From 94c684bab8b73def08fff70a5b43e6a8361d555d Mon Sep 17 00:00:00 2001 From: gahusb Date: Sat, 16 May 2026 03:49:50 +0900 Subject: [PATCH] feat(signal_v2): pull_worker + FastAPI app + 2 integration tests poll_loop: asyncio.gather parallel fetch of 3 endpoints (portfolio, news_sentiment, screener_preview) + state update. main.py: FastAPI lifespan creates StockClient/SignalDedup/shutdown.Event then spawns poll_loop as background task. GET /health reports status, last poll times, cache size. Signal V2 test suite: 19/19 PASS. Co-Authored-By: Claude Opus 4.7 (1M context) --- signal_v2/main.py | 70 ++++++++++++++++++++++++++++++++++++ signal_v2/pull_worker.py | 58 ++++++++++++++++++++++++++++++ signal_v2/tests/test_main.py | 36 +++++++++++++++++++ 3 files changed, 164 insertions(+) create mode 100644 signal_v2/main.py create mode 100644 signal_v2/pull_worker.py create mode 100644 signal_v2/tests/test_main.py diff --git a/signal_v2/main.py b/signal_v2/main.py new file mode 100644 index 0000000..e9281d7 --- /dev/null +++ b/signal_v2/main.py @@ -0,0 +1,70 @@ +"""FastAPI app — Signal V2 Pull Worker.""" +from __future__ import annotations +import asyncio +import logging +from contextlib import asynccontextmanager + +from fastapi import FastAPI + +from signal_v2 import state as state_mod +from signal_v2.config import get_settings +from signal_v2.pull_worker import poll_loop +from signal_v2.rate_limit import SignalDedup +from signal_v2.stock_client import StockClient + +logger = logging.getLogger(__name__) + + +class AppContext: + client: StockClient | None = None + dedup: SignalDedup | None = None + shutdown: asyncio.Event | None = None + poll_task: asyncio.Task | None = None + + +_ctx = AppContext() + + +@asynccontextmanager +async def lifespan(app: FastAPI): + settings = get_settings() + if not settings.webai_api_key: + logger.warning( + "WEBAI_API_KEY not configured — stock API calls will fail with 401" + ) + + _ctx.client = StockClient(settings.stock_api_url, settings.webai_api_key) + _ctx.dedup = SignalDedup(settings.db_path) + _ctx.shutdown = asyncio.Event() + _ctx.poll_task = asyncio.create_task( + poll_loop(_ctx.client, state_mod.state, _ctx.shutdown) + ) + + yield + + # Shutdown + if _ctx.shutdown is not None: + _ctx.shutdown.set() + if _ctx.poll_task is not None: + try: + await asyncio.wait_for(_ctx.poll_task, timeout=5.0) + except asyncio.TimeoutError: + _ctx.poll_task.cancel() + if _ctx.client is not None: + await _ctx.client.close() + + +app = FastAPI( + title="Signal V2 Pull Worker", version="0.1.0", lifespan=lifespan +) + + +@app.get("/health") +async def health(): + settings = get_settings() + return { + "status": "online", + "stock_api_url": settings.stock_api_url, + "last_poll": state_mod.state.last_updated, + "cache_size": len(_ctx.client._cache) if _ctx.client is not None else 0, + } diff --git a/signal_v2/pull_worker.py b/signal_v2/pull_worker.py new file mode 100644 index 0000000..b369867 --- /dev/null +++ b/signal_v2/pull_worker.py @@ -0,0 +1,58 @@ +"""Polling loop — async cron + state update.""" +from __future__ import annotations +import asyncio +import logging +from datetime import datetime + +from signal_v2.scheduler import ( + KST, _is_market_day, _is_polling_window, _next_interval, +) +from signal_v2.state import PollState +from signal_v2.stock_client import StockClient + +logger = logging.getLogger(__name__) + + +async def poll_loop( + client: StockClient, state: PollState, shutdown: asyncio.Event +) -> None: + """FastAPI lifespan 에서 asyncio.create_task 로 시작.""" + logger.info("poll_loop started") + while not shutdown.is_set(): + now = datetime.now(KST) + if _is_market_day(now) and _is_polling_window(now): + try: + await _run_polling_cycle(client, state) + except Exception: + logger.exception("poll cycle failed") + interval = _next_interval(now) + try: + await asyncio.wait_for(shutdown.wait(), timeout=interval) + break + except asyncio.TimeoutError: + continue + logger.info("poll_loop ended") + + +async def _run_polling_cycle(client: StockClient, state: PollState) -> None: + """3 endpoint 병렬 fetch + state 갱신.""" + portfolio, sentiment, screener = await asyncio.gather( + client.get_portfolio(), + client.get_news_sentiment(), + client.run_screener_preview(), + return_exceptions=True, + ) + now_iso = datetime.now(KST).isoformat() + + for name, result in ( + ("portfolio", portfolio), + ("news_sentiment", sentiment), + ("screener_preview", screener), + ): + if isinstance(result, dict): + setattr(state, name, result) + state.last_updated[name] = now_iso + state.fetch_errors[name] = 0 + else: + state.fetch_errors[name] = state.fetch_errors.get(name, 0) + 1 + logger.warning("fetch %s failed: %r", name, result) diff --git a/signal_v2/tests/test_main.py b/signal_v2/tests/test_main.py new file mode 100644 index 0000000..058211c --- /dev/null +++ b/signal_v2/tests/test_main.py @@ -0,0 +1,36 @@ +"""Tests for FastAPI main app.""" +import logging + +import pytest +from fastapi.testclient import TestClient + + +def test_health_endpoint_returns_status_online(monkeypatch): + monkeypatch.setenv("STOCK_API_URL", "https://test.stock.local") + monkeypatch.setenv("WEBAI_API_KEY", "test-secret") + # Reload modules so they pick up the new env + import importlib + from signal_v2 import config as cfg + importlib.reload(cfg) + from signal_v2 import main as main_mod + importlib.reload(main_mod) + with TestClient(main_mod.app) as client: + r = client.get("/health") + assert r.status_code == 200 + body = r.json() + assert body["status"] == "online" + assert body["stock_api_url"] == "https://test.stock.local" + + +def test_startup_warns_if_webai_api_key_missing(monkeypatch, caplog): + monkeypatch.delenv("WEBAI_API_KEY", raising=False) + monkeypatch.setenv("STOCK_API_URL", "https://test.stock.local") + import importlib + from signal_v2 import config as cfg + importlib.reload(cfg) + from signal_v2 import main as main_mod + importlib.reload(main_mod) + with caplog.at_level(logging.WARNING, logger="signal_v2.main"): + with TestClient(main_mod.app) as client: + client.get("/health") + assert any("WEBAI_API_KEY" in rec.message for rec in caplog.records)