Files
web-page/docs/superpowers/specs/2026-05-16-signal-v2-phase3a-kis-data-collection.md
gahusb 1813db761f docs(signal-v2): Phase 3a KIS data collection spec
KIS REST client (minute OHLCV + asking price polling, V1 token
read-only share) + KIS WebSocket client (approval_key + portfolio
asking_price realtime subscribe) + PollState extension + scheduler
NXT windows (20:00-23:30 / 04:30-07:00). 13 new tests.

brainstorming 6 decisions: scope=B(3a/3b split) / data=B(REST minute +
WS asking) / auth=A(V1 token share) / subscribe=A(portfolio WS +
screener REST) / NXT=C(scheduler extend) / test=A(respx + WS mock).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 04:50:16 +09:00

444 lines
18 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Confidence Signal Pipeline V2 — Phase 3a: KIS Data Collection Design
**작성일**: 2026-05-16
**작성자**: gahusb
**상태**: Approved for implementation
**선행 spec**:
- Phase 0 architecture (`2026-05-15-confidence-signal-pipeline-v2-architecture.md`)
- Phase 1 stock WebAI API (`2026-05-15-signal-v2-phase1-webai-api.md`)
- signal_v1 rename (`2026-05-16-web-ai-v1-rename-to-signal-v1.md`)
- Phase 2 web-ai pull worker (`2026-05-16-signal-v2-phase2-web-ai-pull-worker.md`)
**Phase 3 분해**: Phase 0 spec 의 Phase 3 (KIS WebSocket + NXT + Chronos-2 + 분봉 모멘텀) 를 2 sub-phase 로 분해:
- **Phase 3a (본 spec)**: KIS 데이터 수집 (분봉 REST + 호가 WebSocket + scheduler NXT 확장)
- **Phase 3b (별도 spec)**: Chronos-2 추론 + 분봉 모멘텀 분류기
**브레인스토밍 결정 6개**:
- scope = B (3a / 3b 분해)
- 데이터 수집 = B (분봉 REST + 호가 WebSocket)
- KIS 인증 = A (V1 토큰 read-only 공유)
- 구독 범위 = A (portfolio WebSocket + screener REST polling)
- NXT 처리 = C (stock 자동 처리 + scheduler 의 NXT 시간대 폴링 추가)
- 테스트 = A (respx REST mock + WebSocket mock + tmp sqlite)
---
## 1. 목표
signal_v2 가 신호 판단에 필요한 KIS 실시간/준실시간 데이터 (분봉 OHLCV + 호가 매수세) 를 수집해 `PollState` 에 채워 넣는다. Phase 3b (Chronos-2 추론) + Phase 4 (signal generator) 가 이 위에 동작.
**Why**: Phase 0 §3 "web-ai = 시점 분석" 책임의 데이터 수집 부분. KIS REST 의 분봉/호가 + KIS WebSocket 의 실시간 호가가 매수/매도 룰의 핵심 입력.
---
## 2. 범위
### 포함 (6 항목)
-**KIS REST client** (`signal_v2/kis_client.py`) — 분봉 polling + screener Top-N 호가 polling. V1 토큰 파일 (`signal_v1/data/kis_token.json`) read-only 공유.
-**KIS WebSocket client** (`signal_v2/kis_websocket.py`) — approval_key 신규 발급 + portfolio 보유 종목 호가 실시간 구독 + reconnect with exponential backoff.
-**`pull_worker.py` 확장** — 분봉 1분 polling cycle 추가 + WebSocket 메시지 처리 task.
-**`PollState` 확장** — `minute_bars: dict[ticker, deque(maxlen=60)]`, `asking_price: dict[ticker, dict]`, `last_updated["minute_bars"]` / `["asking_price"]`.
-**`scheduler.py` 수정** — NXT 시간대 폴링 (20:00-23:30 / 04:30-07:00) 5분 cron 추가.
-**테스트 13 신규** (KIS REST 4 + WebSocket 4 + scheduler NXT 3 + pull_worker 2). 기존 19 + 신규 13 = 32 total.
### 범위 외 (NOT)
- Chronos-2 모델 로드 + 추론 (Phase 3b)
- 분봉 모멘텀 분류기 (Phase 3b — 5분봉 aggregate + 5연속 양봉 룰)
- Signal generator 매수/매도 룰 (Phase 4)
- NXT 자체 API 호출 — V2 가 별도 NXT API client 없음. stock 측 `price_fetcher` 가 NXT 시간대 가격 자동 반환 (`price_session` 필드)
- WebSocket 동적 subscribe 갱신 — portfolio 변동 시 다음 cycle 에서 일괄 갱신
- 분봉 daily aggregate — 60 분봉 sliding window 만
- 분봉 영속 저장 — 메모리만, 재기동 시 reset
- V2 자체 KIS 토큰 발급 — Phase 6 deprecation 까지 V1 단독 갱신 책임
---
## 3. 파일 구조 + 변경 매트릭스
### 3.1 신규 / 수정
| 파일 | 작업 | 라인 |
|------|------|------|
| `signal_v2/kis_client.py` | 신규 | ~150 |
| `signal_v2/kis_websocket.py` | 신규 | ~180 |
| `signal_v2/state.py` | 필드 2개 추가 | +5 |
| `signal_v2/pull_worker.py` | 분봉 cycle + WebSocket task | +60 |
| `signal_v2/scheduler.py` | NXT 시간대 분기 | +15 |
| `signal_v2/main.py` | KIS lifespan 통합 | +20 |
| `signal_v2/config.py` | KIS env 5개 + V1 token path | +10 |
| `signal_v2/tests/test_kis_client.py` | 신규 4 케이스 | ~150 |
| `signal_v2/tests/test_kis_websocket.py` | 신규 4 케이스 | ~170 |
| `signal_v2/tests/test_pull_worker.py` | 신규 2 케이스 | ~80 |
| `signal_v2/tests/test_scheduler.py` | NXT 3 케이스 추가 | +30 |
| `signal_v2/tests/test_main.py` | KIS lifespan 케이스 | +20 |
| `signal_v2/requirements.txt` | `websockets>=12` | +1 |
| `web-ai/.env` | KIS env 5 + V1_TOKEN_PATH (사용자 수동) | +6 |
### 3.2 외부 의존성 신규
- `websockets>=12` (KIS WebSocket client)
### 3.3 V1 공유 / 무영향
- **공유** (read-only): `signal_v1/data/kis_token.json` — V1 의 단독 갱신 책임. V2 는 mtime 캐시 + read.
- **무영향**: V1 의 main_server.py / modules / 자동매매 봇 — Phase 6 까지 분리 유지.
---
## 4. KIS REST client (`kis_client.py`)
```python
class KISClient:
"""KIS REST API (분봉 + 호가). V1 토큰 read-only 공유."""
def __init__(
self,
app_key: str, app_secret: str, account: str, is_virtual: bool,
v1_token_path: Path,
timeout: float = 10.0,
):
self._app_key = app_key
self._app_secret = app_secret
self._account = account
self._is_virtual = is_virtual
self._v1_token_path = Path(v1_token_path)
self._base_url = (
"https://openapivts.koreainvestment.com:29443" if is_virtual
else "https://openapi.koreainvestment.com:9443"
)
self._client = httpx.AsyncClient(timeout=timeout)
self._token_cache: tuple[str, float] | None = None # (token, file_mtime)
self._last_throttle_at = 0.0 # 초당 2회 제한
async def get_minute_ohlcv(self, ticker: str) -> list[dict]:
"""현재 시점 직전 30개 1분봉 OHLCV (TR_ID: FHKST03010200).
Returns: [{"datetime", "open", "high", "low", "close", "volume"}, ...] (시간 오름차순)
"""
async def get_asking_price(self, ticker: str) -> dict:
"""현재 호가 5단계 + 매수/매도 잔량 (TR_ID: FHKST01010200).
Returns: {
"bid_total": int,
"ask_total": int,
"bid_ratio": float,
"current_price": int,
"as_of": str (ISO),
}
"""
async def close(self) -> None: ...
# internal
def _read_v1_token(self) -> str:
"""signal_v1/data/kis_token.json 읽기. mtime 캐시 — 갱신 시 자동 재로드."""
async def _throttle(self) -> None:
"""V1 패턴 — 초당 2회 제한 (0.5s sleep)."""
def _common_headers(self, tr_id: str) -> dict:
"""authorization, appkey, appsecret, tr_id."""
```
### 4.1 토큰 공유 디자인
- `_v1_token_path` env `V1_TOKEN_PATH` 에서 로드. 기본값 `../signal_v1/data/kis_token.json`.
- 첫 호출 시 파일 read + mtime 캐시.
- 매 호출 전 mtime 비교 — 변경 시 재로드. 캐시 hit 시 빠른 통과.
- 파일 미존재 / 만료 시: WARNING log + `HTTPException` (Phase 6 까지 V1 단독 책임 명시).
### 4.2 분봉 응답 정규화
KIS API 의 분봉 raw 응답 (`output2` 배열) → 표준 dict 리스트로 변환. 시간 오름차순, 거래량 0 인 분봉 skip.
---
## 5. KIS WebSocket client (`kis_websocket.py`)
```python
class KISWebSocket:
"""KIS WebSocket — approval_key 발급 + 호가 실시간 구독."""
def __init__(self, app_key: str, app_secret: str, is_virtual: bool):
self._app_key = app_key
self._app_secret = app_secret
self._ws_url = (
"wss://openapivts.koreainvestment.com:29443/tryitout" if is_virtual
else "wss://openapi.koreainvestment.com:9443/tryitout"
)
self._approval_key: str | None = None
self._ws: WebSocketClientProtocol | None = None
self._subscriptions: set[str] = set()
self._on_asking_price: Callable[[str, dict], None] | None = None
self._recv_task: asyncio.Task | None = None
self._shutdown = asyncio.Event()
async def start(
self, tickers: list[str],
on_asking_price: Callable[[str, dict], None],
) -> None:
"""approval_key 발급 + WebSocket 연결 + 종목 호가 구독 + receive loop 시작."""
async def subscribe(self, ticker: str) -> None:
"""동적 구독 추가."""
async def unsubscribe(self, ticker: str) -> None: ...
async def close(self) -> None:
"""unsubscribe all + shutdown event + close socket."""
# internal
async def _fetch_approval_key(self) -> str:
"""POST {base_rest}/oauth2/Approval — approval_key 발급."""
async def _send_subscription(self, ticker: str, tr_id: str = "H0STASP0") -> None:
"""tr_id H0STASP0 = 실시간 호가."""
async def _receive_loop(self) -> None:
"""메시지 receive loop. PING/PONG 30초 + 호가 message parse → callback.
끊김 감지 → exponential backoff (1s→2s→4s→max 30s) + reconnect + subscribe 재등록."""
def _parse_asking_price(self, raw: str) -> tuple[str, dict] | None:
"""KIS 호가 raw string '0|H0STASP0|...|005930^...' 파싱.
Returns: (ticker, {bid_total, ask_total, bid_ratio, current_price, as_of})
또는 None (parse fail).
"""
```
### 5.1 메시지 형식 (KIS 공식 문서)
호가 메시지 raw 예시 (실제는 더 긴 `^` 구분 필드):
```
0|H0STASP0|001|005930^091500^78500^...^bid_total^ask_total^...
```
파싱 키 (필드 인덱스 기반):
- ticker = 4번째 필드의 종목코드 부분
- as_of = 5번째 필드 (HHMMSS)
- bid_total / ask_total = 정해진 인덱스 (KIS 문서 참조)
### 5.2 Reconnect 정책
- websockets 의 `ConnectionClosed` 캐치
- exponential backoff: 1s → 2s → 4s → 8s → 16s → max 30s
- 재연결 후 `_subscriptions` 의 모든 ticker 재구독
- 5분 이상 연결 실패 시 ERROR log + shutdown event 발생 (운영자 알림은 Phase 7)
---
## 6. PollState 확장 + pull_worker
### 6.1 PollState 추가 필드
```python
@dataclass
class PollState:
portfolio: dict | None = None
news_sentiment: dict | None = None
screener_preview: dict | None = None
# 신규 (Phase 3a)
minute_bars: dict[str, deque] = field(default_factory=dict) # {ticker: deque(maxlen=60)}
asking_price: dict[str, dict] = field(default_factory=dict) # {ticker: {bid_total, ask_total, bid_ratio, ...}}
last_updated: dict[str, str] = field(default_factory=dict)
fetch_errors: dict[str, int] = field(default_factory=dict)
```
### 6.2 pull_worker 확장
```python
async def _run_polling_cycle(client, state, kis_client):
"""기존 3 endpoint (stock) + 분봉 (KIS REST) 4 fetch 병렬."""
portfolio, sentiment, screener = await asyncio.gather(
client.get_portfolio(),
client.get_news_sentiment(),
client.run_screener_preview(),
return_exceptions=True,
)
# ... (기존 state 갱신)
# 분봉 갱신 — portfolio + screener top-N 종목 대상
tickers = _collect_tickers(state) # portfolio + screener Top-N union
minute_results = await asyncio.gather(*[
kis_client.get_minute_ohlcv(t) for t in tickers
], return_exceptions=True)
for ticker, result in zip(tickers, minute_results):
if isinstance(result, list):
state.minute_bars.setdefault(ticker, deque(maxlen=60)).extend(result)
state.last_updated[f"minute_bars/{ticker}"] = now_iso
# 호가 갱신 (screener Top-N 만, portfolio 는 WebSocket 으로 들어옴)
screener_only = _screener_tickers_excluding_portfolio(state)
asking_results = await asyncio.gather(*[
kis_client.get_asking_price(t) for t in screener_only
], return_exceptions=True)
for ticker, result in zip(screener_only, asking_results):
if isinstance(result, dict):
state.asking_price[ticker] = result
state.last_updated[f"asking_price/{ticker}"] = now_iso
def on_websocket_asking_price(ticker: str, data: dict):
"""KIS WebSocket callback — portfolio 호가 실시간 갱신."""
state.asking_price[ticker] = data
state.last_updated[f"asking_price/{ticker}"] = datetime.now(KST).isoformat()
```
### 6.3 종목 동기화
매 cycle 후 `state.portfolio.holdings` 의 ticker 목록과 `kis_websocket._subscriptions` 비교 → 신규 추가 / 제거 ticker 별로 `subscribe()` / `unsubscribe()` 호출.
---
## 7. Scheduler NXT 시간대
```python
# Market windows (기존)
_PRE_OPEN = time(7, 0)
_OPEN = time(9, 0)
_CLOSE = time(15, 30)
_POST_END = time(20, 0)
# NXT windows (신규)
_NXT_PRE_END = time(23, 30)
_NXT_POST_OPEN = time(4, 30)
# 23:30 - 04:30 (새벽) skip
def _next_interval(now: datetime) -> float:
if not _is_market_day(now):
return _seconds_until_next_market_open(now)
t = now.time()
if _PRE_OPEN <= t < _OPEN:
return 300.0
elif _OPEN <= t < _CLOSE:
return 60.0
elif _CLOSE <= t < _POST_END:
return 300.0
elif _POST_END <= t < _NXT_PRE_END:
return 300.0 # NXT 야간 5분 (신규)
elif _NXT_POST_OPEN <= t < _PRE_OPEN:
return 300.0 # NXT 새벽 5분 (신규)
else:
return _seconds_until_next_market_open(now)
def _is_polling_window(now: datetime) -> bool:
"""이제 야간 NXT 도 포함."""
t = now.time()
return (
(_PRE_OPEN <= t < _NXT_PRE_END)
or (_NXT_POST_OPEN <= t < _PRE_OPEN)
)
```
---
## 8. 테스트 (13 신규)
### 8.1 `test_kis_client.py` (4)
- `test_get_minute_ohlcv_normal_returns_30_bars` — respx 200 → list[30 dict]
- `test_get_minute_ohlcv_429_retry_then_success` — 429 → 1s backoff → 200
- `test_get_minute_ohlcv_uses_v1_token` — v1_token_path fixture → token in header
- `test_get_asking_price_computes_bid_ratio` — bid_total=600/ask_total=400 → bid_ratio=0.6
### 8.2 `test_kis_websocket.py` (4)
- `test_fetch_approval_key_via_oauth_endpoint` — respx POST /oauth2/Approval → approval_key 추출
- `test_subscribe_sends_h0stasp0_message` — fake WebSocket server → 종목 구독 메시지 전송 검증
- `test_parse_asking_price_extracts_bid_ask_totals` — KIS raw string fixture → (ticker, dict)
- `test_reconnect_on_disconnect_with_backoff` — fake server close → exponential retry
### 8.3 `test_scheduler.py` 추가 (3)
- `test_next_interval_nxt_evening_5min` — now=22:00 평일 → 300
- `test_next_interval_nxt_dawn_5min` — now=05:30 평일 → 300
- `test_next_interval_dead_zone_skip` — now=02:00 평일 → 다음 04:30 까지
### 8.4 `test_pull_worker.py` (2)
- `test_minute_polling_cycle_updates_state_minute_bars` — KIS mock → state.minute_bars[ticker] deque 갱신
- `test_websocket_message_updates_state_asking_price` — WebSocket callback → state.asking_price[ticker] dict
**합계**: 4 + 4 + 3 + 2 = **13 신규**. 기존 19 + 13 = **32 total signal_v2 tests**.
---
## 9. 위험 및 완화
| 위험 | 완화 |
|------|------|
| V1 토큰 파일 미존재 (V1 미가동) | startup ERROR log + KIS REST 호출 fail. Phase 6 까지 V1 단독 책임 |
| KIS WebSocket 연결 끊김 | exponential backoff (1s→2s→4s→max 30s) + subscription 재등록 |
| KIS WebSocket 호가 메시지 형식 변경 | `_parse_asking_price` parse fail → WARNING log + skip. KIS API 변경 시 spec 갱신 |
| V1 토큰 갱신 race (V1 갱신 중 V2 read) | mtime 캐시 + 짧은 fail 허용 (다음 호출에서 새 token 사용) |
| approval_key 만료 | 매 reconnect 시 재발급 |
| KIS REST rate limit (초당 2회) | `_throttle()` 0.5s sleep (V1 패턴) |
| 분봉 buffer 메모리 누수 | `deque(maxlen=60)` 자동 cap. ticker ~40 → ~200KB |
| websockets 라이브러리 호환 | `websockets>=12` 명시 |
| WebSocket subscription / portfolio drift | pull_worker 가 매 cycle 후 비교 + 동적 subscribe/unsubscribe |
| NXT 시간대 polling 시 stock API 부하 | 5분 cron × portfolio 11 종목 → 분당 ~2 call 무시 가능 |
| 분봉 데이터 누락 (network 단절) | retry 3회 + cache. 누락 분봉 skip + WARNING |
| KIS API 점검 시간대 | KIS 점검 (보통 새벽 02:00-04:30) 은 dead zone 시간대와 일치 — 영향 없음 |
---
## 10. 운영 영향
| 항목 | 영향 |
|------|------|
| 다운타임 | 0 (signal_v2 재기동만, V1 무영향) |
| 사용자 영향 | 없음 (Phase 3a 데이터 수집만, 신호 발송은 Phase 5) |
| `.env` 갱신 | 사용자 1회 (KIS_APP_KEY/SECRET/ACCOUNT/IS_VIRTUAL + V1_TOKEN_PATH) |
| V1 영향 | 0 (read-only 토큰 공유) |
| stock NAS 부하 | 무관 |
| KIS API 부하 | 매 분봉 cycle 분당 ~20 종목 × 2 call (분봉+호가) = 40 call/min ≈ 초당 0.67 < 2 한도 |
| WebSocket 세션 | 1 세션 / portfolio 보유 종목 (~11) 구독 |
---
## 11. Phase 3a 완료 조건 (DoD)
- [ ] `signal_v2/kis_client.py` 신규 (REST 분봉 + 호가)
- [ ] `signal_v2/kis_websocket.py` 신규 (WebSocket approval_key + 호가)
- [ ] `signal_v2/state.py` `PollState` 확장 (minute_bars + asking_price)
- [ ] `signal_v2/pull_worker.py` 분봉 cycle + WebSocket task 추가
- [ ] `signal_v2/scheduler.py` NXT 시간대 추가
- [ ] `signal_v2/main.py` lifespan 에 KISClient/KISWebSocket 통합
- [ ] `signal_v2/config.py` KIS env + V1_TOKEN_PATH
- [ ] `requirements.txt``websockets>=12`
- [ ] 13 신규 테스트 PASS (총 32)
- [ ] `.env` 갱신 (사용자 1회)
- [ ] 운영 smoke: signal_v2 시작 → KIS WebSocket 연결 → portfolio 호가 1건 수신 → `state.asking_price` 갱신 → 분봉 1회 fetch → `state.minute_bars` 갱신
- [ ] V1 봇 무영향 (토큰 read-only 공유 동작)
- [ ] git push (web-ai repo)
---
## 12. Phase 3b 와의 관계
본 Phase 3a 완료 후 즉시 **Phase 3b (Chronos-2 + 분봉 모멘텀)** brainstorming. 의존성:
```
[Phase 3a spec/plan/실행] → [Phase 3b spec/plan/실행]
1주 1주
```
Phase 3b 의 입력 = 본 spec 의 `state.minute_bars` + `state.asking_price`. Phase 3b 산출 = `state.chronos_predictions` + `state.minute_momentum` (Phase 4 가 사용).
---
## 13. Backlog (본 spec NOT)
- WebSocket 동적 subscribe (현재 매 cycle 일괄, 즉시 갱신 안 됨)
- KIS 분봉 60+ 보관 (장기 추세 분석용)
- 체결 데이터 (`H0STCNT0`) 추가 — 자체 분봉 builder 가능성
- KIS API 응답 시간 모니터링 (Phase 7)
- V2 자체 KIS 토큰 갱신 (Phase 6 deprecation 시)
- WebSocket session 멀티 (41 종목 한도 초과 시)
- approval_key 만료 자동 감지 (현재는 reconnect 시점)