Compare commits

..

15 Commits

Author SHA1 Message Date
71ef959310 docs(web-ai): rewrite CLAUDE.md with Phase 0-4 complete context
Replaces Phase-2-era placeholder. Adds:
- V1 (signal_v1 :8000 LSTM bot) vs V2 (signal_v2 :8001 Confidence Pipeline) split
- start.bat invocation for each + KIS rate limit warning (do NOT run both)
- Phase 0-7 status table, Phase 4 completed 2026-05-17
- signal_v2/ module-level inventory + new test count (56)
- Phase 4 buy/sell rule summary (absolute spread amendment included)
- 11 known traps + Phase 7 backlog
- Cross-repo workflow note (code in web-ai, spec/plan in web-ui)
2026-05-17 14:00:52 +09:00
2aa9f48ea3 feat(signal_v2-phase4): add emit/skip logging to signal_generator
logger was declared but unused. Operational visibility was zero —
trader debugging 'why no signal?' had to step through code mentally.

- INFO on emit: '[signal emit] 005930 buy conf=0.823 rank=3' / sell with reason
- DEBUG on each skip path: same-cycle sell, hard gate, low confidence,
  dedup 24h (buy and sell)

Per final reviewer recommendation. 56 tests still pass.
2026-05-17 13:35:29 +09:00
cc6310d72f feat(signal_v2-phase4-task3): integrate signal_generator into poll_loop
poll_loop now accepts dedup + settings kwargs (backwards-compatible defaults).
After each in-window cycle (stock pull + minute momentum + optional post-close),
generate_signals is called to populate state.signals for downstream Phase 5
pickup. main.py lifespan wires _ctx.dedup + settings into the poll_loop task.

1 integration test added (anomaly-free stop_loss path via direct generate_signals
call, exercises the same code path that poll_loop runs).

56 tests pass.
2026-05-17 13:24:47 +09:00
e574074ca8 fix(signal_v2-phase4-task2): code review fixes — sell-first ordering + anomaly test + defensive .get
- generate_signals now evaluates sell before buy; buy candidates with a same-cycle
  sell signal are skipped (resolves silent overwrite of state.signals[ticker]).
- Added test_sell_signal_triggers_on_anomaly_path covering _try_anomaly path
  (previously 0% covered).
- Fixed stale test comment referencing deprecated relative spread formula.
- _check_buy_hard_gate uses dict.get(..., 0) for defense against partial upstream state.
- _compute_buy_confidence clamps screener_norm to >= 0 for future Top-N changes.
2026-05-17 13:18:22 +09:00
b9def06993 feat(signal_v2-phase4): signal_generator + 9 unit tests
generate_signals(state, dedup, settings) → state mutating:
- Buy: screener Top-N + portfolio. Hard gate (chronos median > 0 +
  spread < 0.6 + momentum strong_up + bid_ratio >= 0.6) + soft
  confidence (chronos*0.5 + minute*0.3 + screener*0.2) > 0.7.
- Sell: portfolio only. Priority stop_loss > anomaly > take_profit.
  Stop loss confidence 1.0, take_profit 0.6 (review alert).
- SignalDedup 24h via dedup.is_recent/record per (ticker, action).
- State signal dict matches Phase 0 spec §5.2 schema.

54 tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 13:03:29 +09:00
05ab2846bb feat(signal_v2-phase4): foundation — 6 env thresholds + state.signals
config.py: STOP_LOSS_PCT / TAKE_PROFIT_PCT / CHRONOS_SPREAD_THRESHOLD /
ASKING_BID_RATIO_THRESHOLD / CONFIDENCE_THRESHOLD / MIN_MOMENTUM_FOR_BUY
env vars with sensible defaults (Phase 0 spec §6.1-§6.2 values).

state.py: PollState.signals dict[ticker, signal_body] for Phase 5 input.

45 existing tests still pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 12:55:15 +09:00
760f914d3b fix(signal_v2-phase3b): force FP32 + predict_quantiles positional args
ChronosBoltPipeline.predict_quantiles takes `inputs` positional, not
`context` keyword. Use positional with TypeError fallback for older
chronos versions.

FP16 caused inf overflow on Korean stock prices (e.g. 280,000원 >
FP16 max 65,504). Force FP32 for prices to avoid this. Chronos model
itself handles internal scaling.

Verified end-to-end: 60-day daily fetch → Chronos predict → quantile
output. Example 005930: median=-0.59%, q10=-8.9%, q90=+6.4%, conf=0.0
(low conf is mathematically correct when median is near zero relative
to distribution width).

45/45 tests still pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 09:12:10 +09:00
8eefe9d79d fix(signal_v2-phase3b): ChronosBolt predict_quantiles API support
ChronosBoltPipeline.predict() does not accept `context` kwarg; it
uses positional-only and is deterministic (no num_samples). Switch
to predict_quantiles(context, prediction_length, quantile_levels)
which returns (quantiles_tensor, mean_tensor).

Implementation: if hasattr(pipeline, "predict_quantiles") → modern
quantile branch. Else fall back to legacy sample-based predict (T5).

Tests: switch to predict_quantiles mock returning (quantiles, None)
with shape [1, 1, 3] for q10/q50/q90 directly.

45/45 tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 09:07:11 +09:00
91de16675b fix(signal_v2-phase3b): use BaseChronosPipeline for new model architectures
ChronosPipeline (legacy T5) does not support amazon/chronos-2 or
chronos-bolt-* (input_patch_size). Switch to BaseChronosPipeline
which auto-detects variant and returns the appropriate sub-pipeline
(ChronosBoltPipeline / Chronos2Pipeline / ChronosPipeline).

Also handle the dtype kwarg deprecation: try newer `dtype=` first,
fall back to `torch_dtype=` for older versions.

Test mock_pipeline fixture updated to patch BaseChronosPipeline.

45/45 tests pass. Verified amazon/chronos-bolt-base loads on CUDA.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 08:57:22 +09:00
44888d6ede feat(signal_v2-phase3b): main.py lifespan loads ChronosPredictor
AppContext.chronos field. lifespan: if KIS_APP_KEY set, load
ChronosPredictor(model_name=settings.chronos_model). Exceptions
during load logged + signal_v2 continues without chronos (other
endpoints unaffected). poll_loop receives chronos param.

45 tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 18:11:50 +09:00
9e5fecb369 feat(signal_v2-phase3b): post-close cycle + minute momentum update
scheduler._is_post_close_trigger: 16:00 KST ±1min detection (market day).
pull_worker:
- _run_post_close_cycle: daily fetch (60일) + chronos batch predict →
  state.chronos_predictions + state.daily_ohlcv.
- update_minute_momentum_for_all: 매 cycle 마다 state.minute_momentum 갱신.
- poll_loop signature 확장 (chronos optional).

45 tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 18:04:32 +09:00
28f9c8c3a6 feat(signal_v2-phase3b): chronos_predictor + 4 mock tests
ChronosPredictor wraps HuggingFace ChronosPipeline. Batch predict
returns ChronosPrediction(median, q10, q90, conf, as_of) per ticker.
Confidence = 1 - clamp(spread/2, 0, 1) where spread = (q90-q10) / |median|.
Lazy import of chronos lib (heavy). GPU auto-detect with FP16.

44 tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 18:00:46 +09:00
c5a88fab66 feat(signal_v2-phase3b): momentum_classifier + 6 unit tests
aggregate_1min_to_5min: 1분봉 5개 → 5분봉 1개 (open=첫, close=마지막,
high=max, low=min, volume=sum). classify_minute_momentum: 직전 5개
5분봉 양봉 개수 + 거래량 60분 multiplier → 5-level
(strong_up/weak_up/neutral/weak_down/strong_down).

40 tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 17:55:34 +09:00
7056cf2fa6 feat(signal_v2-phase3b): kis_client.get_daily_ohlcv (60 daily bars)
TR_ID FHKST03010100 (수정주가 일봉). KIS returns descending; client
reverses to ascending and trims to last N days.

1 new test, 34 total.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 17:49:06 +09:00
4ac7da8670 feat(signal_v2-phase3b): foundation — config + state + requirements
- config.py: CHRONOS_MODEL env (default amazon/chronos-2)
- state.py: PollState extended with daily_ohlcv + chronos_predictions
  + minute_momentum
- requirements.txt: transformers + chronos-forecasting

33 existing tests still pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 17:46:09 +09:00
16 changed files with 1177 additions and 16 deletions

143
CLAUDE.md
View File

