Files
web-page/docs/superpowers/specs/2026-05-16-signal-v2-phase3b-chronos-momentum.md
gahusb 6eb4ab1204 docs(signal-v2): Phase 3b Chronos-2 + minute momentum spec
KIS daily OHLCV fetch (kis_client.get_daily_ohlcv, FHKST03010100) +
ChronosPredictor (HuggingFace amazon/chronos-2 zero-shot, env-configurable
model, always-loaded) + minute momentum classifier (5-level rule:
strong_up/weak_up/neutral/weak_down/strong_down) + post-close cycle
trigger (16:00 KST). 12 new tests (33 → 45 total).

brainstorming 7 decisions: daily=B(KIS REST) / freq=A(post-close 1x) /
model=A(env CHRONOS_MODEL) / momentum=A(5-level rule) / state=B(median+
q10+q90+conf+as_of) / test=A(mock+pure) / scope=integrated.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 17:37:17 +09:00

17 KiB
Raw Permalink Blame History

Confidence Signal Pipeline V2 — Phase 3b: Chronos-2 + Minute Momentum Design

작성일: 2026-05-16 작성자: gahusb 상태: Approved for implementation 선행 spec:

  • Phase 0 architecture (2026-05-15-confidence-signal-pipeline-v2-architecture.md)
  • Phase 1 stock WebAI API (2026-05-15-signal-v2-phase1-webai-api.md)
  • Phase 2 web-ai pull worker (2026-05-16-signal-v2-phase2-web-ai-pull-worker.md)
  • Phase 3a KIS data collection (2026-05-16-signal-v2-phase3a-kis-data-collection.md)

브레인스토밍 결정 7개:

  • daily data 소스 = B (KIS REST kis_client.get_daily_ohlcv)
  • 추론 빈도 = A (종가 후 1회 + 메모리 보관)
  • 모델 = A (env CHRONOS_MODEL 외부화, 기본 amazon/chronos-2, 항상 로드)
  • 분봉 모멘텀 = A (5-level 룰 기반)
  • State output = B (median + q10 + q90 + conf + as_of)
  • 테스트 = A (모델 mock + 순수 함수)
  • scope = 통합 9 항목 (Phase 3a 와 같은 1주 단위)

1. 목표

Phase 3a 의 데이터 위에 추론 레이어 추가. Chronos-2 zero-shot 으로 다음날 가격 분포 예측 + 1분봉 → 5분봉 aggregate 후 5-level 모멘텀 분류. Phase 4 (signal generator) 가 두 출력 + Phase 3a 의 호가/분봉 + Phase 2 의 portfolio/news_sentiment 를 종합해 매수/매도 신호 룰 적용.

Why: Phase 0 §3 "web-ai = 시점 분석" 책임의 추론 부분. Chronos-2 의 zero-shot quantile 분포 + 분봉 모멘텀 5-level 이 매수/매도 룰의 핵심 입력.


2. 범위

포함 (9 항목)

  • kis_client.get_daily_ohlcv(ticker, days=60) — KIS REST TR_ID FHKST03010100
  • chronos_predictor.py 신규 — ChronosPredictor (HuggingFace 모델 + batch predict)
  • momentum_classifier.py 신규 — aggregate_1min_to_5min + classify_minute_momentum
  • pull_worker.py 확장 — _run_post_close_cycle + update_minute_momentum_for_all
  • scheduler.py 확장 — _is_post_close_trigger (16:00 KST)
  • state.py 확장 — daily_ohlcv + chronos_predictions + minute_momentum
  • main.py 확장 — lifespan 에 ChronosPredictor 로드
  • config.py 확장 — CHRONOS_MODEL env
  • requirements.txttransformers>=4.40, chronos-forecasting>=1.4, torch>=2.0

범위 외 (NOT)

  • Signal generator 매수/매도 룰 (Phase 4)
  • agent-office /signal 호출 (Phase 5)
  • 모델 재학습/fine-tune — zero-shot only
  • 다중 horizon 예측 — 1-day median 만, 다른 horizon Phase 7
  • 외부 데이터 (yfinance/FDR) — KIS REST 만
  • Chronos lazy load — 항상 로드 (Phase 7 모니터링 후 검토)
  • 분봉 모멘텀 ML 모델 — 룰 기반만 (Phase 7 백테스트 후 ML 검토)
  • WebSocket 동적 subscribe (Phase 3a backlog 그대로)

