"""Polling loop — async cron + state update.""" from __future__ import annotations import asyncio import logging from collections import deque from datetime import datetime from signal_v2.kis_client import KISClient from signal_v2.scheduler import ( KST, _is_market_day, _is_polling_window, _next_interval, ) from signal_v2.state import PollState from signal_v2.stock_client import StockClient logger = logging.getLogger(__name__) async def poll_loop( client: StockClient, state: PollState, shutdown: asyncio.Event, kis_client: KISClient | None = None, ) -> None: """FastAPI lifespan 에서 asyncio.create_task 로 시작.""" logger.info("poll_loop started") while not shutdown.is_set(): now = datetime.now(KST) if _is_market_day(now) and _is_polling_window(now): try: await _run_polling_cycle(client, state, kis_client=kis_client) except Exception: logger.exception("poll cycle failed") interval = _next_interval(now) try: await asyncio.wait_for(shutdown.wait(), timeout=interval) break except asyncio.TimeoutError: continue logger.info("poll_loop ended") async def _run_polling_cycle( client: StockClient, state: PollState, kis_client: KISClient | None = None, ) -> None: """기존 3 endpoint (stock) + KIS 분봉 fetch.""" portfolio, sentiment, screener = await asyncio.gather( client.get_portfolio(), client.get_news_sentiment(), client.run_screener_preview(), return_exceptions=True, ) now_iso = datetime.now(KST).isoformat() for name, result in ( ("portfolio", portfolio), ("news_sentiment", sentiment), ("screener_preview", screener), ): if isinstance(result, dict): setattr(state, name, result) state.last_updated[name] = now_iso state.fetch_errors[name] = 0 else: state.fetch_errors[name] = state.fetch_errors.get(name, 0) + 1 logger.warning("fetch %s failed: %r", name, result) # KIS 분봉 + 호가 (kis_client 주어졌을 때만) if kis_client is not None: try: await _run_kis_minute_cycle(kis_client, state) except Exception: logger.exception("kis minute cycle failed") async def _run_kis_minute_cycle(kis_client: KISClient, state: PollState) -> None: """KIS 분봉 + 호가 fetch + state 갱신. - 분봉: portfolio + screener Top-N union 종목 모두 - 호가 (REST): screener-only 종목 (portfolio 는 WebSocket 으로 들어옴) """ portfolio_tickers = _portfolio_tickers(state) screener_tickers = _screener_tickers(state) all_tickers = list(set(portfolio_tickers) | set(screener_tickers)) # 분봉 fetch (병렬) minute_results = await asyncio.gather(*[ kis_client.get_minute_ohlcv(t) for t in all_tickers ], return_exceptions=True) now_iso = datetime.now(KST).isoformat() for ticker, result in zip(all_tickers, minute_results): if isinstance(result, list): buf = state.minute_bars.setdefault(ticker, deque(maxlen=60)) buf.extend(result) state.last_updated[f"minute_bars/{ticker}"] = now_iso else: state.fetch_errors[f"minute_bars/{ticker}"] = ( state.fetch_errors.get(f"minute_bars/{ticker}", 0) + 1 ) # 호가 fetch (REST) — screener-only screener_only = list(set(screener_tickers) - set(portfolio_tickers)) asking_results = await asyncio.gather(*[ kis_client.get_asking_price(t) for t in screener_only ], return_exceptions=True) for ticker, result in zip(screener_only, asking_results): if isinstance(result, dict): state.asking_price[ticker] = result state.last_updated[f"asking_price/{ticker}"] = now_iso def make_asking_price_callback(state: PollState): """KIS WebSocket on_asking_price callback factory.""" def _cb(ticker: str, data: dict) -> None: state.asking_price[ticker] = data state.last_updated[f"asking_price/{ticker}"] = datetime.now(KST).isoformat() return _cb def _portfolio_tickers(state: PollState) -> list[str]: if state.portfolio is None: return [] return [h["ticker"] for h in state.portfolio.get("holdings", []) if "ticker" in h] def _screener_tickers(state: PollState) -> list[str]: if state.screener_preview is None: return [] return [i["ticker"] for i in state.screener_preview.get("items", []) if "ticker" in i]