Files
web-page/docs/superpowers/plans/2026-05-16-signal-v2-phase3b-chronos-momentum.md
gahusb 1f55d24ce6 docs(signal-v2): Phase 3b implementation plan — 7 tasks TDD
Task 1: foundation (config + state + requirements)
Task 2: kis_client.get_daily_ohlcv + 1 test
Task 3: momentum_classifier (pure functions) + 6 tests
Task 4: chronos_predictor + 4 tests (mock pipeline)
Task 5: pull_worker post-close cycle + scheduler trigger + 1 test
Task 6: main.py lifespan ChronosPredictor
Task 7: user manual (pip install + .env + smoke + push)

12 new tests, total 45 signal_v2 tests. ~1 week.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 17:42:17 +09:00

40 KiB

Signal V2 Phase 3b — Chronos-2 + Minute Momentum Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: signal_v2 에 Chronos-2 zero-shot 추론 (종가 후 1회 batch) + 1분봉 → 5분봉 aggregate 후 5-level 모멘텀 분류 추가. Phase 4 신호 룰의 핵심 입력 (chronos_predictions + minute_momentum) 채워 넣기.

Architecture: HuggingFace chronos-forecasting 라이브러리 + amazon/chronos-2 모델 (env-configurable). 종가 후 16:00 KST 트리거 시 KIS REST 60일 일봉 fetch → Chronos batch predict → 메모리 state. 분봉 모멘텀은 순수 함수 (1분봉 deque → 5분봉 aggregate → 5-level 분류) 매 분봉 cycle 마다 갱신.

Tech Stack: transformers / chronos-forecasting / torch (CUDA) / numpy / pytest-asyncio + respx / unittest.mock

Spec: web-ui/docs/superpowers/specs/2026-05-16-signal-v2-phase3b-chronos-momentum.md

참고: V1 venv (web-ai/signal_v1/.venv 또는 system Python) 에 PyTorch CUDA 이미 설치되어 있을 가능성. signal_v2 도 같은 venv 사용 권장 (재설치 회피).


파일 구조

파일 책임
signal_v2/config.py (수정) chronos_model env field
signal_v2/state.py (수정) daily_ohlcv, chronos_predictions, minute_momentum 추가
signal_v2/requirements.txt (수정) transformers, chronos-forecasting, torch
signal_v2/kis_client.py (수정) get_daily_ohlcv 메서드
signal_v2/momentum_classifier.py (신규) aggregate_1min_to_5min + classify_minute_momentum
signal_v2/chronos_predictor.py (신규) ChronosPredictor 클래스 + ChronosPrediction dataclass
signal_v2/scheduler.py (수정) _is_post_close_trigger 헬퍼
signal_v2/pull_worker.py (수정) _run_post_close_cycle + update_minute_momentum_for_all
signal_v2/main.py (수정) lifespan ChronosPredictor 로드 + poll_loop 에 chronos 전달
signal_v2/tests/test_kis_client.py (수정) get_daily_ohlcv 1 케이스
signal_v2/tests/test_momentum_classifier.py (신규) 6 케이스
signal_v2/tests/test_chronos_predictor.py (신규) 4 케이스 (모델 mock)
signal_v2/tests/test_pull_worker.py (수정) post-close cycle 1 케이스
web-ai/.env (수정, 사용자 Task 7) CHRONOS_MODEL 추가 (optional)

13 파일 변경, 12 신규 테스트 (33 → 45).


Task 순서

Task 1: foundation (config + state + requirements) + pip install
Task 2: kis_client.get_daily_ohlcv + 1 test (TDD)
Task 3: momentum_classifier + 6 tests (TDD, 순수 함수)
Task 4: chronos_predictor + 4 tests (TDD, mock)
Task 5: pull_worker post-close cycle + scheduler trigger + 1 test
Task 6: main.py lifespan ChronosPredictor 로드
Task 7: 사용자 수동 — pip install (필요시) + .env + manual smoke + push

Task 1: foundation (config + state + requirements)

Files:

  • Modify: web-ai/signal_v2/config.py

  • Modify: web-ai/signal_v2/state.py

  • Modify: web-ai/requirements.txt

  • Step 1: Update config.py

Read current web-ai/signal_v2/config.py. Add chronos_model field to Settings dataclass (between v1_token_path and the properties):

    chronos_model: str = field(default_factory=lambda: os.getenv("CHRONOS_MODEL", "amazon/chronos-2"))
  • Step 2: Update state.py

Read current web-ai/signal_v2/state.py. Replace with:

"""PollState — process-wide singleton."""
from collections import deque
from dataclasses import dataclass, field


@dataclass
class PollState:
    portfolio: dict | None = None
    news_sentiment: dict | None = None
    screener_preview: dict | None = None
    minute_bars: dict[str, deque] = field(default_factory=dict)
    asking_price: dict[str, dict] = field(default_factory=dict)
    # Phase 3b additions
    daily_ohlcv: dict[str, list[dict]] = field(default_factory=dict)
    chronos_predictions: dict[str, dict] = field(default_factory=dict)
    minute_momentum: dict[str, str] = field(default_factory=dict)
    last_updated: dict[str, str] = field(default_factory=dict)
    fetch_errors: dict[str, int] = field(default_factory=dict)


state = PollState()
  • Step 3: Update requirements.txt