3. 파일 구조 + 변경 매트릭스

파일 작업 라인
signal_v2/kis_client.py get_daily_ohlcv 메서드 추가 +50
signal_v2/chronos_predictor.py 신규 ~120
signal_v2/momentum_classifier.py 신규 ~80
signal_v2/pull_worker.py post-close cycle + momentum 갱신 +50
signal_v2/scheduler.py _is_post_close_trigger 헬퍼 +20
signal_v2/state.py 3 필드 추가 +5
signal_v2/main.py lifespan ChronosPredictor 로드 +15
signal_v2/config.py chronos_model 필드 +3
signal_v2/requirements.txt 3 의존성 +3
signal_v2/tests/test_kis_client.py daily 1 케이스 +30
signal_v2/tests/test_chronos_predictor.py 신규 4 케이스 ~120
signal_v2/tests/test_momentum_classifier.py 신규 6 케이스 ~150
signal_v2/tests/test_pull_worker.py post-close 1 케이스 +50

합계: 13 파일 변경 (8 코드 + 4 테스트 + 1 requirements), 12 신규 테스트 (33 → 45 total).

외부 의존성 신규

  • transformers>=4.40
  • chronos-forecasting>=1.4
  • torch>=2.0 (CUDA 12.x 빌드, V1 venv 공유 시 재설치 불필요)

모델 다운로드

amazon/chronos-2 HuggingFace 모델 첫 로드 시 ~1GB 다운로드 (~수십 초). ~/.cache/huggingface/ 캐시 후 무영향. Task 7 manual smoke 에 시간 예상 명시.


4. KIS Daily OHLCV (kis_client.get_daily_ohlcv)

async def get_daily_ohlcv(self, ticker: str, days: int = 60) -> list[dict]:
    """KRX 일봉 OHLCV (TR_ID FHKST03010100).

    Args:
        ticker: 6자리 종목코드
        days: 최근 N영업일 (KIS 한도 100영업일)

    Returns:
        [{"datetime": "2026-05-15", "open": int, "high": int, "low": int,
          "close": int, "volume": int}, ...]
        시간 오름차순 (가장 최근이 마지막).
    """
    path = "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice"
    today = datetime.now(KST).strftime("%Y%m%d")
    start_date = (datetime.now(KST) - timedelta(days=days * 2)).strftime("%Y%m%d")
    params = {
        "FID_COND_MRKT_DIV_CODE": "J",
        "FID_INPUT_ISCD": ticker,
        "FID_INPUT_DATE_1": start_date,
        "FID_INPUT_DATE_2": today,
        "FID_PERIOD_DIV_CODE": "D",
        "FID_ORG_ADJ_PRC": "1",
    }
    raw = await self._request_with_retry(
        "GET", path, tr_id="FHKST03010100", params=params,
    )
    output2 = raw.get("output2", [])
    bars = []
    for row in output2:
        try:
            date = row["stck_bsop_date"]
            bars.append({
                "datetime": f"{date[:4]}-{date[4:6]}-{date[6:]}",
                "open": int(row["stck_oprc"]),
                "high": int(row["stck_hgpr"]),
                "low": int(row["stck_lwpr"]),
                "close": int(row["stck_clpr"]),
                "volume": int(row["acml_vol"]),
            })
        except (KeyError, ValueError):
            continue
    bars.reverse()  # KIS descending → ascending
    return bars[-days:]

핵심:

  • TR_ID FHKST03010100 (V1 패턴)
  • 수정주가 (FID_ORG_ADJ_PRC=1)
  • start_date 를 days*2 로 → 휴장일 + 주말 고려 → [-days:] 트리밍

5. ChronosPredictor

@dataclass
class ChronosPrediction:
    median: float
    q10: float
    q90: float
    conf: float
    as_of: str


