Files
ai-trade/signal_v2/pull_worker.py
gahusb 94c684bab8 feat(signal_v2): pull_worker + FastAPI app + 2 integration tests
poll_loop: asyncio.gather parallel fetch of 3 endpoints (portfolio,
news_sentiment, screener_preview) + state update. main.py: FastAPI
lifespan creates StockClient/SignalDedup/shutdown.Event then spawns
poll_loop as background task. GET /health reports status, last poll
times, cache size.

Signal V2 test suite: 19/19 PASS.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 03:49:50 +09:00

59 lines
1.9 KiB
Python

"""Polling loop — async cron + state update."""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime
from signal_v2.scheduler import (
KST, _is_market_day, _is_polling_window, _next_interval,
)
from signal_v2.state import PollState
from signal_v2.stock_client import StockClient
logger = logging.getLogger(__name__)
async def poll_loop(
client: StockClient, state: PollState, shutdown: asyncio.Event
) -> None:
"""FastAPI lifespan 에서 asyncio.create_task 로 시작."""
logger.info("poll_loop started")
while not shutdown.is_set():
now = datetime.now(KST)
if _is_market_day(now) and _is_polling_window(now):
try:
await _run_polling_cycle(client, state)
except Exception:
logger.exception("poll cycle failed")
interval = _next_interval(now)
try:
await asyncio.wait_for(shutdown.wait(), timeout=interval)
break
except asyncio.TimeoutError:
continue
logger.info("poll_loop ended")
async def _run_polling_cycle(client: StockClient, state: PollState) -> None:
"""3 endpoint 병렬 fetch + state 갱신."""
portfolio, sentiment, screener = await asyncio.gather(
client.get_portfolio(),
client.get_news_sentiment(),
client.run_screener_preview(),
return_exceptions=True,
)
now_iso = datetime.now(KST).isoformat()
for name, result in (
("portfolio", portfolio),
("news_sentiment", sentiment),
("screener_preview", screener),
):
if isinstance(result, dict):
setattr(state, name, result)
state.last_updated[name] = now_iso
state.fetch_errors[name] = 0
else:
state.fetch_errors[name] = state.fetch_errors.get(name, 0) + 1
logger.warning("fetch %s failed: %r", name, result)