Read current web-ai/requirements.txt. Append (if not present):

# Phase 3b dependencies (Chronos-2 + ML)
transformers>=4.40
chronos-forecasting>=1.4
# torch: typically already installed via V1 venv; if not, install with CUDA support manually

Do NOT add torch directly — V1 likely has it via CUDA-specific install. Document only.

  • Step 4: pip install attempt
cd /c/Users/jaeoh/Desktop/workspace/web-ai
pip install -r requirements.txt 2>&1 | tail -10

Expected: transformers + chronos-forecasting install success. If chronos-forecasting fails due to network or dependency conflict, report DONE_WITH_CONCERNS — user will install manually in Task 7.

  • Step 5: Smoke import test
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -c "from signal_v2.config import get_settings; from signal_v2.state import state; s = get_settings(); print(f'chronos_model={s.chronos_model}'); print(state)"

Expected: chronos_model=amazon/chronos-2 + state print (with new empty dicts).

  • Step 6: Run existing tests — no regression
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests -q 2>&1 | tail -3

Expected: 33 passed.

  • Step 7: Commit
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/config.py signal_v2/state.py requirements.txt
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3b): foundation — config + state + requirements

- config.py: CHRONOS_MODEL env (default amazon/chronos-2)
- state.py: PollState extended with daily_ohlcv + chronos_predictions
  + minute_momentum
- requirements.txt: transformers + chronos-forecasting (torch via V1 venv)

33 existing tests still pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"

Task 2: kis_client.get_daily_ohlcv + 1 test

Files:

  • Modify: web-ai/signal_v2/kis_client.py

  • Modify: web-ai/signal_v2/tests/test_kis_client.py

  • Step 1: Write failing test

Append to web-ai/signal_v2/tests/test_kis_client.py:

@respx.mock
async def test_get_daily_ohlcv_returns_60_bars(kis_client_factory):
    """KIS daily endpoint returns 60 ascending bars after parsing."""
    sample_output2 = [
        {
            "stck_bsop_date": f"2026{m:02d}{d:02d}",
            "stck_oprc": "78000", "stck_hgpr": "78500",
            "stck_lwpr": "77800", "stck_clpr": "78300",
            "acml_vol": "12345",
        }
        # 60 daily bars (descending order from KIS)
        for m, d in [(5, 18 - i) if (18 - i) >= 1 else (4, 30 + (18 - i)) for i in range(60)]
    ]
    respx.get(
        "https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice"
    ).mock(return_value=httpx.Response(200, json={"output2": sample_output2}))

    client = kis_client_factory()
    try:
        bars = await client.get_daily_ohlcv("005930", days=60)
        # KIS returns descending; client reverses to ascending
        assert len(bars) == 60
        assert bars[0]["datetime"] < bars[-1]["datetime"]
        assert bars[-1]["close"] == 78300
        assert "datetime" in bars[0]
        assert isinstance(bars[0]["open"], int)
    finally:
        await client.close()
  • Step 2: Run test to verify FAIL
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_kis_client.py::test_get_daily_ohlcv_returns_60_bars -v 2>&1 | tail -10

Expected: FAIL — get_daily_ohlcv not defined.

  • Step 3: Implement get_daily_ohlcv

Edit web-ai/signal_v2/kis_client.py. Add the timedelta import to existing from datetime import ... line if needed, then add at the end of KISClient class (after get_asking_price):

    async def get_daily_ohlcv(self, ticker: str, days: int = 60) -> list[dict]:
        """KRX 일봉 OHLCV (TR_ID FHKST03010100).

        Returns: [{"datetime", "open", "high", "low", "close", "volume"}, ...]
                 시간 오름차순.
        """
        from datetime import timedelta
        path = "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice"
        today = datetime.now(KST).strftime("%Y%m%d")
        start_date = (datetime.now(KST) - timedelta(days=days * 2)).strftime("%Y%m%d")
        params = {
            "FID_COND_MRKT_DIV_CODE": "J",
            "FID_INPUT_ISCD": ticker,
            "FID_INPUT_DATE_1": start_date,
            "FID_INPUT_DATE_2": today,
            "FID_PERIOD_DIV_CODE": "D",
            "FID_ORG_ADJ_PRC": "1",
        }
        raw = await self._request_with_retry(
            "GET", path, tr_id="FHKST03010100", params=params,
        )
        output2 = raw.get("output2", [])
        bars = []
        for row in output2:
            try:
                date = row["stck_bsop_date"]
                bars.append({
                    "datetime": f"{date[:4]}-{date[4:6]}-{date[6:]}",
                    "open": int(row["stck_oprc"]),
                    "high": int(row["stck_hgpr"]),
                    "low": int(row["stck_lwpr"]),
                    "close": int(row["stck_clpr"]),
                    "volume": int(row["acml_vol"]),
                })
            except (KeyError, ValueError):
                continue
        bars.reverse()
        return bars[-days:]
  • Step 4: Run test to verify PASS
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_kis_client.py -v 2>&1 | tail -10

Expected: 5 passed (4 existing + 1 new).

Full suite:

python -m pytest signal_v2/tests -q 2>&1 | tail -3

Expected: 34 passed.

  • Step 5: Commit
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/kis_client.py signal_v2/tests/test_kis_client.py
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3b): kis_client.get_daily_ohlcv (60 daily bars)

