Files
web-page/docs/superpowers/plans/2026-05-25-ai-trade-hotfix.md

22 KiB

ai_trade Hotfix — Code Review F1·F2·F3·F4 Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: ai_trade(V2) 코드 리뷰 7개 finding 중 High 3건(F1·F2·F3) + Medium 1건(F4)을 TDD로 수정. F5/F6은 별도 plan, F7은 pushback.

Architecture: 모두 ai_trade/ 내부 단일 모듈 수정. (1) config.py default 경로 — legacy/ 경유. (2) kis_client.py — asyncio.Lock으로 _throttle() 직렬화. (3) scheduler.py + pull_worker.py — post-close를 시간 윈도우가 아닌 "일 1회 + 16:00 이후" 상태기반으로 변경. (4) chronos_predictor.py — confidence 산식을 absolute spread 기반으로 통일.

Tech Stack: Python 3.12, asyncio, pytest + pytest-asyncio + respx, httpx.

Test runner: .venv 한글 경로 깨짐 + 리뷰어 Python 312 경로 부재 보고로, 시스템 Python 사용. 정확한 경로는 where python 으로 우선 확인. 기본 시도 순서:

  1. python -m pytest ai_trade/tests -q (PATH의 Python)
  2. py -3.12 -m pytest ai_trade/tests -q (py launcher)
  3. 둘 다 실패 시 환경 셋업이 선행 작업 — plan 진행 중단하고 박재오에게 보고.

Working directory: C:\Users\jaeoh\Desktop\workspace\web-ai (web-ai repo). Commit/push도 이 디렉토리에서만.


File Map

파일 변경 종류 책임
ai_trade/config.py Modify L31-36 V1_TOKEN_PATH default를 legacy/signal_v1/data/kis_token.json
ai_trade/kis_client.py Modify L40-62 _throttle_lock: asyncio.Lock 추가, _throttle()을 lock 안에서 실행
ai_trade/scheduler.py Modify L79-84 _is_post_close_trigger(now, last_post_close_date) 시그니처 변경 — 상태기반
ai_trade/pull_worker.py Modify L1-58 poll_looplast_post_close_date state 추가, 호출부 갱신
ai_trade/chronos_predictor.py Modify L106, L127 spread 계산을 absolute (q90-q10)로, confidence 산식 max(0, 1 - spread/0.6)
ai_trade/tests/test_kis_client.py Add 1 test concurrent gather throttle test
ai_trade/tests/test_scheduler.py Add 3 tests post-close 상태기반 트리거
ai_trade/tests/test_pull_worker.py Add 1 test 첫 호출 안 됐다가 16:00 이후 5분 cycle에서 호출됨
ai_trade/tests/test_chronos_predictor.py Add 2 tests median≈0에서도 conf 정상, spread 클수록 conf↓
ai_trade/tests/test_main.py Modify v1_token_path default 변경 반영 (있다면)

Task 1: F1 — KIS 토큰 경로 default를 legacy/ 경유로

Files:

  • Modify: ai_trade/config.py:31-36

  • Test: ai_trade/tests/test_config_token_path.py (Create)

  • Step 1: Write the failing test

# ai_trade/tests/test_config_token_path.py
"""F1 — V1_TOKEN_PATH default가 legacy/signal_v1/ 경유인지 검증."""
import os
from pathlib import Path

from ai_trade.config import Settings


def test_v1_token_default_path_uses_legacy_dir(monkeypatch):
    """env에 V1_TOKEN_PATH 없으면 legacy/signal_v1/data/kis_token.json"""
    monkeypatch.delenv("V1_TOKEN_PATH", raising=False)
    settings = Settings()
    expected_suffix = Path("legacy") / "signal_v1" / "data" / "kis_token.json"
    assert str(settings.v1_token_path).endswith(str(expected_suffix)), (
        f"expected default to end with {expected_suffix}, got {settings.v1_token_path}"
    )


