From 1813db761fc41c171b3da0e9cf061eb13e2c42a4 Mon Sep 17 00:00:00 2001 From: gahusb Date: Sat, 16 May 2026 04:50:16 +0900 Subject: [PATCH] docs(signal-v2): Phase 3a KIS data collection spec KIS REST client (minute OHLCV + asking price polling, V1 token read-only share) + KIS WebSocket client (approval_key + portfolio asking_price realtime subscribe) + PollState extension + scheduler NXT windows (20:00-23:30 / 04:30-07:00). 13 new tests. brainstorming 6 decisions: scope=B(3a/3b split) / data=B(REST minute + WS asking) / auth=A(V1 token share) / subscribe=A(portfolio WS + screener REST) / NXT=C(scheduler extend) / test=A(respx + WS mock). Co-Authored-By: Claude Opus 4.7 (1M context) --- ...6-signal-v2-phase3a-kis-data-collection.md | 443 ++++++++++++++++++ 1 file changed, 443 insertions(+) create mode 100644 docs/superpowers/specs/2026-05-16-signal-v2-phase3a-kis-data-collection.md diff --git a/docs/superpowers/specs/2026-05-16-signal-v2-phase3a-kis-data-collection.md b/docs/superpowers/specs/2026-05-16-signal-v2-phase3a-kis-data-collection.md new file mode 100644 index 0000000..f640524 --- /dev/null +++ b/docs/superpowers/specs/2026-05-16-signal-v2-phase3a-kis-data-collection.md @@ -0,0 +1,443 @@ +# Confidence Signal Pipeline V2 — Phase 3a: KIS Data Collection Design + +**작성일**: 2026-05-16 +**작성자**: gahusb +**상태**: Approved for implementation +**선행 spec**: +- Phase 0 architecture (`2026-05-15-confidence-signal-pipeline-v2-architecture.md`) +- Phase 1 stock WebAI API (`2026-05-15-signal-v2-phase1-webai-api.md`) +- signal_v1 rename (`2026-05-16-web-ai-v1-rename-to-signal-v1.md`) +- Phase 2 web-ai pull worker (`2026-05-16-signal-v2-phase2-web-ai-pull-worker.md`) + +**Phase 3 분해**: Phase 0 spec 의 Phase 3 (KIS WebSocket + NXT + Chronos-2 + 분봉 모멘텀) 를 2 sub-phase 로 분해: +- **Phase 3a (본 spec)**: KIS 데이터 수집 (분봉 REST + 호가 WebSocket + scheduler NXT 확장) +- **Phase 3b (별도 spec)**: Chronos-2 추론 + 분봉 모멘텀 분류기 + +**브레인스토밍 결정 6개**: +- scope = B (3a / 3b 분해) +- 데이터 수집 = B (분봉 REST + 호가 WebSocket) +- KIS 인증 = A (V1 토큰 read-only 공유) +- 구독 범위 = A (portfolio WebSocket + screener REST polling) +- NXT 처리 = C (stock 자동 처리 + scheduler 의 NXT 시간대 폴링 추가) +- 테스트 = A (respx REST mock + WebSocket mock + tmp sqlite) + +--- + +## 1. 목표 + +signal_v2 가 신호 판단에 필요한 KIS 실시간/준실시간 데이터 (분봉 OHLCV + 호가 매수세) 를 수집해 `PollState` 에 채워 넣는다. Phase 3b (Chronos-2 추론) + Phase 4 (signal generator) 가 이 위에 동작. + +**Why**: Phase 0 §3 "web-ai = 시점 분석" 책임의 데이터 수집 부분. KIS REST 의 분봉/호가 + KIS WebSocket 의 실시간 호가가 매수/매도 룰의 핵심 입력. + +--- + +## 2. 범위 + +### 포함 (6 항목) + +- ① **KIS REST client** (`signal_v2/kis_client.py`) — 분봉 polling + screener Top-N 호가 polling. V1 토큰 파일 (`signal_v1/data/kis_token.json`) read-only 공유. +- ② **KIS WebSocket client** (`signal_v2/kis_websocket.py`) — approval_key 신규 발급 + portfolio 보유 종목 호가 실시간 구독 + reconnect with exponential backoff. +- ③ **`pull_worker.py` 확장** — 분봉 1분 polling cycle 추가 + WebSocket 메시지 처리 task. +- ④ **`PollState` 확장** — `minute_bars: dict[ticker, deque(maxlen=60)]`, `asking_price: dict[ticker, dict]`, `last_updated["minute_bars"]` / `["asking_price"]`. +- ⑤ **`scheduler.py` 수정** — NXT 시간대 폴링 (20:00-23:30 / 04:30-07:00) 5분 cron 추가. +- ⑥ **테스트 13 신규** (KIS REST 4 + WebSocket 4 + scheduler NXT 3 + pull_worker 2). 기존 19 + 신규 13 = 32 total. + +### 범위 외 (NOT) + +- Chronos-2 모델 로드 + 추론 (Phase 3b) +- 분봉 모멘텀 분류기 (Phase 3b — 5분봉 aggregate + 5연속 양봉 룰) +- Signal generator 매수/매도 룰 (Phase 4) +- NXT 자체 API 호출 — V2 가 별도 NXT API client 없음. stock 측 `price_fetcher` 가 NXT 시간대 가격 자동 반환 (`price_session` 필드) +- WebSocket 동적 subscribe 갱신 — portfolio 변동 시 다음 cycle 에서 일괄 갱신 +- 분봉 daily aggregate — 60 분봉 sliding window 만 +- 분봉 영속 저장 — 메모리만, 재기동 시 reset +- V2 자체 KIS 토큰 발급 — Phase 6 deprecation 까지 V1 단독 갱신 책임 + +--- + +## 3. 파일 구조 + 변경 매트릭스 + +### 3.1 신규 / 수정 + +| 파일 | 작업 | 라인 | +|------|------|------| +| `signal_v2/kis_client.py` | 신규 | ~150 | +| `signal_v2/kis_websocket.py` | 신규 | ~180 | +| `signal_v2/state.py` | 필드 2개 추가 | +5 | +| `signal_v2/pull_worker.py` | 분봉 cycle + WebSocket task | +60 | +| `signal_v2/scheduler.py` | NXT 시간대 분기 | +15 | +| `signal_v2/main.py` | KIS lifespan 통합 | +20 | +| `signal_v2/config.py` | KIS env 5개 + V1 token path | +10 | +| `signal_v2/tests/test_kis_client.py` | 신규 4 케이스 | ~150 | +| `signal_v2/tests/test_kis_websocket.py` | 신규 4 케이스 | ~170 | +| `signal_v2/tests/test_pull_worker.py` | 신규 2 케이스 | ~80 | +| `signal_v2/tests/test_scheduler.py` | NXT 3 케이스 추가 | +30 | +| `signal_v2/tests/test_main.py` | KIS lifespan 케이스 | +20 | +| `signal_v2/requirements.txt` | `websockets>=12` | +1 | +| `web-ai/.env` | KIS env 5 + V1_TOKEN_PATH (사용자 수동) | +6 | + +### 3.2 외부 의존성 신규 + +- `websockets>=12` (KIS WebSocket client) + +### 3.3 V1 공유 / 무영향 + +- **공유** (read-only): `signal_v1/data/kis_token.json` — V1 의 단독 갱신 책임. V2 는 mtime 캐시 + read. +- **무영향**: V1 의 main_server.py / modules / 자동매매 봇 — Phase 6 까지 분리 유지. + +--- + +## 4. KIS REST client (`kis_client.py`) + +```python +class KISClient: + """KIS REST API (분봉 + 호가). V1 토큰 read-only 공유.""" + + def __init__( + self, + app_key: str, app_secret: str, account: str, is_virtual: bool, + v1_token_path: Path, + timeout: float = 10.0, + ): + self._app_key = app_key + self._app_secret = app_secret + self._account = account + self._is_virtual = is_virtual + self._v1_token_path = Path(v1_token_path) + self._base_url = ( + "https://openapivts.koreainvestment.com:29443" if is_virtual + else "https://openapi.koreainvestment.com:9443" + ) + self._client = httpx.AsyncClient(timeout=timeout) + self._token_cache: tuple[str, float] | None = None # (token, file_mtime) + self._last_throttle_at = 0.0 # 초당 2회 제한 + + async def get_minute_ohlcv(self, ticker: str) -> list[dict]: + """현재 시점 직전 30개 1분봉 OHLCV (TR_ID: FHKST03010200). + + Returns: [{"datetime", "open", "high", "low", "close", "volume"}, ...] (시간 오름차순) + """ + + async def get_asking_price(self, ticker: str) -> dict: + """현재 호가 5단계 + 매수/매도 잔량 (TR_ID: FHKST01010200). + + Returns: { + "bid_total": int, + "ask_total": int, + "bid_ratio": float, + "current_price": int, + "as_of": str (ISO), + } + """ + + async def close(self) -> None: ... + + # internal + def _read_v1_token(self) -> str: + """signal_v1/data/kis_token.json 읽기. mtime 캐시 — 갱신 시 자동 재로드.""" + + async def _throttle(self) -> None: + """V1 패턴 — 초당 2회 제한 (0.5s sleep).""" + + def _common_headers(self, tr_id: str) -> dict: + """authorization, appkey, appsecret, tr_id.""" +``` + +### 4.1 토큰 공유 디자인 + +- `_v1_token_path` env `V1_TOKEN_PATH` 에서 로드. 기본값 `../signal_v1/data/kis_token.json`. +- 첫 호출 시 파일 read + mtime 캐시. +- 매 호출 전 mtime 비교 — 변경 시 재로드. 캐시 hit 시 빠른 통과. +- 파일 미존재 / 만료 시: WARNING log + `HTTPException` (Phase 6 까지 V1 단독 책임 명시). + +### 4.2 분봉 응답 정규화 + +KIS API 의 분봉 raw 응답 (`output2` 배열) → 표준 dict 리스트로 변환. 시간 오름차순, 거래량 0 인 분봉 skip. + +--- + +## 5. KIS WebSocket client (`kis_websocket.py`) + +```python +class KISWebSocket: + """KIS WebSocket — approval_key 발급 + 호가 실시간 구독.""" + + def __init__(self, app_key: str, app_secret: str, is_virtual: bool): + self._app_key = app_key + self._app_secret = app_secret + self._ws_url = ( + "wss://openapivts.koreainvestment.com:29443/tryitout" if is_virtual + else "wss://openapi.koreainvestment.com:9443/tryitout" + ) + self._approval_key: str | None = None + self._ws: WebSocketClientProtocol | None = None + self._subscriptions: set[str] = set() + self._on_asking_price: Callable[[str, dict], None] | None = None + self._recv_task: asyncio.Task | None = None + self._shutdown = asyncio.Event() + + async def start( + self, tickers: list[str], + on_asking_price: Callable[[str, dict], None], + ) -> None: + """approval_key 발급 + WebSocket 연결 + 종목 호가 구독 + receive loop 시작.""" + + async def subscribe(self, ticker: str) -> None: + """동적 구독 추가.""" + + async def unsubscribe(self, ticker: str) -> None: ... + + async def close(self) -> None: + """unsubscribe all + shutdown event + close socket.""" + + # internal + async def _fetch_approval_key(self) -> str: + """POST {base_rest}/oauth2/Approval — approval_key 발급.""" + + async def _send_subscription(self, ticker: str, tr_id: str = "H0STASP0") -> None: + """tr_id H0STASP0 = 실시간 호가.""" + + async def _receive_loop(self) -> None: + """메시지 receive loop. PING/PONG 30초 + 호가 message parse → callback. + 끊김 감지 → exponential backoff (1s→2s→4s→max 30s) + reconnect + subscribe 재등록.""" + + def _parse_asking_price(self, raw: str) -> tuple[str, dict] | None: + """KIS 호가 raw string '0|H0STASP0|...|005930^...' 파싱. + + Returns: (ticker, {bid_total, ask_total, bid_ratio, current_price, as_of}) + 또는 None (parse fail). + """ +``` + +### 5.1 메시지 형식 (KIS 공식 문서) + +호가 메시지 raw 예시 (실제는 더 긴 `^` 구분 필드): +``` +0|H0STASP0|001|005930^091500^78500^...^bid_total^ask_total^... +``` +파싱 키 (필드 인덱스 기반): +- ticker = 4번째 필드의 종목코드 부분 +- as_of = 5번째 필드 (HHMMSS) +- bid_total / ask_total = 정해진 인덱스 (KIS 문서 참조) + +### 5.2 Reconnect 정책 + +- websockets 의 `ConnectionClosed` 캐치 +- exponential backoff: 1s → 2s → 4s → 8s → 16s → max 30s +- 재연결 후 `_subscriptions` 의 모든 ticker 재구독 +- 5분 이상 연결 실패 시 ERROR log + shutdown event 발생 (운영자 알림은 Phase 7) + +--- + +## 6. PollState 확장 + pull_worker + +### 6.1 PollState 추가 필드 + +```python +@dataclass +class PollState: + portfolio: dict | None = None + news_sentiment: dict | None = None + screener_preview: dict | None = None + # 신규 (Phase 3a) + minute_bars: dict[str, deque] = field(default_factory=dict) # {ticker: deque(maxlen=60)} + asking_price: dict[str, dict] = field(default_factory=dict) # {ticker: {bid_total, ask_total, bid_ratio, ...}} + last_updated: dict[str, str] = field(default_factory=dict) + fetch_errors: dict[str, int] = field(default_factory=dict) +``` + +### 6.2 pull_worker 확장 + +```python +async def _run_polling_cycle(client, state, kis_client): + """기존 3 endpoint (stock) + 분봉 (KIS REST) 4 fetch 병렬.""" + portfolio, sentiment, screener = await asyncio.gather( + client.get_portfolio(), + client.get_news_sentiment(), + client.run_screener_preview(), + return_exceptions=True, + ) + # ... (기존 state 갱신) + + # 분봉 갱신 — portfolio + screener top-N 종목 대상 + tickers = _collect_tickers(state) # portfolio + screener Top-N union + minute_results = await asyncio.gather(*[ + kis_client.get_minute_ohlcv(t) for t in tickers + ], return_exceptions=True) + for ticker, result in zip(tickers, minute_results): + if isinstance(result, list): + state.minute_bars.setdefault(ticker, deque(maxlen=60)).extend(result) + state.last_updated[f"minute_bars/{ticker}"] = now_iso + + # 호가 갱신 (screener Top-N 만, portfolio 는 WebSocket 으로 들어옴) + screener_only = _screener_tickers_excluding_portfolio(state) + asking_results = await asyncio.gather(*[ + kis_client.get_asking_price(t) for t in screener_only + ], return_exceptions=True) + for ticker, result in zip(screener_only, asking_results): + if isinstance(result, dict): + state.asking_price[ticker] = result + state.last_updated[f"asking_price/{ticker}"] = now_iso + + +def on_websocket_asking_price(ticker: str, data: dict): + """KIS WebSocket callback — portfolio 호가 실시간 갱신.""" + state.asking_price[ticker] = data + state.last_updated[f"asking_price/{ticker}"] = datetime.now(KST).isoformat() +``` + +### 6.3 종목 동기화 + +매 cycle 후 `state.portfolio.holdings` 의 ticker 목록과 `kis_websocket._subscriptions` 비교 → 신규 추가 / 제거 ticker 별로 `subscribe()` / `unsubscribe()` 호출. + +--- + +## 7. Scheduler NXT 시간대 + +```python +# Market windows (기존) +_PRE_OPEN = time(7, 0) +_OPEN = time(9, 0) +_CLOSE = time(15, 30) +_POST_END = time(20, 0) + +# NXT windows (신규) +_NXT_PRE_END = time(23, 30) +_NXT_POST_OPEN = time(4, 30) +# 23:30 - 04:30 (새벽) skip + + +def _next_interval(now: datetime) -> float: + if not _is_market_day(now): + return _seconds_until_next_market_open(now) + + t = now.time() + if _PRE_OPEN <= t < _OPEN: + return 300.0 + elif _OPEN <= t < _CLOSE: + return 60.0 + elif _CLOSE <= t < _POST_END: + return 300.0 + elif _POST_END <= t < _NXT_PRE_END: + return 300.0 # NXT 야간 5분 (신규) + elif _NXT_POST_OPEN <= t < _PRE_OPEN: + return 300.0 # NXT 새벽 5분 (신규) + else: + return _seconds_until_next_market_open(now) + + +def _is_polling_window(now: datetime) -> bool: + """이제 야간 NXT 도 포함.""" + t = now.time() + return ( + (_PRE_OPEN <= t < _NXT_PRE_END) + or (_NXT_POST_OPEN <= t < _PRE_OPEN) + ) +``` + +--- + +## 8. 테스트 (13 신규) + +### 8.1 `test_kis_client.py` (4) + +- `test_get_minute_ohlcv_normal_returns_30_bars` — respx 200 → list[30 dict] +- `test_get_minute_ohlcv_429_retry_then_success` — 429 → 1s backoff → 200 +- `test_get_minute_ohlcv_uses_v1_token` — v1_token_path fixture → token in header +- `test_get_asking_price_computes_bid_ratio` — bid_total=600/ask_total=400 → bid_ratio=0.6 + +### 8.2 `test_kis_websocket.py` (4) + +- `test_fetch_approval_key_via_oauth_endpoint` — respx POST /oauth2/Approval → approval_key 추출 +- `test_subscribe_sends_h0stasp0_message` — fake WebSocket server → 종목 구독 메시지 전송 검증 +- `test_parse_asking_price_extracts_bid_ask_totals` — KIS raw string fixture → (ticker, dict) +- `test_reconnect_on_disconnect_with_backoff` — fake server close → exponential retry + +### 8.3 `test_scheduler.py` 추가 (3) + +- `test_next_interval_nxt_evening_5min` — now=22:00 평일 → 300 +- `test_next_interval_nxt_dawn_5min` — now=05:30 평일 → 300 +- `test_next_interval_dead_zone_skip` — now=02:00 평일 → 다음 04:30 까지 + +### 8.4 `test_pull_worker.py` (2) + +- `test_minute_polling_cycle_updates_state_minute_bars` — KIS mock → state.minute_bars[ticker] deque 갱신 +- `test_websocket_message_updates_state_asking_price` — WebSocket callback → state.asking_price[ticker] dict + +**합계**: 4 + 4 + 3 + 2 = **13 신규**. 기존 19 + 13 = **32 total signal_v2 tests**. + +--- + +## 9. 위험 및 완화 + +| 위험 | 완화 | +|------|------| +| V1 토큰 파일 미존재 (V1 미가동) | startup ERROR log + KIS REST 호출 fail. Phase 6 까지 V1 단독 책임 | +| KIS WebSocket 연결 끊김 | exponential backoff (1s→2s→4s→max 30s) + subscription 재등록 | +| KIS WebSocket 호가 메시지 형식 변경 | `_parse_asking_price` parse fail → WARNING log + skip. KIS API 변경 시 spec 갱신 | +| V1 토큰 갱신 race (V1 갱신 중 V2 read) | mtime 캐시 + 짧은 fail 허용 (다음 호출에서 새 token 사용) | +| approval_key 만료 | 매 reconnect 시 재발급 | +| KIS REST rate limit (초당 2회) | `_throttle()` 0.5s sleep (V1 패턴) | +| 분봉 buffer 메모리 누수 | `deque(maxlen=60)` 자동 cap. ticker ~40 → ~200KB | +| websockets 라이브러리 호환 | `websockets>=12` 명시 | +| WebSocket subscription / portfolio drift | pull_worker 가 매 cycle 후 비교 + 동적 subscribe/unsubscribe | +| NXT 시간대 polling 시 stock API 부하 | 5분 cron × portfolio 11 종목 → 분당 ~2 call 무시 가능 | +| 분봉 데이터 누락 (network 단절) | retry 3회 + cache. 누락 분봉 skip + WARNING | +| KIS API 점검 시간대 | KIS 점검 (보통 새벽 02:00-04:30) 은 dead zone 시간대와 일치 — 영향 없음 | + +--- + +## 10. 운영 영향 + +| 항목 | 영향 | +|------|------| +| 다운타임 | 0 (signal_v2 재기동만, V1 무영향) | +| 사용자 영향 | 없음 (Phase 3a 데이터 수집만, 신호 발송은 Phase 5) | +| `.env` 갱신 | 사용자 1회 (KIS_APP_KEY/SECRET/ACCOUNT/IS_VIRTUAL + V1_TOKEN_PATH) | +| V1 영향 | 0 (read-only 토큰 공유) | +| stock NAS 부하 | 무관 | +| KIS API 부하 | 매 분봉 cycle 분당 ~20 종목 × 2 call (분봉+호가) = 40 call/min ≈ 초당 0.67 < 2 한도 | +| WebSocket 세션 | 1 세션 / portfolio 보유 종목 (~11) 구독 | + +--- + +## 11. Phase 3a 완료 조건 (DoD) + +- [ ] `signal_v2/kis_client.py` 신규 (REST 분봉 + 호가) +- [ ] `signal_v2/kis_websocket.py` 신규 (WebSocket approval_key + 호가) +- [ ] `signal_v2/state.py` `PollState` 확장 (minute_bars + asking_price) +- [ ] `signal_v2/pull_worker.py` 분봉 cycle + WebSocket task 추가 +- [ ] `signal_v2/scheduler.py` NXT 시간대 추가 +- [ ] `signal_v2/main.py` lifespan 에 KISClient/KISWebSocket 통합 +- [ ] `signal_v2/config.py` KIS env + V1_TOKEN_PATH +- [ ] `requirements.txt` 에 `websockets>=12` +- [ ] 13 신규 테스트 PASS (총 32) +- [ ] `.env` 갱신 (사용자 1회) +- [ ] 운영 smoke: signal_v2 시작 → KIS WebSocket 연결 → portfolio 호가 1건 수신 → `state.asking_price` 갱신 → 분봉 1회 fetch → `state.minute_bars` 갱신 +- [ ] V1 봇 무영향 (토큰 read-only 공유 동작) +- [ ] git push (web-ai repo) + +--- + +## 12. Phase 3b 와의 관계 + +본 Phase 3a 완료 후 즉시 **Phase 3b (Chronos-2 + 분봉 모멘텀)** brainstorming. 의존성: + +``` +[Phase 3a spec/plan/실행] → [Phase 3b spec/plan/실행] + 1주 1주 +``` + +Phase 3b 의 입력 = 본 spec 의 `state.minute_bars` + `state.asking_price`. Phase 3b 산출 = `state.chronos_predictions` + `state.minute_momentum` (Phase 4 가 사용). + +--- + +## 13. Backlog (본 spec NOT) + +- WebSocket 동적 subscribe (현재 매 cycle 일괄, 즉시 갱신 안 됨) +- KIS 분봉 60+ 보관 (장기 추세 분석용) +- 체결 데이터 (`H0STCNT0`) 추가 — 자체 분봉 builder 가능성 +- KIS API 응답 시간 모니터링 (Phase 7) +- V2 자체 KIS 토큰 갱신 (Phase 6 deprecation 시) +- WebSocket session 멀티 (41 종목 한도 초과 시) +- approval_key 만료 자동 감지 (현재는 reconnect 시점)