Files
ai-trade/signal_v2/main.py
gahusb b690900cfc fix(signal_v2-phase3a): V1 KIS env pattern + test isolation fix
config.py: kis_env_type (virtual/real) + KIS_REAL_*/KIS_VIRTUAL_* env
variables (V1 호환). kis_app_key/kis_app_secret/kis_account properties
auto-select based on env type.

main.py: KIS not-configured warning uses descriptive message including
env type + expected var name.

test_main.py: monkeypatch load_dotenv to no-op + setenv empty string
(instead of delenv) — defeats .env re-read on importlib.reload.
Pre-existing test_startup_warns_if_webai_api_key_missing also fixed.

33/33 tests pass (was 31/33).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 10:25:58 +09:00

115 lines
3.5 KiB
Python

"""FastAPI app — Signal V2 Pull Worker."""
from __future__ import annotations
import asyncio
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from signal_v2 import state as state_mod
from signal_v2.config import get_settings
from signal_v2.kis_client import KISClient
from signal_v2.kis_websocket import KISWebSocket
from signal_v2.pull_worker import poll_loop, make_asking_price_callback
from signal_v2.rate_limit import SignalDedup
from signal_v2.stock_client import StockClient
logger = logging.getLogger(__name__)
class AppContext:
client: StockClient | None = None
dedup: SignalDedup | None = None
shutdown: asyncio.Event | None = None
poll_task: asyncio.Task | None = None
kis_client: KISClient | None = None
kis_ws: KISWebSocket | None = None
_ctx = AppContext()
@asynccontextmanager
async def lifespan(app: FastAPI):
settings = get_settings()
if not settings.webai_api_key:
logger.warning(
"WEBAI_API_KEY not configured — stock API calls will fail with 401"
)
if not settings.kis_app_key:
logger.warning(
"KIS app_key not configured (KIS_ENV_TYPE=%s, KIS_%s_APP_KEY missing) — KIS REST/WebSocket disabled",
settings.kis_env_type, settings.kis_env_type.upper()
)
_ctx.client = StockClient(settings.stock_api_url, settings.webai_api_key)
_ctx.dedup = SignalDedup(settings.db_path)
_ctx.shutdown = asyncio.Event()
# KIS only if app_key configured
if settings.kis_app_key:
_ctx.kis_client = KISClient(
app_key=settings.kis_app_key,
app_secret=settings.kis_app_secret,
account=settings.kis_account,
is_virtual=settings.kis_is_virtual,
v1_token_path=settings.v1_token_path,
)
_ctx.kis_ws = KISWebSocket(
app_key=settings.kis_app_key,
app_secret=settings.kis_app_secret,
is_virtual=settings.kis_is_virtual,
)
# Subscribe portfolio holdings (if any)
try:
portfolio = await _ctx.client.get_portfolio()
tickers = [h["ticker"] for h in portfolio.get("holdings", []) if "ticker" in h]
cb = make_asking_price_callback(state_mod.state)
await _ctx.kis_ws.start(tickers, cb)
except Exception:
logger.exception("KIS WebSocket startup failed — continuing without realtime asking_price")
_ctx.poll_task = asyncio.create_task(
poll_loop(
_ctx.client, state_mod.state, _ctx.shutdown,
kis_client=_ctx.kis_client,
)
)
yield
# Shutdown
if _ctx.shutdown is not None:
_ctx.shutdown.set()
if _ctx.poll_task is not None:
try:
await asyncio.wait_for(_ctx.poll_task, timeout=5.0)
except asyncio.TimeoutError:
_ctx.poll_task.cancel()
try:
await _ctx.poll_task
except asyncio.CancelledError:
pass
if _ctx.kis_ws is not None:
await _ctx.kis_ws.close()
if _ctx.kis_client is not None:
await _ctx.kis_client.close()
if _ctx.client is not None:
await _ctx.client.close()
app = FastAPI(
title="Signal V2 Pull Worker", version="0.1.0", lifespan=lifespan
)
@app.get("/health")
async def health():
settings = get_settings()
return {
"status": "online",
"stock_api_url": settings.stock_api_url,
"last_poll": state_mod.state.last_updated,
"cache_size": _ctx.client.cache_size() if _ctx.client is not None else 0,
}