def test_v1_token_env_override_wins(monkeypatch, tmp_path):
    """env로 명시한 경로가 default를 덮어씀."""
    custom = tmp_path / "custom_token.json"
    monkeypatch.setenv("V1_TOKEN_PATH", str(custom))
    settings = Settings()
    assert settings.v1_token_path == custom
  • Step 2: Run test to verify it fails
python -m pytest ai_trade/tests/test_config_token_path.py -v

Expected: test_v1_token_default_path_uses_legacy_dir FAILs (default가 signal_v1/... 임). env override는 PASS.

  • Step 3: Fix config.py

ai_trade/config.py:31-36 변경:

    v1_token_path: Path = field(
        default_factory=lambda: Path(
            os.getenv("V1_TOKEN_PATH",
                      str(Path(__file__).parent.parent / "legacy" / "signal_v1" / "data" / "kis_token.json"))
        )
    )
  • Step 4: Run test to verify it passes
python -m pytest ai_trade/tests/test_config_token_path.py -v

Expected: 2 passed.

  • Step 5: Verify full test suite still passes
python -m pytest ai_trade/tests -q

Expected: 모든 기존 테스트 PASS (token path 기본값 변경이 다른 test에 영향 없는지 확인).

  • Step 6: Commit
git add ai_trade/config.py ai_trade/tests/test_config_token_path.py
git commit -m "fix(ai_trade): V1_TOKEN_PATH default를 legacy/signal_v1/ 경유로 수정 (F1)"

Task 2: F2 — KIS throttle을 asyncio.Lock으로 직렬화

Files:

  • Modify: ai_trade/kis_client.py:40-62

  • Test: ai_trade/tests/test_kis_client.py (Modify — 새 test 추가)

  • Step 1: Write the failing test

ai_trade/tests/test_kis_client.py 파일 끝에 추가:

import asyncio
import time as time_module


@respx.mock
async def test_throttle_serializes_concurrent_gather(kis_client_factory):
    """5개 동시 요청이 asyncio.gather로 들어와도 0.5초 간격으로 직렬화되어야 함 (F2).

    초당 2회 = 0.5초 간격. 5개 요청이면 최소 (5-1)*0.5 = 2.0초 소요.
    Race condition 있으면 5개가 거의 동시에 나가서 2초 훨씬 안쪽에 끝남.
    """
    sample = {"output2": []}
    respx.get(
        "https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
    ).mock(return_value=httpx.Response(200, json=sample))

    client = kis_client_factory()
    try:
        start = time_module.monotonic()
        await asyncio.gather(*[client.get_minute_ohlcv(f"00593{i}") for i in range(5)])
        elapsed = time_module.monotonic() - start
        # 5개 throttle = 최소 (5-1)*0.5 = 2.0초. tolerance 0.3초.
        assert elapsed >= 1.7, (
            f"throttle race condition: 5 concurrent calls took only {elapsed:.2f}s, "
            f"expected >=1.7s (0.5s * 4 inter-call gaps)"
        )
    finally:
        await client.close()
  • Step 2: Run test to verify it fails
python -m pytest ai_trade/tests/test_kis_client.py::test_throttle_serializes_concurrent_gather -v

Expected: FAIL — elapsed가 0.5초 이하 (race로 동시 깸).

  • Step 3: Add asyncio.Lock to KISClient

ai_trade/kis_client.py:40 __init__ 끝에 한 줄 추가:

        self._token_cache: tuple[str, float] | None = None  # (token, file_mtime)
        self._last_throttle_at = 0.0
        self._throttle_lock = asyncio.Lock()

그리고 _throttle() (L58-62)을 lock으로 감쌈:

    async def _throttle(self) -> None:
        async with self._throttle_lock:
            elapsed = time.monotonic() - self._last_throttle_at
            if elapsed < _THROTTLE_INTERVAL:
                await asyncio.sleep(_THROTTLE_INTERVAL - elapsed)
            self._last_throttle_at = time.monotonic()
  • Step 4: Run test to verify it passes
python -m pytest ai_trade/tests/test_kis_client.py::test_throttle_serializes_concurrent_gather -v