class ChronosPredictor:
    """HuggingFace Chronos-2 zero-shot forecaster."""

    def __init__(self, model_name: str = "amazon/chronos-2", device: str | None = None):
        from chronos import ChronosPipeline
        import torch

        self._device = device or ("cuda" if torch.cuda.is_available() else "cpu")
        logger.info("Loading Chronos pipeline: %s on %s", model_name, self._device)
        self._pipeline = ChronosPipeline.from_pretrained(
            model_name,
            device_map=self._device,
            torch_dtype=torch.float16 if self._device == "cuda" else torch.float32,
        )

    def predict_batch(
        self,
        daily_ohlcv_dict: dict[str, list[dict]],
        prediction_length: int = 1,
        num_samples: int = 100,
    ) -> dict[str, ChronosPrediction]:
        """종목별 1-day return 분포 예측."""
        import torch
        import numpy as np

        tickers = list(daily_ohlcv_dict.keys())
        contexts = [
            torch.tensor([bar["close"] for bar in daily_ohlcv_dict[t]], dtype=torch.float32)
            for t in tickers
        ]
        forecasts = self._pipeline.predict(
            context=contexts, prediction_length=prediction_length, num_samples=num_samples,
        )

        from datetime import datetime
        now_iso = datetime.now(KST).isoformat()
        results: dict[str, ChronosPrediction] = {}
        for i, ticker in enumerate(tickers):
            samples = forecasts[i, :, 0].numpy()
            last_close = daily_ohlcv_dict[ticker][-1]["close"]
            returns = (samples - last_close) / last_close
            median = float(np.quantile(returns, 0.5))
            q10 = float(np.quantile(returns, 0.1))
            q90 = float(np.quantile(returns, 0.9))
            spread = (q90 - q10) / max(abs(median), 0.001)
            conf = float(max(0.0, min(1.0, 1.0 - spread / 2.0)))
            results[ticker] = ChronosPrediction(median, q10, q90, conf, now_iso)
        return results

핵심:

  • Lazy import (chronos-forecasting 무거움)
  • GPU 자동 감지 + FP16 (CUDA) / FP32 (CPU)
  • Batch predict — 30+ 종목 동시 ~1-2초
  • Price → return 변환
  • Confidence — 분포 폭 기반 (좁을수록 1)

6. 분봉 모멘텀 분류기

6.1 1분봉 → 5분봉 aggregate

def aggregate_1min_to_5min(minute_bars: list[dict]) -> list[dict]:
    """1분봉 N개 → 5분봉 floor(N/5) 개. 시간 오름차순."""
    bars_5min = []
    chunks = len(minute_bars) // 5
    for i in range(chunks):
        chunk = minute_bars[i * 5 : (i + 1) * 5]
        bars_5min.append({
            "datetime": chunk[0]["datetime"],
            "open": chunk[0]["open"],
            "high": max(b["high"] for b in chunk),
            "low": min(b["low"] for b in chunk),
            "close": chunk[-1]["close"],
            "volume": sum(b["volume"] for b in chunk),
        })
    return bars_5min

6.2 5-level 분류

def classify_minute_momentum(minute_bars: deque) -> str:
    """1분봉 deque → strong_up / weak_up / neutral / weak_down / strong_down."""
    minute_list = list(minute_bars)
    if len(minute_list) < 5 * 5:  # 25 bars minimum
        return NEUTRAL

    bars_5min = aggregate_1min_to_5min(minute_list)
    if len(bars_5min) < 5:
        return NEUTRAL

    recent = bars_5min[-5:]  # 직전 5개 5분봉
    up_count = sum(1 for b in recent if b["close"] > b["open"])

    # 거래량 multiplier — recent 5 vs 60분 평균
    recent_vol_avg = sum(b["volume"] for b in recent) / len(recent)
    long_window = bars_5min[-12:]  # 60분 = 5분봉 12개
    long_vol_avg = sum(b["volume"] for b in long_window) / len(long_window)
    vol_mult = recent_vol_avg / long_vol_avg if long_vol_avg > 0 else 1.0

    if up_count == 5 and vol_mult >= 1.5:
        return STRONG_UP
    elif up_count >= 3 and vol_mult >= 1.0:
        return WEAK_UP
    elif up_count == 0 and vol_mult >= 1.5:
        return STRONG_DOWN
    elif up_count <= 2 and vol_mult < 1.0:
        return WEAK_DOWN
    else:
        return NEUTRAL