TR_ID FHKST03010100 (수정주가 일봉). KIS returns descending; client
reverses to ascending and trims to last N days.

1 new test, 34 total.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"

Task 3: momentum_classifier + 6 tests

Files:

  • Create: web-ai/signal_v2/momentum_classifier.py

  • Create: web-ai/signal_v2/tests/test_momentum_classifier.py

  • Step 1: Write 6 failing tests

Create web-ai/signal_v2/tests/test_momentum_classifier.py:

"""Tests for minute momentum classifier."""
from collections import deque

from signal_v2.momentum_classifier import (
    aggregate_1min_to_5min, classify_minute_momentum,
    STRONG_UP, WEAK_UP, NEUTRAL, WEAK_DOWN, STRONG_DOWN,
)


def _bar(open_, high, low, close, volume):
    return {
        "datetime": "2026-05-18T09:00:00+09:00",
        "open": open_, "high": high, "low": low, "close": close, "volume": volume,
    }


def _make_minute_bars(n: int, *, up: int, vol_mult: float = 1.0):
    """n개 1분봉. up=양봉 개수, vol_mult=평균 거래량 multiplier."""
    base_vol = 1000
    bars = []
    for i in range(n):
        is_up = i < up
        o, c = (100, 110) if is_up else (110, 100)
        bars.append(_bar(o, max(o, c) + 5, min(o, c) - 5, c, int(base_vol * vol_mult)))
    return bars


def test_strong_up_5_consecutive_green_with_high_volume():
    # 25 bars (5 chunks of 5) → 5개 5분봉
    # 최근 25 bars: 25/25 양봉 + 거래량 1.5x
    # 거기에 35 bars 추가 (총 60) — long avg 계산용. 추가는 normal volume.
    older = _make_minute_bars(35, up=15, vol_mult=1.0)
    recent = _make_minute_bars(25, up=25, vol_mult=1.5)
    minute_bars = deque(older + recent, maxlen=60)
    assert classify_minute_momentum(minute_bars) == STRONG_UP


def test_weak_up_3of5_green_normal_volume():
    # 25 recent bars: 3-4 of 5 5분봉 이 양봉 + 거래량 1.0x
    # 각 5분봉 chunk 5개 1분봉: 양봉 chunk = 모든 1분봉 양봉
    older = _make_minute_bars(35, up=15, vol_mult=1.0)
    # 5 chunks: 3 up (양봉) + 2 down (음봉). 각 5 bars.
    chunks_up = _make_minute_bars(5, up=5, vol_mult=1.0)
    chunks_down = _make_minute_bars(5, up=0, vol_mult=1.0)
    recent = chunks_up + chunks_up + chunks_up + chunks_down + chunks_down
    minute_bars = deque(older + recent, maxlen=60)
    assert classify_minute_momentum(minute_bars) == WEAK_UP


def test_neutral_mixed():
    # 25 recent: 2-3 양봉 + 거래량 normal
    older = _make_minute_bars(35, up=15, vol_mult=1.0)
    chunks_up = _make_minute_bars(5, up=5, vol_mult=1.0)
    chunks_down = _make_minute_bars(5, up=0, vol_mult=1.0)
    recent = chunks_up + chunks_up + chunks_down + chunks_down + chunks_down
    # 5 5분봉: 2 up + 3 down → up_count=2, vol=1.0
    minute_bars = deque(older + recent, maxlen=60)
    result = classify_minute_momentum(minute_bars)
    # up_count=2, vol_mult=1.0 → 다른 카테고리 매치 안 됨 → NEUTRAL
    assert result == NEUTRAL


def test_weak_down_low_green_low_volume():
    older = _make_minute_bars(35, up=15, vol_mult=1.0)
    chunks_up = _make_minute_bars(5, up=5, vol_mult=0.5)
    chunks_down = _make_minute_bars(5, up=0, vol_mult=0.5)
    recent = chunks_up + chunks_down + chunks_down + chunks_down + chunks_down
    # up_count=1, vol_mult=0.5 → WEAK_DOWN
    minute_bars = deque(older + recent, maxlen=60)
    assert classify_minute_momentum(minute_bars) == WEAK_DOWN


def test_strong_down_5_consecutive_red_high_volume():
    older = _make_minute_bars(35, up=15, vol_mult=1.0)
    recent = _make_minute_bars(25, up=0, vol_mult=1.5)
    minute_bars = deque(older + recent, maxlen=60)
    assert classify_minute_momentum(minute_bars) == STRONG_DOWN


def test_aggregate_1min_to_5min_correctness():
    # 5 1분봉을 1개 5분봉으로 — open=첫, close=마지막, high=max, low=min, volume=sum
    bars = [
        _bar(100, 105, 99, 102, 1000),
        _bar(102, 108, 101, 107, 1500),
        _bar(107, 110, 105, 106, 800),
        _bar(106, 109, 104, 108, 1200),
        _bar(108, 112, 107, 111, 900),
    ]
    result = aggregate_1min_to_5min(bars)
    assert len(result) == 1
    assert result[0]["open"] == 100
    assert result[0]["close"] == 111
    assert result[0]["high"] == 112
    assert result[0]["low"] == 99
    assert result[0]["volume"] == 5400
  • Step 2: Run tests to verify FAIL
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_momentum_classifier.py -v 2>&1 | tail -10

