Files
ai-trade/ai_trade/pull_worker.py
gahusb 139e4e3382 refactor(web-ai): rename signal_v2→ai_trade, deprecate signal_v1
박재오 결정 2026-05-19 — V2를 정식 명칭 ai_trade로 graduation,
V1은 deprecated 마킹 (legacy 디렉토리 이동은 file lock 풀린 후 후속).

변경 사항:
- signal_v2/ → ai_trade/ (git mv, import 일괄 sed: signal_v2.x → ai_trade.x)
- root start.bat → legacy/start_v1.bat (V1 자동 시작 차단)
- ai_trade/start.bat 내부 uvicorn target signal_v2.main → ai_trade.main
- signal_v1/DEPRECATED.md 추가 (사용 금지 명시)
- CLAUDE.md 디렉토리 표·서버 시작 방식 갱신
- services/ 디렉토리 미래 예정 (Plan-B-Insta 작업 시 신설)

ai_trade tests 59/59 PASS 확인.

signal_v1/ 디렉토리 자체 이동(legacy/signal_v1/)은 telegram_bot.log +
data/news_snapshots.db file lock으로 보류. lock 해제 후 후속 커밋.

후속 작업: Plan-B-Insta (services/insta-render + NAS insta 분할)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 01:31:47 +09:00

194 lines
7.2 KiB
Python

