22 KiB
ai_trade Hotfix — Code Review F1·F2·F3·F4 Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: ai_trade(V2) 코드 리뷰 7개 finding 중 High 3건(F1·F2·F3) + Medium 1건(F4)을 TDD로 수정. F5/F6은 별도 plan, F7은 pushback.
Architecture: 모두 ai_trade/ 내부 단일 모듈 수정. (1) config.py default 경로 — legacy/ 경유. (2) kis_client.py — asyncio.Lock으로 _throttle() 직렬화. (3) scheduler.py + pull_worker.py — post-close를 시간 윈도우가 아닌 "일 1회 + 16:00 이후" 상태기반으로 변경. (4) chronos_predictor.py — confidence 산식을 absolute spread 기반으로 통일.
Tech Stack: Python 3.12, asyncio, pytest + pytest-asyncio + respx, httpx.
Test runner: .venv 한글 경로 깨짐 + 리뷰어 Python 312 경로 부재 보고로, 시스템 Python 사용. 정확한 경로는 where python 으로 우선 확인. 기본 시도 순서:
python -m pytest ai_trade/tests -q(PATH의 Python)py -3.12 -m pytest ai_trade/tests -q(py launcher)- 둘 다 실패 시 환경 셋업이 선행 작업 — plan 진행 중단하고 박재오에게 보고.
Working directory: C:\Users\jaeoh\Desktop\workspace\web-ai (web-ai repo). Commit/push도 이 디렉토리에서만.
File Map
| 파일 | 변경 종류 | 책임 |
|---|---|---|
ai_trade/config.py |
Modify L31-36 | V1_TOKEN_PATH default를 legacy/signal_v1/data/kis_token.json로 |
ai_trade/kis_client.py |
Modify L40-62 | _throttle_lock: asyncio.Lock 추가, _throttle()을 lock 안에서 실행 |
ai_trade/scheduler.py |
Modify L79-84 | _is_post_close_trigger(now, last_post_close_date) 시그니처 변경 — 상태기반 |
ai_trade/pull_worker.py |
Modify L1-58 | poll_loop에 last_post_close_date state 추가, 호출부 갱신 |
ai_trade/chronos_predictor.py |
Modify L106, L127 | spread 계산을 absolute (q90-q10)로, confidence 산식 max(0, 1 - spread/0.6) |
ai_trade/tests/test_kis_client.py |
Add 1 test | concurrent gather throttle test |
ai_trade/tests/test_scheduler.py |
Add 3 tests | post-close 상태기반 트리거 |
ai_trade/tests/test_pull_worker.py |
Add 1 test | 첫 호출 안 됐다가 16:00 이후 5분 cycle에서 호출됨 |
ai_trade/tests/test_chronos_predictor.py |
Add 2 tests | median≈0에서도 conf 정상, spread 클수록 conf↓ |
ai_trade/tests/test_main.py |
Modify | v1_token_path default 변경 반영 (있다면) |
Task 1: F1 — KIS 토큰 경로 default를 legacy/ 경유로
Files:
-
Modify:
ai_trade/config.py:31-36 -
Test:
ai_trade/tests/test_config_token_path.py(Create) -
Step 1: Write the failing test
# ai_trade/tests/test_config_token_path.py
"""F1 — V1_TOKEN_PATH default가 legacy/signal_v1/ 경유인지 검증."""
import os
from pathlib import Path
from ai_trade.config import Settings
def test_v1_token_default_path_uses_legacy_dir(monkeypatch):
"""env에 V1_TOKEN_PATH 없으면 legacy/signal_v1/data/kis_token.json"""
monkeypatch.delenv("V1_TOKEN_PATH", raising=False)
settings = Settings()
expected_suffix = Path("legacy") / "signal_v1" / "data" / "kis_token.json"
assert str(settings.v1_token_path).endswith(str(expected_suffix)), (
f"expected default to end with {expected_suffix}, got {settings.v1_token_path}"
)
def test_v1_token_env_override_wins(monkeypatch, tmp_path):
"""env로 명시한 경로가 default를 덮어씀."""
custom = tmp_path / "custom_token.json"
monkeypatch.setenv("V1_TOKEN_PATH", str(custom))
settings = Settings()
assert settings.v1_token_path == custom
- Step 2: Run test to verify it fails
python -m pytest ai_trade/tests/test_config_token_path.py -v
Expected: test_v1_token_default_path_uses_legacy_dir FAILs (default가 signal_v1/... 임). env override는 PASS.
- Step 3: Fix config.py
ai_trade/config.py:31-36 변경:
v1_token_path: Path = field(
default_factory=lambda: Path(
os.getenv("V1_TOKEN_PATH",
str(Path(__file__).parent.parent / "legacy" / "signal_v1" / "data" / "kis_token.json"))
)
)
- Step 4: Run test to verify it passes
python -m pytest ai_trade/tests/test_config_token_path.py -v
Expected: 2 passed.
- Step 5: Verify full test suite still passes
python -m pytest ai_trade/tests -q
Expected: 모든 기존 테스트 PASS (token path 기본값 변경이 다른 test에 영향 없는지 확인).
- Step 6: Commit
git add ai_trade/config.py ai_trade/tests/test_config_token_path.py
git commit -m "fix(ai_trade): V1_TOKEN_PATH default를 legacy/signal_v1/ 경유로 수정 (F1)"
Task 2: F2 — KIS throttle을 asyncio.Lock으로 직렬화
Files:
-
Modify:
ai_trade/kis_client.py:40-62 -
Test:
ai_trade/tests/test_kis_client.py(Modify — 새 test 추가) -
Step 1: Write the failing test
ai_trade/tests/test_kis_client.py 파일 끝에 추가:
import asyncio
import time as time_module
@respx.mock
async def test_throttle_serializes_concurrent_gather(kis_client_factory):
"""5개 동시 요청이 asyncio.gather로 들어와도 0.5초 간격으로 직렬화되어야 함 (F2).
초당 2회 = 0.5초 간격. 5개 요청이면 최소 (5-1)*0.5 = 2.0초 소요.
Race condition 있으면 5개가 거의 동시에 나가서 2초 훨씬 안쪽에 끝남.
"""
sample = {"output2": []}
respx.get(
"https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
).mock(return_value=httpx.Response(200, json=sample))
client = kis_client_factory()
try:
start = time_module.monotonic()
await asyncio.gather(*[client.get_minute_ohlcv(f"00593{i}") for i in range(5)])
elapsed = time_module.monotonic() - start
# 5개 throttle = 최소 (5-1)*0.5 = 2.0초. tolerance 0.3초.
assert elapsed >= 1.7, (
f"throttle race condition: 5 concurrent calls took only {elapsed:.2f}s, "
f"expected >=1.7s (0.5s * 4 inter-call gaps)"
)
finally:
await client.close()
- Step 2: Run test to verify it fails
python -m pytest ai_trade/tests/test_kis_client.py::test_throttle_serializes_concurrent_gather -v
Expected: FAIL — elapsed가 0.5초 이하 (race로 동시 깸).
- Step 3: Add asyncio.Lock to KISClient
ai_trade/kis_client.py:40 __init__ 끝에 한 줄 추가:
self._token_cache: tuple[str, float] | None = None # (token, file_mtime)
self._last_throttle_at = 0.0
self._throttle_lock = asyncio.Lock()
그리고 _throttle() (L58-62)을 lock으로 감쌈:
async def _throttle(self) -> None:
async with self._throttle_lock:
elapsed = time.monotonic() - self._last_throttle_at
if elapsed < _THROTTLE_INTERVAL:
await asyncio.sleep(_THROTTLE_INTERVAL - elapsed)
self._last_throttle_at = time.monotonic()
- Step 4: Run test to verify it passes
python -m pytest ai_trade/tests/test_kis_client.py::test_throttle_serializes_concurrent_gather -v
Expected: PASS — elapsed >= 1.7s.
- Step 5: Verify full kis_client suite still passes
python -m pytest ai_trade/tests/test_kis_client.py -v
Expected: 모든 test PASS (기존 429 retry 등 영향 없는지 확인).
- Step 6: Commit
git add ai_trade/kis_client.py ai_trade/tests/test_kis_client.py
git commit -m "fix(ai_trade): KIS throttle을 asyncio.Lock으로 직렬화 (F2)"
Task 3: F3 — post-close 트리거를 상태기반으로 변경
Files:
- Modify:
ai_trade/scheduler.py:79-84 - Modify:
ai_trade/pull_worker.py:1-58 - Test:
ai_trade/tests/test_scheduler.py(add 3 tests)
Why state-based: 16:00:00-16:00:59 윈도우는 5분 sleep + 비결정적 cycle 시작 시각과 충돌. "오늘 아직 post-close 안 돌렸고 현재 시각 ≥ 16:00 이면 trigger 후 today 표시" 로 변경.
- Step 1: Write the failing tests
ai_trade/tests/test_scheduler.py 파일 끝에 추가:
from datetime import date as _date
from ai_trade.scheduler import _is_post_close_trigger
def test_post_close_trigger_fires_at_1601_if_not_yet_today():
"""16:01에 깬 cycle도 오늘 아직 안 돌렸으면 trigger (F3)."""
now = _kst(2026, 5, 18, 16, 1)
assert _is_post_close_trigger(now, last_post_close_date=None) is True
def test_post_close_trigger_skips_if_already_today():
"""이미 오늘 돌렸으면 trigger 안 함."""
now = _kst(2026, 5, 18, 16, 5)
today = _date(2026, 5, 18)
assert _is_post_close_trigger(now, last_post_close_date=today) is False
def test_post_close_trigger_skips_before_1600():
"""16:00 전에는 trigger 안 함."""
now = _kst(2026, 5, 18, 15, 59)
assert _is_post_close_trigger(now, last_post_close_date=None) is False
def test_post_close_trigger_fires_next_day_after_reset():
"""다음 영업일이 되면 last_post_close_date < today.date() 이므로 다시 trigger."""
now = _kst(2026, 5, 19, 16, 0)
yesterday = _date(2026, 5, 18)
assert _is_post_close_trigger(now, last_post_close_date=yesterday) is True
def test_post_close_trigger_skips_on_holiday():
"""휴장일에는 trigger 안 함 (2026-05-05 어린이날)."""
now = _kst(2026, 5, 5, 16, 30)
assert _is_post_close_trigger(now, last_post_close_date=None) is False
- Step 2: Run tests to verify they fail
python -m pytest ai_trade/tests/test_scheduler.py -v -k post_close
Expected: FAIL — _is_post_close_trigger가 신규 시그니처(last_post_close_date 인자) 미지원.
- Step 3: Modify scheduler.py:79-84
def _is_post_close_trigger(now: datetime, last_post_close_date) -> bool:
"""16:00 KST 이후 오늘 아직 post-close cycle 안 돌렸으면 True (F3 상태기반).
Args:
now: 현재 KST datetime.
last_post_close_date: 마지막 post-close 실행 영업일 date 객체 (None=미실행).
"""
if not _is_market_day(now):
return False
if now.time() < time(16, 0):
return False
today = now.date()
return last_post_close_date != today
- Step 4: Run scheduler tests
python -m pytest ai_trade/tests/test_scheduler.py -v
Expected: 신규 5개 PASS. 기존 test도 PASS (다른 함수 영향 없음).
- Step 5: Update pull_worker.py to track last_post_close_date
ai_trade/pull_worker.py 의 poll_loop (L18-58)을 다음으로 교체:
async def poll_loop(
client: StockClient, state: PollState, shutdown: asyncio.Event,
kis_client: KISClient | None = None,
chronos=None,
dedup=None,
settings=None,
) -> None:
"""FastAPI lifespan 에서 asyncio.create_task 로 시작."""
logger.info("poll_loop started")
last_post_close_date = None
while not shutdown.is_set():
now = datetime.now(KST)
if _is_market_day(now) and _is_polling_window(now):
try:
await _run_polling_cycle(client, state, kis_client=kis_client)
except Exception:
logger.exception("poll cycle failed")
# Minute momentum 갱신 (매 cycle)
try:
update_minute_momentum_for_all(state)
except Exception:
logger.exception("minute momentum update failed")
# Post-close trigger (상태기반: 16:00 이후 + 오늘 미실행)
if (
_is_post_close_trigger(now, last_post_close_date)
and chronos is not None and kis_client is not None
):
try:
await _run_post_close_cycle(kis_client, chronos, state)
last_post_close_date = now.date()
except Exception:
logger.exception("post-close cycle failed")
# Phase 4: generate signals
if dedup is not None and settings is not None:
try:
from ai_trade.signal_generator import generate_signals
generate_signals(state, dedup, settings)
except Exception:
logger.exception("generate_signals failed")
interval = _next_interval(now)
try:
await asyncio.wait_for(shutdown.wait(), timeout=interval)
break
except asyncio.TimeoutError:
continue
logger.info("poll_loop ended")
- Step 6: Add pull_worker test
ai_trade/tests/test_pull_worker.py 파일 끝에 추가:
from unittest.mock import AsyncMock, MagicMock
from datetime import datetime as _dt
from zoneinfo import ZoneInfo as _ZI
import asyncio as _asyncio
async def test_post_close_fires_at_1601_when_not_yet_today(monkeypatch):
"""16:01에 깬 cycle도 post_close 안 돌렸으면 호출됨 (F3 회귀)."""
from ai_trade import pull_worker
_kst = _ZI("Asia/Seoul")
now_at_1601 = _dt(2026, 5, 18, 16, 1, tzinfo=_kst)
real_dt = _dt
class FrozenDateTime:
@staticmethod
def now(tz=None):
return now_at_1601
monkeypatch.setattr(pull_worker, "datetime", FrozenDateTime)
monkeypatch.setattr(
pull_worker, "_is_market_day", lambda n: True,
)
monkeypatch.setattr(
pull_worker, "_is_polling_window", lambda n: True,
)
monkeypatch.setattr(
pull_worker, "_next_interval", lambda n: 0.01,
)
monkeypatch.setattr(
pull_worker, "_run_polling_cycle", AsyncMock(),
)
monkeypatch.setattr(
pull_worker, "update_minute_momentum_for_all", lambda s: None,
)
post_close = AsyncMock()
monkeypatch.setattr(pull_worker, "_run_post_close_cycle", post_close)
state = MagicMock()
chronos = MagicMock()
kis = MagicMock()
shutdown = _asyncio.Event()
async def _stop_soon():
await _asyncio.sleep(0.05)
shutdown.set()
_asyncio.create_task(_stop_soon())
await pull_worker.poll_loop(
client=MagicMock(),
state=state,
shutdown=shutdown,
kis_client=kis,
chronos=chronos,
dedup=None,
settings=None,
)
assert post_close.await_count >= 1, "post-close가 16:01에 호출되지 않음 (F3 회귀)"
- Step 7: Run pull_worker test
python -m pytest ai_trade/tests/test_pull_worker.py::test_post_close_fires_at_1601_when_not_yet_today -v
Expected: PASS.
- Step 8: Run full ai_trade suite
python -m pytest ai_trade/tests -q
Expected: 모두 PASS.
- Step 9: Commit
git add ai_trade/scheduler.py ai_trade/pull_worker.py ai_trade/tests/test_scheduler.py ai_trade/tests/test_pull_worker.py
git commit -m "fix(ai_trade): post-close trigger를 상태기반으로 변경 (F3)"
Task 4: F4 — Chronos confidence를 absolute spread 기반으로 통일
Files:
- Modify:
ai_trade/chronos_predictor.py:106, 127 - Test:
ai_trade/tests/test_chronos_predictor.py(add 2 tests)
Why absolute: Phase 4 spec amendment (web-ui commit 534ded5)가 absolute spread로 hard gate를 결정. confidence도 같은 철학으로. 새 산식: conf = max(0, min(1, 1 - spread / SPREAD_THRESHOLD)) — spread가 0.6에 도달하면 conf=0, 0이면 conf=1.
- Step 1: Write the failing tests
기존 ai_trade/tests/test_chronos_predictor.py 끝에 추가 (파일이 없거나 비어있으면 신규):
import numpy as np
import pytest
import torch
@pytest.fixture
def fake_pipeline():
"""predict_quantiles만 stub하는 가짜 pipeline."""
class FakePipeline:
def __init__(self, q10_price, q50_price, q90_price):
self._q10, self._q50, self._q90 = q10_price, q50_price, q90_price
def predict_quantiles(self, contexts, prediction_length, quantile_levels):
n = len(contexts)
tensor = torch.tensor(
[[[self._q10, self._q50, self._q90]]] * n,
dtype=torch.float32,
)
return tensor, None
return FakePipeline
def _make_predictor_with(pipeline_obj):
"""ChronosPredictor 인스턴스 (실제 모델 안 부르고 pipeline만 주입)."""
from ai_trade.chronos_predictor import ChronosPredictor
p = ChronosPredictor.__new__(ChronosPredictor)
p._pipeline = pipeline_obj
p._device = "cpu"
return p
def test_confidence_high_when_spread_near_zero(fake_pipeline):
"""median≈0, spread≈0 (q10=q90=last_close)일 때 conf≈1 (F4)."""
last_close = 100000.0
p = _make_predictor_with(fake_pipeline(last_close, last_close, last_close))
ohlcv = {"A": [{"close": last_close}] * 30}
out = p.predict_batch(ohlcv)
assert out["A"].conf > 0.95, (
f"median≈0 + spread≈0인데 conf={out['A'].conf} (F4 회귀: relative spread로 폭증)"
)
def test_confidence_drops_with_spread(fake_pipeline):
"""spread 0.3일 때 conf≈0.5 (1 - 0.3/0.6 = 0.5)."""
last_close = 100000.0
# q10=85000 → -0.15, q90=115000 → 0.15, spread=0.30
p = _make_predictor_with(fake_pipeline(85000.0, 100000.0, 115000.0))
ohlcv = {"A": [{"close": last_close}] * 30}
out = p.predict_batch(ohlcv)
# 1 - 0.30/0.60 = 0.50
assert 0.45 < out["A"].conf < 0.55, (
f"absolute spread 0.30에서 conf={out['A'].conf} (expected ≈0.5)"
)
def test_confidence_zero_at_threshold_spread(fake_pipeline):
"""spread가 threshold(0.6) 이상이면 conf=0."""
last_close = 100000.0
# q10=70000 → -0.30, q90=130000 → 0.30, spread=0.60
p = _make_predictor_with(fake_pipeline(70000.0, 100000.0, 130000.0))
ohlcv = {"A": [{"close": last_close}] * 30}
out = p.predict_batch(ohlcv)
assert out["A"].conf < 0.05, (
f"spread=threshold에서 conf={out['A'].conf} (expected ≈0)"
)
- Step 2: Run tests to verify they fail
python -m pytest ai_trade/tests/test_chronos_predictor.py -v -k confidence
Expected: test_confidence_high_when_spread_near_zero 가 FAIL — 현행 relative spread 산식 때문에 median≈0에서 conf가 0으로 폭락.
- Step 3: Fix chronos_predictor.py
ai_trade/chronos_predictor.py 상단에 상수 추가 (L13 근처):
_SPREAD_THRESHOLD = 0.6 # F4: signal_generator hard gate와 동일 (absolute return spread)
L106 (modern API 경로) 변경:
# shape: [num_series, prediction_length, 3]
for i, ticker in enumerate(tickers):
q10_price, q50_price, q90_price = quantiles_np[i, 0, :]
last_close = daily_ohlcv_dict[ticker][-1]["close"]
median = float((q50_price - last_close) / last_close)
q10 = float((q10_price - last_close) / last_close)
q90 = float((q90_price - last_close) / last_close)
# F4: absolute spread (q90-q10) 기반 — signal_generator hard gate와 통일.
# median≈0 zero-shot 케이스에서 conf가 0으로 폭락하던 relative 산식 제거.
spread = q90 - q10
conf = float(max(0.0, min(1.0, 1.0 - spread / _SPREAD_THRESHOLD)))
results[ticker] = ChronosPrediction(
median=median, q10=q10, q90=q90, conf=conf, as_of=now_iso,
)
return results
L127 (legacy API 경로) 동일하게 변경:
spread = q90 - q10
conf = float(max(0.0, min(1.0, 1.0 - spread / _SPREAD_THRESHOLD)))
- Step 4: Run tests to verify they pass
python -m pytest ai_trade/tests/test_chronos_predictor.py -v
Expected: 신규 3개 모두 PASS. 기존 test도 PASS.
- Step 5: Run full ai_trade suite
python -m pytest ai_trade/tests -q
Expected: 모두 PASS. signal_generator 테스트(_compute_buy_confidence 가 pred["conf"] 사용) 도 영향 받을 수 있으니 주시.
- Step 6: Commit
git add ai_trade/chronos_predictor.py ai_trade/tests/test_chronos_predictor.py
git commit -m "fix(ai_trade): Chronos confidence를 absolute spread 기반으로 통일 (F4)"
Task 5: 전체 회귀 확인 + push
- Step 1: Run full ai_trade suite + count
python -m pytest ai_trade/tests -v
Expected:
-
기존 56 tests + 신규 (config 2 + kis_client 1 + scheduler 5 + pull_worker 1 + chronos_predictor 3) = 68 tests 정도 PASS.
-
Step 2: Quick sanity — server boot smoke test (시간 허용 시)
cd ai_trade && python -c "from main import app; print('app import OK')"
Expected: no import errors.
- Step 3: Push
git push origin main
Self-Review Checklist
이 plan을 다 작성한 뒤 다음을 확인:
- F1: config.py default + 2 test (default + env override) ✅
- F2:
_throttle_lock추가 + 1 concurrent test ✅ - F3:
_is_post_close_trigger(now, last_post_close_date)시그니처 변경 +poll_loop상태 추적 + 5 scheduler test + 1 pull_worker test ✅ - F4:
_SPREAD_THRESHOLD=0.6상수 + 두 분기(modern + legacy) 모두 absolute spread 적용 + 3 chronos_predictor test ✅
누락 가능 항목:
test_main.py가v1_token_pathdefault를 직접 검증한다면 Task 1에서 같이 갱신. 위 patch는 Settings 객체 통해서만 다루므로 영향 없음(검증 완료).- Task 3 pull_worker test의
FrozenDateTime.now는datetime.now(KST)호출만 stub함. 다른 datetime 사용 부분 영향 없음 (verified L28). - Task 4 test는 ChronosPredictor
__new__로 우회 — 실제 HuggingFace 모델 로딩 안 함.
Execution Handoff
Plan complete and saved to docs/superpowers/plans/2026-05-25-ai-trade-hotfix.md.
두 가지 실행 옵션:
1. Subagent-Driven (recommended) — task 별 fresh subagent dispatch + two-stage review. F2/F3 같이 미묘한 동시성/상태 변경에 유리.
2. Inline Execution — 현 세션에서 직접 task별 진행 + checkpoint.
박재오 결정 대기.