@@ -1,24 +1,141 @@
# web-ai — Workspace 가이드 # web-ai — Workspace 가이드
Windows AI 머신 (AMD 9800X3D + RTX 5070 Ti) 의 두 시그널 파이프라인 컨테이너. Windows AI 머신 (AMD 9800X3D + RTX 5070 Ti 16GB) 의 두 신호 파이프라인.
**Confidence Signal Pipeline V2 의 Windows-side 구현체** (NAS stock 백엔드와 HTTP 연동).
상위 워크스페이스 컨텍스트는 `../CLAUDE.md` 참조.
---
## 디렉토리 구조 ## 디렉토리 구조
| 경로 | 역할 | 상태 | | 경로 | 역할 | 포트 | 상태 |
|------|------|------| |------|------|------|------|
| `signal_v1/` | V1 자체 자동매매 시스템 (main_server.py + Trading Bot + Telegram Bot + LSTM + Ollama + KIS 자동주문) | 운영 중. Confidence Signal Pipeline V2 Phase 6 에서 deprecation 예정 | | `signal_v1/` | 레거시 자동매매 시스템 (LSTM 7-features + Gemini Flash + Telegram Bot + KIS 자동주문) | `:8000` | 운영 중. **V2 Phase 6 에서 deprecation 예정** |
| `signal_v2/` | V2 신호 파이프라인 (stock pull worker + Chronos-2 + signal API client) | Phase 2 에서 신설 | | `signal_v2/` | Confidence Signal Pipeline V2 (Chronos-bolt + 분봉 모멘텀 + KIS WebSocket + 신호 생성) | `:8001` | **Phase 4 완료 (2026-05-17)**, Phase 5 대기 |
| `.env` | V1 + V2 환경변수 공유 | KIS_*, TELEGRAM_*, STOCK_API_URL, WEBAI_API_KEY | | `.env` | V1 + V2 환경변수 공유 | — | `KIS_REAL_*`, `TELEGRAM_*`, `STOCK_API_URL`, `WEBAI_API_KEY`, `LOG_LEVEL` |
| `start.bat` | V1 진입 (signal_v1 디렉토리 안 main_server.py 실행) | V2 별도 start 스크립트는 signal_v2/start.bat | | `start.bat` | V1 진입점 | — | `signal_v1/main_server.py` 실행 |
| `signal_v2/start.bat` | V2 진입점 | — | `signal_v2/main.py` uvicorn 실행 |
| `requirements.txt` | 공용 의존성 | — | torch, chronos-forecasting, fastapi, httpx, websockets 등 |
## 운영 가이드 `.venv`**구조적으로 깨짐**: `pyvenv.cfg` 가 한글 사용자 경로(`C:\Users\박재오\...`) 를 포함하여 콘솔 코드페이지가 roundtrip 못함. 테스트는 시스템 Python 으로 실행: `C:\Users\jaeoh\AppData\Local\Programs\Python\Python312\python.exe -m pytest signal_v2/tests -q`.
- V1 시작: `start.bat` 또는 `cd signal_v1 && python main_server.py` ---
- V2 시작 (Phase 2 이후): `cd signal_v2 && python -m uvicorn main:app --port 8001`
- 둘 다 동시 실행 가능 (포트 분리: V1=8000, V2=8001) ## 서버 시작 방식
### V1 단독 (운영 기본)
```bat
cd C:\Users\jaeoh\Desktop\workspace\web-ai
.\start.bat
```
기대 로그: `[Bot] Cycle Start ...`, `[AI] 005930: NN epochs ...`, `[Ensemble] tech=... news=... lstm=...`, `Score: 0.xx [HOLD]`
### V2 단독 (smoke/검증)
```bat
cd C:\Users\jaeoh\Desktop\workspace\web-ai\signal_v2
.\start.bat
```
기대 로그: `Uvicorn running on http://0.0.0.0:8001`, `poll_loop started`, `[KIS] minute bars ... OK`, `[Chronos] predicted N tickers`, `signal emit XXXXXX buy conf=0.xxx`.
휴장일/장 외 시간엔 `poll_loop` 만 idle. `Application startup complete` 만 보이면 정상.
### V1 + V2 동시 실행 — **권장 안 함**
**KIS app_key 초당 2회 한도 (EGW00201)** 충돌. V1 cycle + V2 분봉 cron 이 같은 KIS app_key 로 동시 호출하면 rate limit. 채택 해결책: V2 임시 종료 (Phase 3a 결정), Phase 6 V1 deprecation 시 자연 해소. 별도 app_key 발급은 옵션 B.
---
## Phase 진행 상태 (Confidence Signal Pipeline V2) ## Phase 진행 상태 (Confidence Signal Pipeline V2)
`web-ui/docs/superpowers/specs/2026-05-15-confidence-signal-pipeline-v2-architecture.md` 참조. | Phase | 내용 | 상태 |
|-------|------|------|
| 0 | Architecture & contract spec | ✅ Chronos-2 + Qwen3 14B 채택 |
| 1 | stock 백엔드 WebAI API 보강 (NAS) | ✅ 102/102 tests, 운영 배포 |
| 1.5 | V1 → `signal_v1/` rename | ✅ V1 정상 기동 |
| 2 | signal_v2 pull worker + signal API client + scheduler | ✅ 19/19 tests, `:8001` 기동 |
| 3a | KIS REST 분봉 + WebSocket 호가 + NXT 스케줄 | ✅ 33/33 tests |
| 3b | Chronos-bolt-base 추론 + 5분봉 모멘텀 분류기 | ✅ 45/45 tests, 실 KIS+Chronos chain 검증 |
| 4 | Signal Generator (매수/매도 룰) + pull_worker 통합 + 로깅 | ✅ **2026-05-17 완료, 56/56 tests, push 완료** |
| 5 | agent-office `/signal` + Ollama Qwen3 14B + 이중 텔레그램 | ⏳ 2주 예상 |
| 6 | signal_v1 deprecation | ⏳ 1주 |
| 7 | 운영 모니터링 + 4주 IC 검증 | ⏳ 1주 + 4주 |
자세한 V1 가이드는 `signal_v1/CLAUDE.md` 참조. 상세 spec/plan: `../web-ui/docs/superpowers/specs/``../web-ui/docs/superpowers/plans/` (web-ui repo 안에 보관됨 — V2 자체 코드와 분리 보관).
---
## signal_v2 디렉토리 내부
| 파일 | 역할 |
|------|------|
| `main.py` | FastAPI app + lifespan (StockClient + KISClient + KISWebSocket + ChronosPredictor + SignalDedup 초기화). poll_loop task 생성 |
| `config.py` | Settings dataclass — 환경변수 로드. Phase 4 추가 6 필드: `stop_loss_pct`, `take_profit_pct`, `chronos_spread_threshold`, `asking_bid_ratio_threshold`, `confidence_threshold`, `min_momentum_for_buy` |
| `state.py` | PollState (process-wide singleton) — portfolio, screener_preview, news_sentiment, chronos_predictions, minute_bars, asking_price, **signals** (Phase 4) |
| `stock_client.py` | NAS stock 백엔드 pull (X-WebAI-Key + 메모리 cache 60s/300s/60s + retry) |
| `kis_client.py` | KIS REST 분봉/호가 — V1 토큰 read-only 공유 (mtime cache) + 초당 2회 throttle + 지수 backoff |
| `kis_websocket.py` | KIS WebSocket H0STASP0 호가 + approval_key + 재연결 (1→2→4→max 30s) |
| `chronos_predictor.py` | `amazon/chronos-bolt-base` zero-shot quantile (FP32 강제 — FP16 overflow 회피) |
| `minute_momentum.py` | 5분봉 → strong_up/weak_up/neutral/weak_down/strong_down 5단계 분류 |
| `signal_generator.py` | **Phase 4 — 매수/매도 룰 엔진**. `generate_signals(state, dedup, settings)` 진입. sell-first → buy 순서. 신호 emit/skip INFO/DEBUG 로그 |
| `pull_worker.py` | asyncio cron — 장전 5분 / 장중 1분 / 장후 5분 / NXT / dead zone skip. cycle 끝에 `generate_signals` 호출 |
| `scheduler.py` | polling window 판정 (KST 캘린더 + 휴장일) |
| `rate_limit.py` | 초당 N회 token bucket |
| `dedup.py` | SignalDedup SQLite WAL — `(ticker, action)` PK 24h |
| `tests/` | 56 tests (pytest + respx HTTP mock + monkeypatch) |
| `data/` | dedup.db (SQLite WAL) + `holidays.json` (NAS stock 에서 manual copy) |
| `start.bat` | V2 진입 |
---
## 신호 룰 요약 (Phase 4)
### 매수 (screener Top-N + portfolio, sell 신호 받은 종목은 skip)
모두 충족:
1. `chronos.median > 0`
2. **`chronos.q90 - chronos.q10 < 0.6`** (absolute spread — 2026-05-17 spec amend, 기존 relative formula 가 zero-shot median≈0 빈번에서 모든 신호 거부)
3. `minute_momentum == strong_up` (env 로 조정 가능)
4. `asking_price.bid_ratio >= 0.6`
종합 confidence = `chronos_conf * 0.5 + minute_score * 0.3 + screener_norm * 0.2`. `> 0.7` 시 emit.
### 매도 (portfolio only, 우선순위 stop_loss → anomaly → take_profit)
- **stop_loss**: `pnl_pct < -7%` 즉시 (confidence=1.0)
- **anomaly**: `chronos.median < -1%` + `strong_down` + `bid_ratio < 0.4` + 종합 conf > 0.7
- **take_profit**: `pnl_pct > 15%` 검토 (confidence=0.6)
---
## 알려진 함정 / Phase 7 백로그
1. **KIS rate limit (EGW00201)** — V1+V2 동시 실행 시 충돌. Phase 6 자연 해소
2. **`.venv` 한글 경로 깨짐** — 시스템 Python 사용
3. **Chronos FP16 overflow** — 한국 주가 5만+ 시 inf. FP32 강제 (`chronos_predictor.py:39-41`)
4. **`predict_quantiles` positional `inputs`** — ChronosBolt API 새 변경. `try/except TypeError` fallback 처리됨
5. **`state.signals` consumer-drain protocol 미정의** — Phase 5 prereq. dict 무한 누적 위험 (실제로는 bounded by unique ticker count)
6. **integration test 가 poll_loop 실제 호출 안 함**`test_pull_worker.py:test_poll_loop_calls_generate_signals_after_cycle``generate_signals` 직접 호출. Phase 7 hardening 시 mock-iteration 으로 강화
7. **KIS WebSocket URL `ws://ops.koreainvestment.com:21000/31000`** — 첫 운영 시 실제 KIS API docs 와 대조 필요
8. **`_parse_asking_price` 필드 인덱스** — 마지막 2 필드 가정. 실 운영 raw 메시지 캡처 후 매핑 검증 필요
9. **`holidays.json` 자동 동기화 부재** — NAS stock 의 `holidays.json` 을 수동 copy
10. **schema rename** — Phase 0 §5.2 의 `lstm_pred_*`, `news_top[]``chronos_pred_*`, `news_reason(string)` 으로 변경됨. Phase 5 prompt 작성 시 반영
11. **6개 env 필드가 `.env` 에 미기재** — 기본값으로 동작 가능하나 discoverability 위해 `.env.example` 또는 commented block 추가 권장
---
## 다음 단계 (Phase 5 진입 시 brainstorming 주제)
- `state.signals` consumer 패턴: pop vs leave + Phase 5 자체 dedup
- agent-office 의 `/signal` endpoint 설계 — POST 페이로드 schema
- Ollama Qwen3 14B Q4 로컬 호출 — 타임아웃, retry, VRAM 공존 (Chronos + Qwen3 동시 메모리 9.3GB / 15.5GB 가용)
- 이중 텔레그램 (본인 풀 / 아내 lite) — context augmentation 단일 호출에서 양쪽 메시지 생성
- LLM 비용: ₩0 목표 유지 (로컬)
---
## 양쪽 디렉토리 (web-ui ↔ web-ai) 작업 시 주의
- **코드**: signal_v2 는 web-ai/, spec/plan/메모리는 web-ui/
- **커밋**: `web-ai``web-ui`**별도 Gitea 저장소**. 각각 경로에서만 `git add/commit/push`
- **메모리**: Claude Code 의 auto-memory 는 디렉토리별 격리. 핵심 reference 는 양쪽에 미러됨 (`./memory-mirror/` 또는 `~/.claude/projects/C--Users-jaeoh-Desktop-workspace-web-ai/memory/`)
- **spec amendment 발생 시**: 코드는 `web-ai` 에 commit, spec 갱신은 `web-ui/docs/superpowers/specs/` 에 commit (Phase 4 spread formula 변경 사례 = web-ui commit `534ded5`)
자세한 V1 가이드는 `signal_v1/CLAUDE.md` 참조 (있다면).