7. PollState 확장 + pull_worker

7.1 PollState 추가 필드

@dataclass
class PollState:
    # ... 기존 필드 ...
    # Phase 3b additions
    daily_ohlcv: dict[str, list[dict]] = field(default_factory=dict)
    chronos_predictions: dict[str, dict] = field(default_factory=dict)
    minute_momentum: dict[str, str] = field(default_factory=dict)

7.2 pull_worker 확장

async def _run_post_close_cycle(
    kis_client: KISClient, chronos: ChronosPredictor, state: PollState,
) -> None:
    """16:00 KST 종가 후 1회: daily fetch + chronos predict."""
    tickers = list(set(_portfolio_tickers(state)) | set(_screener_tickers(state)))
    daily_results = await asyncio.gather(*[
        kis_client.get_daily_ohlcv(t, days=60) for t in tickers
    ], return_exceptions=True)
    daily_dict = {}
    for ticker, result in zip(tickers, daily_results):
        if isinstance(result, list) and len(result) >= 30:
            daily_dict[ticker] = result
            state.daily_ohlcv[ticker] = result

    if daily_dict:
        predictions = chronos.predict_batch(daily_dict)
        now_iso = datetime.now(KST).isoformat()
        for ticker, pred in predictions.items():
            state.chronos_predictions[ticker] = {
                "median": pred.median, "q10": pred.q10, "q90": pred.q90,
                "conf": pred.conf, "as_of": pred.as_of,
            }
            state.last_updated[f"chronos/{ticker}"] = pred.as_of


def update_minute_momentum_for_all(state: PollState) -> None:
    """매 분봉 cycle 후 호출 — 모든 종목 모멘텀 갱신."""
    from signal_v2.momentum_classifier import classify_minute_momentum
    for ticker, bars in state.minute_bars.items():
        state.minute_momentum[ticker] = classify_minute_momentum(bars)

7.3 scheduler _is_post_close_trigger

def _is_post_close_trigger(now: datetime) -> bool:
    """16:00 KST ±1분 (post-close cycle 트리거)."""
    if not _is_market_day(now):
        return False
    t = now.time()
    return time(16, 0) <= t < time(16, 1)

poll_loop 안에서 매 cycle:

if _is_post_close_trigger(now) and chronos is not None:
    await _run_post_close_cycle(kis_client, chronos, state)

8. 테스트 (12 신규)

8.1 test_kis_client.py (1)

  • test_get_daily_ohlcv_returns_60_bars — respx mock 200 → 60 bars 시간 오름차순

8.2 test_chronos_predictor.py (4, 모델 mock)

  • test_predict_batch_returns_prediction_dict — mock pipeline → ChronosPrediction
  • test_conf_high_when_distribution_narrow — narrow → conf ≈ 1
  • test_conf_low_when_distribution_wide — wide → conf ≈ 0
  • test_return_computed_from_price_relative_to_last_close — price → return 변환

8.3 test_momentum_classifier.py (6)

  • test_strong_up_5_consecutive_green_with_high_volume
  • test_weak_up_3of5_green_normal_volume
  • test_neutral_mixed
  • test_weak_down_low_green_low_volume
  • test_strong_down_5_consecutive_red_high_volume
  • test_aggregate_1min_to_5min_correctness

8.4 test_pull_worker.py (1)

  • test_post_close_cycle_updates_chronos_predictions — mock kis + mock chronos → state 갱신

합계: 1 + 4 + 6 + 1 = 12 신규. 기존 33 + 12 = 45 total.


9. 위험 및 완화

