feat(signal_v2): pull_worker + FastAPI app + 2 integration tests

poll_loop: asyncio.gather parallel fetch of 3 endpoints (portfolio,
news_sentiment, screener_preview) + state update. main.py: FastAPI
lifespan creates StockClient/SignalDedup/shutdown.Event then spawns
poll_loop as background task. GET /health reports status, last poll
times, cache size.

Signal V2 test suite: 19/19 PASS.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-16 03:49:50 +09:00
parent 1a6d9fcb39
commit 94c684bab8
3 changed files with 164 additions and 0 deletions

70
signal_v2/main.py Normal file
View File

@@ -0,0 +1,70 @@
"""FastAPI app — Signal V2 Pull Worker."""
from __future__ import annotations
import asyncio
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from signal_v2 import state as state_mod
from signal_v2.config import get_settings
from signal_v2.pull_worker import poll_loop
from signal_v2.rate_limit import SignalDedup
from signal_v2.stock_client import StockClient
logger = logging.getLogger(__name__)
class AppContext:
client: StockClient | None = None
dedup: SignalDedup | None = None
shutdown: asyncio.Event | None = None
poll_task: asyncio.Task | None = None
_ctx = AppContext()
@asynccontextmanager
async def lifespan(app: FastAPI):
settings = get_settings()
if not settings.webai_api_key:
logger.warning(
"WEBAI_API_KEY not configured — stock API calls will fail with 401"
)
_ctx.client = StockClient(settings.stock_api_url, settings.webai_api_key)
_ctx.dedup = SignalDedup(settings.db_path)
_ctx.shutdown = asyncio.Event()
_ctx.poll_task = asyncio.create_task(
poll_loop(_ctx.client, state_mod.state, _ctx.shutdown)
)
yield
# Shutdown
if _ctx.shutdown is not None:
_ctx.shutdown.set()
if _ctx.poll_task is not None:
try:
await asyncio.wait_for(_ctx.poll_task, timeout=5.0)
except asyncio.TimeoutError:
_ctx.poll_task.cancel()
if _ctx.client is not None:
await _ctx.client.close()
app = FastAPI(
title="Signal V2 Pull Worker", version="0.1.0", lifespan=lifespan
)
@app.get("/health")
async def health():
settings = get_settings()
return {
"status": "online",
"stock_api_url": settings.stock_api_url,
"last_poll": state_mod.state.last_updated,
"cache_size": len(_ctx.client._cache) if _ctx.client is not None else 0,
}

58
signal_v2/pull_worker.py Normal file
View File

@@ -0,0 +1,58 @@
"""Polling loop — async cron + state update."""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime
from signal_v2.scheduler import (
KST, _is_market_day, _is_polling_window, _next_interval,
)
from signal_v2.state import PollState
from signal_v2.stock_client import StockClient
logger = logging.getLogger(__name__)
async def poll_loop(
client: StockClient, state: PollState, shutdown: asyncio.Event
) -> None:
"""FastAPI lifespan 에서 asyncio.create_task 로 시작."""
logger.info("poll_loop started")
while not shutdown.is_set():
now = datetime.now(KST)
if _is_market_day(now) and _is_polling_window(now):
try:
await _run_polling_cycle(client, state)
except Exception:
logger.exception("poll cycle failed")
interval = _next_interval(now)
try:
await asyncio.wait_for(shutdown.wait(), timeout=interval)
break
except asyncio.TimeoutError:
continue
logger.info("poll_loop ended")
async def _run_polling_cycle(client: StockClient, state: PollState) -> None:
"""3 endpoint 병렬 fetch + state 갱신."""
portfolio, sentiment, screener = await asyncio.gather(
client.get_portfolio(),
client.get_news_sentiment(),
client.run_screener_preview(),
return_exceptions=True,
)
now_iso = datetime.now(KST).isoformat()
for name, result in (
("portfolio", portfolio),
("news_sentiment", sentiment),
("screener_preview", screener),
):
if isinstance(result, dict):
setattr(state, name, result)
state.last_updated[name] = now_iso
state.fetch_errors[name] = 0
else:
state.fetch_errors[name] = state.fetch_errors.get(name, 0) + 1
logger.warning("fetch %s failed: %r", name, result)

View File

@@ -0,0 +1,36 @@
"""Tests for FastAPI main app."""
import logging
import pytest
from fastapi.testclient import TestClient
def test_health_endpoint_returns_status_online(monkeypatch):
monkeypatch.setenv("STOCK_API_URL", "https://test.stock.local")
monkeypatch.setenv("WEBAI_API_KEY", "test-secret")
# Reload modules so they pick up the new env
import importlib
from signal_v2 import config as cfg
importlib.reload(cfg)
from signal_v2 import main as main_mod
importlib.reload(main_mod)
with TestClient(main_mod.app) as client:
r = client.get("/health")
assert r.status_code == 200
body = r.json()
assert body["status"] == "online"
assert body["stock_api_url"] == "https://test.stock.local"
def test_startup_warns_if_webai_api_key_missing(monkeypatch, caplog):
monkeypatch.delenv("WEBAI_API_KEY", raising=False)
monkeypatch.setenv("STOCK_API_URL", "https://test.stock.local")
import importlib
from signal_v2 import config as cfg
importlib.reload(cfg)
from signal_v2 import main as main_mod
importlib.reload(main_mod)
with caplog.at_level(logging.WARNING, logger="signal_v2.main"):
with TestClient(main_mod.app) as client:
client.get("/health")
assert any("WEBAI_API_KEY" in rec.message for rec in caplog.records)