Files
web-page/docs/superpowers/plans/2026-05-25-ai-trade-hotfix.md

643 lines
22 KiB
Markdown

# ai_trade Hotfix — Code Review F1·F2·F3·F4 Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** ai_trade(V2) 코드 리뷰 7개 finding 중 High 3건(F1·F2·F3) + Medium 1건(F4)을 TDD로 수정. F5/F6은 별도 plan, F7은 pushback.
**Architecture:** 모두 ai_trade/ 내부 단일 모듈 수정. (1) config.py default 경로 — legacy/ 경유. (2) kis_client.py — asyncio.Lock으로 `_throttle()` 직렬화. (3) scheduler.py + pull_worker.py — post-close를 시간 윈도우가 아닌 "일 1회 + 16:00 이후" 상태기반으로 변경. (4) chronos_predictor.py — confidence 산식을 absolute spread 기반으로 통일.
**Tech Stack:** Python 3.12, asyncio, pytest + pytest-asyncio + respx, httpx.
**Test runner:** `.venv` 한글 경로 깨짐 + 리뷰어 Python 312 경로 부재 보고로, 시스템 Python 사용. 정확한 경로는 `where python` 으로 우선 확인. 기본 시도 순서:
1. `python -m pytest ai_trade/tests -q` (PATH의 Python)
2. `py -3.12 -m pytest ai_trade/tests -q` (py launcher)
3. 둘 다 실패 시 환경 셋업이 선행 작업 — plan 진행 중단하고 박재오에게 보고.
**Working directory:** `C:\Users\jaeoh\Desktop\workspace\web-ai` (web-ai repo). Commit/push도 이 디렉토리에서만.
---
## File Map
| 파일 | 변경 종류 | 책임 |
|------|-----------|------|
| `ai_trade/config.py` | Modify L31-36 | V1_TOKEN_PATH default를 `legacy/signal_v1/data/kis_token.json`로 |
| `ai_trade/kis_client.py` | Modify L40-62 | `_throttle_lock: asyncio.Lock` 추가, `_throttle()`을 lock 안에서 실행 |
| `ai_trade/scheduler.py` | Modify L79-84 | `_is_post_close_trigger(now, last_post_close_date)` 시그니처 변경 — 상태기반 |
| `ai_trade/pull_worker.py` | Modify L1-58 | `poll_loop``last_post_close_date` state 추가, 호출부 갱신 |
| `ai_trade/chronos_predictor.py` | Modify L106, L127 | spread 계산을 absolute (q90-q10)로, confidence 산식 `max(0, 1 - spread/0.6)` |
| `ai_trade/tests/test_kis_client.py` | Add 1 test | concurrent gather throttle test |
| `ai_trade/tests/test_scheduler.py` | Add 3 tests | post-close 상태기반 트리거 |
| `ai_trade/tests/test_pull_worker.py` | Add 1 test | 첫 호출 안 됐다가 16:00 이후 5분 cycle에서 호출됨 |
| `ai_trade/tests/test_chronos_predictor.py` | Add 2 tests | median≈0에서도 conf 정상, spread 클수록 conf↓ |
| `ai_trade/tests/test_main.py` | Modify | v1_token_path default 변경 반영 (있다면) |
---
## Task 1: F1 — KIS 토큰 경로 default를 legacy/ 경유로
**Files:**
- Modify: `ai_trade/config.py:31-36`
- Test: `ai_trade/tests/test_config_token_path.py` (Create)
- [ ] **Step 1: Write the failing test**
```python
# ai_trade/tests/test_config_token_path.py
"""F1 — V1_TOKEN_PATH default가 legacy/signal_v1/ 경유인지 검증."""
import os
from pathlib import Path
from ai_trade.config import Settings
def test_v1_token_default_path_uses_legacy_dir(monkeypatch):
"""env에 V1_TOKEN_PATH 없으면 legacy/signal_v1/data/kis_token.json"""
monkeypatch.delenv("V1_TOKEN_PATH", raising=False)
settings = Settings()
expected_suffix = Path("legacy") / "signal_v1" / "data" / "kis_token.json"
assert str(settings.v1_token_path).endswith(str(expected_suffix)), (
f"expected default to end with {expected_suffix}, got {settings.v1_token_path}"
)
def test_v1_token_env_override_wins(monkeypatch, tmp_path):
"""env로 명시한 경로가 default를 덮어씀."""
custom = tmp_path / "custom_token.json"
monkeypatch.setenv("V1_TOKEN_PATH", str(custom))
settings = Settings()
assert settings.v1_token_path == custom
```
- [ ] **Step 2: Run test to verify it fails**
```
python -m pytest ai_trade/tests/test_config_token_path.py -v
```
Expected: `test_v1_token_default_path_uses_legacy_dir` FAILs (default가 `signal_v1/...` 임). env override는 PASS.
- [ ] **Step 3: Fix config.py**
`ai_trade/config.py:31-36` 변경:
```python
v1_token_path: Path = field(
default_factory=lambda: Path(
os.getenv("V1_TOKEN_PATH",
str(Path(__file__).parent.parent / "legacy" / "signal_v1" / "data" / "kis_token.json"))
)
)
```
- [ ] **Step 4: Run test to verify it passes**
```
python -m pytest ai_trade/tests/test_config_token_path.py -v
```
Expected: 2 passed.
- [ ] **Step 5: Verify full test suite still passes**
```
python -m pytest ai_trade/tests -q
```
Expected: 모든 기존 테스트 PASS (token path 기본값 변경이 다른 test에 영향 없는지 확인).
- [ ] **Step 6: Commit**
```bash
git add ai_trade/config.py ai_trade/tests/test_config_token_path.py
git commit -m "fix(ai_trade): V1_TOKEN_PATH default를 legacy/signal_v1/ 경유로 수정 (F1)"
```
---
## Task 2: F2 — KIS throttle을 asyncio.Lock으로 직렬화
**Files:**
- Modify: `ai_trade/kis_client.py:40-62`
- Test: `ai_trade/tests/test_kis_client.py` (Modify — 새 test 추가)
- [ ] **Step 1: Write the failing test**
`ai_trade/tests/test_kis_client.py` 파일 끝에 추가:
```python
import asyncio
import time as time_module
@respx.mock
async def test_throttle_serializes_concurrent_gather(kis_client_factory):
"""5개 동시 요청이 asyncio.gather로 들어와도 0.5초 간격으로 직렬화되어야 함 (F2).
초당 2회 = 0.5초 간격. 5개 요청이면 최소 (5-1)*0.5 = 2.0초 소요.
Race condition 있으면 5개가 거의 동시에 나가서 2초 훨씬 안쪽에 끝남.
"""
sample = {"output2": []}
respx.get(
"https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
).mock(return_value=httpx.Response(200, json=sample))
client = kis_client_factory()
try:
start = time_module.monotonic()
await asyncio.gather(*[client.get_minute_ohlcv(f"00593{i}") for i in range(5)])
elapsed = time_module.monotonic() - start
# 5개 throttle = 최소 (5-1)*0.5 = 2.0초. tolerance 0.3초.
assert elapsed >= 1.7, (
f"throttle race condition: 5 concurrent calls took only {elapsed:.2f}s, "
f"expected >=1.7s (0.5s * 4 inter-call gaps)"
)
finally:
await client.close()
```
- [ ] **Step 2: Run test to verify it fails**
```
python -m pytest ai_trade/tests/test_kis_client.py::test_throttle_serializes_concurrent_gather -v
```
Expected: FAIL — elapsed가 0.5초 이하 (race로 동시 깸).
- [ ] **Step 3: Add asyncio.Lock to KISClient**
`ai_trade/kis_client.py:40` `__init__` 끝에 한 줄 추가:
```python
self._token_cache: tuple[str, float] | None = None # (token, file_mtime)
self._last_throttle_at = 0.0
self._throttle_lock = asyncio.Lock()
```
그리고 `_throttle()` (L58-62)을 lock으로 감쌈:
```python
async def _throttle(self) -> None:
async with self._throttle_lock:
elapsed = time.monotonic() - self._last_throttle_at
if elapsed < _THROTTLE_INTERVAL:
await asyncio.sleep(_THROTTLE_INTERVAL - elapsed)
self._last_throttle_at = time.monotonic()
```
- [ ] **Step 4: Run test to verify it passes**
```
python -m pytest ai_trade/tests/test_kis_client.py::test_throttle_serializes_concurrent_gather -v
```
Expected: PASS — elapsed >= 1.7s.
- [ ] **Step 5: Verify full kis_client suite still passes**
```
python -m pytest ai_trade/tests/test_kis_client.py -v
```
Expected: 모든 test PASS (기존 429 retry 등 영향 없는지 확인).
- [ ] **Step 6: Commit**
```bash
git add ai_trade/kis_client.py ai_trade/tests/test_kis_client.py
git commit -m "fix(ai_trade): KIS throttle을 asyncio.Lock으로 직렬화 (F2)"
```
---
## Task 3: F3 — post-close 트리거를 상태기반으로 변경
**Files:**
- Modify: `ai_trade/scheduler.py:79-84`
- Modify: `ai_trade/pull_worker.py:1-58`
- Test: `ai_trade/tests/test_scheduler.py` (add 3 tests)
**Why state-based:** 16:00:00-16:00:59 윈도우는 5분 sleep + 비결정적 cycle 시작 시각과 충돌. "오늘 아직 post-close 안 돌렸고 현재 시각 ≥ 16:00 이면 trigger 후 today 표시" 로 변경.
- [ ] **Step 1: Write the failing tests**
`ai_trade/tests/test_scheduler.py` 파일 끝에 추가:
```python
from datetime import date as _date
from ai_trade.scheduler import _is_post_close_trigger
def test_post_close_trigger_fires_at_1601_if_not_yet_today():
"""16:01에 깬 cycle도 오늘 아직 안 돌렸으면 trigger (F3)."""
now = _kst(2026, 5, 18, 16, 1)
assert _is_post_close_trigger(now, last_post_close_date=None) is True
def test_post_close_trigger_skips_if_already_today():
"""이미 오늘 돌렸으면 trigger 안 함."""
now = _kst(2026, 5, 18, 16, 5)
today = _date(2026, 5, 18)
assert _is_post_close_trigger(now, last_post_close_date=today) is False
def test_post_close_trigger_skips_before_1600():
"""16:00 전에는 trigger 안 함."""
now = _kst(2026, 5, 18, 15, 59)
assert _is_post_close_trigger(now, last_post_close_date=None) is False
def test_post_close_trigger_fires_next_day_after_reset():
"""다음 영업일이 되면 last_post_close_date < today.date() 이므로 다시 trigger."""
now = _kst(2026, 5, 19, 16, 0)
yesterday = _date(2026, 5, 18)
assert _is_post_close_trigger(now, last_post_close_date=yesterday) is True
def test_post_close_trigger_skips_on_holiday():
"""휴장일에는 trigger 안 함 (2026-05-05 어린이날)."""
now = _kst(2026, 5, 5, 16, 30)
assert _is_post_close_trigger(now, last_post_close_date=None) is False
```
- [ ] **Step 2: Run tests to verify they fail**
```
python -m pytest ai_trade/tests/test_scheduler.py -v -k post_close
```
Expected: FAIL — `_is_post_close_trigger`가 신규 시그니처(`last_post_close_date` 인자) 미지원.
- [ ] **Step 3: Modify scheduler.py:79-84**
```python
def _is_post_close_trigger(now: datetime, last_post_close_date) -> bool:
"""16:00 KST 이후 오늘 아직 post-close cycle 안 돌렸으면 True (F3 상태기반).
Args:
now: 현재 KST datetime.
last_post_close_date: 마지막 post-close 실행 영업일 date 객체 (None=미실행).
"""
if not _is_market_day(now):
return False
if now.time() < time(16, 0):
return False
today = now.date()
return last_post_close_date != today
```
- [ ] **Step 4: Run scheduler tests**
```
python -m pytest ai_trade/tests/test_scheduler.py -v
```
Expected: 신규 5개 PASS. 기존 test도 PASS (다른 함수 영향 없음).
- [ ] **Step 5: Update pull_worker.py to track last_post_close_date**
`ai_trade/pull_worker.py``poll_loop` (L18-58)을 다음으로 교체:
```python
async def poll_loop(
client: StockClient, state: PollState, shutdown: asyncio.Event,
kis_client: KISClient | None = None,
chronos=None,
dedup=None,
settings=None,
) -> None:
"""FastAPI lifespan 에서 asyncio.create_task 로 시작."""
logger.info("poll_loop started")
last_post_close_date = None
while not shutdown.is_set():
now = datetime.now(KST)
if _is_market_day(now) and _is_polling_window(now):
try:
await _run_polling_cycle(client, state, kis_client=kis_client)
except Exception:
logger.exception("poll cycle failed")
# Minute momentum 갱신 (매 cycle)
try:
update_minute_momentum_for_all(state)
except Exception:
logger.exception("minute momentum update failed")
# Post-close trigger (상태기반: 16:00 이후 + 오늘 미실행)
if (
_is_post_close_trigger(now, last_post_close_date)
and chronos is not None and kis_client is not None
):
try:
await _run_post_close_cycle(kis_client, chronos, state)
last_post_close_date = now.date()
except Exception:
logger.exception("post-close cycle failed")
# Phase 4: generate signals
if dedup is not None and settings is not None:
try:
from ai_trade.signal_generator import generate_signals
generate_signals(state, dedup, settings)
except Exception:
logger.exception("generate_signals failed")
interval = _next_interval(now)
try:
await asyncio.wait_for(shutdown.wait(), timeout=interval)
break
except asyncio.TimeoutError:
continue
logger.info("poll_loop ended")
```
- [ ] **Step 6: Add pull_worker test**
`ai_trade/tests/test_pull_worker.py` 파일 끝에 추가:
```python
from unittest.mock import AsyncMock, MagicMock
from datetime import datetime as _dt
from zoneinfo import ZoneInfo as _ZI
import asyncio as _asyncio
async def test_post_close_fires_at_1601_when_not_yet_today(monkeypatch):
"""16:01에 깬 cycle도 post_close 안 돌렸으면 호출됨 (F3 회귀)."""
from ai_trade import pull_worker
_kst = _ZI("Asia/Seoul")
now_at_1601 = _dt(2026, 5, 18, 16, 1, tzinfo=_kst)
real_dt = _dt
class FrozenDateTime:
@staticmethod
def now(tz=None):
return now_at_1601
monkeypatch.setattr(pull_worker, "datetime", FrozenDateTime)
monkeypatch.setattr(
pull_worker, "_is_market_day", lambda n: True,
)
monkeypatch.setattr(
pull_worker, "_is_polling_window", lambda n: True,
)
monkeypatch.setattr(
pull_worker, "_next_interval", lambda n: 0.01,
)
monkeypatch.setattr(
pull_worker, "_run_polling_cycle", AsyncMock(),
)
monkeypatch.setattr(
pull_worker, "update_minute_momentum_for_all", lambda s: None,
)
post_close = AsyncMock()
monkeypatch.setattr(pull_worker, "_run_post_close_cycle", post_close)
state = MagicMock()
chronos = MagicMock()
kis = MagicMock()
shutdown = _asyncio.Event()
async def _stop_soon():
await _asyncio.sleep(0.05)
shutdown.set()
_asyncio.create_task(_stop_soon())
await pull_worker.poll_loop(
client=MagicMock(),
state=state,
shutdown=shutdown,
kis_client=kis,
chronos=chronos,
dedup=None,
settings=None,
)
assert post_close.await_count >= 1, "post-close가 16:01에 호출되지 않음 (F3 회귀)"
```
- [ ] **Step 7: Run pull_worker test**
```
python -m pytest ai_trade/tests/test_pull_worker.py::test_post_close_fires_at_1601_when_not_yet_today -v
```
Expected: PASS.
- [ ] **Step 8: Run full ai_trade suite**
```
python -m pytest ai_trade/tests -q
```
Expected: 모두 PASS.
- [ ] **Step 9: Commit**
```bash
git add ai_trade/scheduler.py ai_trade/pull_worker.py ai_trade/tests/test_scheduler.py ai_trade/tests/test_pull_worker.py
git commit -m "fix(ai_trade): post-close trigger를 상태기반으로 변경 (F3)"
```
---
## Task 4: F4 — Chronos confidence를 absolute spread 기반으로 통일
**Files:**
- Modify: `ai_trade/chronos_predictor.py:106, 127`
- Test: `ai_trade/tests/test_chronos_predictor.py` (add 2 tests)
**Why absolute:** Phase 4 spec amendment (web-ui commit 534ded5)가 absolute spread로 hard gate를 결정. confidence도 같은 철학으로. 새 산식: `conf = max(0, min(1, 1 - spread / SPREAD_THRESHOLD))` — spread가 0.6에 도달하면 conf=0, 0이면 conf=1.
- [ ] **Step 1: Write the failing tests**
기존 `ai_trade/tests/test_chronos_predictor.py` 끝에 추가 (파일이 없거나 비어있으면 신규):
```python
import numpy as np
import pytest
import torch
@pytest.fixture
def fake_pipeline():
"""predict_quantiles만 stub하는 가짜 pipeline."""
class FakePipeline:
def __init__(self, q10_price, q50_price, q90_price):
self._q10, self._q50, self._q90 = q10_price, q50_price, q90_price
def predict_quantiles(self, contexts, prediction_length, quantile_levels):
n = len(contexts)
tensor = torch.tensor(
[[[self._q10, self._q50, self._q90]]] * n,
dtype=torch.float32,
)
return tensor, None
return FakePipeline
def _make_predictor_with(pipeline_obj):
"""ChronosPredictor 인스턴스 (실제 모델 안 부르고 pipeline만 주입)."""
from ai_trade.chronos_predictor import ChronosPredictor
p = ChronosPredictor.__new__(ChronosPredictor)
p._pipeline = pipeline_obj
p._device = "cpu"
return p
def test_confidence_high_when_spread_near_zero(fake_pipeline):
"""median≈0, spread≈0 (q10=q90=last_close)일 때 conf≈1 (F4)."""
last_close = 100000.0
p = _make_predictor_with(fake_pipeline(last_close, last_close, last_close))
ohlcv = {"A": [{"close": last_close}] * 30}
out = p.predict_batch(ohlcv)
assert out["A"].conf > 0.95, (
f"median≈0 + spread≈0인데 conf={out['A'].conf} (F4 회귀: relative spread로 폭증)"
)
def test_confidence_drops_with_spread(fake_pipeline):
"""spread 0.3일 때 conf≈0.5 (1 - 0.3/0.6 = 0.5)."""
last_close = 100000.0
# q10=85000 → -0.15, q90=115000 → 0.15, spread=0.30
p = _make_predictor_with(fake_pipeline(85000.0, 100000.0, 115000.0))
ohlcv = {"A": [{"close": last_close}] * 30}
out = p.predict_batch(ohlcv)
# 1 - 0.30/0.60 = 0.50
assert 0.45 < out["A"].conf < 0.55, (
f"absolute spread 0.30에서 conf={out['A'].conf} (expected ≈0.5)"
)
def test_confidence_zero_at_threshold_spread(fake_pipeline):
"""spread가 threshold(0.6) 이상이면 conf=0."""
last_close = 100000.0
# q10=70000 → -0.30, q90=130000 → 0.30, spread=0.60
p = _make_predictor_with(fake_pipeline(70000.0, 100000.0, 130000.0))
ohlcv = {"A": [{"close": last_close}] * 30}
out = p.predict_batch(ohlcv)
assert out["A"].conf < 0.05, (
f"spread=threshold에서 conf={out['A'].conf} (expected ≈0)"
)
```
- [ ] **Step 2: Run tests to verify they fail**
```
python -m pytest ai_trade/tests/test_chronos_predictor.py -v -k confidence
```
Expected: `test_confidence_high_when_spread_near_zero` 가 FAIL — 현행 relative spread 산식 때문에 median≈0에서 conf가 0으로 폭락.
- [ ] **Step 3: Fix chronos_predictor.py**
`ai_trade/chronos_predictor.py` 상단에 상수 추가 (L13 근처):
```python
_SPREAD_THRESHOLD = 0.6 # F4: signal_generator hard gate와 동일 (absolute return spread)
```
L106 (modern API 경로) 변경:
```python
# shape: [num_series, prediction_length, 3]
for i, ticker in enumerate(tickers):
q10_price, q50_price, q90_price = quantiles_np[i, 0, :]
last_close = daily_ohlcv_dict[ticker][-1]["close"]
median = float((q50_price - last_close) / last_close)
q10 = float((q10_price - last_close) / last_close)
q90 = float((q90_price - last_close) / last_close)
# F4: absolute spread (q90-q10) 기반 — signal_generator hard gate와 통일.
# median≈0 zero-shot 케이스에서 conf가 0으로 폭락하던 relative 산식 제거.
spread = q90 - q10
conf = float(max(0.0, min(1.0, 1.0 - spread / _SPREAD_THRESHOLD)))
results[ticker] = ChronosPrediction(
median=median, q10=q10, q90=q90, conf=conf, as_of=now_iso,
)
return results
```
L127 (legacy API 경로) 동일하게 변경:
```python
spread = q90 - q10
conf = float(max(0.0, min(1.0, 1.0 - spread / _SPREAD_THRESHOLD)))
```
- [ ] **Step 4: Run tests to verify they pass**
```
python -m pytest ai_trade/tests/test_chronos_predictor.py -v
```
Expected: 신규 3개 모두 PASS. 기존 test도 PASS.
- [ ] **Step 5: Run full ai_trade suite**
```
python -m pytest ai_trade/tests -q
```
Expected: 모두 PASS. signal_generator 테스트(`_compute_buy_confidence``pred["conf"]` 사용) 도 영향 받을 수 있으니 주시.
- [ ] **Step 6: Commit**
```bash
git add ai_trade/chronos_predictor.py ai_trade/tests/test_chronos_predictor.py
git commit -m "fix(ai_trade): Chronos confidence를 absolute spread 기반으로 통일 (F4)"
```
---
## Task 5: 전체 회귀 확인 + push
- [ ] **Step 1: Run full ai_trade suite + count**
```
python -m pytest ai_trade/tests -v
```
Expected:
- 기존 56 tests + 신규 (config 2 + kis_client 1 + scheduler 5 + pull_worker 1 + chronos_predictor 3) = **68 tests** 정도 PASS.
- [ ] **Step 2: Quick sanity — server boot smoke test (시간 허용 시)**
```
cd ai_trade && python -c "from main import app; print('app import OK')"
```
Expected: no import errors.
- [ ] **Step 3: Push**
```bash
git push origin main
```
---
## Self-Review Checklist
이 plan을 다 작성한 뒤 다음을 확인:
1. **F1**: config.py default + 2 test (default + env override) ✅
2. **F2**: `_throttle_lock` 추가 + 1 concurrent test ✅
3. **F3**: `_is_post_close_trigger(now, last_post_close_date)` 시그니처 변경 + `poll_loop` 상태 추적 + 5 scheduler test + 1 pull_worker test ✅
4. **F4**: `_SPREAD_THRESHOLD=0.6` 상수 + 두 분기(modern + legacy) 모두 absolute spread 적용 + 3 chronos_predictor test ✅
**누락 가능 항목**:
- `test_main.py``v1_token_path` default를 직접 검증한다면 Task 1에서 같이 갱신. 위 patch는 Settings 객체 통해서만 다루므로 영향 없음(검증 완료).
- Task 3 pull_worker test의 `FrozenDateTime.now``datetime.now(KST)` 호출만 stub함. 다른 datetime 사용 부분 영향 없음 (verified L28).
- Task 4 test는 ChronosPredictor `__new__`로 우회 — 실제 HuggingFace 모델 로딩 안 함.
---
## Execution Handoff
**Plan complete and saved to `docs/superpowers/plans/2026-05-25-ai-trade-hotfix.md`.**
두 가지 실행 옵션:
**1. Subagent-Driven (recommended)** — task 별 fresh subagent dispatch + two-stage review. F2/F3 같이 미묘한 동시성/상태 변경에 유리.
**2. Inline Execution** — 현 세션에서 직접 task별 진행 + checkpoint.
박재오 결정 대기.