Expected: PASS — elapsed >= 1.7s.

  • Step 5: Verify full kis_client suite still passes
python -m pytest ai_trade/tests/test_kis_client.py -v

Expected: 모든 test PASS (기존 429 retry 등 영향 없는지 확인).

  • Step 6: Commit
git add ai_trade/kis_client.py ai_trade/tests/test_kis_client.py
git commit -m "fix(ai_trade): KIS throttle을 asyncio.Lock으로 직렬화 (F2)"

Task 3: F3 — post-close 트리거를 상태기반으로 변경

Files:

  • Modify: ai_trade/scheduler.py:79-84
  • Modify: ai_trade/pull_worker.py:1-58
  • Test: ai_trade/tests/test_scheduler.py (add 3 tests)

Why state-based: 16:00:00-16:00:59 윈도우는 5분 sleep + 비결정적 cycle 시작 시각과 충돌. "오늘 아직 post-close 안 돌렸고 현재 시각 ≥ 16:00 이면 trigger 후 today 표시" 로 변경.

  • Step 1: Write the failing tests

ai_trade/tests/test_scheduler.py 파일 끝에 추가:

from datetime import date as _date
from ai_trade.scheduler import _is_post_close_trigger


def test_post_close_trigger_fires_at_1601_if_not_yet_today():
    """16:01에 깬 cycle도 오늘 아직 안 돌렸으면 trigger (F3)."""
    now = _kst(2026, 5, 18, 16, 1)
    assert _is_post_close_trigger(now, last_post_close_date=None) is True


def test_post_close_trigger_skips_if_already_today():
    """이미 오늘 돌렸으면 trigger 안 함."""
    now = _kst(2026, 5, 18, 16, 5)
    today = _date(2026, 5, 18)
    assert _is_post_close_trigger(now, last_post_close_date=today) is False


def test_post_close_trigger_skips_before_1600():
    """16:00 전에는 trigger 안 함."""
    now = _kst(2026, 5, 18, 15, 59)
    assert _is_post_close_trigger(now, last_post_close_date=None) is False


def test_post_close_trigger_fires_next_day_after_reset():
    """다음 영업일이 되면 last_post_close_date < today.date() 이므로 다시 trigger."""
    now = _kst(2026, 5, 19, 16, 0)
    yesterday = _date(2026, 5, 18)
    assert _is_post_close_trigger(now, last_post_close_date=yesterday) is True


def test_post_close_trigger_skips_on_holiday():
    """휴장일에는 trigger 안 함 (2026-05-05 어린이날)."""
    now = _kst(2026, 5, 5, 16, 30)
    assert _is_post_close_trigger(now, last_post_close_date=None) is False
  • Step 2: Run tests to verify they fail
python -m pytest ai_trade/tests/test_scheduler.py -v -k post_close

Expected: FAIL — _is_post_close_trigger가 신규 시그니처(last_post_close_date 인자) 미지원.

  • Step 3: Modify scheduler.py:79-84
def _is_post_close_trigger(now: datetime, last_post_close_date) -> bool:
    """16:00 KST 이후 오늘 아직 post-close cycle 안 돌렸으면 True (F3 상태기반).

    Args:
        now: 현재 KST datetime.
        last_post_close_date: 마지막 post-close 실행 영업일 date 객체 (None=미실행).
    """
    if not _is_market_day(now):
        return False
    if now.time() < time(16, 0):
        return False
    today = now.date()
    return last_post_close_date != today
  • Step 4: Run scheduler tests
python -m pytest ai_trade/tests/test_scheduler.py -v

Expected: 신규 5개 PASS. 기존 test도 PASS (다른 함수 영향 없음).

  • Step 5: Update pull_worker.py to track last_post_close_date

ai_trade/pull_worker.pypoll_loop (L18-58)을 다음으로 교체:

async def poll_loop(
    client: StockClient, state: PollState, shutdown: asyncio.Event,
    kis_client: KISClient | None = None,
    chronos=None,
    dedup=None,
    settings=None,
) -> None:
    """FastAPI lifespan 에서 asyncio.create_task 로 시작."""
    logger.info("poll_loop started")
    last_post_close_date = None
    while not shutdown.is_set():
        now = datetime.now(KST)
        if _is_market_day(now) and _is_polling_window(now):
            try:
                await _run_polling_cycle(client, state, kis_client=kis_client)
            except Exception:
                logger.exception("poll cycle failed")
            # Minute momentum 갱신 (매 cycle)
            try:
                update_minute_momentum_for_all(state)
            except Exception:
                logger.exception("minute momentum update failed")
            # Post-close trigger (상태기반: 16:00 이후 + 오늘 미실행)
            if (
                _is_post_close_trigger(now, last_post_close_date)
                and chronos is not None and kis_client is not None
            ):
                try:
                    await _run_post_close_cycle(kis_client, chronos, state)
                    last_post_close_date = now.date()
                except Exception:
                    logger.exception("post-close cycle failed")
            # Phase 4: generate signals
            if dedup is not None and settings is not None:
                try:
                    from ai_trade.signal_generator import generate_signals
                    generate_signals(state, dedup, settings)
                except Exception:
                    logger.exception("generate_signals failed")
        interval = _next_interval(now)
        try:
            await asyncio.wait_for(shutdown.wait(), timeout=interval)
            break
        except asyncio.TimeoutError:
            continue
    logger.info("poll_loop ended")
  • Step 6: Add pull_worker test

ai_trade/tests/test_pull_worker.py 파일 끝에 추가:

from unittest.mock import AsyncMock, MagicMock
from datetime import datetime as _dt
from zoneinfo import ZoneInfo as _ZI
import asyncio as _asyncio


async def test_post_close_fires_at_1601_when_not_yet_today(monkeypatch):
    """16:01에 깬 cycle도 post_close 안 돌렸으면 호출됨 (F3 회귀)."""
    from ai_trade import pull_worker

    _kst = _ZI("Asia/Seoul")
    now_at_1601 = _dt(2026, 5, 18, 16, 1, tzinfo=_kst)

    real_dt = _dt

    class FrozenDateTime:
        @staticmethod
        def now(tz=None):
            return now_at_1601

    monkeypatch.setattr(pull_worker, "datetime", FrozenDateTime)
    monkeypatch.setattr(
        pull_worker, "_is_market_day", lambda n: True,
    )
    monkeypatch.setattr(
        pull_worker, "_is_polling_window", lambda n: True,
    )
    monkeypatch.setattr(
        pull_worker, "_next_interval", lambda n: 0.01,
    )
    monkeypatch.setattr(
        pull_worker, "_run_polling_cycle", AsyncMock(),
    )
    monkeypatch.setattr(
        pull_worker, "update_minute_momentum_for_all", lambda s: None,
    )
    post_close = AsyncMock()
    monkeypatch.setattr(pull_worker, "_run_post_close_cycle", post_close)

    state = MagicMock()
    chronos = MagicMock()
    kis = MagicMock()
    shutdown = _asyncio.Event()

    async def _stop_soon():
        await _asyncio.sleep(0.05)
        shutdown.set()

    _asyncio.create_task(_stop_soon())
    await pull_worker.poll_loop(
        client=MagicMock(),
        state=state,
        shutdown=shutdown,
        kis_client=kis,
        chronos=chronos,
        dedup=None,
        settings=None,
    )

    assert post_close.await_count >= 1, "post-close가 16:01에 호출되지 않음 (F3 회귀)"
  • Step 7: Run pull_worker test
python -m pytest ai_trade/tests/test_pull_worker.py::test_post_close_fires_at_1601_when_not_yet_today -v

Expected: PASS.

  • Step 8: Run full ai_trade suite
python -m pytest ai_trade/tests -q

Expected: 모두 PASS.

  • Step 9: Commit
git add ai_trade/scheduler.py ai_trade/pull_worker.py ai_trade/tests/test_scheduler.py ai_trade/tests/test_pull_worker.py
git commit -m "fix(ai_trade): post-close trigger를 상태기반으로 변경 (F3)"