View File

@@ -7,3 +7,7 @@ pytest>=8.0
pytest-asyncio>=0.23 pytest-asyncio>=0.23
respx>=0.21 respx>=0.21
websockets>=12 websockets>=12
# Phase 3b dependencies (Chronos-2 + ML)
transformers>=4.40
chronos-forecasting>=1.4
# torch: typically already installed via V1 venv; if not, install with CUDA support manually

View File

@@ -0,0 +1,132 @@
"""Chronos-2 zero-shot forecaster wrapper."""
from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import datetime
from zoneinfo import ZoneInfo
import numpy as np
logger = logging.getLogger(__name__)
KST = ZoneInfo("Asia/Seoul")
@dataclass
class ChronosPrediction:
median: float
q10: float
q90: float
conf: float
as_of: str
class ChronosPredictor:
"""HuggingFace Chronos-2 zero-shot forecaster."""
def __init__(self, model_name: str = "amazon/chronos-2", device: str | None = None):
# BaseChronosPipeline auto-detects model variant (Chronos / ChronosBolt / Chronos-2)
# and returns the appropriate sub-pipeline. ChronosPipeline only supports legacy T5.
import torch
try:
from chronos import BaseChronosPipeline
pipeline_cls = BaseChronosPipeline
except ImportError:
from chronos import ChronosPipeline
pipeline_cls = ChronosPipeline
self._device = device or ("cuda" if torch.cuda.is_available() else "cpu")
# Always use float32 — Korean stock prices (e.g. 280,000원) exceed FP16 max (~65,504)
# causing inf in quantile output. FP32 is safe for typical price magnitudes.
dtype = torch.float32
logger.info("Loading Chronos pipeline: %s on %s (cls=%s)",
model_name, self._device, pipeline_cls.__name__)
# Try `dtype` (newer API) first, fall back to `torch_dtype` (older)
try:
self._pipeline = pipeline_cls.from_pretrained(
model_name, device_map=self._device, dtype=dtype,
)
except TypeError:
self._pipeline = pipeline_cls.from_pretrained(
model_name, device_map=self._device, torch_dtype=dtype,
)
logger.info("Chronos pipeline loaded.")
def predict_batch(
self,
daily_ohlcv_dict: dict[str, list[dict]],
prediction_length: int = 1,
num_samples: int = 100,
) -> dict[str, ChronosPrediction]:
"""종목별 1-day return 분포 예측.
ChronosBolt / Chronos-2 등 신모델은 predict_quantiles 사용 (deterministic).
Legacy ChronosPipeline (T5) 는 sample-based predict.
"""
import torch
tickers = list(daily_ohlcv_dict.keys())
if not tickers:
return {}
contexts = [
torch.tensor([bar["close"] for bar in daily_ohlcv_dict[t]], dtype=torch.float32)
for t in tickers
]
now_iso = datetime.now(KST).isoformat()
results: dict[str, ChronosPrediction] = {}
# Modern API: predict_quantiles (ChronosBolt / Chronos-2)
if hasattr(self._pipeline, "predict_quantiles"):
quantile_levels = [0.1, 0.5, 0.9]
# ChronosBolt API: positional `inputs` (first arg). Older variants use `context`.
try:
quantiles_tensor, _ = self._pipeline.predict_quantiles(
contexts,
prediction_length=prediction_length,
quantile_levels=quantile_levels,
)
except TypeError:
quantiles_tensor, _ = self._pipeline.predict_quantiles(
context=contexts,
prediction_length=prediction_length,
quantile_levels=quantile_levels,
)
quantiles_np = (
quantiles_tensor.cpu().numpy()
if hasattr(quantiles_tensor, "cpu")
else np.asarray(quantiles_tensor)
)
# shape: [num_series, prediction_length, 3]
for i, ticker in enumerate(tickers):
q10_price, q50_price, q90_price = quantiles_np[i, 0, :]
last_close = daily_ohlcv_dict[ticker][-1]["close"]
median = float((q50_price - last_close) / last_close)
q10 = float((q10_price - last_close) / last_close)
q90 = float((q90_price - last_close) / last_close)
spread = (q90 - q10) / max(abs(median), 0.001)
conf = float(max(0.0, min(1.0, 1.0 - spread / 2.0)))
results[ticker] = ChronosPrediction(
median=median, q10=q10, q90=q90, conf=conf, as_of=now_iso,
)
return results
# Legacy API: sample-based predict (ChronosPipeline T5)
forecasts = self._pipeline.predict(
context=contexts,
prediction_length=prediction_length,
num_samples=num_samples,
)
forecasts_np = forecasts.numpy() if hasattr(forecasts, "numpy") else np.asarray(forecasts)
for i, ticker in enumerate(tickers):
samples = forecasts_np[i, :, 0]
last_close = daily_ohlcv_dict[ticker][-1]["close"]
returns = (samples - last_close) / last_close
median = float(np.quantile(returns, 0.5))
q10 = float(np.quantile(returns, 0.1))
q90 = float(np.quantile(returns, 0.9))
spread = (q90 - q10) / max(abs(median), 0.001)
conf = float(max(0.0, min(1.0, 1.0 - spread / 2.0)))
results[ticker] = ChronosPrediction(
median=median, q10=q10, q90=q90, conf=conf, as_of=now_iso,
)
return results