"""Polling loop — async cron + state update."""
from __future__ import annotations
import asyncio
import logging
from collections import deque
from datetime import datetime
from ai_trade.kis_client import KISClient
from ai_trade.scheduler import (
KST, _is_market_day, _is_polling_window, _next_interval, _is_post_close_trigger,
)
from ai_trade.state import PollState
from ai_trade.stock_client import StockClient
logger = logging.getLogger(__name__)
async def poll_loop(
client: StockClient, state: PollState, shutdown: asyncio.Event,
kis_client: KISClient | None = None,
chronos=None,
dedup=None,
settings=None,
) -> None:
"""FastAPI lifespan 에서 asyncio.create_task 로 시작."""
logger.info("poll_loop started")
while not shutdown.is_set():
now = datetime.now(KST)
if _is_market_day(now) and _is_polling_window(now):
try:
await _run_polling_cycle(client, state, kis_client=kis_client)
except Exception:
logger.exception("poll cycle failed")
# Minute momentum 갱신 (매 cycle)
try:
update_minute_momentum_for_all(state)
except Exception:
logger.exception("minute momentum update failed")
# Post-close trigger (16:00 KST)
if _is_post_close_trigger(now) and chronos is not None and kis_client is not None:
try:
await _run_post_close_cycle(kis_client, chronos, state)
except Exception:
logger.exception("post-close cycle failed")
# Phase 4: generate signals
if dedup is not None and settings is not None:
try:
from ai_trade.signal_generator import generate_signals
generate_signals(state, dedup, settings)
except Exception:
logger.exception("generate_signals failed")
interval = _next_interval(now)
try:
await asyncio.wait_for(shutdown.wait(), timeout=interval)
break
except asyncio.TimeoutError:
continue
logger.info("poll_loop ended")
async def _run_polling_cycle(
client: StockClient, state: PollState,
kis_client: KISClient | None = None,
) -> None:
"""기존 3 endpoint (stock) + KIS 분봉 fetch."""
portfolio, sentiment, screener = await asyncio.gather(
client.get_portfolio(),
client.get_news_sentiment(),
client.run_screener_preview(),
return_exceptions=True,
)
now_iso = datetime.now(KST).isoformat()
for name, result in (
("portfolio", portfolio),
("news_sentiment", sentiment),
("screener_preview", screener),
):
if isinstance(result, dict):
setattr(state, name, result)
state.last_updated[name] = now_iso
state.fetch_errors[name] = 0
else:
state.fetch_errors[name] = state.fetch_errors.get(name, 0) + 1
logger.warning("fetch %s failed: %r", name, result)
# KIS 분봉 + 호가 (kis_client 주어졌을 때만)
if kis_client is not None:
try:
await _run_kis_minute_cycle(kis_client, state)
except Exception:
logger.exception("kis minute cycle failed")
async def _run_kis_minute_cycle(kis_client: KISClient, state: PollState) -> None:
"""KIS 분봉 + 호가 fetch + state 갱신.
- 분봉: portfolio + screener Top-N union 종목 모두
- 호가 (REST): screener-only 종목 (portfolio 는 WebSocket 으로 들어옴)
"""
portfolio_tickers = _portfolio_tickers(state)
screener_tickers = _screener_tickers(state)
all_tickers = list(set(portfolio_tickers) | set(screener_tickers))
# 분봉 fetch (병렬)
minute_results = await asyncio.gather(*[
kis_client.get_minute_ohlcv(t) for t in all_tickers
], return_exceptions=True)
now_iso = datetime.now(KST).isoformat()
for ticker, result in zip(all_tickers, minute_results):
if isinstance(result, list):
buf = state.minute_bars.setdefault(ticker, deque(maxlen=60))
buf.extend(result)
state.last_updated[f"minute_bars/{ticker}"] = now_iso
else:
state.fetch_errors[f"minute_bars/{ticker}"] = (
state.fetch_errors.get(f"minute_bars/{ticker}", 0) + 1
)
# 호가 fetch (REST) — screener-only
screener_only = list(set(screener_tickers) - set(portfolio_tickers))
asking_results = await asyncio.gather(*[
kis_client.get_asking_price(t) for t in screener_only
], return_exceptions=True)
for ticker, result in zip(screener_only, asking_results):
if isinstance(result, dict):
state.asking_price[ticker] = result
state.last_updated[f"asking_price/{ticker}"] = now_iso
def make_asking_price_callback(state: PollState):
"""KIS WebSocket on_asking_price callback factory."""
def _cb(ticker: str, data: dict) -> None:
state.asking_price[ticker] = data
state.last_updated[f"asking_price/{ticker}"] = datetime.now(KST).isoformat()
return _cb
def _portfolio_tickers(state: PollState) -> list[str]:
if state.portfolio is None:
return []
return [h["ticker"] for h in state.portfolio.get("holdings", []) if "ticker" in h]
def _screener_tickers(state: PollState) -> list[str]:
if state.screener_preview is None:
return []
return [i["ticker"] for i in state.screener_preview.get("items", []) if "ticker" in i]
async def _run_post_close_cycle(kis_client, chronos, state) -> None:
"""16:00 KST 종가 후 1회: daily fetch + chronos predict."""
tickers = list(set(_portfolio_tickers(state)) | set(_screener_tickers(state)))
if not tickers:
return
daily_results = await asyncio.gather(*[
kis_client.get_daily_ohlcv(t, days=60) for t in tickers
], return_exceptions=True)
daily_dict = {}
for ticker, result in zip(tickers, daily_results):
if isinstance(result, list) and len(result) >= 30:
daily_dict[ticker] = result
state.daily_ohlcv[ticker] = result
elif isinstance(result, Exception):
state.fetch_errors[f"daily_ohlcv/{ticker}"] = (
state.fetch_errors.get(f"daily_ohlcv/{ticker}", 0) + 1
)
if daily_dict and chronos is not None:
try:
predictions = chronos.predict_batch(daily_dict)
except Exception:
logger.exception("chronos predict_batch failed")
return
for ticker, pred in predictions.items():
state.chronos_predictions[ticker] = {
"median": pred.median,
"q10": pred.q10,
"q90": pred.q90,
"conf": pred.conf,
"as_of": pred.as_of,
}
state.last_updated[f"chronos/{ticker}"] = pred.as_of
def update_minute_momentum_for_all(state) -> None:
"""매 분봉 cycle 후 호출 — 모든 종목 모멘텀 갱신."""
from ai_trade.momentum_classifier import classify_minute_momentum
now_iso = datetime.now(KST).isoformat()
for ticker, bars in state.minute_bars.items():
state.minute_momentum[ticker] = classify_minute_momentum(bars)
state.last_updated[f"momentum/{ticker}"] = now_iso