Files
web-page/docs/superpowers/specs/2026-05-16-signal-v2-phase3b-chronos-momentum.md
gahusb 6eb4ab1204 docs(signal-v2): Phase 3b Chronos-2 + minute momentum spec
KIS daily OHLCV fetch (kis_client.get_daily_ohlcv, FHKST03010100) +
ChronosPredictor (HuggingFace amazon/chronos-2 zero-shot, env-configurable
model, always-loaded) + minute momentum classifier (5-level rule:
strong_up/weak_up/neutral/weak_down/strong_down) + post-close cycle
trigger (16:00 KST). 12 new tests (33 → 45 total).

brainstorming 7 decisions: daily=B(KIS REST) / freq=A(post-close 1x) /
model=A(env CHRONOS_MODEL) / momentum=A(5-level rule) / state=B(median+
q10+q90+conf+as_of) / test=A(mock+pure) / scope=integrated.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 17:37:17 +09:00

438 lines
17 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Confidence Signal Pipeline V2 — Phase 3b: Chronos-2 + Minute Momentum Design
**작성일**: 2026-05-16
**작성자**: gahusb
**상태**: Approved for implementation
**선행 spec**:
- Phase 0 architecture (`2026-05-15-confidence-signal-pipeline-v2-architecture.md`)
- Phase 1 stock WebAI API (`2026-05-15-signal-v2-phase1-webai-api.md`)
- Phase 2 web-ai pull worker (`2026-05-16-signal-v2-phase2-web-ai-pull-worker.md`)
- Phase 3a KIS data collection (`2026-05-16-signal-v2-phase3a-kis-data-collection.md`)
**브레인스토밍 결정 7개**:
- daily data 소스 = B (KIS REST `kis_client.get_daily_ohlcv`)
- 추론 빈도 = A (종가 후 1회 + 메모리 보관)
- 모델 = A (env `CHRONOS_MODEL` 외부화, 기본 `amazon/chronos-2`, 항상 로드)
- 분봉 모멘텀 = A (5-level 룰 기반)
- State output = B (median + q10 + q90 + conf + as_of)
- 테스트 = A (모델 mock + 순수 함수)
- scope = 통합 9 항목 (Phase 3a 와 같은 1주 단위)
---
## 1. 목표
Phase 3a 의 데이터 위에 추론 레이어 추가. Chronos-2 zero-shot 으로 다음날 가격 분포 예측 + 1분봉 → 5분봉 aggregate 후 5-level 모멘텀 분류. Phase 4 (signal generator) 가 두 출력 + Phase 3a 의 호가/분봉 + Phase 2 의 portfolio/news_sentiment 를 종합해 매수/매도 신호 룰 적용.
**Why**: Phase 0 §3 "web-ai = 시점 분석" 책임의 추론 부분. Chronos-2 의 zero-shot quantile 분포 + 분봉 모멘텀 5-level 이 매수/매도 룰의 핵심 입력.
---
## 2. 범위
### 포함 (9 항목)
-`kis_client.get_daily_ohlcv(ticker, days=60)` — KIS REST TR_ID `FHKST03010100`
-`chronos_predictor.py` 신규 — `ChronosPredictor` (HuggingFace 모델 + batch predict)
-`momentum_classifier.py` 신규 — `aggregate_1min_to_5min` + `classify_minute_momentum`
-`pull_worker.py` 확장 — `_run_post_close_cycle` + `update_minute_momentum_for_all`
-`scheduler.py` 확장 — `_is_post_close_trigger` (16:00 KST)
-`state.py` 확장 — `daily_ohlcv` + `chronos_predictions` + `minute_momentum`
-`main.py` 확장 — lifespan 에 ChronosPredictor 로드
-`config.py` 확장 — `CHRONOS_MODEL` env
-`requirements.txt``transformers>=4.40`, `chronos-forecasting>=1.4`, `torch>=2.0`
### 범위 외 (NOT)
- Signal generator 매수/매도 룰 (Phase 4)
- agent-office `/signal` 호출 (Phase 5)
- 모델 재학습/fine-tune — zero-shot only
- 다중 horizon 예측 — 1-day median 만, 다른 horizon Phase 7
- 외부 데이터 (yfinance/FDR) — KIS REST 만
- Chronos lazy load — 항상 로드 (Phase 7 모니터링 후 검토)
- 분봉 모멘텀 ML 모델 — 룰 기반만 (Phase 7 백테스트 후 ML 검토)
- WebSocket 동적 subscribe (Phase 3a backlog 그대로)
---
## 3. 파일 구조 + 변경 매트릭스
| 파일 | 작업 | 라인 |
|------|------|------|
| `signal_v2/kis_client.py` | `get_daily_ohlcv` 메서드 추가 | +50 |
| `signal_v2/chronos_predictor.py` | 신규 | ~120 |
| `signal_v2/momentum_classifier.py` | 신규 | ~80 |
| `signal_v2/pull_worker.py` | post-close cycle + momentum 갱신 | +50 |
| `signal_v2/scheduler.py` | `_is_post_close_trigger` 헬퍼 | +20 |
| `signal_v2/state.py` | 3 필드 추가 | +5 |
| `signal_v2/main.py` | lifespan ChronosPredictor 로드 | +15 |
| `signal_v2/config.py` | `chronos_model` 필드 | +3 |
| `signal_v2/requirements.txt` | 3 의존성 | +3 |
| `signal_v2/tests/test_kis_client.py` | daily 1 케이스 | +30 |
| `signal_v2/tests/test_chronos_predictor.py` | 신규 4 케이스 | ~120 |
| `signal_v2/tests/test_momentum_classifier.py` | 신규 6 케이스 | ~150 |
| `signal_v2/tests/test_pull_worker.py` | post-close 1 케이스 | +50 |
**합계**: 13 파일 변경 (8 코드 + 4 테스트 + 1 requirements), **12 신규 테스트** (33 → 45 total).
### 외부 의존성 신규
- `transformers>=4.40`
- `chronos-forecasting>=1.4`
- `torch>=2.0` (CUDA 12.x 빌드, V1 venv 공유 시 재설치 불필요)
### 모델 다운로드
`amazon/chronos-2` HuggingFace 모델 첫 로드 시 ~1GB 다운로드 (~수십 초). `~/.cache/huggingface/` 캐시 후 무영향. Task 7 manual smoke 에 시간 예상 명시.
---
## 4. KIS Daily OHLCV (`kis_client.get_daily_ohlcv`)
```python
async def get_daily_ohlcv(self, ticker: str, days: int = 60) -> list[dict]:
"""KRX 일봉 OHLCV (TR_ID FHKST03010100).
Args:
ticker: 6자리 종목코드
days: 최근 N영업일 (KIS 한도 100영업일)
Returns:
[{"datetime": "2026-05-15", "open": int, "high": int, "low": int,
"close": int, "volume": int}, ...]
시간 오름차순 (가장 최근이 마지막).
"""
path = "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice"
today = datetime.now(KST).strftime("%Y%m%d")
start_date = (datetime.now(KST) - timedelta(days=days * 2)).strftime("%Y%m%d")
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": ticker,
"FID_INPUT_DATE_1": start_date,
"FID_INPUT_DATE_2": today,
"FID_PERIOD_DIV_CODE": "D",
"FID_ORG_ADJ_PRC": "1",
}
raw = await self._request_with_retry(
"GET", path, tr_id="FHKST03010100", params=params,
)
output2 = raw.get("output2", [])
bars = []
for row in output2:
try:
date = row["stck_bsop_date"]
bars.append({
"datetime": f"{date[:4]}-{date[4:6]}-{date[6:]}",
"open": int(row["stck_oprc"]),
"high": int(row["stck_hgpr"]),
"low": int(row["stck_lwpr"]),
"close": int(row["stck_clpr"]),
"volume": int(row["acml_vol"]),
})
except (KeyError, ValueError):
continue
bars.reverse() # KIS descending → ascending
return bars[-days:]
```
핵심:
- TR_ID `FHKST03010100` (V1 패턴)
- 수정주가 (`FID_ORG_ADJ_PRC=1`)
- start_date 를 `days*2` 로 → 휴장일 + 주말 고려 → `[-days:]` 트리밍
---
## 5. ChronosPredictor
```python
@dataclass
class ChronosPrediction:
median: float
q10: float
q90: float
conf: float
as_of: str
class ChronosPredictor:
"""HuggingFace Chronos-2 zero-shot forecaster."""
def __init__(self, model_name: str = "amazon/chronos-2", device: str | None = None):
from chronos import ChronosPipeline
import torch
self._device = device or ("cuda" if torch.cuda.is_available() else "cpu")
logger.info("Loading Chronos pipeline: %s on %s", model_name, self._device)
self._pipeline = ChronosPipeline.from_pretrained(
model_name,
device_map=self._device,
torch_dtype=torch.float16 if self._device == "cuda" else torch.float32,
)
def predict_batch(
self,
daily_ohlcv_dict: dict[str, list[dict]],
prediction_length: int = 1,
num_samples: int = 100,
) -> dict[str, ChronosPrediction]:
"""종목별 1-day return 분포 예측."""
import torch
import numpy as np
tickers = list(daily_ohlcv_dict.keys())
contexts = [
torch.tensor([bar["close"] for bar in daily_ohlcv_dict[t]], dtype=torch.float32)
for t in tickers
]
forecasts = self._pipeline.predict(
context=contexts, prediction_length=prediction_length, num_samples=num_samples,
)
from datetime import datetime
now_iso = datetime.now(KST).isoformat()
results: dict[str, ChronosPrediction] = {}
for i, ticker in enumerate(tickers):
samples = forecasts[i, :, 0].numpy()
last_close = daily_ohlcv_dict[ticker][-1]["close"]
returns = (samples - last_close) / last_close
median = float(np.quantile(returns, 0.5))
q10 = float(np.quantile(returns, 0.1))
q90 = float(np.quantile(returns, 0.9))
spread = (q90 - q10) / max(abs(median), 0.001)
conf = float(max(0.0, min(1.0, 1.0 - spread / 2.0)))
results[ticker] = ChronosPrediction(median, q10, q90, conf, now_iso)
return results
```
핵심:
- Lazy import (`chronos-forecasting` 무거움)
- GPU 자동 감지 + FP16 (CUDA) / FP32 (CPU)
- Batch predict — 30+ 종목 동시 ~1-2초
- Price → return 변환
- Confidence — 분포 폭 기반 (좁을수록 1)
---
## 6. 분봉 모멘텀 분류기
### 6.1 1분봉 → 5분봉 aggregate
```python
def aggregate_1min_to_5min(minute_bars: list[dict]) -> list[dict]:
"""1분봉 N개 → 5분봉 floor(N/5) 개. 시간 오름차순."""
bars_5min = []
chunks = len(minute_bars) // 5
for i in range(chunks):
chunk = minute_bars[i * 5 : (i + 1) * 5]
bars_5min.append({
"datetime": chunk[0]["datetime"],
"open": chunk[0]["open"],
"high": max(b["high"] for b in chunk),
"low": min(b["low"] for b in chunk),
"close": chunk[-1]["close"],
"volume": sum(b["volume"] for b in chunk),
})
return bars_5min
```
### 6.2 5-level 분류
```python
def classify_minute_momentum(minute_bars: deque) -> str:
"""1분봉 deque → strong_up / weak_up / neutral / weak_down / strong_down."""
minute_list = list(minute_bars)
if len(minute_list) < 5 * 5: # 25 bars minimum
return NEUTRAL
bars_5min = aggregate_1min_to_5min(minute_list)
if len(bars_5min) < 5:
return NEUTRAL
recent = bars_5min[-5:] # 직전 5개 5분봉
up_count = sum(1 for b in recent if b["close"] > b["open"])
# 거래량 multiplier — recent 5 vs 60분 평균
recent_vol_avg = sum(b["volume"] for b in recent) / len(recent)
long_window = bars_5min[-12:] # 60분 = 5분봉 12개
long_vol_avg = sum(b["volume"] for b in long_window) / len(long_window)
vol_mult = recent_vol_avg / long_vol_avg if long_vol_avg > 0 else 1.0
if up_count == 5 and vol_mult >= 1.5:
return STRONG_UP
elif up_count >= 3 and vol_mult >= 1.0:
return WEAK_UP
elif up_count == 0 and vol_mult >= 1.5:
return STRONG_DOWN
elif up_count <= 2 and vol_mult < 1.0:
return WEAK_DOWN
else:
return NEUTRAL
```
---
## 7. PollState 확장 + pull_worker
### 7.1 PollState 추가 필드
```python
@dataclass
class PollState:
# ... 기존 필드 ...
# Phase 3b additions
daily_ohlcv: dict[str, list[dict]] = field(default_factory=dict)
chronos_predictions: dict[str, dict] = field(default_factory=dict)
minute_momentum: dict[str, str] = field(default_factory=dict)
```
### 7.2 pull_worker 확장
```python
async def _run_post_close_cycle(
kis_client: KISClient, chronos: ChronosPredictor, state: PollState,
) -> None:
"""16:00 KST 종가 후 1회: daily fetch + chronos predict."""
tickers = list(set(_portfolio_tickers(state)) | set(_screener_tickers(state)))
daily_results = await asyncio.gather(*[
kis_client.get_daily_ohlcv(t, days=60) for t in tickers
], return_exceptions=True)
daily_dict = {}
for ticker, result in zip(tickers, daily_results):
if isinstance(result, list) and len(result) >= 30:
daily_dict[ticker] = result
state.daily_ohlcv[ticker] = result
if daily_dict:
predictions = chronos.predict_batch(daily_dict)
now_iso = datetime.now(KST).isoformat()
for ticker, pred in predictions.items():
state.chronos_predictions[ticker] = {
"median": pred.median, "q10": pred.q10, "q90": pred.q90,
"conf": pred.conf, "as_of": pred.as_of,
}
state.last_updated[f"chronos/{ticker}"] = pred.as_of
def update_minute_momentum_for_all(state: PollState) -> None:
"""매 분봉 cycle 후 호출 — 모든 종목 모멘텀 갱신."""
from signal_v2.momentum_classifier import classify_minute_momentum
for ticker, bars in state.minute_bars.items():
state.minute_momentum[ticker] = classify_minute_momentum(bars)
```
### 7.3 scheduler `_is_post_close_trigger`
```python
def _is_post_close_trigger(now: datetime) -> bool:
"""16:00 KST ±1분 (post-close cycle 트리거)."""
if not _is_market_day(now):
return False
t = now.time()
return time(16, 0) <= t < time(16, 1)
```
`poll_loop` 안에서 매 cycle:
```python
if _is_post_close_trigger(now) and chronos is not None:
await _run_post_close_cycle(kis_client, chronos, state)
```
---
## 8. 테스트 (12 신규)
### 8.1 `test_kis_client.py` (1)
- `test_get_daily_ohlcv_returns_60_bars` — respx mock 200 → 60 bars 시간 오름차순
### 8.2 `test_chronos_predictor.py` (4, 모델 mock)
- `test_predict_batch_returns_prediction_dict` — mock pipeline → ChronosPrediction
- `test_conf_high_when_distribution_narrow` — narrow → conf ≈ 1
- `test_conf_low_when_distribution_wide` — wide → conf ≈ 0
- `test_return_computed_from_price_relative_to_last_close` — price → return 변환
### 8.3 `test_momentum_classifier.py` (6)
- `test_strong_up_5_consecutive_green_with_high_volume`
- `test_weak_up_3of5_green_normal_volume`
- `test_neutral_mixed`
- `test_weak_down_low_green_low_volume`
- `test_strong_down_5_consecutive_red_high_volume`
- `test_aggregate_1min_to_5min_correctness`
### 8.4 `test_pull_worker.py` (1)
- `test_post_close_cycle_updates_chronos_predictions` — mock kis + mock chronos → state 갱신
**합계**: 1 + 4 + 6 + 1 = **12 신규**. 기존 33 + 12 = **45 total**.
---
## 9. 위험 및 완화
| 위험 | 완화 |
|------|------|
| Chronos-2 첫 로드 ~1GB 다운로드 | startup INFO + Task 7 smoke 시간 예상 명시 |
| GPU OOM (Chronos + V1 Ollama 동거) | FP16 ~400MB + Ollama 4GB = 5GB / 15.5GB 여유. Phase 5 Qwen3 추가 시 13.3GB. Phase 6 V1 deprecation 후 해소 |
| `chronos-forecasting` 호환 (transformers 버전) | 명시 버전. 운영 첫 install 검증 |
| KIS daily fetch + V1 Macro 동시 → rate limit (EGW00201) | post-close 16:00 트리거 vs V1 Trading Bot 의 장 마감 cycle 충돌 위험. 운영 검증 후 16:05 으로 조정 가능 |
| Chronos-2 예측 정확도 불확실 | Phase 7 IC 검증 + 신호 hit-rate 추적. 부족 시 model env 변경 또는 Moirai-2.0 |
| 모멘텀 룰 임계값 (1.5x / 5/5) 보수적 | Phase 7 운영 후 임계값 조정 |
| 1분봉 60개 미만 (장 시작 1시간 내) | NEUTRAL 폴백. 09:00-10:00 신호 발생 안 함 (운영 허용) |
| Chronos 모델 다운로드 네트워크 단절 | startup RuntimeError + 운영자 알림 + 재시작. 캐시 후 무관 |
| daily_ohlcv 메모리 누수 | 종목 ~30 × 60일 ~100B = ~180KB. 무시 |
| Chronos 추론 시 V1 Ollama 와 동시 GPU 사용 | 일 1회 + 짧음 (~2초). V1 Ollama 의 GPU 점유 사이에 끼어들 가능성 → 일시 deferred. Phase 7 모니터링 |
---
## 10. 운영 영향
| 항목 | 영향 |
|------|------|
| 다운타임 | signal_v2 재기동 ~30초 (첫 모델 로드) |
| 사용자 영향 | 없음 (Phase 3b 도 silent, 신호 발송은 Phase 5) |
| `.env` 갱신 | optional 1줄 (`CHRONOS_MODEL=amazon/chronos-2` — 기본값과 동일 시 미설정 OK) |
| V1 영향 | 0 (별도 process). GPU 메모리만 공유 |
| KIS API 부하 | post-close cycle 일 1회 30 종목 daily fetch ~60 calls. 평소 분봉/호가 cycle 그대로 |
| 모델 다운로드 | 첫 시작 ~1GB / 캐시 |
---
## 11. Phase 3b 완료 조건 (DoD)
- [ ] `signal_v2/kis_client.py` `get_daily_ohlcv` 메서드 추가
- [ ] `signal_v2/chronos_predictor.py` 신규
- [ ] `signal_v2/momentum_classifier.py` 신규
- [ ] `signal_v2/pull_worker.py` post-close cycle + momentum 갱신
- [ ] `signal_v2/scheduler.py` `_is_post_close_trigger`
- [ ] `signal_v2/state.py` 3 필드 추가
- [ ] `signal_v2/main.py` lifespan ChronosPredictor 로드
- [ ] `signal_v2/config.py` `CHRONOS_MODEL` env
- [ ] `requirements.txt` 3 의존성 추가
- [ ] 12 신규 테스트 PASS (총 45)
- [ ] 운영 smoke: signal_v2 시작 → Chronos 모델 로드 성공 → 16:00 post-close cycle 1회 실행 → state.chronos_predictions 갱신 확인
- [ ] V1 무영향 (GPU OOM 없음)
- [ ] git push
---
## 12. Phase 4 와의 관계
본 Phase 3b 완료 후 즉시 **Phase 4 (Signal Generator)** brainstorming. 의존성:
```
[Phase 3b spec/plan/실행] → [Phase 4 spec/plan/실행]
1주 1주
```
Phase 4 의 입력 = 본 spec 의 `state.chronos_predictions` + `state.minute_momentum` + Phase 3a 의 `state.asking_price` + Phase 2 의 `state.portfolio` + `state.news_sentiment`. Phase 4 산출 = `state.signals[ticker]` (buy/sell decision + confidence).
---
## 13. Backlog (본 spec NOT)
- Chronos lazy load (Phase 5 Qwen3 동거 시 VRAM 압박 검토)
- 다중 horizon (1-day + 5-day + 20-day)
- ML 기반 분봉 모멘텀 (현재 룰 기반만)
- Chronos model A/B (chronos-bolt-base vs chronos-2 비교 실험)
- KIS daily fetch 의 V1 충돌 회피 — file mutex 또는 V2 별도 app_key
- Chronos quantile 의 임의 quantile 지원 (현재 q10/q50/q90 만)
- daily_ohlcv 영속 저장 (재기동 시 reset 회피)