poll_loop: asyncio.gather parallel fetch of 3 endpoints (portfolio, news_sentiment, screener_preview) + state update. main.py: FastAPI lifespan creates StockClient/SignalDedup/shutdown.Event then spawns poll_loop as background task. GET /health reports status, last poll times, cache size. Signal V2 test suite: 19/19 PASS. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
59 lines
1.9 KiB
Python
59 lines
1.9 KiB
Python
"""Polling loop — async cron + state update."""
|
|
from __future__ import annotations
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
from signal_v2.scheduler import (
|
|
KST, _is_market_day, _is_polling_window, _next_interval,
|
|
)
|
|
from signal_v2.state import PollState
|
|
from signal_v2.stock_client import StockClient
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def poll_loop(
|
|
client: StockClient, state: PollState, shutdown: asyncio.Event
|
|
) -> None:
|
|
"""FastAPI lifespan 에서 asyncio.create_task 로 시작."""
|
|
logger.info("poll_loop started")
|
|
while not shutdown.is_set():
|
|
now = datetime.now(KST)
|
|
if _is_market_day(now) and _is_polling_window(now):
|
|
try:
|
|
await _run_polling_cycle(client, state)
|
|
except Exception:
|
|
logger.exception("poll cycle failed")
|
|
interval = _next_interval(now)
|
|
try:
|
|
await asyncio.wait_for(shutdown.wait(), timeout=interval)
|
|
break
|
|
except asyncio.TimeoutError:
|
|
continue
|
|
logger.info("poll_loop ended")
|
|
|
|
|
|
async def _run_polling_cycle(client: StockClient, state: PollState) -> None:
|
|
"""3 endpoint 병렬 fetch + state 갱신."""
|
|
portfolio, sentiment, screener = await asyncio.gather(
|
|
client.get_portfolio(),
|
|
client.get_news_sentiment(),
|
|
client.run_screener_preview(),
|
|
return_exceptions=True,
|
|
)
|
|
now_iso = datetime.now(KST).isoformat()
|
|
|
|
for name, result in (
|
|
("portfolio", portfolio),
|
|
("news_sentiment", sentiment),
|
|
("screener_preview", screener),
|
|
):
|
|
if isinstance(result, dict):
|
|
setattr(state, name, result)
|
|
state.last_updated[name] = now_iso
|
|
state.fetch_errors[name] = 0
|
|
else:
|
|
state.fetch_errors[name] = state.fetch_errors.get(name, 0) + 1
|
|
logger.warning("fetch %s failed: %r", name, result)
|