Task 4: F4 — Chronos confidence를 absolute spread 기반으로 통일

Files:

  • Modify: ai_trade/chronos_predictor.py:106, 127
  • Test: ai_trade/tests/test_chronos_predictor.py (add 2 tests)

Why absolute: Phase 4 spec amendment (web-ui commit 534ded5)가 absolute spread로 hard gate를 결정. confidence도 같은 철학으로. 새 산식: conf = max(0, min(1, 1 - spread / SPREAD_THRESHOLD)) — spread가 0.6에 도달하면 conf=0, 0이면 conf=1.

  • Step 1: Write the failing tests

기존 ai_trade/tests/test_chronos_predictor.py 끝에 추가 (파일이 없거나 비어있으면 신규):

import numpy as np
import pytest
import torch


@pytest.fixture
def fake_pipeline():
    """predict_quantiles만 stub하는 가짜 pipeline."""
    class FakePipeline:
        def __init__(self, q10_price, q50_price, q90_price):
            self._q10, self._q50, self._q90 = q10_price, q50_price, q90_price
        def predict_quantiles(self, contexts, prediction_length, quantile_levels):
            n = len(contexts)
            tensor = torch.tensor(
                [[[self._q10, self._q50, self._q90]]] * n,
                dtype=torch.float32,
            )
            return tensor, None
    return FakePipeline


def _make_predictor_with(pipeline_obj):
    """ChronosPredictor 인스턴스 (실제 모델 안 부르고 pipeline만 주입)."""
    from ai_trade.chronos_predictor import ChronosPredictor
    p = ChronosPredictor.__new__(ChronosPredictor)
    p._pipeline = pipeline_obj
    p._device = "cpu"
    return p


def test_confidence_high_when_spread_near_zero(fake_pipeline):
    """median≈0, spread≈0 (q10=q90=last_close)일 때 conf≈1 (F4)."""
    last_close = 100000.0
    p = _make_predictor_with(fake_pipeline(last_close, last_close, last_close))
    ohlcv = {"A": [{"close": last_close}] * 30}
    out = p.predict_batch(ohlcv)
    assert out["A"].conf > 0.95, (
        f"median≈0 + spread≈0인데 conf={out['A'].conf} (F4 회귀: relative spread로 폭증)"
    )


def test_confidence_drops_with_spread(fake_pipeline):
    """spread 0.3일 때 conf≈0.5 (1 - 0.3/0.6 = 0.5)."""
    last_close = 100000.0
    # q10=85000 → -0.15, q90=115000 → 0.15, spread=0.30
    p = _make_predictor_with(fake_pipeline(85000.0, 100000.0, 115000.0))
    ohlcv = {"A": [{"close": last_close}] * 30}
    out = p.predict_batch(ohlcv)
    # 1 - 0.30/0.60 = 0.50
    assert 0.45 < out["A"].conf < 0.55, (
        f"absolute spread 0.30에서 conf={out['A'].conf} (expected ≈0.5)"
    )


def test_confidence_zero_at_threshold_spread(fake_pipeline):
    """spread가 threshold(0.6) 이상이면 conf=0."""
    last_close = 100000.0
    # q10=70000 → -0.30, q90=130000 → 0.30, spread=0.60
    p = _make_predictor_with(fake_pipeline(70000.0, 100000.0, 130000.0))
    ohlcv = {"A": [{"close": last_close}] * 30}
    out = p.predict_batch(ohlcv)
    assert out["A"].conf < 0.05, (
        f"spread=threshold에서 conf={out['A'].conf} (expected ≈0)"
    )
  • Step 2: Run tests to verify they fail
python -m pytest ai_trade/tests/test_chronos_predictor.py -v -k confidence

Expected: test_confidence_high_when_spread_near_zero 가 FAIL — 현행 relative spread 산식 때문에 median≈0에서 conf가 0으로 폭락.

  • Step 3: Fix chronos_predictor.py

ai_trade/chronos_predictor.py 상단에 상수 추가 (L13 근처):