View File

@@ -34,6 +34,25 @@ class Settings:
str(Path(__file__).parent.parent / "signal_v1" / "data" / "kis_token.json")) str(Path(__file__).parent.parent / "signal_v1" / "data" / "kis_token.json"))
) )
) )
chronos_model: str = field(default_factory=lambda: os.getenv("CHRONOS_MODEL", "amazon/chronos-2"))
stop_loss_pct: float = field(
default_factory=lambda: float(os.getenv("STOP_LOSS_PCT", "-0.07"))
)
take_profit_pct: float = field(
default_factory=lambda: float(os.getenv("TAKE_PROFIT_PCT", "0.15"))
)
chronos_spread_threshold: float = field(
default_factory=lambda: float(os.getenv("CHRONOS_SPREAD_THRESHOLD", "0.6"))
)
asking_bid_ratio_threshold: float = field(
default_factory=lambda: float(os.getenv("ASKING_BID_RATIO_THRESHOLD", "0.6"))
)
confidence_threshold: float = field(
default_factory=lambda: float(os.getenv("CONFIDENCE_THRESHOLD", "0.7"))
)
min_momentum_for_buy: str = field(
default_factory=lambda: os.getenv("MIN_MOMENTUM_FOR_BUY", "strong_up")
)
@property @property
def kis_is_virtual(self) -> bool: def kis_is_virtual(self) -> bool:

View File

@@ -4,7 +4,7 @@ import asyncio
import json import json
import logging import logging
import time import time
from datetime import datetime from datetime import datetime, timedelta
from pathlib import Path from pathlib import Path
from zoneinfo import ZoneInfo from zoneinfo import ZoneInfo
@@ -153,3 +153,41 @@ class KISClient:
"current_price": current_price, "current_price": current_price,
"as_of": datetime.now(KST).isoformat(), "as_of": datetime.now(KST).isoformat(),
} }
async def get_daily_ohlcv(self, ticker: str, days: int = 60) -> list[dict]:
"""KRX 일봉 OHLCV (TR_ID FHKST03010100).
Returns: [{"datetime", "open", "high", "low", "close", "volume"}, ...]
시간 오름차순.
"""
path = "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice"
today = datetime.now(KST).strftime("%Y%m%d")
start_date = (datetime.now(KST) - timedelta(days=days * 2)).strftime("%Y%m%d")
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": ticker,
"FID_INPUT_DATE_1": start_date,
"FID_INPUT_DATE_2": today,
"FID_PERIOD_DIV_CODE": "D",
"FID_ORG_ADJ_PRC": "1",
}
raw = await self._request_with_retry(
"GET", path, tr_id="FHKST03010100", params=params,
)
output2 = raw.get("output2", [])
bars = []
for row in output2:
try:
date = row["stck_bsop_date"]
bars.append({
"datetime": f"{date[:4]}-{date[4:6]}-{date[6:]}",
"open": int(row["stck_oprc"]),
"high": int(row["stck_hgpr"]),
"low": int(row["stck_lwpr"]),
"close": int(row["stck_clpr"]),
"volume": int(row["acml_vol"]),
})
except (KeyError, ValueError):
continue
bars.reverse()
return bars[-days:]

View File

@@ -7,6 +7,7 @@ from contextlib import asynccontextmanager
from fastapi import FastAPI from fastapi import FastAPI
from signal_v2 import state as state_mod from signal_v2 import state as state_mod
from signal_v2.chronos_predictor import ChronosPredictor
from signal_v2.config import get_settings from signal_v2.config import get_settings
from signal_v2.kis_client import KISClient from signal_v2.kis_client import KISClient
from signal_v2.kis_websocket import KISWebSocket from signal_v2.kis_websocket import KISWebSocket
@@ -24,6 +25,7 @@ class AppContext:
poll_task: asyncio.Task | None = None poll_task: asyncio.Task | None = None
kis_client: KISClient | None = None kis_client: KISClient | None = None
kis_ws: KISWebSocket | None = None kis_ws: KISWebSocket | None = None
chronos: ChronosPredictor | None = None
_ctx = AppContext() _ctx = AppContext()
@@ -69,10 +71,19 @@ async def lifespan(app: FastAPI):
except Exception: except Exception:
logger.exception("KIS WebSocket startup failed — continuing without realtime asking_price") logger.exception("KIS WebSocket startup failed — continuing without realtime asking_price")
# Load Chronos (heavy: ~1GB model download first time)
try:
_ctx.chronos = ChronosPredictor(model_name=settings.chronos_model)
except Exception:
logger.exception("ChronosPredictor load failed — continuing without chronos predictions")
_ctx.poll_task = asyncio.create_task( _ctx.poll_task = asyncio.create_task(
poll_loop( poll_loop(
_ctx.client, state_mod.state, _ctx.shutdown, _ctx.client, state_mod.state, _ctx.shutdown,
kis_client=_ctx.kis_client, kis_client=_ctx.kis_client,
chronos=_ctx.chronos,
dedup=_ctx.dedup,
settings=settings,
) )
) )

View File

@@ -0,0 +1,69 @@
"""분봉 OHLCV → 5-level 모멘텀 분류."""
from __future__ import annotations
from collections import deque
# 분류 카테고리
STRONG_UP = "strong_up"
WEAK_UP = "weak_up"
NEUTRAL = "neutral"
WEAK_DOWN = "weak_down"
STRONG_DOWN = "strong_down"
_BARS_PER_5MIN = 5
_LOOKBACK_5MIN_BARS = 5
_VOLUME_AVG_WINDOW = 12 # 60분 = 5분봉 12개
def aggregate_1min_to_5min(minute_bars: list[dict]) -> list[dict]:
"""1분봉 N개 → 5분봉 floor(N/5) 개. 시간 오름차순.
각 5분봉: open=첫 1분봉 open, high=max, low=min, close=마지막 close, volume=sum.
"""
bars_5min = []
chunks = len(minute_bars) // _BARS_PER_5MIN
for i in range(chunks):
chunk = minute_bars[i * _BARS_PER_5MIN : (i + 1) * _BARS_PER_5MIN]
bars_5min.append({
"datetime": chunk[0]["datetime"],
"open": chunk[0]["open"],
"high": max(b["high"] for b in chunk),
"low": min(b["low"] for b in chunk),
"close": chunk[-1]["close"],
"volume": sum(b["volume"] for b in chunk),
})
return bars_5min
def classify_minute_momentum(minute_bars: deque) -> str:
"""1분봉 deque → 5-level 모멘텀 분류.
Returns: STRONG_UP / WEAK_UP / NEUTRAL / WEAK_DOWN / STRONG_DOWN
"""
minute_list = list(minute_bars)
if len(minute_list) < _BARS_PER_5MIN * _LOOKBACK_5MIN_BARS:
return NEUTRAL # 데이터 부족
bars_5min = aggregate_1min_to_5min(minute_list)
if len(bars_5min) < _LOOKBACK_5MIN_BARS:
return NEUTRAL
recent = bars_5min[-_LOOKBACK_5MIN_BARS:]
up_count = sum(1 for b in recent if b["close"] > b["open"])
# 거래량 multiplier: recent 5 avg vs 60분 avg
recent_vol_avg = sum(b["volume"] for b in recent) / len(recent)
long_window = bars_5min[-_VOLUME_AVG_WINDOW:]
long_vol_avg = sum(b["volume"] for b in long_window) / len(long_window)
vol_mult = recent_vol_avg / long_vol_avg if long_vol_avg > 0 else 1.0
# 5-level 분류
if up_count == 5 and vol_mult >= 1.5:
return STRONG_UP
elif up_count >= 3 and vol_mult >= 1.0:
return WEAK_UP
elif up_count == 0 and vol_mult >= 1.5:
return STRONG_DOWN
elif up_count <= 2 and vol_mult < 1.0:
return WEAK_DOWN
else:
return NEUTRAL

View File

