KIS REST client (minute OHLCV + asking price polling, V1 token read-only share) + KIS WebSocket client (approval_key + portfolio asking_price realtime subscribe) + PollState extension + scheduler NXT windows (20:00-23:30 / 04:30-07:00). 13 new tests. brainstorming 6 decisions: scope=B(3a/3b split) / data=B(REST minute + WS asking) / auth=A(V1 token share) / subscribe=A(portfolio WS + screener REST) / NXT=C(scheduler extend) / test=A(respx + WS mock). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
18 KiB
Confidence Signal Pipeline V2 — Phase 3a: KIS Data Collection Design
작성일: 2026-05-16 작성자: gahusb 상태: Approved for implementation 선행 spec:
- Phase 0 architecture (
2026-05-15-confidence-signal-pipeline-v2-architecture.md) - Phase 1 stock WebAI API (
2026-05-15-signal-v2-phase1-webai-api.md) - signal_v1 rename (
2026-05-16-web-ai-v1-rename-to-signal-v1.md) - Phase 2 web-ai pull worker (
2026-05-16-signal-v2-phase2-web-ai-pull-worker.md)
Phase 3 분해: Phase 0 spec 의 Phase 3 (KIS WebSocket + NXT + Chronos-2 + 분봉 모멘텀) 를 2 sub-phase 로 분해:
- Phase 3a (본 spec): KIS 데이터 수집 (분봉 REST + 호가 WebSocket + scheduler NXT 확장)
- Phase 3b (별도 spec): Chronos-2 추론 + 분봉 모멘텀 분류기
브레인스토밍 결정 6개:
- scope = B (3a / 3b 분해)
- 데이터 수집 = B (분봉 REST + 호가 WebSocket)
- KIS 인증 = A (V1 토큰 read-only 공유)
- 구독 범위 = A (portfolio WebSocket + screener REST polling)
- NXT 처리 = C (stock 자동 처리 + scheduler 의 NXT 시간대 폴링 추가)
- 테스트 = A (respx REST mock + WebSocket mock + tmp sqlite)
1. 목표
signal_v2 가 신호 판단에 필요한 KIS 실시간/준실시간 데이터 (분봉 OHLCV + 호가 매수세) 를 수집해 PollState 에 채워 넣는다. Phase 3b (Chronos-2 추론) + Phase 4 (signal generator) 가 이 위에 동작.
Why: Phase 0 §3 "web-ai = 시점 분석" 책임의 데이터 수집 부분. KIS REST 의 분봉/호가 + KIS WebSocket 의 실시간 호가가 매수/매도 룰의 핵심 입력.
2. 범위
포함 (6 항목)
- ① KIS REST client (
signal_v2/kis_client.py) — 분봉 polling + screener Top-N 호가 polling. V1 토큰 파일 (signal_v1/data/kis_token.json) read-only 공유. - ② KIS WebSocket client (
signal_v2/kis_websocket.py) — approval_key 신규 발급 + portfolio 보유 종목 호가 실시간 구독 + reconnect with exponential backoff. - ③
pull_worker.py확장 — 분봉 1분 polling cycle 추가 + WebSocket 메시지 처리 task. - ④
PollState확장 —minute_bars: dict[ticker, deque(maxlen=60)],asking_price: dict[ticker, dict],last_updated["minute_bars"]/["asking_price"]. - ⑤
scheduler.py수정 — NXT 시간대 폴링 (20:00-23:30 / 04:30-07:00) 5분 cron 추가. - ⑥ 테스트 13 신규 (KIS REST 4 + WebSocket 4 + scheduler NXT 3 + pull_worker 2). 기존 19 + 신규 13 = 32 total.
범위 외 (NOT)
- Chronos-2 모델 로드 + 추론 (Phase 3b)
- 분봉 모멘텀 분류기 (Phase 3b — 5분봉 aggregate + 5연속 양봉 룰)
- Signal generator 매수/매도 룰 (Phase 4)
- NXT 자체 API 호출 — V2 가 별도 NXT API client 없음. stock 측
price_fetcher가 NXT 시간대 가격 자동 반환 (price_session필드) - WebSocket 동적 subscribe 갱신 — portfolio 변동 시 다음 cycle 에서 일괄 갱신
- 분봉 daily aggregate — 60 분봉 sliding window 만
- 분봉 영속 저장 — 메모리만, 재기동 시 reset
- V2 자체 KIS 토큰 발급 — Phase 6 deprecation 까지 V1 단독 갱신 책임
3. 파일 구조 + 변경 매트릭스
3.1 신규 / 수정
| 파일 | 작업 | 라인 |
|---|---|---|
signal_v2/kis_client.py |
신규 | ~150 |
signal_v2/kis_websocket.py |
신규 | ~180 |
signal_v2/state.py |
필드 2개 추가 | +5 |
signal_v2/pull_worker.py |
분봉 cycle + WebSocket task | +60 |
signal_v2/scheduler.py |
NXT 시간대 분기 | +15 |
signal_v2/main.py |
KIS lifespan 통합 | +20 |
signal_v2/config.py |
KIS env 5개 + V1 token path | +10 |
signal_v2/tests/test_kis_client.py |
신규 4 케이스 | ~150 |
signal_v2/tests/test_kis_websocket.py |
신규 4 케이스 | ~170 |
signal_v2/tests/test_pull_worker.py |
신규 2 케이스 | ~80 |
signal_v2/tests/test_scheduler.py |
NXT 3 케이스 추가 | +30 |
signal_v2/tests/test_main.py |
KIS lifespan 케이스 | +20 |
signal_v2/requirements.txt |
websockets>=12 |
+1 |
web-ai/.env |
KIS env 5 + V1_TOKEN_PATH (사용자 수동) | +6 |
3.2 외부 의존성 신규
websockets>=12(KIS WebSocket client)
3.3 V1 공유 / 무영향
- 공유 (read-only):
signal_v1/data/kis_token.json— V1 의 단독 갱신 책임. V2 는 mtime 캐시 + read. - 무영향: V1 의 main_server.py / modules / 자동매매 봇 — Phase 6 까지 분리 유지.
4. KIS REST client (kis_client.py)
class KISClient:
"""KIS REST API (분봉 + 호가). V1 토큰 read-only 공유."""
def __init__(
self,
app_key: str, app_secret: str, account: str, is_virtual: bool,
v1_token_path: Path,
timeout: float = 10.0,
):
self._app_key = app_key
self._app_secret = app_secret
self._account = account
self._is_virtual = is_virtual
self._v1_token_path = Path(v1_token_path)
self._base_url = (
"https://openapivts.koreainvestment.com:29443" if is_virtual
else "https://openapi.koreainvestment.com:9443"
)
self._client = httpx.AsyncClient(timeout=timeout)
self._token_cache: tuple[str, float] | None = None # (token, file_mtime)
self._last_throttle_at = 0.0 # 초당 2회 제한
async def get_minute_ohlcv(self, ticker: str) -> list[dict]:
"""현재 시점 직전 30개 1분봉 OHLCV (TR_ID: FHKST03010200).
Returns: [{"datetime", "open", "high", "low", "close", "volume"}, ...] (시간 오름차순)
"""
async def get_asking_price(self, ticker: str) -> dict:
"""현재 호가 5단계 + 매수/매도 잔량 (TR_ID: FHKST01010200).
Returns: {
"bid_total": int,
"ask_total": int,
"bid_ratio": float,
"current_price": int,
"as_of": str (ISO),
}
"""
async def close(self) -> None: ...
# internal
def _read_v1_token(self) -> str:
"""signal_v1/data/kis_token.json 읽기. mtime 캐시 — 갱신 시 자동 재로드."""
async def _throttle(self) -> None:
"""V1 패턴 — 초당 2회 제한 (0.5s sleep)."""
def _common_headers(self, tr_id: str) -> dict:
"""authorization, appkey, appsecret, tr_id."""
4.1 토큰 공유 디자인
_v1_token_pathenvV1_TOKEN_PATH에서 로드. 기본값../signal_v1/data/kis_token.json.- 첫 호출 시 파일 read + mtime 캐시.
- 매 호출 전 mtime 비교 — 변경 시 재로드. 캐시 hit 시 빠른 통과.
- 파일 미존재 / 만료 시: WARNING log +
HTTPException(Phase 6 까지 V1 단독 책임 명시).
4.2 분봉 응답 정규화
KIS API 의 분봉 raw 응답 (output2 배열) → 표준 dict 리스트로 변환. 시간 오름차순, 거래량 0 인 분봉 skip.
5. KIS WebSocket client (kis_websocket.py)
class KISWebSocket:
"""KIS WebSocket — approval_key 발급 + 호가 실시간 구독."""
def __init__(self, app_key: str, app_secret: str, is_virtual: bool):
self._app_key = app_key
self._app_secret = app_secret
self._ws_url = (
"wss://openapivts.koreainvestment.com:29443/tryitout" if is_virtual
else "wss://openapi.koreainvestment.com:9443/tryitout"
)
self._approval_key: str | None = None
self._ws: WebSocketClientProtocol | None = None
self._subscriptions: set[str] = set()
self._on_asking_price: Callable[[str, dict], None] | None = None
self._recv_task: asyncio.Task | None = None
self._shutdown = asyncio.Event()
async def start(
self, tickers: list[str],
on_asking_price: Callable[[str, dict], None],
) -> None:
"""approval_key 발급 + WebSocket 연결 + 종목 호가 구독 + receive loop 시작."""
async def subscribe(self, ticker: str) -> None:
"""동적 구독 추가."""
async def unsubscribe(self, ticker: str) -> None: ...
async def close(self) -> None:
"""unsubscribe all + shutdown event + close socket."""
# internal
async def _fetch_approval_key(self) -> str:
"""POST {base_rest}/oauth2/Approval — approval_key 발급."""
async def _send_subscription(self, ticker: str, tr_id: str = "H0STASP0") -> None:
"""tr_id H0STASP0 = 실시간 호가."""
async def _receive_loop(self) -> None:
"""메시지 receive loop. PING/PONG 30초 + 호가 message parse → callback.
끊김 감지 → exponential backoff (1s→2s→4s→max 30s) + reconnect + subscribe 재등록."""
def _parse_asking_price(self, raw: str) -> tuple[str, dict] | None:
"""KIS 호가 raw string '0|H0STASP0|...|005930^...' 파싱.
Returns: (ticker, {bid_total, ask_total, bid_ratio, current_price, as_of})
또는 None (parse fail).
"""
5.1 메시지 형식 (KIS 공식 문서)
호가 메시지 raw 예시 (실제는 더 긴 ^ 구분 필드):
0|H0STASP0|001|005930^091500^78500^...^bid_total^ask_total^...
파싱 키 (필드 인덱스 기반):
- ticker = 4번째 필드의 종목코드 부분
- as_of = 5번째 필드 (HHMMSS)
- bid_total / ask_total = 정해진 인덱스 (KIS 문서 참조)
5.2 Reconnect 정책
- websockets 의
ConnectionClosed캐치 - exponential backoff: 1s → 2s → 4s → 8s → 16s → max 30s
- 재연결 후
_subscriptions의 모든 ticker 재구독 - 5분 이상 연결 실패 시 ERROR log + shutdown event 발생 (운영자 알림은 Phase 7)
6. PollState 확장 + pull_worker
6.1 PollState 추가 필드
@dataclass
class PollState:
portfolio: dict | None = None
news_sentiment: dict | None = None
screener_preview: dict | None = None
# 신규 (Phase 3a)
minute_bars: dict[str, deque] = field(default_factory=dict) # {ticker: deque(maxlen=60)}
asking_price: dict[str, dict] = field(default_factory=dict) # {ticker: {bid_total, ask_total, bid_ratio, ...}}
last_updated: dict[str, str] = field(default_factory=dict)
fetch_errors: dict[str, int] = field(default_factory=dict)
6.2 pull_worker 확장
async def _run_polling_cycle(client, state, kis_client):
"""기존 3 endpoint (stock) + 분봉 (KIS REST) 4 fetch 병렬."""
portfolio, sentiment, screener = await asyncio.gather(
client.get_portfolio(),
client.get_news_sentiment(),
client.run_screener_preview(),
return_exceptions=True,
)
# ... (기존 state 갱신)
# 분봉 갱신 — portfolio + screener top-N 종목 대상
tickers = _collect_tickers(state) # portfolio + screener Top-N union
minute_results = await asyncio.gather(*[
kis_client.get_minute_ohlcv(t) for t in tickers
], return_exceptions=True)
for ticker, result in zip(tickers, minute_results):
if isinstance(result, list):
state.minute_bars.setdefault(ticker, deque(maxlen=60)).extend(result)
state.last_updated[f"minute_bars/{ticker}"] = now_iso
# 호가 갱신 (screener Top-N 만, portfolio 는 WebSocket 으로 들어옴)
screener_only = _screener_tickers_excluding_portfolio(state)
asking_results = await asyncio.gather(*[
kis_client.get_asking_price(t) for t in screener_only
], return_exceptions=True)
for ticker, result in zip(screener_only, asking_results):
if isinstance(result, dict):
state.asking_price[ticker] = result
state.last_updated[f"asking_price/{ticker}"] = now_iso
def on_websocket_asking_price(ticker: str, data: dict):
"""KIS WebSocket callback — portfolio 호가 실시간 갱신."""
state.asking_price[ticker] = data
state.last_updated[f"asking_price/{ticker}"] = datetime.now(KST).isoformat()
6.3 종목 동기화
매 cycle 후 state.portfolio.holdings 의 ticker 목록과 kis_websocket._subscriptions 비교 → 신규 추가 / 제거 ticker 별로 subscribe() / unsubscribe() 호출.
7. Scheduler NXT 시간대
# Market windows (기존)
_PRE_OPEN = time(7, 0)
_OPEN = time(9, 0)
_CLOSE = time(15, 30)
_POST_END = time(20, 0)
# NXT windows (신규)
_NXT_PRE_END = time(23, 30)
_NXT_POST_OPEN = time(4, 30)
# 23:30 - 04:30 (새벽) skip
def _next_interval(now: datetime) -> float:
if not _is_market_day(now):
return _seconds_until_next_market_open(now)
t = now.time()
if _PRE_OPEN <= t < _OPEN:
return 300.0
elif _OPEN <= t < _CLOSE:
return 60.0
elif _CLOSE <= t < _POST_END:
return 300.0
elif _POST_END <= t < _NXT_PRE_END:
return 300.0 # NXT 야간 5분 (신규)
elif _NXT_POST_OPEN <= t < _PRE_OPEN:
return 300.0 # NXT 새벽 5분 (신규)
else:
return _seconds_until_next_market_open(now)
def _is_polling_window(now: datetime) -> bool:
"""이제 야간 NXT 도 포함."""
t = now.time()
return (
(_PRE_OPEN <= t < _NXT_PRE_END)
or (_NXT_POST_OPEN <= t < _PRE_OPEN)
)
8. 테스트 (13 신규)
8.1 test_kis_client.py (4)
test_get_minute_ohlcv_normal_returns_30_bars— respx 200 → list[30 dict]test_get_minute_ohlcv_429_retry_then_success— 429 → 1s backoff → 200test_get_minute_ohlcv_uses_v1_token— v1_token_path fixture → token in headertest_get_asking_price_computes_bid_ratio— bid_total=600/ask_total=400 → bid_ratio=0.6
8.2 test_kis_websocket.py (4)
test_fetch_approval_key_via_oauth_endpoint— respx POST /oauth2/Approval → approval_key 추출test_subscribe_sends_h0stasp0_message— fake WebSocket server → 종목 구독 메시지 전송 검증test_parse_asking_price_extracts_bid_ask_totals— KIS raw string fixture → (ticker, dict)test_reconnect_on_disconnect_with_backoff— fake server close → exponential retry
8.3 test_scheduler.py 추가 (3)
test_next_interval_nxt_evening_5min— now=22:00 평일 → 300test_next_interval_nxt_dawn_5min— now=05:30 평일 → 300test_next_interval_dead_zone_skip— now=02:00 평일 → 다음 04:30 까지
8.4 test_pull_worker.py (2)
test_minute_polling_cycle_updates_state_minute_bars— KIS mock → state.minute_bars[ticker] deque 갱신test_websocket_message_updates_state_asking_price— WebSocket callback → state.asking_price[ticker] dict
합계: 4 + 4 + 3 + 2 = 13 신규. 기존 19 + 13 = 32 total signal_v2 tests.
9. 위험 및 완화
| 위험 | 완화 |
|---|---|
| V1 토큰 파일 미존재 (V1 미가동) | startup ERROR log + KIS REST 호출 fail. Phase 6 까지 V1 단독 책임 |
| KIS WebSocket 연결 끊김 | exponential backoff (1s→2s→4s→max 30s) + subscription 재등록 |
| KIS WebSocket 호가 메시지 형식 변경 | _parse_asking_price parse fail → WARNING log + skip. KIS API 변경 시 spec 갱신 |
| V1 토큰 갱신 race (V1 갱신 중 V2 read) | mtime 캐시 + 짧은 fail 허용 (다음 호출에서 새 token 사용) |
| approval_key 만료 | 매 reconnect 시 재발급 |
| KIS REST rate limit (초당 2회) | _throttle() 0.5s sleep (V1 패턴) |
| 분봉 buffer 메모리 누수 | deque(maxlen=60) 자동 cap. ticker ~40 → ~200KB |
| websockets 라이브러리 호환 | websockets>=12 명시 |
| WebSocket subscription / portfolio drift | pull_worker 가 매 cycle 후 비교 + 동적 subscribe/unsubscribe |
| NXT 시간대 polling 시 stock API 부하 | 5분 cron × portfolio 11 종목 → 분당 ~2 call 무시 가능 |
| 분봉 데이터 누락 (network 단절) | retry 3회 + cache. 누락 분봉 skip + WARNING |
| KIS API 점검 시간대 | KIS 점검 (보통 새벽 02:00-04:30) 은 dead zone 시간대와 일치 — 영향 없음 |
10. 운영 영향
| 항목 | 영향 |
|---|---|
| 다운타임 | 0 (signal_v2 재기동만, V1 무영향) |
| 사용자 영향 | 없음 (Phase 3a 데이터 수집만, 신호 발송은 Phase 5) |
.env 갱신 |
사용자 1회 (KIS_APP_KEY/SECRET/ACCOUNT/IS_VIRTUAL + V1_TOKEN_PATH) |
| V1 영향 | 0 (read-only 토큰 공유) |
| stock NAS 부하 | 무관 |
| KIS API 부하 | 매 분봉 cycle 분당 ~20 종목 × 2 call (분봉+호가) = 40 call/min ≈ 초당 0.67 < 2 한도 |
| WebSocket 세션 | 1 세션 / portfolio 보유 종목 (~11) 구독 |
11. Phase 3a 완료 조건 (DoD)
signal_v2/kis_client.py신규 (REST 분봉 + 호가)signal_v2/kis_websocket.py신규 (WebSocket approval_key + 호가)signal_v2/state.pyPollState확장 (minute_bars + asking_price)signal_v2/pull_worker.py분봉 cycle + WebSocket task 추가signal_v2/scheduler.pyNXT 시간대 추가signal_v2/main.pylifespan 에 KISClient/KISWebSocket 통합signal_v2/config.pyKIS env + V1_TOKEN_PATHrequirements.txt에websockets>=12- 13 신규 테스트 PASS (총 32)
.env갱신 (사용자 1회)- 운영 smoke: signal_v2 시작 → KIS WebSocket 연결 → portfolio 호가 1건 수신 →
state.asking_price갱신 → 분봉 1회 fetch →state.minute_bars갱신 - V1 봇 무영향 (토큰 read-only 공유 동작)
- git push (web-ai repo)
12. Phase 3b 와의 관계
본 Phase 3a 완료 후 즉시 Phase 3b (Chronos-2 + 분봉 모멘텀) brainstorming. 의존성:
[Phase 3a spec/plan/실행] → [Phase 3b spec/plan/실행]
1주 1주
Phase 3b 의 입력 = 본 spec 의 state.minute_bars + state.asking_price. Phase 3b 산출 = state.chronos_predictions + state.minute_momentum (Phase 4 가 사용).
13. Backlog (본 spec NOT)
- WebSocket 동적 subscribe (현재 매 cycle 일괄, 즉시 갱신 안 됨)
- KIS 분봉 60+ 보관 (장기 추세 분석용)
- 체결 데이터 (
H0STCNT0) 추가 — 자체 분봉 builder 가능성 - KIS API 응답 시간 모니터링 (Phase 7)
- V2 자체 KIS 토큰 갱신 (Phase 6 deprecation 시)
- WebSocket session 멀티 (41 종목 한도 초과 시)
- approval_key 만료 자동 감지 (현재는 reconnect 시점)