Expected: ImportError.

  • Step 3: Implement momentum_classifier.py

Create web-ai/signal_v2/momentum_classifier.py:

"""분봉 OHLCV → 5-level 모멘텀 분류."""
from __future__ import annotations
from collections import deque

# 분류 카테고리
STRONG_UP = "strong_up"
WEAK_UP = "weak_up"
NEUTRAL = "neutral"
WEAK_DOWN = "weak_down"
STRONG_DOWN = "strong_down"

_BARS_PER_5MIN = 5
_LOOKBACK_5MIN_BARS = 5
_VOLUME_AVG_WINDOW = 12  # 60분 = 5분봉 12개


def aggregate_1min_to_5min(minute_bars: list[dict]) -> list[dict]:
    """1분봉 N개 → 5분봉 floor(N/5) 개. 시간 오름차순.

    각 5분봉: open=첫 1분봉 open, high=max, low=min, close=마지막 close, volume=sum.
    """
    bars_5min = []
    chunks = len(minute_bars) // _BARS_PER_5MIN
    for i in range(chunks):
        chunk = minute_bars[i * _BARS_PER_5MIN : (i + 1) * _BARS_PER_5MIN]
        bars_5min.append({
            "datetime": chunk[0]["datetime"],
            "open": chunk[0]["open"],
            "high": max(b["high"] for b in chunk),
            "low": min(b["low"] for b in chunk),
            "close": chunk[-1]["close"],
            "volume": sum(b["volume"] for b in chunk),
        })
    return bars_5min


def classify_minute_momentum(minute_bars: deque) -> str:
    """1분봉 deque → 5-level 모멘텀 분류.

    Returns: STRONG_UP / WEAK_UP / NEUTRAL / WEAK_DOWN / STRONG_DOWN
    """
    minute_list = list(minute_bars)
    if len(minute_list) < _BARS_PER_5MIN * _LOOKBACK_5MIN_BARS:
        return NEUTRAL  # 데이터 부족

    bars_5min = aggregate_1min_to_5min(minute_list)
    if len(bars_5min) < _LOOKBACK_5MIN_BARS:
        return NEUTRAL

    recent = bars_5min[-_LOOKBACK_5MIN_BARS:]
    up_count = sum(1 for b in recent if b["close"] > b["open"])

    # 거래량 multiplier: recent 5 avg vs 60분 avg
    recent_vol_avg = sum(b["volume"] for b in recent) / len(recent)
    long_window = bars_5min[-_VOLUME_AVG_WINDOW:]
    long_vol_avg = sum(b["volume"] for b in long_window) / len(long_window)
    vol_mult = recent_vol_avg / long_vol_avg if long_vol_avg > 0 else 1.0

    # 5-level 분류
    if up_count == 5 and vol_mult >= 1.5:
        return STRONG_UP
    elif up_count >= 3 and vol_mult >= 1.0:
        return WEAK_UP
    elif up_count == 0 and vol_mult >= 1.5:
        return STRONG_DOWN
    elif up_count <= 2 and vol_mult < 1.0:
        return WEAK_DOWN
    else:
        return NEUTRAL
  • Step 4: Run tests to verify PASS
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_momentum_classifier.py -v 2>&1 | tail -15

Expected: 6 passed.

Full suite:

python -m pytest signal_v2/tests -q 2>&1 | tail -3

Expected: 40 passed.

If any test fails (e.g. test_neutral_mixed or test_weak_up_3of5), check whether the volume multiplier calculation matches the test fixtures. The recent 5 chunks' volume avg vs the trailing 12 chunks' avg may differ depending on whether vol_mult=1.0 chunks pad both ranges. Adjust either tests or impl as needed for correctness.

  • Step 5: Commit
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/momentum_classifier.py signal_v2/tests/test_momentum_classifier.py
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3b): momentum_classifier + 6 unit tests

aggregate_1min_to_5min: 5분봉 OHLCV synthesis (open=첫, close=마지막,
high=max, low=min, volume=sum). classify_minute_momentum: 직전 5개
5분봉 양봉 개수 + 거래량 60분 multiplier → 5-level
(strong_up/weak_up/neutral/weak_down/strong_down).

40 tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"

Task 4: chronos_predictor + 4 tests (mock)

Files:

  • Create: web-ai/signal_v2/chronos_predictor.py

  • Create: web-ai/signal_v2/tests/test_chronos_predictor.py

  • Step 1: Write 4 failing tests

Create web-ai/signal_v2/tests/test_chronos_predictor.py:

"""Tests for ChronosPredictor (model mock)."""
from unittest.mock import MagicMock, patch

import numpy as np
import pytest


@pytest.fixture
def mock_pipeline():
    """Mock ChronosPipeline.from_pretrained returning a mock pipeline object."""
    with patch("chronos.ChronosPipeline") as cls:
        instance = MagicMock()
        cls.from_pretrained.return_value = instance
        yield instance


@pytest.fixture
def mock_torch():
    with patch("torch.cuda.is_available", return_value=False):
        yield


def _daily_ohlcv(close_seq):
    return [{"datetime": f"2026-05-{i+1:02d}", "open": c, "high": c, "low": c,
             "close": c, "volume": 1000} for i, c in enumerate(close_seq)]