@@ -7,7 +7,7 @@ from datetime import datetime
from signal_v2.kis_client import KISClient from signal_v2.kis_client import KISClient
from signal_v2.scheduler import ( from signal_v2.scheduler import (
KST, _is_market_day, _is_polling_window, _next_interval, KST, _is_market_day, _is_polling_window, _next_interval, _is_post_close_trigger,
) )
from signal_v2.state import PollState from signal_v2.state import PollState
from signal_v2.stock_client import StockClient from signal_v2.stock_client import StockClient
@@ -18,6 +18,9 @@ logger = logging.getLogger(__name__)
async def poll_loop( async def poll_loop(
client: StockClient, state: PollState, shutdown: asyncio.Event, client: StockClient, state: PollState, shutdown: asyncio.Event,
kis_client: KISClient | None = None, kis_client: KISClient | None = None,
chronos=None,
dedup=None,
settings=None,
) -> None: ) -> None:
"""FastAPI lifespan 에서 asyncio.create_task 로 시작.""" """FastAPI lifespan 에서 asyncio.create_task 로 시작."""
logger.info("poll_loop started") logger.info("poll_loop started")
@@ -28,6 +31,24 @@ async def poll_loop(
await _run_polling_cycle(client, state, kis_client=kis_client) await _run_polling_cycle(client, state, kis_client=kis_client)
except Exception: except Exception:
logger.exception("poll cycle failed") logger.exception("poll cycle failed")
# Minute momentum 갱신 (매 cycle)
try:
update_minute_momentum_for_all(state)
except Exception:
logger.exception("minute momentum update failed")
# Post-close trigger (16:00 KST)
if _is_post_close_trigger(now) and chronos is not None and kis_client is not None:
try:
await _run_post_close_cycle(kis_client, chronos, state)
except Exception:
logger.exception("post-close cycle failed")
# Phase 4: generate signals
if dedup is not None and settings is not None:
try:
from signal_v2.signal_generator import generate_signals
generate_signals(state, dedup, settings)
except Exception:
logger.exception("generate_signals failed")
interval = _next_interval(now) interval = _next_interval(now)
try: try:
await asyncio.wait_for(shutdown.wait(), timeout=interval) await asyncio.wait_for(shutdown.wait(), timeout=interval)
@@ -125,3 +146,48 @@ def _screener_tickers(state: PollState) -> list[str]:
if state.screener_preview is None: if state.screener_preview is None:
return [] return []
return [i["ticker"] for i in state.screener_preview.get("items", []) if "ticker" in i] return [i["ticker"] for i in state.screener_preview.get("items", []) if "ticker" in i]
async def _run_post_close_cycle(kis_client, chronos, state) -> None:
"""16:00 KST 종가 후 1회: daily fetch + chronos predict."""
tickers = list(set(_portfolio_tickers(state)) | set(_screener_tickers(state)))
if not tickers:
return
daily_results = await asyncio.gather(*[
kis_client.get_daily_ohlcv(t, days=60) for t in tickers
], return_exceptions=True)
daily_dict = {}
for ticker, result in zip(tickers, daily_results):
if isinstance(result, list) and len(result) >= 30:
daily_dict[ticker] = result
state.daily_ohlcv[ticker] = result
elif isinstance(result, Exception):
state.fetch_errors[f"daily_ohlcv/{ticker}"] = (
state.fetch_errors.get(f"daily_ohlcv/{ticker}", 0) + 1
)
if daily_dict and chronos is not None:
try:
predictions = chronos.predict_batch(daily_dict)
except Exception:
logger.exception("chronos predict_batch failed")
return
for ticker, pred in predictions.items():
state.chronos_predictions[ticker] = {
"median": pred.median,
"q10": pred.q10,
"q90": pred.q90,
"conf": pred.conf,
"as_of": pred.as_of,
}
state.last_updated[f"chronos/{ticker}"] = pred.as_of
def update_minute_momentum_for_all(state) -> None:
"""매 분봉 cycle 후 호출 — 모든 종목 모멘텀 갱신."""
from signal_v2.momentum_classifier import classify_minute_momentum
now_iso = datetime.now(KST).isoformat()
for ticker, bars in state.minute_bars.items():
state.minute_momentum[ticker] = classify_minute_momentum(bars)
state.last_updated[f"momentum/{ticker}"] = now_iso

View File

@@ -76,6 +76,14 @@ def _seconds_until_nxt_or_market_open(now: datetime) -> float:
return 86400.0 return 86400.0
def _is_post_close_trigger(now: datetime) -> bool:
"""16:00 KST ±1분 (post-close cycle 트리거). 평일/영업일만."""
if not _is_market_day(now):
return False
t = now.time()
return time(16, 0) <= t < time(16, 1)
def _seconds_until_next_market_open(now: datetime) -> float: def _seconds_until_next_market_open(now: datetime) -> float:
"""다음 영업일의 07:00 KST 까지 초수 (휴장일/주말용).""" """다음 영업일의 07:00 KST 까지 초수 (휴장일/주말용)."""
candidate = now.replace(hour=7, minute=0, second=0, microsecond=0) candidate = now.replace(hour=7, minute=0, second=0, microsecond=0)

View File

@@ -0,0 +1,228 @@
"""Phase 4 — 매수/매도 신호 생성.
순수 함수 generate_signals(state, dedup, settings). state 를 mutate.
"""
from __future__ import annotations
import logging
from datetime import datetime
from zoneinfo import ZoneInfo
logger = logging.getLogger(__name__)
KST = ZoneInfo("Asia/Seoul")
MOMENTUM_SCORES = {
"strong_up": 1.0,
"weak_up": 0.7,
"neutral": 0.5,
"weak_down": 0.3,
"strong_down": 0.0,
}
def generate_signals(state, dedup, settings) -> None:
"""Phase 4 entry — state-mutating. Evaluation order: sell first (priority), then buy. A ticker receiving a sell signal in this cycle is excluded from buy evaluation to avoid silent overwrite."""
_evaluate_sell_signals(state, dedup, settings)
_evaluate_buy_signals(state, dedup, settings)
# ----- 매수 -----
def _evaluate_buy_signals(state, dedup, settings) -> None:
candidates = _buy_candidates(state)
for ticker, name, rank in candidates:
existing = state.signals.get(ticker)
if existing is not None and existing.get("action") == "sell":
logger.debug("buy %s skipped: same-cycle sell precedence", ticker)
continue
if not _check_buy_hard_gate(state, ticker, settings):
logger.debug("buy %s skipped: hard gate failed", ticker)
continue
confidence = _compute_buy_confidence(state, ticker, rank)
if confidence <= settings.confidence_threshold:
logger.debug("buy %s skipped: confidence %.3f <= %.3f",
ticker, confidence, settings.confidence_threshold)
continue
if dedup.is_recent(ticker, "buy", within_hours=24):
logger.debug("buy %s skipped: dedup 24h", ticker)
continue
state.signals[ticker] = _build_buy_signal(state, ticker, name, rank, confidence)
dedup.record(ticker, "buy", confidence=confidence)
logger.info("signal emit %s buy conf=%.3f rank=%s", ticker, confidence, rank)
def _buy_candidates(state) -> list[tuple[str, str, int | None]]:
"""screener Top-N (rank 1..N) + portfolio (rank=None)."""
candidates: list[tuple[str, str, int | None]] = []
seen: set[str] = set()
if state.screener_preview is not None:
for i, item in enumerate(state.screener_preview.get("items", [])):
ticker = item.get("ticker")
if not ticker or ticker in seen:
continue
seen.add(ticker)
name = item.get("name", ticker)
candidates.append((ticker, name, i + 1))
if state.portfolio is not None:
for h in state.portfolio.get("holdings", []):
ticker = h.get("ticker")
if not ticker or ticker in seen:
continue
seen.add(ticker)
candidates.append((ticker, h.get("name", ticker), None))
return candidates
def _check_buy_hard_gate(state, ticker: str, settings) -> bool:
pred = state.chronos_predictions.get(ticker)
if pred is None or pred.get("median", 0) <= 0:
return False
spread = pred.get("q90", 0) - pred.get("q10", 0)
if spread >= settings.chronos_spread_threshold:
return False
momentum = state.minute_momentum.get(ticker)
if momentum != settings.min_momentum_for_buy:
return False
ap = state.asking_price.get(ticker)
if ap is None or ap.get("bid_ratio", 0) < settings.asking_bid_ratio_threshold:
return False
return True
def _compute_buy_confidence(state, ticker: str, rank: int | None) -> float:
pred = state.chronos_predictions[ticker]
chronos_conf = pred["conf"]
minute_score = MOMENTUM_SCORES.get(state.minute_momentum.get(ticker, "neutral"), 0.5)
screener_norm = max(0.0, 1 - (rank - 1) / 20) if rank is not None else 0.0
return chronos_conf * 0.5 + minute_score * 0.3 + screener_norm * 0.2
def _build_buy_signal(state, ticker: str, name: str, rank: int | None, confidence: float) -> dict:
ap = state.asking_price[ticker]
return {
"ticker": ticker,
"name": name,
"action": "buy",
"confidence_webai": confidence,
"current_price": ap["current_price"],
"avg_price": None,
"pnl_pct": None,
"context": _build_context(state, ticker, rank),
"as_of": datetime.now(KST).isoformat(),
}
# ----- 매도 -----
def _evaluate_sell_signals(state, dedup, settings) -> None:
if state.portfolio is None:
return
for holding in state.portfolio.get("holdings", []):
ticker = holding.get("ticker")
if not ticker:
continue
sell = _try_stop_loss(state, holding, settings)
if sell is None:
sell = _try_anomaly(state, holding, settings)
if sell is None:
sell = _try_take_profit(state, holding, settings)
if sell is None:
continue
if dedup.is_recent(ticker, "sell", within_hours=24):
logger.debug("sell %s skipped: dedup 24h", ticker)
continue
state.signals[ticker] = sell
dedup.record(ticker, "sell", confidence=sell["confidence_webai"])
logger.info("signal emit %s sell conf=%.3f reason=%s",
ticker, sell["confidence_webai"],
sell.get("context", {}).get("sell_reason"))
def _try_stop_loss(state, holding: dict, settings) -> dict | None:
pnl = holding.get("pnl_pct")
if pnl is None or pnl >= settings.stop_loss_pct:
return None
return _build_sell_signal(state, holding, confidence=1.0, reason="stop_loss")
def _try_take_profit(state, holding: dict, settings) -> dict | None:
pnl = holding.get("pnl_pct")
if pnl is None or pnl <= settings.take_profit_pct:
return None
return _build_sell_signal(state, holding, confidence=0.6, reason="take_profit")
def _try_anomaly(state, holding: dict, settings) -> dict | None:
ticker = holding["ticker"]
pred = state.chronos_predictions.get(ticker)
if pred is None or pred["median"] >= -0.01:
return None
momentum = state.minute_momentum.get(ticker)
if momentum != "strong_down":
return None
ap = state.asking_price.get(ticker)
if ap is None:
return None
if ap["bid_ratio"] > (1 - settings.asking_bid_ratio_threshold):
return None
minute_score = 1.0 - MOMENTUM_SCORES.get(momentum, 0.5)
confidence = pred["conf"] * 0.5 + minute_score * 0.3 + 1.0 * 0.2
if confidence <= settings.confidence_threshold:
return None
return _build_sell_signal(state, holding, confidence=confidence, reason="anomaly")
def _build_sell_signal(state, holding: dict, confidence: float, reason: str) -> dict:
ticker = holding["ticker"]
return {
"ticker": ticker,
"name": holding.get("name", ticker),
"action": "sell",
"confidence_webai": confidence,
"current_price": holding.get("current_price"),
"avg_price": holding.get("avg_price"),
"pnl_pct": holding.get("pnl_pct"),
"context": _build_context(state, ticker, rank=None, sell_reason=reason),
"as_of": datetime.now(KST).isoformat(),
}
# ----- Context -----
def _build_context(state, ticker: str, rank: int | None, sell_reason: str | None = None) -> dict:
pred = state.chronos_predictions.get(ticker) or {}
ap = state.asking_price.get(ticker) or {}
news_item = _find_news_sentiment(state, ticker)
screener_scores = _find_screener_scores(state, ticker)
context: dict = {
"chronos_pred_1d": pred.get("median"),
"chronos_pred_conf": pred.get("conf"),
"chronos_q10": pred.get("q10"),
"chronos_q90": pred.get("q90"),
"screener_rank": rank,
"screener_scores": screener_scores,
"minute_momentum": state.minute_momentum.get(ticker),
"asking_bid_ratio": ap.get("bid_ratio"),
"news_sentiment": news_item.get("score") if news_item else None,
"news_reason": news_item.get("reason") if news_item else None,
}
if sell_reason is not None:
context["sell_reason"] = sell_reason
return context
def _find_news_sentiment(state, ticker: str) -> dict | None:
if state.news_sentiment is None:
return None
for item in state.news_sentiment.get("items", []):
if item.get("ticker") == ticker:
return item
return None
def _find_screener_scores(state, ticker: str) -> dict | None:
if state.screener_preview is None:
return None
for item in state.screener_preview.get("items", []):
if item.get("ticker") == ticker:
return item.get("scores")
return None

View File

@@ -8,9 +8,13 @@ class PollState:
portfolio: dict | None = None portfolio: dict | None = None
news_sentiment: dict | None = None news_sentiment: dict | None = None
screener_preview: dict | None = None screener_preview: dict | None = None
# Phase 3a additions
minute_bars: dict[str, deque] = field(default_factory=dict) minute_bars: dict[str, deque] = field(default_factory=dict)
asking_price: dict[str, dict] = field(default_factory=dict) asking_price: dict[str, dict] = field(default_factory=dict)
# Phase 3b additions
daily_ohlcv: dict[str, list[dict]] = field(default_factory=dict)
chronos_predictions: dict[str, dict] = field(default_factory=dict)
minute_momentum: dict[str, str] = field(default_factory=dict)
signals: dict[str, dict] = field(default_factory=dict)
last_updated: dict[str, str] = field(default_factory=dict) last_updated: dict[str, str] = field(default_factory=dict)
fetch_errors: dict[str, int] = field(default_factory=dict) fetch_errors: dict[str, int] = field(default_factory=dict)

View File

@@ -0,0 +1,92 @@
"""Tests for ChronosPredictor (model mock)."""
from unittest.mock import MagicMock, patch
import numpy as np
import pytest
@pytest.fixture
def mock_pipeline():
"""Mock BaseChronosPipeline.from_pretrained returning a mock pipeline object."""
with patch("chronos.BaseChronosPipeline") as cls:
cls.__name__ = "BaseChronosPipeline"
instance = MagicMock()
# ChronosBolt API: predict_quantiles returns (quantiles_tensor, mean_tensor)
# Modern (predict_quantiles) branch will be used since hasattr(MagicMock, "predict_quantiles") is True.
cls.from_pretrained.return_value = instance
yield instance
@pytest.fixture
def mock_torch_cpu():
with patch("torch.cuda.is_available", return_value=False):
yield
def _daily_ohlcv(close_seq):
return [{"datetime": f"2026-05-{i+1:02d}", "open": c, "high": c, "low": c,
"close": c, "volume": 1000} for i, c in enumerate(close_seq)]
def _mk_quantiles_tensor(q10_price: float, q50_price: float, q90_price: float):
"""Helper: build predict_quantiles return tensor shape [1, 1, 3]."""
import torch
return torch.tensor([[[q10_price, q50_price, q90_price]]], dtype=torch.float32)
def test_predict_batch_returns_prediction_dict(mock_pipeline, mock_torch_cpu):
"""mock predict_quantiles → dict[ticker, ChronosPrediction]. last_close=100, q50=102 → median≈+2%."""
quantiles = _mk_quantiles_tensor(101.5, 102.0, 102.5) # narrow around 102
mock_pipeline.predict_quantiles.return_value = (quantiles, None)
from signal_v2.chronos_predictor import ChronosPredictor, ChronosPrediction
predictor = ChronosPredictor(model_name="mock-model")
daily = {"005930": _daily_ohlcv([100] * 60)}
result = predictor.predict_batch(daily)
assert "005930" in result
pred = result["005930"]
assert isinstance(pred, ChronosPrediction)
assert abs(pred.median - 0.02) < 0.001
def test_conf_high_when_distribution_narrow(mock_pipeline, mock_torch_cpu):
"""좁은 distribution (q90-q10 작음, median 0 아님) → conf ≈ 1."""
# last_close=100, q10=101.99, q50=102.00, q90=102.01
# returns: q10=0.0199, q50=0.02, q90=0.0201
# spread = (0.0201 - 0.0199) / max(0.02, 0.001) = 0.0002/0.02 = 0.01 → conf = 1 - 0.005 = 0.995
quantiles = _mk_quantiles_tensor(101.99, 102.0, 102.01)
mock_pipeline.predict_quantiles.return_value = (quantiles, None)
from signal_v2.chronos_predictor import ChronosPredictor
predictor = ChronosPredictor(model_name="mock-model")
daily = {"005930": _daily_ohlcv([100] * 60)}
result = predictor.predict_batch(daily)
assert result["005930"].conf > 0.8
def test_conf_low_when_distribution_wide(mock_pipeline, mock_torch_cpu):
"""넓은 distribution → conf ≈ 0."""
# last_close=100, q10=70, q50=100, q90=130
# returns: q10=-0.3, q50=0.0, q90=0.3
# spread = (0.3 - (-0.3)) / max(0.0, 0.001) = 0.6 / 0.001 = 600 → conf = max(0, 1 - 300) = 0
quantiles = _mk_quantiles_tensor(70.0, 100.0, 130.0)
mock_pipeline.predict_quantiles.return_value = (quantiles, None)
from signal_v2.chronos_predictor import ChronosPredictor
predictor = ChronosPredictor(model_name="mock-model")
daily = {"005930": _daily_ohlcv([100] * 60)}
result = predictor.predict_batch(daily)
assert result["005930"].conf < 0.3
def test_return_computed_from_price_relative_to_last_close(mock_pipeline, mock_torch_cpu):
"""price 예측 → last_close 대비 return 변환. last_close=100, q50=110 → return ≈ +10%."""
quantiles = _mk_quantiles_tensor(109.0, 110.0, 111.0)
mock_pipeline.predict_quantiles.return_value = (quantiles, None)
from signal_v2.chronos_predictor import ChronosPredictor
predictor = ChronosPredictor(model_name="mock-model")
# last close = 100
daily = {"005930": _daily_ohlcv(list(range(41, 101)))} # last = 100
result = predictor.predict_batch(daily)
assert abs(result["005930"].median - 0.10) < 0.001

View File

@@ -126,3 +126,36 @@ async def test_get_asking_price_computes_bid_ratio(kis_client_factory):
assert "as_of" in data assert "as_of" in data
finally: finally:
await client.close() await client.close()
@respx.mock
async def test_get_daily_ohlcv_returns_60_bars(kis_client_factory):
"""KIS daily endpoint returns 60 ascending bars after parsing."""
# Build 60 KIS-format daily bars (descending dates as KIS does)
sample_output2 = []
for i in range(60):
# Generate a fake date 60 days ago, descending
day = 60 - i
sample_output2.append({
"stck_bsop_date": f"2026{(((day-1)//30)+1):02d}{(((day-1)%30)+1):02d}",
"stck_oprc": "78000", "stck_hgpr": "78500",
"stck_lwpr": "77800", "stck_clpr": str(78000 + i),
"acml_vol": "12345",
})
respx.get(
"https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice"
).mock(return_value=httpx.Response(200, json={"output2": sample_output2}))
client = kis_client_factory()
try:
bars = await client.get_daily_ohlcv("005930", days=60)
# KIS returns descending; client reverses to ascending
assert len(bars) == 60
# Ascending order: first item has smaller datetime than last
assert bars[0]["datetime"] < bars[-1]["datetime"]
assert isinstance(bars[0]["open"], int)
assert isinstance(bars[0]["close"], int)
assert "datetime" in bars[0]
finally:
await client.close()

View File

@@ -0,0 +1,92 @@
"""Tests for minute momentum classifier."""
from collections import deque
from signal_v2.momentum_classifier import (
aggregate_1min_to_5min, classify_minute_momentum,
STRONG_UP, WEAK_UP, NEUTRAL, WEAK_DOWN, STRONG_DOWN,
)
def _bar(open_, high, low, close, volume):
return {
"datetime": "2026-05-18T09:00:00+09:00",
"open": open_, "high": high, "low": low, "close": close, "volume": volume,
}
def _make_chunks(num_chunks_up: int, num_chunks_total: int, base_vol: int = 1000):
"""num_chunks_total 개의 5-bar 청크. num_chunks_up 청크는 양봉, 나머지는 음봉.
각 청크는 5개 1분봉. 거래량 = base_vol per bar.
"""
bars = []
for i in range(num_chunks_total):
is_up = i < num_chunks_up
o, c = (100, 110) if is_up else (110, 100)
for j in range(5):
bars.append(_bar(o, max(o, c) + 5, min(o, c) - 5, c, base_vol))
return bars
def test_strong_up_5_consecutive_green_with_high_volume():
"""직전 5개 5분봉 모두 양봉 + 거래량 1.5x → STRONG_UP."""
# 60분 (12 5분봉) 데이터: 7 normal + 5 high-vol up
older = _make_chunks(num_chunks_up=3, num_chunks_total=7, base_vol=1000)
recent = _make_chunks(num_chunks_up=5, num_chunks_total=5, base_vol=2500)
minute_bars = deque(older + recent, maxlen=60)
assert classify_minute_momentum(minute_bars) == STRONG_UP
def test_weak_up_3of5_green_normal_volume():
"""직전 5개 5분봉 중 3-4개 양봉 + 거래량 ≥ 1.0x → WEAK_UP."""
older = _make_chunks(num_chunks_up=3, num_chunks_total=7, base_vol=1000)
# 5 chunks: 3 up + 2 down, normal vol
recent_up = _make_chunks(num_chunks_up=3, num_chunks_total=3, base_vol=1000)
recent_down = _make_chunks(num_chunks_up=0, num_chunks_total=2, base_vol=1000)
minute_bars = deque(older + recent_up + recent_down, maxlen=60)
assert classify_minute_momentum(minute_bars) == WEAK_UP
def test_neutral_mixed():
"""up_count=2, vol normal → NEUTRAL (rule 미해당)."""
older = _make_chunks(num_chunks_up=3, num_chunks_total=7, base_vol=1000)
recent_up = _make_chunks(num_chunks_up=2, num_chunks_total=2, base_vol=1000)
recent_down = _make_chunks(num_chunks_up=0, num_chunks_total=3, base_vol=1000)
minute_bars = deque(older + recent_up + recent_down, maxlen=60)
# up_count=2, vol_mult=1.0 → 어느 분기 조건도 만족 안 함 → NEUTRAL
assert classify_minute_momentum(minute_bars) == NEUTRAL
def test_weak_down_low_green_low_volume():
"""up_count <= 2 + vol < 1.0 → WEAK_DOWN."""
older = _make_chunks(num_chunks_up=3, num_chunks_total=7, base_vol=1000)
recent_up = _make_chunks(num_chunks_up=1, num_chunks_total=1, base_vol=500)
recent_down = _make_chunks(num_chunks_up=0, num_chunks_total=4, base_vol=500)
minute_bars = deque(older + recent_up + recent_down, maxlen=60)
# recent 5 chunks avg vol = 500, long 12 avg ≈ (7*1000 + 5*500) / 12 ≈ 791 → vol_mult ≈ 0.63
assert classify_minute_momentum(minute_bars) == WEAK_DOWN
def test_strong_down_5_consecutive_red_high_volume():
"""직전 5개 5분봉 모두 음봉 + 거래량 1.5x → STRONG_DOWN."""
older = _make_chunks(num_chunks_up=3, num_chunks_total=7, base_vol=1000)
recent = _make_chunks(num_chunks_up=0, num_chunks_total=5, base_vol=2500)
minute_bars = deque(older + recent, maxlen=60)
assert classify_minute_momentum(minute_bars) == STRONG_DOWN
def test_aggregate_1min_to_5min_correctness():
"""5 1분봉 → 1개 5분봉 — open/close/high/low/volume 정확."""
bars = [
_bar(100, 105, 99, 102, 1000),
_bar(102, 108, 101, 107, 1500),
_bar(107, 110, 105, 106, 800),
_bar(106, 109, 104, 108, 1200),
_bar(108, 112, 107, 111, 900),
]
result = aggregate_1min_to_5min(bars)
assert len(result) == 1
assert result[0]["open"] == 100 # 첫 bar
assert result[0]["close"] == 111 # 마지막 bar
assert result[0]["high"] == 112 # max
assert result[0]["low"] == 99 # min
assert result[0]["volume"] == 5400 # sum

View File

@@ -53,3 +53,79 @@ def test_websocket_message_updates_state_asking_price():
"current_price": 78500, "as_of": "2026-05-18T10:00:00+09:00"}) "current_price": 78500, "as_of": "2026-05-18T10:00:00+09:00"})
assert state.asking_price["005930"]["bid_total"] == 1000 assert state.asking_price["005930"]["bid_total"] == 1000
assert "asking_price/005930" in state.last_updated assert "asking_price/005930" in state.last_updated
async def test_post_close_cycle_updates_chronos_predictions():
"""mock kis + mock chronos → state.chronos_predictions + state.daily_ohlcv 갱신."""
from unittest.mock import AsyncMock, MagicMock
from signal_v2.pull_worker import _run_post_close_cycle
from signal_v2.chronos_predictor import ChronosPrediction
from signal_v2.state import PollState
state = PollState()
state.portfolio = {"holdings": [{"ticker": "005930"}]}
state.screener_preview = {"items": [{"ticker": "000660"}]}
kis_mock = MagicMock()
daily_005930 = [{"datetime": f"2026-05-{i+1:02d}", "open": 100, "high": 105,
"low": 95, "close": 100 + i, "volume": 1000} for i in range(60)]
daily_000660 = [{"datetime": f"2026-05-{i+1:02d}", "open": 200, "high": 210,
"low": 190, "close": 200 + i, "volume": 2000} for i in range(60)]
# _run_post_close_cycle iterates tickers and calls get_daily_ohlcv per ticker.
# Order depends on set() so use side_effect mapping if possible, otherwise list.
async def fake_daily(ticker, days=60):
if ticker == "005930":
return daily_005930
if ticker == "000660":
return daily_000660
return []
kis_mock.get_daily_ohlcv = AsyncMock(side_effect=fake_daily)
chronos_mock = MagicMock()
chronos_mock.predict_batch = MagicMock(return_value={
"005930": ChronosPrediction(0.02, -0.01, 0.04, 0.85, "2026-05-18T16:00:00+09:00"),
"000660": ChronosPrediction(0.03, -0.02, 0.06, 0.75, "2026-05-18T16:00:00+09:00"),
})
await _run_post_close_cycle(kis_mock, chronos_mock, state)
assert "005930" in state.chronos_predictions
assert "000660" in state.chronos_predictions
assert state.chronos_predictions["005930"]["median"] == 0.02
assert state.chronos_predictions["005930"]["conf"] == 0.85
assert "005930" in state.daily_ohlcv
assert "chronos/005930" in state.last_updated
def test_poll_loop_calls_generate_signals_after_cycle(monkeypatch):
"""Phase 4: generate_signals 가 cycle 후 state.signals 를 갱신한다."""
from unittest.mock import MagicMock
from signal_v2.state import PollState
from signal_v2.signal_generator import generate_signals
state = PollState()
state.portfolio = {"holdings": [{
"ticker": "005930", "name": "삼성전자",
"avg_price": 75000, "current_price": 69000,
"pnl_pct": -0.08, "profit_rate": -8.0,
"quantity": 100, "broker": "키움",
}]}
state.screener_preview = {"items": []}
dedup = MagicMock()
dedup.is_recent.return_value = False
settings = MagicMock()
settings.stop_loss_pct = -0.07
settings.take_profit_pct = 0.15
settings.chronos_spread_threshold = 0.6
settings.asking_bid_ratio_threshold = 0.6
settings.confidence_threshold = 0.7
settings.min_momentum_for_buy = "strong_up"
generate_signals(state, dedup, settings)
assert "005930" in state.signals
assert state.signals["005930"]["action"] == "sell"
assert state.signals["005930"]["confidence_webai"] == 1.0
dedup.record.assert_called_with("005930", "sell", confidence=1.0)

