# ai_trade Hotfix — Code Review F1·F2·F3·F4 Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** ai_trade(V2) 코드 리뷰 7개 finding 중 High 3건(F1·F2·F3) + Medium 1건(F4)을 TDD로 수정. F5/F6은 별도 plan, F7은 pushback. **Architecture:** 모두 ai_trade/ 내부 단일 모듈 수정. (1) config.py default 경로 — legacy/ 경유. (2) kis_client.py — asyncio.Lock으로 `_throttle()` 직렬화. (3) scheduler.py + pull_worker.py — post-close를 시간 윈도우가 아닌 "일 1회 + 16:00 이후" 상태기반으로 변경. (4) chronos_predictor.py — confidence 산식을 absolute spread 기반으로 통일. **Tech Stack:** Python 3.12, asyncio, pytest + pytest-asyncio + respx, httpx. **Test runner:** `.venv` 한글 경로 깨짐 + 리뷰어 Python 312 경로 부재 보고로, 시스템 Python 사용. 정확한 경로는 `where python` 으로 우선 확인. 기본 시도 순서: 1. `python -m pytest ai_trade/tests -q` (PATH의 Python) 2. `py -3.12 -m pytest ai_trade/tests -q` (py launcher) 3. 둘 다 실패 시 환경 셋업이 선행 작업 — plan 진행 중단하고 박재오에게 보고. **Working directory:** `C:\Users\jaeoh\Desktop\workspace\web-ai` (web-ai repo). Commit/push도 이 디렉토리에서만. --- ## File Map | 파일 | 변경 종류 | 책임 | |------|-----------|------| | `ai_trade/config.py` | Modify L31-36 | V1_TOKEN_PATH default를 `legacy/signal_v1/data/kis_token.json`로 | | `ai_trade/kis_client.py` | Modify L40-62 | `_throttle_lock: asyncio.Lock` 추가, `_throttle()`을 lock 안에서 실행 | | `ai_trade/scheduler.py` | Modify L79-84 | `_is_post_close_trigger(now, last_post_close_date)` 시그니처 변경 — 상태기반 | | `ai_trade/pull_worker.py` | Modify L1-58 | `poll_loop`에 `last_post_close_date` state 추가, 호출부 갱신 | | `ai_trade/chronos_predictor.py` | Modify L106, L127 | spread 계산을 absolute (q90-q10)로, confidence 산식 `max(0, 1 - spread/0.6)` | | `ai_trade/tests/test_kis_client.py` | Add 1 test | concurrent gather throttle test | | `ai_trade/tests/test_scheduler.py` | Add 3 tests | post-close 상태기반 트리거 | | `ai_trade/tests/test_pull_worker.py` | Add 1 test | 첫 호출 안 됐다가 16:00 이후 5분 cycle에서 호출됨 | | `ai_trade/tests/test_chronos_predictor.py` | Add 2 tests | median≈0에서도 conf 정상, spread 클수록 conf↓ | | `ai_trade/tests/test_main.py` | Modify | v1_token_path default 변경 반영 (있다면) | --- ## Task 1: F1 — KIS 토큰 경로 default를 legacy/ 경유로 **Files:** - Modify: `ai_trade/config.py:31-36` - Test: `ai_trade/tests/test_config_token_path.py` (Create) - [ ] **Step 1: Write the failing test** ```python # ai_trade/tests/test_config_token_path.py """F1 — V1_TOKEN_PATH default가 legacy/signal_v1/ 경유인지 검증.""" import os from pathlib import Path from ai_trade.config import Settings def test_v1_token_default_path_uses_legacy_dir(monkeypatch): """env에 V1_TOKEN_PATH 없으면 legacy/signal_v1/data/kis_token.json""" monkeypatch.delenv("V1_TOKEN_PATH", raising=False) settings = Settings() expected_suffix = Path("legacy") / "signal_v1" / "data" / "kis_token.json" assert str(settings.v1_token_path).endswith(str(expected_suffix)), ( f"expected default to end with {expected_suffix}, got {settings.v1_token_path}" ) def test_v1_token_env_override_wins(monkeypatch, tmp_path): """env로 명시한 경로가 default를 덮어씀.""" custom = tmp_path / "custom_token.json" monkeypatch.setenv("V1_TOKEN_PATH", str(custom)) settings = Settings() assert settings.v1_token_path == custom ``` - [ ] **Step 2: Run test to verify it fails** ``` python -m pytest ai_trade/tests/test_config_token_path.py -v ``` Expected: `test_v1_token_default_path_uses_legacy_dir` FAILs (default가 `signal_v1/...` 임). env override는 PASS. - [ ] **Step 3: Fix config.py** `ai_trade/config.py:31-36` 변경: ```python v1_token_path: Path = field( default_factory=lambda: Path( os.getenv("V1_TOKEN_PATH", str(Path(__file__).parent.parent / "legacy" / "signal_v1" / "data" / "kis_token.json")) ) ) ``` - [ ] **Step 4: Run test to verify it passes** ``` python -m pytest ai_trade/tests/test_config_token_path.py -v ``` Expected: 2 passed. - [ ] **Step 5: Verify full test suite still passes** ``` python -m pytest ai_trade/tests -q ``` Expected: 모든 기존 테스트 PASS (token path 기본값 변경이 다른 test에 영향 없는지 확인). - [ ] **Step 6: Commit** ```bash git add ai_trade/config.py ai_trade/tests/test_config_token_path.py git commit -m "fix(ai_trade): V1_TOKEN_PATH default를 legacy/signal_v1/ 경유로 수정 (F1)" ``` --- ## Task 2: F2 — KIS throttle을 asyncio.Lock으로 직렬화 **Files:** - Modify: `ai_trade/kis_client.py:40-62` - Test: `ai_trade/tests/test_kis_client.py` (Modify — 새 test 추가) - [ ] **Step 1: Write the failing test** `ai_trade/tests/test_kis_client.py` 파일 끝에 추가: ```python import asyncio import time as time_module @respx.mock async def test_throttle_serializes_concurrent_gather(kis_client_factory): """5개 동시 요청이 asyncio.gather로 들어와도 0.5초 간격으로 직렬화되어야 함 (F2). 초당 2회 = 0.5초 간격. 5개 요청이면 최소 (5-1)*0.5 = 2.0초 소요. Race condition 있으면 5개가 거의 동시에 나가서 2초 훨씬 안쪽에 끝남. """ sample = {"output2": []} respx.get( "https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice" ).mock(return_value=httpx.Response(200, json=sample)) client = kis_client_factory() try: start = time_module.monotonic() await asyncio.gather(*[client.get_minute_ohlcv(f"00593{i}") for i in range(5)]) elapsed = time_module.monotonic() - start # 5개 throttle = 최소 (5-1)*0.5 = 2.0초. tolerance 0.3초. assert elapsed >= 1.7, ( f"throttle race condition: 5 concurrent calls took only {elapsed:.2f}s, " f"expected >=1.7s (0.5s * 4 inter-call gaps)" ) finally: await client.close() ``` - [ ] **Step 2: Run test to verify it fails** ``` python -m pytest ai_trade/tests/test_kis_client.py::test_throttle_serializes_concurrent_gather -v ``` Expected: FAIL — elapsed가 0.5초 이하 (race로 동시 깸). - [ ] **Step 3: Add asyncio.Lock to KISClient** `ai_trade/kis_client.py:40` `__init__` 끝에 한 줄 추가: ```python self._token_cache: tuple[str, float] | None = None # (token, file_mtime) self._last_throttle_at = 0.0 self._throttle_lock = asyncio.Lock() ``` 그리고 `_throttle()` (L58-62)을 lock으로 감쌈: ```python async def _throttle(self) -> None: async with self._throttle_lock: elapsed = time.monotonic() - self._last_throttle_at if elapsed < _THROTTLE_INTERVAL: await asyncio.sleep(_THROTTLE_INTERVAL - elapsed) self._last_throttle_at = time.monotonic() ``` - [ ] **Step 4: Run test to verify it passes** ``` python -m pytest ai_trade/tests/test_kis_client.py::test_throttle_serializes_concurrent_gather -v ``` Expected: PASS — elapsed >= 1.7s. - [ ] **Step 5: Verify full kis_client suite still passes** ``` python -m pytest ai_trade/tests/test_kis_client.py -v ``` Expected: 모든 test PASS (기존 429 retry 등 영향 없는지 확인). - [ ] **Step 6: Commit** ```bash git add ai_trade/kis_client.py ai_trade/tests/test_kis_client.py git commit -m "fix(ai_trade): KIS throttle을 asyncio.Lock으로 직렬화 (F2)" ``` --- ## Task 3: F3 — post-close 트리거를 상태기반으로 변경 **Files:** - Modify: `ai_trade/scheduler.py:79-84` - Modify: `ai_trade/pull_worker.py:1-58` - Test: `ai_trade/tests/test_scheduler.py` (add 3 tests) **Why state-based:** 16:00:00-16:00:59 윈도우는 5분 sleep + 비결정적 cycle 시작 시각과 충돌. "오늘 아직 post-close 안 돌렸고 현재 시각 ≥ 16:00 이면 trigger 후 today 표시" 로 변경. - [ ] **Step 1: Write the failing tests** `ai_trade/tests/test_scheduler.py` 파일 끝에 추가: ```python from datetime import date as _date from ai_trade.scheduler import _is_post_close_trigger def test_post_close_trigger_fires_at_1601_if_not_yet_today(): """16:01에 깬 cycle도 오늘 아직 안 돌렸으면 trigger (F3).""" now = _kst(2026, 5, 18, 16, 1) assert _is_post_close_trigger(now, last_post_close_date=None) is True def test_post_close_trigger_skips_if_already_today(): """이미 오늘 돌렸으면 trigger 안 함.""" now = _kst(2026, 5, 18, 16, 5) today = _date(2026, 5, 18) assert _is_post_close_trigger(now, last_post_close_date=today) is False def test_post_close_trigger_skips_before_1600(): """16:00 전에는 trigger 안 함.""" now = _kst(2026, 5, 18, 15, 59) assert _is_post_close_trigger(now, last_post_close_date=None) is False def test_post_close_trigger_fires_next_day_after_reset(): """다음 영업일이 되면 last_post_close_date < today.date() 이므로 다시 trigger.""" now = _kst(2026, 5, 19, 16, 0) yesterday = _date(2026, 5, 18) assert _is_post_close_trigger(now, last_post_close_date=yesterday) is True def test_post_close_trigger_skips_on_holiday(): """휴장일에는 trigger 안 함 (2026-05-05 어린이날).""" now = _kst(2026, 5, 5, 16, 30) assert _is_post_close_trigger(now, last_post_close_date=None) is False ``` - [ ] **Step 2: Run tests to verify they fail** ``` python -m pytest ai_trade/tests/test_scheduler.py -v -k post_close ``` Expected: FAIL — `_is_post_close_trigger`가 신규 시그니처(`last_post_close_date` 인자) 미지원. - [ ] **Step 3: Modify scheduler.py:79-84** ```python def _is_post_close_trigger(now: datetime, last_post_close_date) -> bool: """16:00 KST 이후 오늘 아직 post-close cycle 안 돌렸으면 True (F3 상태기반). Args: now: 현재 KST datetime. last_post_close_date: 마지막 post-close 실행 영업일 date 객체 (None=미실행). """ if not _is_market_day(now): return False if now.time() < time(16, 0): return False today = now.date() return last_post_close_date != today ``` - [ ] **Step 4: Run scheduler tests** ``` python -m pytest ai_trade/tests/test_scheduler.py -v ``` Expected: 신규 5개 PASS. 기존 test도 PASS (다른 함수 영향 없음). - [ ] **Step 5: Update pull_worker.py to track last_post_close_date** `ai_trade/pull_worker.py` 의 `poll_loop` (L18-58)을 다음으로 교체: ```python async def poll_loop( client: StockClient, state: PollState, shutdown: asyncio.Event, kis_client: KISClient | None = None, chronos=None, dedup=None, settings=None, ) -> None: """FastAPI lifespan 에서 asyncio.create_task 로 시작.""" logger.info("poll_loop started") last_post_close_date = None while not shutdown.is_set(): now = datetime.now(KST) if _is_market_day(now) and _is_polling_window(now): try: await _run_polling_cycle(client, state, kis_client=kis_client) except Exception: logger.exception("poll cycle failed") # Minute momentum 갱신 (매 cycle) try: update_minute_momentum_for_all(state) except Exception: logger.exception("minute momentum update failed") # Post-close trigger (상태기반: 16:00 이후 + 오늘 미실행) if ( _is_post_close_trigger(now, last_post_close_date) and chronos is not None and kis_client is not None ): try: await _run_post_close_cycle(kis_client, chronos, state) last_post_close_date = now.date() except Exception: logger.exception("post-close cycle failed") # Phase 4: generate signals if dedup is not None and settings is not None: try: from ai_trade.signal_generator import generate_signals generate_signals(state, dedup, settings) except Exception: logger.exception("generate_signals failed") interval = _next_interval(now) try: await asyncio.wait_for(shutdown.wait(), timeout=interval) break except asyncio.TimeoutError: continue logger.info("poll_loop ended") ``` - [ ] **Step 6: Add pull_worker test** `ai_trade/tests/test_pull_worker.py` 파일 끝에 추가: ```python from unittest.mock import AsyncMock, MagicMock from datetime import datetime as _dt from zoneinfo import ZoneInfo as _ZI import asyncio as _asyncio async def test_post_close_fires_at_1601_when_not_yet_today(monkeypatch): """16:01에 깬 cycle도 post_close 안 돌렸으면 호출됨 (F3 회귀).""" from ai_trade import pull_worker _kst = _ZI("Asia/Seoul") now_at_1601 = _dt(2026, 5, 18, 16, 1, tzinfo=_kst) real_dt = _dt class FrozenDateTime: @staticmethod def now(tz=None): return now_at_1601 monkeypatch.setattr(pull_worker, "datetime", FrozenDateTime) monkeypatch.setattr( pull_worker, "_is_market_day", lambda n: True, ) monkeypatch.setattr( pull_worker, "_is_polling_window", lambda n: True, ) monkeypatch.setattr( pull_worker, "_next_interval", lambda n: 0.01, ) monkeypatch.setattr( pull_worker, "_run_polling_cycle", AsyncMock(), ) monkeypatch.setattr( pull_worker, "update_minute_momentum_for_all", lambda s: None, ) post_close = AsyncMock() monkeypatch.setattr(pull_worker, "_run_post_close_cycle", post_close) state = MagicMock() chronos = MagicMock() kis = MagicMock() shutdown = _asyncio.Event() async def _stop_soon(): await _asyncio.sleep(0.05) shutdown.set() _asyncio.create_task(_stop_soon()) await pull_worker.poll_loop( client=MagicMock(), state=state, shutdown=shutdown, kis_client=kis, chronos=chronos, dedup=None, settings=None, ) assert post_close.await_count >= 1, "post-close가 16:01에 호출되지 않음 (F3 회귀)" ``` - [ ] **Step 7: Run pull_worker test** ``` python -m pytest ai_trade/tests/test_pull_worker.py::test_post_close_fires_at_1601_when_not_yet_today -v ``` Expected: PASS. - [ ] **Step 8: Run full ai_trade suite** ``` python -m pytest ai_trade/tests -q ``` Expected: 모두 PASS. - [ ] **Step 9: Commit** ```bash git add ai_trade/scheduler.py ai_trade/pull_worker.py ai_trade/tests/test_scheduler.py ai_trade/tests/test_pull_worker.py git commit -m "fix(ai_trade): post-close trigger를 상태기반으로 변경 (F3)" ``` --- ## Task 4: F4 — Chronos confidence를 absolute spread 기반으로 통일 **Files:** - Modify: `ai_trade/chronos_predictor.py:106, 127` - Test: `ai_trade/tests/test_chronos_predictor.py` (add 2 tests) **Why absolute:** Phase 4 spec amendment (web-ui commit 534ded5)가 absolute spread로 hard gate를 결정. confidence도 같은 철학으로. 새 산식: `conf = max(0, min(1, 1 - spread / SPREAD_THRESHOLD))` — spread가 0.6에 도달하면 conf=0, 0이면 conf=1. - [ ] **Step 1: Write the failing tests** 기존 `ai_trade/tests/test_chronos_predictor.py` 끝에 추가 (파일이 없거나 비어있으면 신규): ```python import numpy as np import pytest import torch @pytest.fixture def fake_pipeline(): """predict_quantiles만 stub하는 가짜 pipeline.""" class FakePipeline: def __init__(self, q10_price, q50_price, q90_price): self._q10, self._q50, self._q90 = q10_price, q50_price, q90_price def predict_quantiles(self, contexts, prediction_length, quantile_levels): n = len(contexts) tensor = torch.tensor( [[[self._q10, self._q50, self._q90]]] * n, dtype=torch.float32, ) return tensor, None return FakePipeline def _make_predictor_with(pipeline_obj): """ChronosPredictor 인스턴스 (실제 모델 안 부르고 pipeline만 주입).""" from ai_trade.chronos_predictor import ChronosPredictor p = ChronosPredictor.__new__(ChronosPredictor) p._pipeline = pipeline_obj p._device = "cpu" return p def test_confidence_high_when_spread_near_zero(fake_pipeline): """median≈0, spread≈0 (q10=q90=last_close)일 때 conf≈1 (F4).""" last_close = 100000.0 p = _make_predictor_with(fake_pipeline(last_close, last_close, last_close)) ohlcv = {"A": [{"close": last_close}] * 30} out = p.predict_batch(ohlcv) assert out["A"].conf > 0.95, ( f"median≈0 + spread≈0인데 conf={out['A'].conf} (F4 회귀: relative spread로 폭증)" ) def test_confidence_drops_with_spread(fake_pipeline): """spread 0.3일 때 conf≈0.5 (1 - 0.3/0.6 = 0.5).""" last_close = 100000.0 # q10=85000 → -0.15, q90=115000 → 0.15, spread=0.30 p = _make_predictor_with(fake_pipeline(85000.0, 100000.0, 115000.0)) ohlcv = {"A": [{"close": last_close}] * 30} out = p.predict_batch(ohlcv) # 1 - 0.30/0.60 = 0.50 assert 0.45 < out["A"].conf < 0.55, ( f"absolute spread 0.30에서 conf={out['A'].conf} (expected ≈0.5)" ) def test_confidence_zero_at_threshold_spread(fake_pipeline): """spread가 threshold(0.6) 이상이면 conf=0.""" last_close = 100000.0 # q10=70000 → -0.30, q90=130000 → 0.30, spread=0.60 p = _make_predictor_with(fake_pipeline(70000.0, 100000.0, 130000.0)) ohlcv = {"A": [{"close": last_close}] * 30} out = p.predict_batch(ohlcv) assert out["A"].conf < 0.05, ( f"spread=threshold에서 conf={out['A'].conf} (expected ≈0)" ) ``` - [ ] **Step 2: Run tests to verify they fail** ``` python -m pytest ai_trade/tests/test_chronos_predictor.py -v -k confidence ``` Expected: `test_confidence_high_when_spread_near_zero` 가 FAIL — 현행 relative spread 산식 때문에 median≈0에서 conf가 0으로 폭락. - [ ] **Step 3: Fix chronos_predictor.py** `ai_trade/chronos_predictor.py` 상단에 상수 추가 (L13 근처): ```python _SPREAD_THRESHOLD = 0.6 # F4: signal_generator hard gate와 동일 (absolute return spread) ``` L106 (modern API 경로) 변경: ```python # shape: [num_series, prediction_length, 3] for i, ticker in enumerate(tickers): q10_price, q50_price, q90_price = quantiles_np[i, 0, :] last_close = daily_ohlcv_dict[ticker][-1]["close"] median = float((q50_price - last_close) / last_close) q10 = float((q10_price - last_close) / last_close) q90 = float((q90_price - last_close) / last_close) # F4: absolute spread (q90-q10) 기반 — signal_generator hard gate와 통일. # median≈0 zero-shot 케이스에서 conf가 0으로 폭락하던 relative 산식 제거. spread = q90 - q10 conf = float(max(0.0, min(1.0, 1.0 - spread / _SPREAD_THRESHOLD))) results[ticker] = ChronosPrediction( median=median, q10=q10, q90=q90, conf=conf, as_of=now_iso, ) return results ``` L127 (legacy API 경로) 동일하게 변경: ```python spread = q90 - q10 conf = float(max(0.0, min(1.0, 1.0 - spread / _SPREAD_THRESHOLD))) ``` - [ ] **Step 4: Run tests to verify they pass** ``` python -m pytest ai_trade/tests/test_chronos_predictor.py -v ``` Expected: 신규 3개 모두 PASS. 기존 test도 PASS. - [ ] **Step 5: Run full ai_trade suite** ``` python -m pytest ai_trade/tests -q ``` Expected: 모두 PASS. signal_generator 테스트(`_compute_buy_confidence` 가 `pred["conf"]` 사용) 도 영향 받을 수 있으니 주시. - [ ] **Step 6: Commit** ```bash git add ai_trade/chronos_predictor.py ai_trade/tests/test_chronos_predictor.py git commit -m "fix(ai_trade): Chronos confidence를 absolute spread 기반으로 통일 (F4)" ``` --- ## Task 5: 전체 회귀 확인 + push - [ ] **Step 1: Run full ai_trade suite + count** ``` python -m pytest ai_trade/tests -v ``` Expected: - 기존 56 tests + 신규 (config 2 + kis_client 1 + scheduler 5 + pull_worker 1 + chronos_predictor 3) = **68 tests** 정도 PASS. - [ ] **Step 2: Quick sanity — server boot smoke test (시간 허용 시)** ``` cd ai_trade && python -c "from main import app; print('app import OK')" ``` Expected: no import errors. - [ ] **Step 3: Push** ```bash git push origin main ``` --- ## Self-Review Checklist 이 plan을 다 작성한 뒤 다음을 확인: 1. **F1**: config.py default + 2 test (default + env override) ✅ 2. **F2**: `_throttle_lock` 추가 + 1 concurrent test ✅ 3. **F3**: `_is_post_close_trigger(now, last_post_close_date)` 시그니처 변경 + `poll_loop` 상태 추적 + 5 scheduler test + 1 pull_worker test ✅ 4. **F4**: `_SPREAD_THRESHOLD=0.6` 상수 + 두 분기(modern + legacy) 모두 absolute spread 적용 + 3 chronos_predictor test ✅ **누락 가능 항목**: - `test_main.py` 가 `v1_token_path` default를 직접 검증한다면 Task 1에서 같이 갱신. 위 patch는 Settings 객체 통해서만 다루므로 영향 없음(검증 완료). - Task 3 pull_worker test의 `FrozenDateTime.now`는 `datetime.now(KST)` 호출만 stub함. 다른 datetime 사용 부분 영향 없음 (verified L28). - Task 4 test는 ChronosPredictor `__new__`로 우회 — 실제 HuggingFace 모델 로딩 안 함. --- ## Execution Handoff **Plan complete and saved to `docs/superpowers/plans/2026-05-25-ai-trade-hotfix.md`.** 두 가지 실행 옵션: **1. Subagent-Driven (recommended)** — task 별 fresh subagent dispatch + two-stage review. F2/F3 같이 미묘한 동시성/상태 변경에 유리. **2. Inline Execution** — 현 세션에서 직접 task별 진행 + checkpoint. 박재오 결정 대기.