diff --git a/docs/superpowers/plans/2026-05-16-signal-v2-phase3b-chronos-momentum.md b/docs/superpowers/plans/2026-05-16-signal-v2-phase3b-chronos-momentum.md new file mode 100644 index 0000000..f43d72c --- /dev/null +++ b/docs/superpowers/plans/2026-05-16-signal-v2-phase3b-chronos-momentum.md @@ -0,0 +1,1160 @@ +# Signal V2 Phase 3b — Chronos-2 + Minute Momentum Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** signal_v2 에 Chronos-2 zero-shot 추론 (종가 후 1회 batch) + 1분봉 → 5분봉 aggregate 후 5-level 모멘텀 분류 추가. Phase 4 신호 룰의 핵심 입력 (chronos_predictions + minute_momentum) 채워 넣기. + +**Architecture:** HuggingFace `chronos-forecasting` 라이브러리 + `amazon/chronos-2` 모델 (env-configurable). 종가 후 16:00 KST 트리거 시 KIS REST 60일 일봉 fetch → Chronos batch predict → 메모리 state. 분봉 모멘텀은 순수 함수 (1분봉 deque → 5분봉 aggregate → 5-level 분류) 매 분봉 cycle 마다 갱신. + +**Tech Stack:** transformers / chronos-forecasting / torch (CUDA) / numpy / pytest-asyncio + respx / unittest.mock + +**Spec:** `web-ui/docs/superpowers/specs/2026-05-16-signal-v2-phase3b-chronos-momentum.md` + +**참고**: V1 venv (`web-ai/signal_v1/.venv` 또는 system Python) 에 PyTorch CUDA 이미 설치되어 있을 가능성. signal_v2 도 같은 venv 사용 권장 (재설치 회피). + +--- + +## 파일 구조 + +| 파일 | 책임 | +|------|------| +| `signal_v2/config.py` | (수정) `chronos_model` env field | +| `signal_v2/state.py` | (수정) `daily_ohlcv`, `chronos_predictions`, `minute_momentum` 추가 | +| `signal_v2/requirements.txt` | (수정) transformers, chronos-forecasting, torch | +| `signal_v2/kis_client.py` | (수정) `get_daily_ohlcv` 메서드 | +| `signal_v2/momentum_classifier.py` | (신규) `aggregate_1min_to_5min` + `classify_minute_momentum` | +| `signal_v2/chronos_predictor.py` | (신규) `ChronosPredictor` 클래스 + `ChronosPrediction` dataclass | +| `signal_v2/scheduler.py` | (수정) `_is_post_close_trigger` 헬퍼 | +| `signal_v2/pull_worker.py` | (수정) `_run_post_close_cycle` + `update_minute_momentum_for_all` | +| `signal_v2/main.py` | (수정) lifespan ChronosPredictor 로드 + poll_loop 에 chronos 전달 | +| `signal_v2/tests/test_kis_client.py` | (수정) `get_daily_ohlcv` 1 케이스 | +| `signal_v2/tests/test_momentum_classifier.py` | (신규) 6 케이스 | +| `signal_v2/tests/test_chronos_predictor.py` | (신규) 4 케이스 (모델 mock) | +| `signal_v2/tests/test_pull_worker.py` | (수정) post-close cycle 1 케이스 | +| `web-ai/.env` | (수정, 사용자 Task 7) `CHRONOS_MODEL` 추가 (optional) | + +13 파일 변경, 12 신규 테스트 (33 → 45). + +--- + +## Task 순서 + +``` +Task 1: foundation (config + state + requirements) + pip install +Task 2: kis_client.get_daily_ohlcv + 1 test (TDD) +Task 3: momentum_classifier + 6 tests (TDD, 순수 함수) +Task 4: chronos_predictor + 4 tests (TDD, mock) +Task 5: pull_worker post-close cycle + scheduler trigger + 1 test +Task 6: main.py lifespan ChronosPredictor 로드 +Task 7: 사용자 수동 — pip install (필요시) + .env + manual smoke + push +``` + +--- + +### Task 1: foundation (config + state + requirements) + +**Files:** +- Modify: `web-ai/signal_v2/config.py` +- Modify: `web-ai/signal_v2/state.py` +- Modify: `web-ai/requirements.txt` + +- [ ] **Step 1: Update config.py** + +Read current `web-ai/signal_v2/config.py`. Add `chronos_model` field to Settings dataclass (between `v1_token_path` and the properties): + +```python + chronos_model: str = field(default_factory=lambda: os.getenv("CHRONOS_MODEL", "amazon/chronos-2")) +``` + +- [ ] **Step 2: Update state.py** + +Read current `web-ai/signal_v2/state.py`. Replace with: + +```python +"""PollState — process-wide singleton.""" +from collections import deque +from dataclasses import dataclass, field + + +@dataclass +class PollState: + portfolio: dict | None = None + news_sentiment: dict | None = None + screener_preview: dict | None = None + minute_bars: dict[str, deque] = field(default_factory=dict) + asking_price: dict[str, dict] = field(default_factory=dict) + # Phase 3b additions + daily_ohlcv: dict[str, list[dict]] = field(default_factory=dict) + chronos_predictions: dict[str, dict] = field(default_factory=dict) + minute_momentum: dict[str, str] = field(default_factory=dict) + last_updated: dict[str, str] = field(default_factory=dict) + fetch_errors: dict[str, int] = field(default_factory=dict) + + +state = PollState() +``` + +- [ ] **Step 3: Update requirements.txt** + +Read current `web-ai/requirements.txt`. Append (if not present): + +``` +# Phase 3b dependencies (Chronos-2 + ML) +transformers>=4.40 +chronos-forecasting>=1.4 +# torch: typically already installed via V1 venv; if not, install with CUDA support manually +``` + +Do NOT add `torch` directly — V1 likely has it via CUDA-specific install. Document only. + +- [ ] **Step 4: pip install attempt** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +pip install -r requirements.txt 2>&1 | tail -10 +``` +Expected: `transformers` + `chronos-forecasting` install success. If `chronos-forecasting` fails due to network or dependency conflict, report DONE_WITH_CONCERNS — user will install manually in Task 7. + +- [ ] **Step 5: Smoke import test** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +python -c "from signal_v2.config import get_settings; from signal_v2.state import state; s = get_settings(); print(f'chronos_model={s.chronos_model}'); print(state)" +``` +Expected: `chronos_model=amazon/chronos-2` + state print (with new empty dicts). + +- [ ] **Step 6: Run existing tests — no regression** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +python -m pytest signal_v2/tests -q 2>&1 | tail -3 +``` +Expected: 33 passed. + +- [ ] **Step 7: Commit** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +git add signal_v2/config.py signal_v2/state.py requirements.txt +git commit -m "$(cat <<'EOF' +feat(signal_v2-phase3b): foundation — config + state + requirements + +- config.py: CHRONOS_MODEL env (default amazon/chronos-2) +- state.py: PollState extended with daily_ohlcv + chronos_predictions + + minute_momentum +- requirements.txt: transformers + chronos-forecasting (torch via V1 venv) + +33 existing tests still pass. + +Co-Authored-By: Claude Opus 4.7 (1M context) +EOF +)" +``` + +--- + +### Task 2: kis_client.get_daily_ohlcv + 1 test + +**Files:** +- Modify: `web-ai/signal_v2/kis_client.py` +- Modify: `web-ai/signal_v2/tests/test_kis_client.py` + +- [ ] **Step 1: Write failing test** + +Append to `web-ai/signal_v2/tests/test_kis_client.py`: + +```python +@respx.mock +async def test_get_daily_ohlcv_returns_60_bars(kis_client_factory): + """KIS daily endpoint returns 60 ascending bars after parsing.""" + sample_output2 = [ + { + "stck_bsop_date": f"2026{m:02d}{d:02d}", + "stck_oprc": "78000", "stck_hgpr": "78500", + "stck_lwpr": "77800", "stck_clpr": "78300", + "acml_vol": "12345", + } + # 60 daily bars (descending order from KIS) + for m, d in [(5, 18 - i) if (18 - i) >= 1 else (4, 30 + (18 - i)) for i in range(60)] + ] + respx.get( + "https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice" + ).mock(return_value=httpx.Response(200, json={"output2": sample_output2})) + + client = kis_client_factory() + try: + bars = await client.get_daily_ohlcv("005930", days=60) + # KIS returns descending; client reverses to ascending + assert len(bars) == 60 + assert bars[0]["datetime"] < bars[-1]["datetime"] + assert bars[-1]["close"] == 78300 + assert "datetime" in bars[0] + assert isinstance(bars[0]["open"], int) + finally: + await client.close() +``` + +- [ ] **Step 2: Run test to verify FAIL** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +python -m pytest signal_v2/tests/test_kis_client.py::test_get_daily_ohlcv_returns_60_bars -v 2>&1 | tail -10 +``` +Expected: FAIL — `get_daily_ohlcv` not defined. + +- [ ] **Step 3: Implement get_daily_ohlcv** + +Edit `web-ai/signal_v2/kis_client.py`. Add the `timedelta` import to existing `from datetime import ...` line if needed, then add at the end of `KISClient` class (after `get_asking_price`): + +```python + async def get_daily_ohlcv(self, ticker: str, days: int = 60) -> list[dict]: + """KRX 일봉 OHLCV (TR_ID FHKST03010100). + + Returns: [{"datetime", "open", "high", "low", "close", "volume"}, ...] + 시간 오름차순. + """ + from datetime import timedelta + path = "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice" + today = datetime.now(KST).strftime("%Y%m%d") + start_date = (datetime.now(KST) - timedelta(days=days * 2)).strftime("%Y%m%d") + params = { + "FID_COND_MRKT_DIV_CODE": "J", + "FID_INPUT_ISCD": ticker, + "FID_INPUT_DATE_1": start_date, + "FID_INPUT_DATE_2": today, + "FID_PERIOD_DIV_CODE": "D", + "FID_ORG_ADJ_PRC": "1", + } + raw = await self._request_with_retry( + "GET", path, tr_id="FHKST03010100", params=params, + ) + output2 = raw.get("output2", []) + bars = [] + for row in output2: + try: + date = row["stck_bsop_date"] + bars.append({ + "datetime": f"{date[:4]}-{date[4:6]}-{date[6:]}", + "open": int(row["stck_oprc"]), + "high": int(row["stck_hgpr"]), + "low": int(row["stck_lwpr"]), + "close": int(row["stck_clpr"]), + "volume": int(row["acml_vol"]), + }) + except (KeyError, ValueError): + continue + bars.reverse() + return bars[-days:] +``` + +- [ ] **Step 4: Run test to verify PASS** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +python -m pytest signal_v2/tests/test_kis_client.py -v 2>&1 | tail -10 +``` +Expected: 5 passed (4 existing + 1 new). + +Full suite: +```bash +python -m pytest signal_v2/tests -q 2>&1 | tail -3 +``` +Expected: 34 passed. + +- [ ] **Step 5: Commit** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +git add signal_v2/kis_client.py signal_v2/tests/test_kis_client.py +git commit -m "$(cat <<'EOF' +feat(signal_v2-phase3b): kis_client.get_daily_ohlcv (60 daily bars) + +TR_ID FHKST03010100 (수정주가 일봉). KIS returns descending; client +reverses to ascending and trims to last N days. + +1 new test, 34 total. + +Co-Authored-By: Claude Opus 4.7 (1M context) +EOF +)" +``` + +--- + +### Task 3: momentum_classifier + 6 tests + +**Files:** +- Create: `web-ai/signal_v2/momentum_classifier.py` +- Create: `web-ai/signal_v2/tests/test_momentum_classifier.py` + +- [ ] **Step 1: Write 6 failing tests** + +Create `web-ai/signal_v2/tests/test_momentum_classifier.py`: + +```python +"""Tests for minute momentum classifier.""" +from collections import deque + +from signal_v2.momentum_classifier import ( + aggregate_1min_to_5min, classify_minute_momentum, + STRONG_UP, WEAK_UP, NEUTRAL, WEAK_DOWN, STRONG_DOWN, +) + + +def _bar(open_, high, low, close, volume): + return { + "datetime": "2026-05-18T09:00:00+09:00", + "open": open_, "high": high, "low": low, "close": close, "volume": volume, + } + + +def _make_minute_bars(n: int, *, up: int, vol_mult: float = 1.0): + """n개 1분봉. up=양봉 개수, vol_mult=평균 거래량 multiplier.""" + base_vol = 1000 + bars = [] + for i in range(n): + is_up = i < up + o, c = (100, 110) if is_up else (110, 100) + bars.append(_bar(o, max(o, c) + 5, min(o, c) - 5, c, int(base_vol * vol_mult))) + return bars + + +def test_strong_up_5_consecutive_green_with_high_volume(): + # 25 bars (5 chunks of 5) → 5개 5분봉 + # 최근 25 bars: 25/25 양봉 + 거래량 1.5x + # 거기에 35 bars 추가 (총 60) — long avg 계산용. 추가는 normal volume. + older = _make_minute_bars(35, up=15, vol_mult=1.0) + recent = _make_minute_bars(25, up=25, vol_mult=1.5) + minute_bars = deque(older + recent, maxlen=60) + assert classify_minute_momentum(minute_bars) == STRONG_UP + + +def test_weak_up_3of5_green_normal_volume(): + # 25 recent bars: 3-4 of 5 5분봉 이 양봉 + 거래량 1.0x + # 각 5분봉 chunk 5개 1분봉: 양봉 chunk = 모든 1분봉 양봉 + older = _make_minute_bars(35, up=15, vol_mult=1.0) + # 5 chunks: 3 up (양봉) + 2 down (음봉). 각 5 bars. + chunks_up = _make_minute_bars(5, up=5, vol_mult=1.0) + chunks_down = _make_minute_bars(5, up=0, vol_mult=1.0) + recent = chunks_up + chunks_up + chunks_up + chunks_down + chunks_down + minute_bars = deque(older + recent, maxlen=60) + assert classify_minute_momentum(minute_bars) == WEAK_UP + + +def test_neutral_mixed(): + # 25 recent: 2-3 양봉 + 거래량 normal + older = _make_minute_bars(35, up=15, vol_mult=1.0) + chunks_up = _make_minute_bars(5, up=5, vol_mult=1.0) + chunks_down = _make_minute_bars(5, up=0, vol_mult=1.0) + recent = chunks_up + chunks_up + chunks_down + chunks_down + chunks_down + # 5 5분봉: 2 up + 3 down → up_count=2, vol=1.0 + minute_bars = deque(older + recent, maxlen=60) + result = classify_minute_momentum(minute_bars) + # up_count=2, vol_mult=1.0 → 다른 카테고리 매치 안 됨 → NEUTRAL + assert result == NEUTRAL + + +def test_weak_down_low_green_low_volume(): + older = _make_minute_bars(35, up=15, vol_mult=1.0) + chunks_up = _make_minute_bars(5, up=5, vol_mult=0.5) + chunks_down = _make_minute_bars(5, up=0, vol_mult=0.5) + recent = chunks_up + chunks_down + chunks_down + chunks_down + chunks_down + # up_count=1, vol_mult=0.5 → WEAK_DOWN + minute_bars = deque(older + recent, maxlen=60) + assert classify_minute_momentum(minute_bars) == WEAK_DOWN + + +def test_strong_down_5_consecutive_red_high_volume(): + older = _make_minute_bars(35, up=15, vol_mult=1.0) + recent = _make_minute_bars(25, up=0, vol_mult=1.5) + minute_bars = deque(older + recent, maxlen=60) + assert classify_minute_momentum(minute_bars) == STRONG_DOWN + + +def test_aggregate_1min_to_5min_correctness(): + # 5 1분봉을 1개 5분봉으로 — open=첫, close=마지막, high=max, low=min, volume=sum + bars = [ + _bar(100, 105, 99, 102, 1000), + _bar(102, 108, 101, 107, 1500), + _bar(107, 110, 105, 106, 800), + _bar(106, 109, 104, 108, 1200), + _bar(108, 112, 107, 111, 900), + ] + result = aggregate_1min_to_5min(bars) + assert len(result) == 1 + assert result[0]["open"] == 100 + assert result[0]["close"] == 111 + assert result[0]["high"] == 112 + assert result[0]["low"] == 99 + assert result[0]["volume"] == 5400 +``` + +- [ ] **Step 2: Run tests to verify FAIL** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +python -m pytest signal_v2/tests/test_momentum_classifier.py -v 2>&1 | tail -10 +``` +Expected: ImportError. + +- [ ] **Step 3: Implement momentum_classifier.py** + +Create `web-ai/signal_v2/momentum_classifier.py`: + +```python +"""분봉 OHLCV → 5-level 모멘텀 분류.""" +from __future__ import annotations +from collections import deque + +# 분류 카테고리 +STRONG_UP = "strong_up" +WEAK_UP = "weak_up" +NEUTRAL = "neutral" +WEAK_DOWN = "weak_down" +STRONG_DOWN = "strong_down" + +_BARS_PER_5MIN = 5 +_LOOKBACK_5MIN_BARS = 5 +_VOLUME_AVG_WINDOW = 12 # 60분 = 5분봉 12개 + + +def aggregate_1min_to_5min(minute_bars: list[dict]) -> list[dict]: + """1분봉 N개 → 5분봉 floor(N/5) 개. 시간 오름차순. + + 각 5분봉: open=첫 1분봉 open, high=max, low=min, close=마지막 close, volume=sum. + """ + bars_5min = [] + chunks = len(minute_bars) // _BARS_PER_5MIN + for i in range(chunks): + chunk = minute_bars[i * _BARS_PER_5MIN : (i + 1) * _BARS_PER_5MIN] + bars_5min.append({ + "datetime": chunk[0]["datetime"], + "open": chunk[0]["open"], + "high": max(b["high"] for b in chunk), + "low": min(b["low"] for b in chunk), + "close": chunk[-1]["close"], + "volume": sum(b["volume"] for b in chunk), + }) + return bars_5min + + +def classify_minute_momentum(minute_bars: deque) -> str: + """1분봉 deque → 5-level 모멘텀 분류. + + Returns: STRONG_UP / WEAK_UP / NEUTRAL / WEAK_DOWN / STRONG_DOWN + """ + minute_list = list(minute_bars) + if len(minute_list) < _BARS_PER_5MIN * _LOOKBACK_5MIN_BARS: + return NEUTRAL # 데이터 부족 + + bars_5min = aggregate_1min_to_5min(minute_list) + if len(bars_5min) < _LOOKBACK_5MIN_BARS: + return NEUTRAL + + recent = bars_5min[-_LOOKBACK_5MIN_BARS:] + up_count = sum(1 for b in recent if b["close"] > b["open"]) + + # 거래량 multiplier: recent 5 avg vs 60분 avg + recent_vol_avg = sum(b["volume"] for b in recent) / len(recent) + long_window = bars_5min[-_VOLUME_AVG_WINDOW:] + long_vol_avg = sum(b["volume"] for b in long_window) / len(long_window) + vol_mult = recent_vol_avg / long_vol_avg if long_vol_avg > 0 else 1.0 + + # 5-level 분류 + if up_count == 5 and vol_mult >= 1.5: + return STRONG_UP + elif up_count >= 3 and vol_mult >= 1.0: + return WEAK_UP + elif up_count == 0 and vol_mult >= 1.5: + return STRONG_DOWN + elif up_count <= 2 and vol_mult < 1.0: + return WEAK_DOWN + else: + return NEUTRAL +``` + +- [ ] **Step 4: Run tests to verify PASS** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +python -m pytest signal_v2/tests/test_momentum_classifier.py -v 2>&1 | tail -15 +``` +Expected: 6 passed. + +Full suite: +```bash +python -m pytest signal_v2/tests -q 2>&1 | tail -3 +``` +Expected: 40 passed. + +If any test fails (e.g. test_neutral_mixed or test_weak_up_3of5), check whether the volume multiplier calculation matches the test fixtures. The recent 5 chunks' volume avg vs the trailing 12 chunks' avg may differ depending on whether `vol_mult=1.0` chunks pad both ranges. Adjust either tests or impl as needed for correctness. + +- [ ] **Step 5: Commit** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +git add signal_v2/momentum_classifier.py signal_v2/tests/test_momentum_classifier.py +git commit -m "$(cat <<'EOF' +feat(signal_v2-phase3b): momentum_classifier + 6 unit tests + +aggregate_1min_to_5min: 5분봉 OHLCV synthesis (open=첫, close=마지막, +high=max, low=min, volume=sum). classify_minute_momentum: 직전 5개 +5분봉 양봉 개수 + 거래량 60분 multiplier → 5-level +(strong_up/weak_up/neutral/weak_down/strong_down). + +40 tests pass. + +Co-Authored-By: Claude Opus 4.7 (1M context) +EOF +)" +``` + +--- + +### Task 4: chronos_predictor + 4 tests (mock) + +**Files:** +- Create: `web-ai/signal_v2/chronos_predictor.py` +- Create: `web-ai/signal_v2/tests/test_chronos_predictor.py` + +- [ ] **Step 1: Write 4 failing tests** + +Create `web-ai/signal_v2/tests/test_chronos_predictor.py`: + +```python +"""Tests for ChronosPredictor (model mock).""" +from unittest.mock import MagicMock, patch + +import numpy as np +import pytest + + +@pytest.fixture +def mock_pipeline(): + """Mock ChronosPipeline.from_pretrained returning a mock pipeline object.""" + with patch("chronos.ChronosPipeline") as cls: + instance = MagicMock() + cls.from_pretrained.return_value = instance + yield instance + + +@pytest.fixture +def mock_torch(): + with patch("torch.cuda.is_available", return_value=False): + yield + + +def _daily_ohlcv(close_seq): + return [{"datetime": f"2026-05-{i+1:02d}", "open": c, "high": c, "low": c, + "close": c, "volume": 1000} for i, c in enumerate(close_seq)] + + +def test_predict_batch_returns_prediction_dict(mock_pipeline, mock_torch): + """mock pipeline → dict[ticker, ChronosPrediction].""" + import torch + # mock samples shape [num_tickers=1, num_samples=100, prediction_length=1] + # last_close = 100; samples around 102 → return ~+2% + samples = np.full((100,), 102.0) + mock_pipeline.predict.return_value = torch.tensor(samples.reshape(1, 100, 1)) + + from signal_v2.chronos_predictor import ChronosPredictor, ChronosPrediction + predictor = ChronosPredictor(model_name="mock-model") + daily = {"005930": _daily_ohlcv([100] * 60)} + result = predictor.predict_batch(daily) + assert "005930" in result + pred = result["005930"] + assert isinstance(pred, ChronosPrediction) + assert abs(pred.median - 0.02) < 0.001 # +2% return + + +def test_conf_high_when_distribution_narrow(mock_pipeline, mock_torch): + """좁은 distribution → conf 높음.""" + import torch + # Tight distribution: all samples ≈ 102 + samples = np.random.normal(102.0, 0.1, 100) + mock_pipeline.predict.return_value = torch.tensor(samples.reshape(1, 100, 1)) + + from signal_v2.chronos_predictor import ChronosPredictor + predictor = ChronosPredictor(model_name="mock-model") + daily = {"005930": _daily_ohlcv([100] * 60)} + result = predictor.predict_batch(daily) + assert result["005930"].conf > 0.8 + + +def test_conf_low_when_distribution_wide(mock_pipeline, mock_torch): + """넓은 distribution → conf 낮음.""" + import torch + # Wide distribution: samples spread far + samples = np.random.normal(100.0, 30.0, 100) + mock_pipeline.predict.return_value = torch.tensor(samples.reshape(1, 100, 1)) + + from signal_v2.chronos_predictor import ChronosPredictor + predictor = ChronosPredictor(model_name="mock-model") + daily = {"005930": _daily_ohlcv([100] * 60)} + result = predictor.predict_batch(daily) + assert result["005930"].conf < 0.3 + + +def test_return_computed_from_price_relative_to_last_close(mock_pipeline, mock_torch): + """price 예측 → last_close 대비 return 변환.""" + import torch + samples = np.full((100,), 110.0) # predict 110 + mock_pipeline.predict.return_value = torch.tensor(samples.reshape(1, 100, 1)) + + from signal_v2.chronos_predictor import ChronosPredictor + predictor = ChronosPredictor(model_name="mock-model") + # last_close = 100 → return = +10% + daily = {"005930": _daily_ohlcv(list(range(41, 101)))} # last value = 100 + result = predictor.predict_batch(daily) + assert abs(result["005930"].median - 0.10) < 0.001 +``` + +- [ ] **Step 2: Run tests to verify FAIL** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +python -m pytest signal_v2/tests/test_chronos_predictor.py -v 2>&1 | tail -10 +``` +Expected: ImportError (signal_v2.chronos_predictor not yet exists). + +- [ ] **Step 3: Implement chronos_predictor.py** + +Create `web-ai/signal_v2/chronos_predictor.py`: + +```python +"""Chronos-2 zero-shot forecaster wrapper.""" +from __future__ import annotations +import logging +from dataclasses import dataclass +from datetime import datetime +from zoneinfo import ZoneInfo + +import numpy as np + +logger = logging.getLogger(__name__) +KST = ZoneInfo("Asia/Seoul") + + +@dataclass +class ChronosPrediction: + median: float + q10: float + q90: float + conf: float + as_of: str + + +class ChronosPredictor: + """HuggingFace Chronos-2 zero-shot forecaster.""" + + def __init__(self, model_name: str = "amazon/chronos-2", device: str | None = None): + from chronos import ChronosPipeline + import torch + + self._device = device or ("cuda" if torch.cuda.is_available() else "cpu") + logger.info("Loading Chronos pipeline: %s on %s", model_name, self._device) + self._pipeline = ChronosPipeline.from_pretrained( + model_name, + device_map=self._device, + torch_dtype=torch.float16 if self._device == "cuda" else torch.float32, + ) + logger.info("Chronos pipeline loaded.") + + def predict_batch( + self, + daily_ohlcv_dict: dict[str, list[dict]], + prediction_length: int = 1, + num_samples: int = 100, + ) -> dict[str, ChronosPrediction]: + """종목별 1-day return 분포 예측.""" + import torch + + tickers = list(daily_ohlcv_dict.keys()) + if not tickers: + return {} + + contexts = [ + torch.tensor([bar["close"] for bar in daily_ohlcv_dict[t]], dtype=torch.float32) + for t in tickers + ] + forecasts = self._pipeline.predict( + context=contexts, + prediction_length=prediction_length, + num_samples=num_samples, + ) + forecasts_np = forecasts.numpy() if hasattr(forecasts, "numpy") else np.asarray(forecasts) + + now_iso = datetime.now(KST).isoformat() + results: dict[str, ChronosPrediction] = {} + for i, ticker in enumerate(tickers): + samples = forecasts_np[i, :, 0] + last_close = daily_ohlcv_dict[ticker][-1]["close"] + returns = (samples - last_close) / last_close + median = float(np.quantile(returns, 0.5)) + q10 = float(np.quantile(returns, 0.1)) + q90 = float(np.quantile(returns, 0.9)) + spread = (q90 - q10) / max(abs(median), 0.001) + conf = float(max(0.0, min(1.0, 1.0 - spread / 2.0))) + results[ticker] = ChronosPrediction( + median=median, q10=q10, q90=q90, conf=conf, as_of=now_iso, + ) + return results +``` + +- [ ] **Step 4: Run tests to verify PASS** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +python -m pytest signal_v2/tests/test_chronos_predictor.py -v 2>&1 | tail -15 +``` +Expected: 4 passed. + +If `chronos-forecasting` import fails (Task 1 의 pip install 실패), the tests will still fail at import. In that case the implementer should mock `chronos` module at sys.modules level OR skip and report DONE_WITH_CONCERNS — Task 7 user manual will install. + +Full suite: +```bash +python -m pytest signal_v2/tests -q 2>&1 | tail -3 +``` +Expected: 44 passed. + +- [ ] **Step 5: Commit** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +git add signal_v2/chronos_predictor.py signal_v2/tests/test_chronos_predictor.py +git commit -m "$(cat <<'EOF' +feat(signal_v2-phase3b): chronos_predictor + 4 mock tests + +ChronosPredictor wraps HuggingFace ChronosPipeline. Batch predict +returns ChronosPrediction(median, q10, q90, conf, as_of) per ticker. +Confidence = 1 - clamp(spread/2, 0, 1) where spread = (q90-q10) / |median|. +Lazy import of chronos lib (heavy). GPU auto-detect with FP16. + +44 tests pass. + +Co-Authored-By: Claude Opus 4.7 (1M context) +EOF +)" +``` + +--- + +### Task 5: pull_worker post-close cycle + scheduler trigger + 1 test + +**Files:** +- Modify: `web-ai/signal_v2/scheduler.py` +- Modify: `web-ai/signal_v2/pull_worker.py` +- Modify: `web-ai/signal_v2/tests/test_pull_worker.py` + +- [ ] **Step 1: Write failing test** + +Append to `web-ai/signal_v2/tests/test_pull_worker.py`: + +```python +async def test_post_close_cycle_updates_chronos_predictions(): + """mock kis + mock chronos → state.chronos_predictions + state.daily_ohlcv 갱신.""" + from unittest.mock import AsyncMock, MagicMock + from signal_v2.pull_worker import _run_post_close_cycle + from signal_v2.chronos_predictor import ChronosPrediction + from signal_v2.state import PollState + + state = PollState() + state.portfolio = {"holdings": [{"ticker": "005930"}]} + state.screener_preview = {"items": [{"ticker": "000660"}]} + + kis_mock = MagicMock() + daily_005930 = [{"datetime": f"2026-05-{i+1:02d}", "open": 100, "high": 105, + "low": 95, "close": 100 + i, "volume": 1000} for i in range(60)] + daily_000660 = [{"datetime": f"2026-05-{i+1:02d}", "open": 200, "high": 210, + "low": 190, "close": 200 + i, "volume": 2000} for i in range(60)] + kis_mock.get_daily_ohlcv = AsyncMock(side_effect=[daily_005930, daily_000660]) + + chronos_mock = MagicMock() + chronos_mock.predict_batch = MagicMock(return_value={ + "005930": ChronosPrediction(0.02, -0.01, 0.04, 0.85, "2026-05-18T16:00:00+09:00"), + "000660": ChronosPrediction(0.03, -0.02, 0.06, 0.75, "2026-05-18T16:00:00+09:00"), + }) + + await _run_post_close_cycle(kis_mock, chronos_mock, state) + + assert "005930" in state.chronos_predictions + assert "000660" in state.chronos_predictions + assert state.chronos_predictions["005930"]["median"] == 0.02 + assert state.chronos_predictions["005930"]["conf"] == 0.85 + assert "005930" in state.daily_ohlcv + assert "chronos/005930" in state.last_updated +``` + +- [ ] **Step 2: Run test to verify FAIL** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +python -m pytest signal_v2/tests/test_pull_worker.py::test_post_close_cycle_updates_chronos_predictions -v 2>&1 | tail -10 +``` +Expected: ImportError or AttributeError. + +- [ ] **Step 3: Update scheduler.py with _is_post_close_trigger** + +Append to `web-ai/signal_v2/scheduler.py`: + +```python +def _is_post_close_trigger(now: datetime) -> bool: + """16:00 KST ±1분 (post-close cycle 트리거). 평일/영업일만.""" + if not _is_market_day(now): + return False + t = now.time() + return time(16, 0) <= t < time(16, 1) +``` + +- [ ] **Step 4: Update pull_worker.py with _run_post_close_cycle + update_minute_momentum_for_all** + +Read current `web-ai/signal_v2/pull_worker.py`. Add at the end of the file: + +```python +async def _run_post_close_cycle(kis_client, chronos, state) -> None: + """16:00 KST 종가 후 1회: daily fetch + chronos predict.""" + tickers = list(set(_portfolio_tickers(state)) | set(_screener_tickers(state))) + if not tickers: + return + + daily_results = await asyncio.gather(*[ + kis_client.get_daily_ohlcv(t, days=60) for t in tickers + ], return_exceptions=True) + daily_dict = {} + for ticker, result in zip(tickers, daily_results): + if isinstance(result, list) and len(result) >= 30: + daily_dict[ticker] = result + state.daily_ohlcv[ticker] = result + elif isinstance(result, Exception): + state.fetch_errors[f"daily_ohlcv/{ticker}"] = ( + state.fetch_errors.get(f"daily_ohlcv/{ticker}", 0) + 1 + ) + + if daily_dict and chronos is not None: + try: + predictions = chronos.predict_batch(daily_dict) + except Exception: + logger.exception("chronos predict_batch failed") + return + for ticker, pred in predictions.items(): + state.chronos_predictions[ticker] = { + "median": pred.median, + "q10": pred.q10, + "q90": pred.q90, + "conf": pred.conf, + "as_of": pred.as_of, + } + state.last_updated[f"chronos/{ticker}"] = pred.as_of + + +def update_minute_momentum_for_all(state) -> None: + """매 분봉 cycle 후 호출 — 모든 종목 모멘텀 갱신.""" + from signal_v2.momentum_classifier import classify_minute_momentum + now_iso = datetime.now(KST).isoformat() + for ticker, bars in state.minute_bars.items(): + state.minute_momentum[ticker] = classify_minute_momentum(bars) + state.last_updated[f"momentum/{ticker}"] = now_iso +``` + +Also update `poll_loop` and `_run_polling_cycle` signatures to accept `chronos` optional param: + +```python +async def poll_loop( + client, state, shutdown, + kis_client=None, chronos=None, +) -> None: + """...existing...""" + logger.info("poll_loop started") + while not shutdown.is_set(): + now = datetime.now(KST) + if _is_market_day(now) and _is_polling_window(now): + try: + await _run_polling_cycle(client, state, kis_client=kis_client) + except Exception: + logger.exception("poll cycle failed") + # Minute momentum 갱신 (매 cycle) + try: + update_minute_momentum_for_all(state) + except Exception: + logger.exception("minute momentum update failed") + # Post-close trigger (16:00 KST) + if _is_post_close_trigger(now) and chronos is not None and kis_client is not None: + try: + await _run_post_close_cycle(kis_client, chronos, state) + except Exception: + logger.exception("post-close cycle failed") + interval = _next_interval(now) + try: + await asyncio.wait_for(shutdown.wait(), timeout=interval) + break + except asyncio.TimeoutError: + continue + logger.info("poll_loop ended") +``` + +Add `_is_post_close_trigger` to the scheduler import block at the top of pull_worker.py: +```python +from signal_v2.scheduler import ( + KST, _is_market_day, _is_polling_window, _next_interval, _is_post_close_trigger, +) +``` + +- [ ] **Step 5: Run tests to verify PASS** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +python -m pytest signal_v2/tests/test_pull_worker.py -v 2>&1 | tail -10 +``` +Expected: 3 passed (2 existing + 1 new). + +Full suite: +```bash +python -m pytest signal_v2/tests -q 2>&1 | tail -3 +``` +Expected: 45 passed. + +- [ ] **Step 6: Commit** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +git add signal_v2/scheduler.py signal_v2/pull_worker.py signal_v2/tests/test_pull_worker.py +git commit -m "$(cat <<'EOF' +feat(signal_v2-phase3b): post-close cycle + minute momentum update + +scheduler._is_post_close_trigger: 16:00 KST ±1min detection (market day). +pull_worker: +- _run_post_close_cycle: daily fetch (60일) + chronos batch predict → + state.chronos_predictions + state.daily_ohlcv. +- update_minute_momentum_for_all: 매 cycle 마다 state.minute_momentum 갱신. +- poll_loop signature 확장 (chronos optional). + +45 tests pass (44 → 45). + +Co-Authored-By: Claude Opus 4.7 (1M context) +EOF +)" +``` + +--- + +### Task 6: main.py lifespan ChronosPredictor + +**Files:** +- Modify: `web-ai/signal_v2/main.py` + +- [ ] **Step 1: Update main.py** + +Read current `web-ai/signal_v2/main.py`. Update lifespan to load ChronosPredictor and pass to poll_loop. + +Add import (with the existing imports): +```python +from signal_v2.chronos_predictor import ChronosPredictor +``` + +Extend `AppContext`: +```python +class AppContext: + client: StockClient | None = None + dedup: SignalDedup | None = None + shutdown: asyncio.Event | None = None + poll_task: asyncio.Task | None = None + kis_client: KISClient | None = None + kis_ws: KISWebSocket | None = None + chronos: ChronosPredictor | None = None +``` + +Inside `lifespan`, after `_ctx.kis_ws` setup, add chronos initialization (only if kis_app_key set): + +```python + if settings.kis_app_key: + # ... existing KISClient + KISWebSocket setup ... + + # Load Chronos (heavy: ~1GB model download first time) + try: + _ctx.chronos = ChronosPredictor(model_name=settings.chronos_model) + except Exception: + logger.exception("ChronosPredictor load failed — continuing without chronos predictions") +``` + +Update poll_task creation to pass chronos: +```python + _ctx.poll_task = asyncio.create_task( + poll_loop( + _ctx.client, state_mod.state, _ctx.shutdown, + kis_client=_ctx.kis_client, + chronos=_ctx.chronos, + ) + ) +``` + +No new tests for this task (lifespan is tested implicitly by existing `test_main.py`). + +- [ ] **Step 2: Run full test suite** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +python -m pytest signal_v2/tests -q 2>&1 | tail -3 +``` +Expected: 45 passed. + +- [ ] **Step 3: Commit** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +git add signal_v2/main.py +git commit -m "$(cat <<'EOF' +feat(signal_v2-phase3b): main.py lifespan loads ChronosPredictor + +AppContext.chronos field. lifespan: if KIS_APP_KEY set, load +ChronosPredictor(model_name=settings.chronos_model). Exceptions +during load logged + signal_v2 continues without chronos (other +endpoints unaffected). poll_loop receives chronos param. + +45 tests pass. + +Co-Authored-By: Claude Opus 4.7 (1M context) +EOF +)" +``` + +--- + +### Task 7: 사용자 수동 — pip install + .env + manual smoke + push + +**This task requires user action.** + +- [ ] **Step 1: pip install (필요시)** + +만약 Task 1 의 pip install 이 일부 실패 (chronos-forecasting / torch CUDA 등), 사용자가 수동: + +```powershell +cd C:\Users\jaeoh\Desktop\workspace\web-ai +# V1 venv 활성화 (이미 있으면) +# .\signal_v1\.venv\Scripts\Activate.ps1 + +# transformers + chronos-forecasting 설치 +pip install transformers>=4.40 chronos-forecasting>=1.4 + +# torch (CUDA 12.x) — V1 의 PyTorch 가 이미 설치되어 있다면 skip +# pip install torch --index-url https://download.pytorch.org/whl/cu124 +``` + +- [ ] **Step 2: .env (optional)** + +`CHRONOS_MODEL` 기본값 `amazon/chronos-2` 유지하면 .env 변경 불필요. 다른 모델 시도 시: +``` +CHRONOS_MODEL=amazon/chronos-bolt-base +``` + +- [ ] **Step 3: signal_v2 시작** + +```powershell +cd C:\Users\jaeoh\Desktop\workspace\web-ai\signal_v2 +.\start.bat +``` + +⚠️ 첫 시작 시 Chronos 모델 ~1GB 다운로드 (~수십 초). 콘솔에: +- `Loading Chronos pipeline: amazon/chronos-2 on cuda` (또는 cpu) +- `Chronos pipeline loaded.` + +만약 다운로드 실패 또는 OOM → `ChronosPredictor load failed` 로그. signal_v2 는 chronos 없이 계속 가동 (다른 기능 정상). + +- [ ] **Step 4: /health smoke** + +```powershell +curl http://localhost:8001/health +``` + +- [ ] **Step 5: post-close cycle 검증 (다음 16:00 KST)** + +평일 16:00 KST 시점 (또는 manual trigger): +- state.chronos_predictions 갱신 확인 +- 다시 `/health` 호출 → `last_poll` 의 `chronos/` timestamp 표시 + +장외 시간 검증 (수동): +```powershell +cd C:\Users\jaeoh\Desktop\workspace\web-ai +python -c " +import asyncio +from signal_v2.config import get_settings +from signal_v2.kis_client import KISClient +from signal_v2.chronos_predictor import ChronosPredictor +from signal_v2.state import PollState +from signal_v2.pull_worker import _run_post_close_cycle + +async def main(): + s = get_settings() + kc = KISClient(app_key=s.kis_app_key, app_secret=s.kis_app_secret, + account=s.kis_account, is_virtual=s.kis_is_virtual, + v1_token_path=s.v1_token_path) + chr_p = ChronosPredictor(model_name=s.chronos_model) + state = PollState() + state.portfolio = {'holdings': [{'ticker': '005930'}]} + state.screener_preview = {'items': []} + try: + await _run_post_close_cycle(kc, chr_p, state) + print(state.chronos_predictions) + finally: + await kc.close() + +asyncio.run(main()) +" +``` +Expected: `{'005930': {'median': ..., 'q10': ..., 'q90': ..., 'conf': ..., 'as_of': ...}}` + +- [ ] **Step 6: V1 무영향** + +V1 봇 정상 가동 + Telegram /status 응답 + GPU OOM 없음. + +- [ ] **Step 7: push** + +```powershell +cd C:\Users\jaeoh\Desktop\workspace\web-ai +git push +``` + +- [ ] **Step 8: 결과 보고** + +- Step 3 (signal_v2 시작 + Chronos load): PASS / FAIL — 에러 메시지 +- Step 4 (/health): PASS / FAIL +- Step 5 (post-close 검증): PASS / FAIL — state.chronos_predictions 결과 공유 +- Step 6 (V1 무영향): PASS / FAIL +- Step 7 (push): PASS / FAIL + +전체 PASS 시 **Phase 3b 완료** → Phase 4 (Signal Generator) brainstorming. + +--- + +## Self-Review + +**1. Spec coverage:** + +| Spec § | 요구사항 | Plan task | +|--------|----------|----------| +| §2 포함 ① kis_client.get_daily_ohlcv | Task 2 ✅ | +| §2 포함 ② chronos_predictor | Task 4 ✅ | +| §2 포함 ③ momentum_classifier | Task 3 ✅ | +| §2 포함 ④ pull_worker post-close + momentum | Task 5 ✅ | +| §2 포함 ⑤ scheduler `_is_post_close_trigger` | Task 5 ✅ | +| §2 포함 ⑥ state.py 3 필드 | Task 1 ✅ | +| §2 포함 ⑦ main.py lifespan chronos | Task 6 ✅ | +| §2 포함 ⑧ config CHRONOS_MODEL | Task 1 ✅ | +| §2 포함 ⑨ requirements 3 deps | Task 1 ✅ | +| §8 12 신규 테스트 | Task 2 (1) + Task 3 (6) + Task 4 (4) + Task 5 (1) = 12 ✅ | +| §11 DoD 13 항목 | Task 1-7 합산 ✅ | + +No gaps. + +**2. Placeholder scan**: No "TBD" / "implement later". Manual smoke (Task 7) has user-action steps clearly labeled, not placeholders. + +**3. Type consistency:** +- `ChronosPredictor(model_name, device=None)` consistent Task 4 + Task 6 ✅ +- `ChronosPrediction(median, q10, q90, conf, as_of)` consistent across tests + impl + state ✅ +- `classify_minute_momentum(minute_bars: deque) -> str` consistent Task 3 + Task 5 ✅ +- `aggregate_1min_to_5min(minute_bars: list[dict]) -> list[dict]` consistent ✅ +- `_run_post_close_cycle(kis_client, chronos, state)` consistent Task 5 + Task 6 ✅ +- `_is_post_close_trigger(now: datetime) -> bool` consistent Task 5 ✅ +- State fields (daily_ohlcv / chronos_predictions / minute_momentum) consistent Task 1 + Task 5 ✅ +- env names (CHRONOS_MODEL) consistent Task 1 + Task 6 ✅ + +Plan passes self-review.