View File

@@ -0,0 +1,172 @@
"""Tests for signal_generator."""
from unittest.mock import MagicMock
import pytest
from signal_v2.signal_generator import generate_signals
from signal_v2.state import PollState
def _settings(**overrides):
"""Build a Settings-like object for tests (avoid env)."""
defaults = dict(
stop_loss_pct=-0.07,
take_profit_pct=0.15,
chronos_spread_threshold=0.6,
asking_bid_ratio_threshold=0.6,
confidence_threshold=0.7,
min_momentum_for_buy="strong_up",
)
defaults.update(overrides)
m = MagicMock()
for k, v in defaults.items():
setattr(m, k, v)
return m
def _make_state_with_buy_candidate(
ticker="005930", name="삼성전자",
chronos_median=0.02, chronos_q10=-0.01, chronos_q90=0.04, chronos_conf=0.85,
momentum="strong_up", bid_ratio=0.7, current_price=78500,
):
state = PollState()
state.screener_preview = {"items": [{"ticker": ticker, "name": name}]}
state.chronos_predictions[ticker] = {
"median": chronos_median, "q10": chronos_q10, "q90": chronos_q90,
"conf": chronos_conf, "as_of": "2026-05-17T16:00:00+09:00",
}
state.minute_momentum[ticker] = momentum
state.asking_price[ticker] = {
"bid_total": int(bid_ratio * 1000),
"ask_total": int((1 - bid_ratio) * 1000),
"bid_ratio": bid_ratio,
"current_price": current_price,
"as_of": "2026-05-17T16:00:01+09:00",
}
return state
def _make_state_with_holding(
ticker="005930", name="삼성전자",
pnl_pct=0.0, avg_price=75000, current_price=75000,
):
state = PollState()
state.portfolio = {"holdings": [{
"ticker": ticker, "name": name,
"avg_price": avg_price, "current_price": current_price,
"pnl_pct": pnl_pct, "profit_rate": pnl_pct * 100,
"quantity": 100, "broker": "키움",
}]}
state.screener_preview = {"items": []}
return state
@pytest.fixture
def dedup_mock():
d = MagicMock()
d.is_recent.return_value = False
return d
def test_buy_signal_when_all_conditions_pass_and_confidence_high(dedup_mock):
state = _make_state_with_buy_candidate()
generate_signals(state, dedup_mock, _settings())
assert "005930" in state.signals
sig = state.signals["005930"]
assert sig["action"] == "buy"
assert sig["confidence_webai"] > 0.7
dedup_mock.record.assert_called()
def test_silent_when_chronos_median_negative(dedup_mock):
state = _make_state_with_buy_candidate(chronos_median=-0.01)
generate_signals(state, dedup_mock, _settings())
assert "005930" not in state.signals
def test_silent_when_distribution_spread_too_wide(dedup_mock):
# spread = q90 - q10 = 0.5 - (-0.5) = 1.0 > 0.6 → hard gate fails
state = _make_state_with_buy_candidate(
chronos_median=0.001, chronos_q10=-0.5, chronos_q90=0.5,
)
generate_signals(state, dedup_mock, _settings())
assert "005930" not in state.signals
def test_silent_when_momentum_not_strong_up(dedup_mock):
state = _make_state_with_buy_candidate(momentum="weak_up")
generate_signals(state, dedup_mock, _settings())
assert "005930" not in state.signals
def test_silent_when_bid_ratio_below_threshold(dedup_mock):
state = _make_state_with_buy_candidate(bid_ratio=0.5)
generate_signals(state, dedup_mock, _settings())
assert "005930" not in state.signals
def test_silent_when_confidence_below_threshold(dedup_mock):
# chronos_conf low + rank=20 → confidence < 0.7
state = _make_state_with_buy_candidate(chronos_conf=0.3)
# add 19 fake items to push 005930 rank to 20
state.screener_preview["items"] = (
[{"ticker": f"FAKE{i:03d}"} for i in range(19)]
+ [{"ticker": "005930", "name": "삼성전자"}]
)
generate_signals(state, dedup_mock, _settings())
# confidence_webai = 0.3*0.5 + 1.0*0.3 + 0.05*0.2 = 0.46 < 0.7
assert "005930" not in state.signals
def test_sell_signal_when_stop_loss_triggered(dedup_mock):
state = _make_state_with_holding(pnl_pct=-0.08, current_price=69000, avg_price=75000)
generate_signals(state, dedup_mock, _settings())
assert "005930" in state.signals
sig = state.signals["005930"]
assert sig["action"] == "sell"
assert sig["confidence_webai"] == 1.0
assert sig["pnl_pct"] == -0.08
def test_sell_signal_when_take_profit_triggered(dedup_mock):
state = _make_state_with_holding(pnl_pct=0.16, current_price=87000, avg_price=75000)
generate_signals(state, dedup_mock, _settings())
assert "005930" in state.signals
sig = state.signals["005930"]
assert sig["action"] == "sell"
assert sig["confidence_webai"] == 0.6
def test_silent_when_dedup_recently_sent(dedup_mock):
state = _make_state_with_buy_candidate()
dedup_mock.is_recent.return_value = True
generate_signals(state, dedup_mock, _settings())
assert "005930" not in state.signals
dedup_mock.record.assert_not_called()
def test_sell_signal_triggers_on_anomaly_path(dedup_mock):
"""Anomaly sell: median < -1%, momentum strong_down, low bid_ratio, confidence > threshold."""
state = PollState()
state.portfolio = {"holdings": [{
"ticker": "005930", "name": "삼성전자",
"avg_price": 75000, "current_price": 70000,
"pnl_pct": -0.067, # within stop_loss tolerance (default -0.07): NOT triggering stop_loss
"quantity": 100, "broker": "키움",
}]}
state.screener_preview = {"items": []}
state.chronos_predictions["005930"] = {
"median": -0.025, "q10": -0.05, "q90": 0.005, "conf": 0.85,
}
state.minute_momentum["005930"] = "strong_down"
state.asking_price["005930"] = {"current_price": 70000, "bid_ratio": 0.30}
# bid_ratio 0.30 < (1 - 0.6) = 0.4 → anomaly bid_ratio gate passes
# confidence = 0.85*0.5 + 1.0*0.3 + 1.0*0.2 = 0.425 + 0.3 + 0.2 = 0.925 > 0.7
generate_signals(state, dedup_mock, _settings())
assert "005930" in state.signals
sig = state.signals["005930"]
assert sig["action"] == "sell"
assert sig["context"]["sell_reason"] == "anomaly"
assert sig["confidence_webai"] > 0.7