def test_predict_batch_returns_prediction_dict(mock_pipeline, mock_torch):
    """mock pipeline → dict[ticker, ChronosPrediction]."""
    import torch
    # mock samples shape [num_tickers=1, num_samples=100, prediction_length=1]
    # last_close = 100; samples around 102 → return ~+2%
    samples = np.full((100,), 102.0)
    mock_pipeline.predict.return_value = torch.tensor(samples.reshape(1, 100, 1))

    from signal_v2.chronos_predictor import ChronosPredictor, ChronosPrediction
    predictor = ChronosPredictor(model_name="mock-model")
    daily = {"005930": _daily_ohlcv([100] * 60)}
    result = predictor.predict_batch(daily)
    assert "005930" in result
    pred = result["005930"]
    assert isinstance(pred, ChronosPrediction)
    assert abs(pred.median - 0.02) < 0.001  # +2% return


def test_conf_high_when_distribution_narrow(mock_pipeline, mock_torch):
    """좁은 distribution → conf 높음."""
    import torch
    # Tight distribution: all samples ≈ 102
    samples = np.random.normal(102.0, 0.1, 100)
    mock_pipeline.predict.return_value = torch.tensor(samples.reshape(1, 100, 1))

    from signal_v2.chronos_predictor import ChronosPredictor
    predictor = ChronosPredictor(model_name="mock-model")
    daily = {"005930": _daily_ohlcv([100] * 60)}
    result = predictor.predict_batch(daily)
    assert result["005930"].conf > 0.8


def test_conf_low_when_distribution_wide(mock_pipeline, mock_torch):
    """넓은 distribution → conf 낮음."""
    import torch
    # Wide distribution: samples spread far
    samples = np.random.normal(100.0, 30.0, 100)
    mock_pipeline.predict.return_value = torch.tensor(samples.reshape(1, 100, 1))

    from signal_v2.chronos_predictor import ChronosPredictor
    predictor = ChronosPredictor(model_name="mock-model")
    daily = {"005930": _daily_ohlcv([100] * 60)}
    result = predictor.predict_batch(daily)
    assert result["005930"].conf < 0.3


def test_return_computed_from_price_relative_to_last_close(mock_pipeline, mock_torch):
    """price 예측 → last_close 대비 return 변환."""
    import torch
    samples = np.full((100,), 110.0)  # predict 110
    mock_pipeline.predict.return_value = torch.tensor(samples.reshape(1, 100, 1))

    from signal_v2.chronos_predictor import ChronosPredictor
    predictor = ChronosPredictor(model_name="mock-model")
    # last_close = 100 → return = +10%
    daily = {"005930": _daily_ohlcv(list(range(41, 101)))}  # last value = 100
    result = predictor.predict_batch(daily)
    assert abs(result["005930"].median - 0.10) < 0.001
  • Step 2: Run tests to verify FAIL
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_chronos_predictor.py -v 2>&1 | tail -10

Expected: ImportError (signal_v2.chronos_predictor not yet exists).

  • Step 3: Implement chronos_predictor.py

Create web-ai/signal_v2/chronos_predictor.py:

"""Chronos-2 zero-shot forecaster wrapper."""
from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import datetime
from zoneinfo import ZoneInfo

import numpy as np

logger = logging.getLogger(__name__)
KST = ZoneInfo("Asia/Seoul")


@dataclass
class ChronosPrediction:
    median: float
    q10: float
    q90: float
    conf: float
    as_of: str


class ChronosPredictor:
    """HuggingFace Chronos-2 zero-shot forecaster."""

    def __init__(self, model_name: str = "amazon/chronos-2", device: str | None = None):
        from chronos import ChronosPipeline
        import torch

        self._device = device or ("cuda" if torch.cuda.is_available() else "cpu")
        logger.info("Loading Chronos pipeline: %s on %s", model_name, self._device)
        self._pipeline = ChronosPipeline.from_pretrained(
            model_name,
            device_map=self._device,
            torch_dtype=torch.float16 if self._device == "cuda" else torch.float32,
        )
        logger.info("Chronos pipeline loaded.")

    def predict_batch(
        self,
        daily_ohlcv_dict: dict[str, list[dict]],
        prediction_length: int = 1,
        num_samples: int = 100,
    ) -> dict[str, ChronosPrediction]:
        """종목별 1-day return 분포 예측."""
        import torch

        tickers = list(daily_ohlcv_dict.keys())
        if not tickers:
            return {}

        contexts = [
            torch.tensor([bar["close"] for bar in daily_ohlcv_dict[t]], dtype=torch.float32)
            for t in tickers
        ]
        forecasts = self._pipeline.predict(
            context=contexts,
            prediction_length=prediction_length,
            num_samples=num_samples,
        )
        forecasts_np = forecasts.numpy() if hasattr(forecasts, "numpy") else np.asarray(forecasts)

        now_iso = datetime.now(KST).isoformat()
        results: dict[str, ChronosPrediction] = {}
        for i, ticker in enumerate(tickers):
            samples = forecasts_np[i, :, 0]
            last_close = daily_ohlcv_dict[ticker][-1]["close"]
            returns = (samples - last_close) / last_close
            median = float(np.quantile(returns, 0.5))
            q10 = float(np.quantile(returns, 0.1))
            q90 = float(np.quantile(returns, 0.9))
            spread = (q90 - q10) / max(abs(median), 0.001)
            conf = float(max(0.0, min(1.0, 1.0 - spread / 2.0)))
            results[ticker] = ChronosPrediction(
                median=median, q10=q10, q90=q90, conf=conf, as_of=now_iso,
            )
        return results
  • Step 4: Run tests to verify PASS
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_chronos_predictor.py -v 2>&1 | tail -15

