"""FastAPI app — Signal V2 Pull Worker.""" from __future__ import annotations import asyncio import logging from contextlib import asynccontextmanager from fastapi import FastAPI from signal_v2 import state as state_mod from signal_v2.config import get_settings from signal_v2.kis_client import KISClient from signal_v2.kis_websocket import KISWebSocket from signal_v2.pull_worker import poll_loop, make_asking_price_callback from signal_v2.rate_limit import SignalDedup from signal_v2.stock_client import StockClient logger = logging.getLogger(__name__) class AppContext: client: StockClient | None = None dedup: SignalDedup | None = None shutdown: asyncio.Event | None = None poll_task: asyncio.Task | None = None kis_client: KISClient | None = None kis_ws: KISWebSocket | None = None _ctx = AppContext() @asynccontextmanager async def lifespan(app: FastAPI): settings = get_settings() if not settings.webai_api_key: logger.warning( "WEBAI_API_KEY not configured — stock API calls will fail with 401" ) if not settings.kis_app_key: logger.warning( "KIS_APP_KEY not configured — KIS REST/WebSocket disabled" ) _ctx.client = StockClient(settings.stock_api_url, settings.webai_api_key) _ctx.dedup = SignalDedup(settings.db_path) _ctx.shutdown = asyncio.Event() # KIS only if app_key configured if settings.kis_app_key: _ctx.kis_client = KISClient( app_key=settings.kis_app_key, app_secret=settings.kis_app_secret, account=settings.kis_account, is_virtual=settings.kis_is_virtual, v1_token_path=settings.v1_token_path, ) _ctx.kis_ws = KISWebSocket( app_key=settings.kis_app_key, app_secret=settings.kis_app_secret, is_virtual=settings.kis_is_virtual, ) # Subscribe portfolio holdings (if any) try: portfolio = await _ctx.client.get_portfolio() tickers = [h["ticker"] for h in portfolio.get("holdings", []) if "ticker" in h] cb = make_asking_price_callback(state_mod.state) await _ctx.kis_ws.start(tickers, cb) except Exception: logger.exception("KIS WebSocket startup failed — continuing without realtime asking_price") _ctx.poll_task = asyncio.create_task( poll_loop( _ctx.client, state_mod.state, _ctx.shutdown, kis_client=_ctx.kis_client, ) ) yield # Shutdown if _ctx.shutdown is not None: _ctx.shutdown.set() if _ctx.poll_task is not None: try: await asyncio.wait_for(_ctx.poll_task, timeout=5.0) except asyncio.TimeoutError: _ctx.poll_task.cancel() try: await _ctx.poll_task except asyncio.CancelledError: pass if _ctx.kis_ws is not None: await _ctx.kis_ws.close() if _ctx.kis_client is not None: await _ctx.kis_client.close() if _ctx.client is not None: await _ctx.client.close() app = FastAPI( title="Signal V2 Pull Worker", version="0.1.0", lifespan=lifespan ) @app.get("/health") async def health(): settings = get_settings() return { "status": "online", "stock_api_url": settings.stock_api_url, "last_poll": state_mod.state.last_updated, "cache_size": _ctx.client.cache_size() if _ctx.client is not None else 0, }