_SPREAD_THRESHOLD = 0.6  # F4: signal_generator hard gate와 동일 (absolute return spread)

L106 (modern API 경로) 변경:

            # shape: [num_series, prediction_length, 3]
            for i, ticker in enumerate(tickers):
                q10_price, q50_price, q90_price = quantiles_np[i, 0, :]
                last_close = daily_ohlcv_dict[ticker][-1]["close"]
                median = float((q50_price - last_close) / last_close)
                q10 = float((q10_price - last_close) / last_close)
                q90 = float((q90_price - last_close) / last_close)
                # F4: absolute spread (q90-q10) 기반 — signal_generator hard gate와 통일.
                # median≈0 zero-shot 케이스에서 conf가 0으로 폭락하던 relative 산식 제거.
                spread = q90 - q10
                conf = float(max(0.0, min(1.0, 1.0 - spread / _SPREAD_THRESHOLD)))
                results[ticker] = ChronosPrediction(
                    median=median, q10=q10, q90=q90, conf=conf, as_of=now_iso,
                )
            return results

L127 (legacy API 경로) 동일하게 변경:

            spread = q90 - q10
            conf = float(max(0.0, min(1.0, 1.0 - spread / _SPREAD_THRESHOLD)))
  • Step 4: Run tests to verify they pass
python -m pytest ai_trade/tests/test_chronos_predictor.py -v

Expected: 신규 3개 모두 PASS. 기존 test도 PASS.

  • Step 5: Run full ai_trade suite
python -m pytest ai_trade/tests -q

Expected: 모두 PASS. signal_generator 테스트(_compute_buy_confidencepred["conf"] 사용) 도 영향 받을 수 있으니 주시.

  • Step 6: Commit
git add ai_trade/chronos_predictor.py ai_trade/tests/test_chronos_predictor.py
git commit -m "fix(ai_trade): Chronos confidence를 absolute spread 기반으로 통일 (F4)"

Task 5: 전체 회귀 확인 + push

  • Step 1: Run full ai_trade suite + count
python -m pytest ai_trade/tests -v

Expected:

  • 기존 56 tests + 신규 (config 2 + kis_client 1 + scheduler 5 + pull_worker 1 + chronos_predictor 3) = 68 tests 정도 PASS.

  • Step 2: Quick sanity — server boot smoke test (시간 허용 시)

cd ai_trade && python -c "from main import app; print('app import OK')"

Expected: no import errors.

  • Step 3: Push
git push origin main

Self-Review Checklist

이 plan을 다 작성한 뒤 다음을 확인:

  1. F1: config.py default + 2 test (default + env override)
  2. F2: _throttle_lock 추가 + 1 concurrent test
  3. F3: _is_post_close_trigger(now, last_post_close_date) 시그니처 변경 + poll_loop 상태 추적 + 5 scheduler test + 1 pull_worker test
  4. F4: _SPREAD_THRESHOLD=0.6 상수 + 두 분기(modern + legacy) 모두 absolute spread 적용 + 3 chronos_predictor test

누락 가능 항목:

  • test_main.pyv1_token_path default를 직접 검증한다면 Task 1에서 같이 갱신. 위 patch는 Settings 객체 통해서만 다루므로 영향 없음(검증 완료).
  • Task 3 pull_worker test의 FrozenDateTime.nowdatetime.now(KST) 호출만 stub함. 다른 datetime 사용 부분 영향 없음 (verified L28).
  • Task 4 test는 ChronosPredictor __new__로 우회 — 실제 HuggingFace 모델 로딩 안 함.

Execution Handoff

Plan complete and saved to docs/superpowers/plans/2026-05-25-ai-trade-hotfix.md.

두 가지 실행 옵션:

1. Subagent-Driven (recommended) — task 별 fresh subagent dispatch + two-stage review. F2/F3 같이 미묘한 동시성/상태 변경에 유리.

2. Inline Execution — 현 세션에서 직접 task별 진행 + checkpoint.

박재오 결정 대기.