Expected: 4 passed.

If chronos-forecasting import fails (Task 1 의 pip install 실패), the tests will still fail at import. In that case the implementer should mock chronos module at sys.modules level OR skip and report DONE_WITH_CONCERNS — Task 7 user manual will install.

Full suite:

python -m pytest signal_v2/tests -q 2>&1 | tail -3

Expected: 44 passed.

  • Step 5: Commit
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/chronos_predictor.py signal_v2/tests/test_chronos_predictor.py
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3b): chronos_predictor + 4 mock tests

ChronosPredictor wraps HuggingFace ChronosPipeline. Batch predict
returns ChronosPrediction(median, q10, q90, conf, as_of) per ticker.
Confidence = 1 - clamp(spread/2, 0, 1) where spread = (q90-q10) / |median|.
Lazy import of chronos lib (heavy). GPU auto-detect with FP16.

44 tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"

Task 5: pull_worker post-close cycle + scheduler trigger + 1 test

Files:

  • Modify: web-ai/signal_v2/scheduler.py

  • Modify: web-ai/signal_v2/pull_worker.py

  • Modify: web-ai/signal_v2/tests/test_pull_worker.py

  • Step 1: Write failing test

Append to web-ai/signal_v2/tests/test_pull_worker.py:

async def test_post_close_cycle_updates_chronos_predictions():
    """mock kis + mock chronos → state.chronos_predictions + state.daily_ohlcv 갱신."""
    from unittest.mock import AsyncMock, MagicMock
    from signal_v2.pull_worker import _run_post_close_cycle
    from signal_v2.chronos_predictor import ChronosPrediction
    from signal_v2.state import PollState

    state = PollState()
    state.portfolio = {"holdings": [{"ticker": "005930"}]}
    state.screener_preview = {"items": [{"ticker": "000660"}]}

    kis_mock = MagicMock()
    daily_005930 = [{"datetime": f"2026-05-{i+1:02d}", "open": 100, "high": 105,
                     "low": 95, "close": 100 + i, "volume": 1000} for i in range(60)]
    daily_000660 = [{"datetime": f"2026-05-{i+1:02d}", "open": 200, "high": 210,
                     "low": 190, "close": 200 + i, "volume": 2000} for i in range(60)]
    kis_mock.get_daily_ohlcv = AsyncMock(side_effect=[daily_005930, daily_000660])

    chronos_mock = MagicMock()
    chronos_mock.predict_batch = MagicMock(return_value={
        "005930": ChronosPrediction(0.02, -0.01, 0.04, 0.85, "2026-05-18T16:00:00+09:00"),
        "000660": ChronosPrediction(0.03, -0.02, 0.06, 0.75, "2026-05-18T16:00:00+09:00"),
    })

    await _run_post_close_cycle(kis_mock, chronos_mock, state)

    assert "005930" in state.chronos_predictions
    assert "000660" in state.chronos_predictions
    assert state.chronos_predictions["005930"]["median"] == 0.02
    assert state.chronos_predictions["005930"]["conf"] == 0.85
    assert "005930" in state.daily_ohlcv
    assert "chronos/005930" in state.last_updated
  • Step 2: Run test to verify FAIL
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_pull_worker.py::test_post_close_cycle_updates_chronos_predictions -v 2>&1 | tail -10

Expected: ImportError or AttributeError.

  • Step 3: Update scheduler.py with _is_post_close_trigger

Append to web-ai/signal_v2/scheduler.py:

def _is_post_close_trigger(now: datetime) -> bool:
    """16:00 KST ±1분 (post-close cycle 트리거). 평일/영업일만."""
    if not _is_market_day(now):
        return False
    t = now.time()
    return time(16, 0) <= t < time(16, 1)
  • Step 4: Update pull_worker.py with _run_post_close_cycle + update_minute_momentum_for_all

Read current web-ai/signal_v2/pull_worker.py. Add at the end of the file:

async def _run_post_close_cycle(kis_client, chronos, state) -> None:
    """16:00 KST 종가 후 1회: daily fetch + chronos predict."""
    tickers = list(set(_portfolio_tickers(state)) | set(_screener_tickers(state)))
    if not tickers:
        return

    daily_results = await asyncio.gather(*[
        kis_client.get_daily_ohlcv(t, days=60) for t in tickers
    ], return_exceptions=True)
    daily_dict = {}
    for ticker, result in zip(tickers, daily_results):
        if isinstance(result, list) and len(result) >= 30:
            daily_dict[ticker] = result
            state.daily_ohlcv[ticker] = result
        elif isinstance(result, Exception):
            state.fetch_errors[f"daily_ohlcv/{ticker}"] = (
                state.fetch_errors.get(f"daily_ohlcv/{ticker}", 0) + 1
            )

    if daily_dict and chronos is not None:
        try:
            predictions = chronos.predict_batch(daily_dict)
        except Exception:
            logger.exception("chronos predict_batch failed")
            return
        for ticker, pred in predictions.items():
            state.chronos_predictions[ticker] = {
                "median": pred.median,
                "q10": pred.q10,
                "q90": pred.q90,
                "conf": pred.conf,
                "as_of": pred.as_of,
            }
            state.last_updated[f"chronos/{ticker}"] = pred.as_of