위험 완화
Chronos-2 첫 로드 ~1GB 다운로드 startup INFO + Task 7 smoke 시간 예상 명시
GPU OOM (Chronos + V1 Ollama 동거) FP16 ~400MB + Ollama 4GB = 5GB / 15.5GB 여유. Phase 5 Qwen3 추가 시 13.3GB. Phase 6 V1 deprecation 후 해소
chronos-forecasting 호환 (transformers 버전) 명시 버전. 운영 첫 install 검증
KIS daily fetch + V1 Macro 동시 → rate limit (EGW00201) post-close 16:00 트리거 vs V1 Trading Bot 의 장 마감 cycle 충돌 위험. 운영 검증 후 16:05 으로 조정 가능
Chronos-2 예측 정확도 불확실 Phase 7 IC 검증 + 신호 hit-rate 추적. 부족 시 model env 변경 또는 Moirai-2.0
모멘텀 룰 임계값 (1.5x / 5/5) 보수적 Phase 7 운영 후 임계값 조정
1분봉 60개 미만 (장 시작 1시간 내) NEUTRAL 폴백. 09:00-10:00 신호 발생 안 함 (운영 허용)
Chronos 모델 다운로드 네트워크 단절 startup RuntimeError + 운영자 알림 + 재시작. 캐시 후 무관
daily_ohlcv 메모리 누수 종목 ~30 × 60일 ~100B = ~180KB. 무시
Chronos 추론 시 V1 Ollama 와 동시 GPU 사용 일 1회 + 짧음 (~2초). V1 Ollama 의 GPU 점유 사이에 끼어들 가능성 → 일시 deferred. Phase 7 모니터링

10. 운영 영향

항목 영향
다운타임 signal_v2 재기동 ~30초 (첫 모델 로드)
사용자 영향 없음 (Phase 3b 도 silent, 신호 발송은 Phase 5)
.env 갱신 optional 1줄 (CHRONOS_MODEL=amazon/chronos-2 — 기본값과 동일 시 미설정 OK)
V1 영향 0 (별도 process). GPU 메모리만 공유
KIS API 부하 post-close cycle 일 1회 30 종목 daily fetch ~60 calls. 평소 분봉/호가 cycle 그대로
모델 다운로드 첫 시작 ~1GB / 캐시

11. Phase 3b 완료 조건 (DoD)

  • signal_v2/kis_client.py get_daily_ohlcv 메서드 추가
  • signal_v2/chronos_predictor.py 신규
  • signal_v2/momentum_classifier.py 신규
  • signal_v2/pull_worker.py post-close cycle + momentum 갱신
  • signal_v2/scheduler.py _is_post_close_trigger
  • signal_v2/state.py 3 필드 추가
  • signal_v2/main.py lifespan ChronosPredictor 로드
  • signal_v2/config.py CHRONOS_MODEL env
  • requirements.txt 3 의존성 추가
  • 12 신규 테스트 PASS (총 45)
  • 운영 smoke: signal_v2 시작 → Chronos 모델 로드 성공 → 16:00 post-close cycle 1회 실행 → state.chronos_predictions 갱신 확인
  • V1 무영향 (GPU OOM 없음)
  • git push

12. Phase 4 와의 관계

본 Phase 3b 완료 후 즉시 Phase 4 (Signal Generator) brainstorming. 의존성:

[Phase 3b spec/plan/실행]   →   [Phase 4 spec/plan/실행]
       1주                          1주

Phase 4 의 입력 = 본 spec 의 state.chronos_predictions + state.minute_momentum + Phase 3a 의 state.asking_price + Phase 2 의 state.portfolio + state.news_sentiment. Phase 4 산출 = state.signals[ticker] (buy/sell decision + confidence).


13. Backlog (본 spec NOT)

  • Chronos lazy load (Phase 5 Qwen3 동거 시 VRAM 압박 검토)
  • 다중 horizon (1-day + 5-day + 20-day)
  • ML 기반 분봉 모멘텀 (현재 룰 기반만)
  • Chronos model A/B (chronos-bolt-base vs chronos-2 비교 실험)
  • KIS daily fetch 의 V1 충돌 회피 — file mutex 또는 V2 별도 app_key
  • Chronos quantile 의 임의 quantile 지원 (현재 q10/q50/q90 만)
  • daily_ohlcv 영속 저장 (재기동 시 reset 회피)