def update_minute_momentum_for_all(state) -> None:
    """매 분봉 cycle 후 호출 — 모든 종목 모멘텀 갱신."""
    from signal_v2.momentum_classifier import classify_minute_momentum
    now_iso = datetime.now(KST).isoformat()
    for ticker, bars in state.minute_bars.items():
        state.minute_momentum[ticker] = classify_minute_momentum(bars)
        state.last_updated[f"momentum/{ticker}"] = now_iso

Also update poll_loop and _run_polling_cycle signatures to accept chronos optional param:

async def poll_loop(
    client, state, shutdown,
    kis_client=None, chronos=None,
) -> None:
    """...existing..."""
    logger.info("poll_loop started")
    while not shutdown.is_set():
        now = datetime.now(KST)
        if _is_market_day(now) and _is_polling_window(now):
            try:
                await _run_polling_cycle(client, state, kis_client=kis_client)
            except Exception:
                logger.exception("poll cycle failed")
            # Minute momentum 갱신 (매 cycle)
            try:
                update_minute_momentum_for_all(state)
            except Exception:
                logger.exception("minute momentum update failed")
            # Post-close trigger (16:00 KST)
            if _is_post_close_trigger(now) and chronos is not None and kis_client is not None:
                try:
                    await _run_post_close_cycle(kis_client, chronos, state)
                except Exception:
                    logger.exception("post-close cycle failed")
        interval = _next_interval(now)
        try:
            await asyncio.wait_for(shutdown.wait(), timeout=interval)
            break
        except asyncio.TimeoutError:
            continue
    logger.info("poll_loop ended")

Add _is_post_close_trigger to the scheduler import block at the top of pull_worker.py:

from signal_v2.scheduler import (
    KST, _is_market_day, _is_polling_window, _next_interval, _is_post_close_trigger,
)
  • Step 5: Run tests to verify PASS
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_pull_worker.py -v 2>&1 | tail -10

Expected: 3 passed (2 existing + 1 new).

Full suite:

python -m pytest signal_v2/tests -q 2>&1 | tail -3

Expected: 45 passed.

  • Step 6: Commit
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/scheduler.py signal_v2/pull_worker.py signal_v2/tests/test_pull_worker.py
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3b): post-close cycle + minute momentum update

scheduler._is_post_close_trigger: 16:00 KST ±1min detection (market day).
pull_worker:
- _run_post_close_cycle: daily fetch (60일) + chronos batch predict →
  state.chronos_predictions + state.daily_ohlcv.
- update_minute_momentum_for_all: 매 cycle 마다 state.minute_momentum 갱신.
- poll_loop signature 확장 (chronos optional).

45 tests pass (44 → 45).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"

Task 6: main.py lifespan ChronosPredictor

Files:

  • Modify: web-ai/signal_v2/main.py

  • Step 1: Update main.py

Read current web-ai/signal_v2/main.py. Update lifespan to load ChronosPredictor and pass to poll_loop.

Add import (with the existing imports):

from signal_v2.chronos_predictor import ChronosPredictor

Extend AppContext:

class AppContext:
    client: StockClient | None = None
    dedup: SignalDedup | None = None
    shutdown: asyncio.Event | None = None
    poll_task: asyncio.Task | None = None
    kis_client: KISClient | None = None
    kis_ws: KISWebSocket | None = None
    chronos: ChronosPredictor | None = None

Inside lifespan, after _ctx.kis_ws setup, add chronos initialization (only if kis_app_key set):

    if settings.kis_app_key:
        # ... existing KISClient + KISWebSocket setup ...

        # Load Chronos (heavy: ~1GB model download first time)
        try:
            _ctx.chronos = ChronosPredictor(model_name=settings.chronos_model)
        except Exception:
            logger.exception("ChronosPredictor load failed — continuing without chronos predictions")

Update poll_task creation to pass chronos:

    _ctx.poll_task = asyncio.create_task(
        poll_loop(
            _ctx.client, state_mod.state, _ctx.shutdown,
            kis_client=_ctx.kis_client,
            chronos=_ctx.chronos,
        )
    )

No new tests for this task (lifespan is tested implicitly by existing test_main.py).

  • Step 2: Run full test suite
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests -q 2>&1 | tail -3

Expected: 45 passed.

  • Step 3: Commit
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/main.py
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3b): main.py lifespan loads ChronosPredictor

AppContext.chronos field. lifespan: if KIS_APP_KEY set, load
ChronosPredictor(model_name=settings.chronos_model). Exceptions
during load logged + signal_v2 continues without chronos (other
endpoints unaffected). poll_loop receives chronos param.

45 tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"

Task 7: 사용자 수동 — pip install + .env + manual smoke + push

This task requires user action.

  • Step 1: pip install (필요시)

만약 Task 1 의 pip install 이 일부 실패 (chronos-forecasting / torch CUDA 등), 사용자가 수동:

cd C:\Users\jaeoh\Desktop\workspace\web-ai
# V1 venv 활성화 (이미 있으면)
# .\signal_v1\.venv\Scripts\Activate.ps1

# transformers + chronos-forecasting 설치
pip install transformers>=4.40 chronos-forecasting>=1.4

# torch (CUDA 12.x) — V1 의 PyTorch 가 이미 설치되어 있다면 skip
# pip install torch --index-url https://download.pytorch.org/whl/cu124
  • Step 2: .env (optional)

CHRONOS_MODEL 기본값 amazon/chronos-2 유지하면 .env 변경 불필요. 다른 모델 시도 시:

CHRONOS_MODEL=amazon/chronos-bolt-base
  • Step 3: signal_v2 시작
cd C:\Users\jaeoh\Desktop\workspace\web-ai\signal_v2
.\start.bat

⚠️ 첫 시작 시 Chronos 모델 ~1GB 다운로드 (~수십 초). 콘솔에:

  • Loading Chronos pipeline: amazon/chronos-2 on cuda (또는 cpu)
  • Chronos pipeline loaded.

만약 다운로드 실패 또는 OOM → ChronosPredictor load failed 로그. signal_v2 는 chronos 없이 계속 가동 (다른 기능 정상).

  • Step 4: /health smoke
curl http://localhost:8001/health
  • Step 5: post-close cycle 검증 (다음 16:00 KST)

평일 16:00 KST 시점 (또는 manual trigger):

  • state.chronos_predictions 갱신 확인
  • 다시 /health 호출 → last_pollchronos/<ticker> timestamp 표시

장외 시간 검증 (수동):

cd C:\Users\jaeoh\Desktop\workspace\web-ai
python -c "
import asyncio
from signal_v2.config import get_settings
from signal_v2.kis_client import KISClient
from signal_v2.chronos_predictor import ChronosPredictor
from signal_v2.state import PollState
from signal_v2.pull_worker import _run_post_close_cycle

async def main():
    s = get_settings()
    kc = KISClient(app_key=s.kis_app_key, app_secret=s.kis_app_secret,
                   account=s.kis_account, is_virtual=s.kis_is_virtual,
                   v1_token_path=s.v1_token_path)
    chr_p = ChronosPredictor(model_name=s.chronos_model)
    state = PollState()
    state.portfolio = {'holdings': [{'ticker': '005930'}]}
    state.screener_preview = {'items': []}
    try:
        await _run_post_close_cycle(kc, chr_p, state)
        print(state.chronos_predictions)
    finally:
        await kc.close()

asyncio.run(main())
"

Expected: {'005930': {'median': ..., 'q10': ..., 'q90': ..., 'conf': ..., 'as_of': ...}}

  • Step 6: V1 무영향

V1 봇 정상 가동 + Telegram /status 응답 + GPU OOM 없음.

  • Step 7: push
cd C:\Users\jaeoh\Desktop\workspace\web-ai
git push
  • Step 8: 결과 보고

  • Step 3 (signal_v2 시작 + Chronos load): PASS / FAIL — 에러 메시지

  • Step 4 (/health): PASS / FAIL

  • Step 5 (post-close 검증): PASS / FAIL — state.chronos_predictions 결과 공유

  • Step 6 (V1 무영향): PASS / FAIL

  • Step 7 (push): PASS / FAIL

전체 PASS 시 Phase 3b 완료 → Phase 4 (Signal Generator) brainstorming.


Self-Review

1. Spec coverage:

Spec § 요구사항 Plan task
§2 포함 ① kis_client.get_daily_ohlcv Task 2
§2 포함 ② chronos_predictor Task 4
§2 포함 ③ momentum_classifier Task 3
§2 포함 ④ pull_worker post-close + momentum Task 5
§2 포함 ⑤ scheduler _is_post_close_trigger Task 5
§2 포함 ⑥ state.py 3 필드 Task 1
§2 포함 ⑦ main.py lifespan chronos Task 6
§2 포함 ⑧ config CHRONOS_MODEL Task 1
§2 포함 ⑨ requirements 3 deps Task 1
§8 12 신규 테스트 Task 2 (1) + Task 3 (6) + Task 4 (4) + Task 5 (1) = 12
§11 DoD 13 항목 Task 1-7 합산

No gaps.

2. Placeholder scan: No "TBD" / "implement later". Manual smoke (Task 7) has user-action steps clearly labeled, not placeholders.

3. Type consistency:

  • ChronosPredictor(model_name, device=None) consistent Task 4 + Task 6
  • ChronosPrediction(median, q10, q90, conf, as_of) consistent across tests + impl + state
  • classify_minute_momentum(minute_bars: deque) -> str consistent Task 3 + Task 5
  • aggregate_1min_to_5min(minute_bars: list[dict]) -> list[dict] consistent
  • _run_post_close_cycle(kis_client, chronos, state) consistent Task 5 + Task 6
  • _is_post_close_trigger(now: datetime) -> bool consistent Task 5
  • State fields (daily_ohlcv / chronos_predictions / minute_momentum) consistent Task 1 + Task 5
  • env names (CHRONOS_MODEL) consistent Task 1 + Task 6

Plan passes self-review.