Files
web-page/docs/superpowers/specs/2026-05-16-signal-v2-phase3a-kis-data-collection.md
gahusb 1813db761f docs(signal-v2): Phase 3a KIS data collection spec
KIS REST client (minute OHLCV + asking price polling, V1 token
read-only share) + KIS WebSocket client (approval_key + portfolio
asking_price realtime subscribe) + PollState extension + scheduler
NXT windows (20:00-23:30 / 04:30-07:00). 13 new tests.

brainstorming 6 decisions: scope=B(3a/3b split) / data=B(REST minute +
WS asking) / auth=A(V1 token share) / subscribe=A(portfolio WS +
screener REST) / NXT=C(scheduler extend) / test=A(respx + WS mock).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 04:50:16 +09:00

18 KiB
Raw Blame History

Confidence Signal Pipeline V2 — Phase 3a: KIS Data Collection Design

작성일: 2026-05-16 작성자: gahusb 상태: Approved for implementation 선행 spec:

  • Phase 0 architecture (2026-05-15-confidence-signal-pipeline-v2-architecture.md)
  • Phase 1 stock WebAI API (2026-05-15-signal-v2-phase1-webai-api.md)
  • signal_v1 rename (2026-05-16-web-ai-v1-rename-to-signal-v1.md)
  • Phase 2 web-ai pull worker (2026-05-16-signal-v2-phase2-web-ai-pull-worker.md)

Phase 3 분해: Phase 0 spec 의 Phase 3 (KIS WebSocket + NXT + Chronos-2 + 분봉 모멘텀) 를 2 sub-phase 로 분해:

  • Phase 3a (본 spec): KIS 데이터 수집 (분봉 REST + 호가 WebSocket + scheduler NXT 확장)
  • Phase 3b (별도 spec): Chronos-2 추론 + 분봉 모멘텀 분류기

브레인스토밍 결정 6개:

  • scope = B (3a / 3b 분해)
  • 데이터 수집 = B (분봉 REST + 호가 WebSocket)
  • KIS 인증 = A (V1 토큰 read-only 공유)
  • 구독 범위 = A (portfolio WebSocket + screener REST polling)
  • NXT 처리 = C (stock 자동 처리 + scheduler 의 NXT 시간대 폴링 추가)
  • 테스트 = A (respx REST mock + WebSocket mock + tmp sqlite)

1. 목표

signal_v2 가 신호 판단에 필요한 KIS 실시간/준실시간 데이터 (분봉 OHLCV + 호가 매수세) 를 수집해 PollState 에 채워 넣는다. Phase 3b (Chronos-2 추론) + Phase 4 (signal generator) 가 이 위에 동작.

Why: Phase 0 §3 "web-ai = 시점 분석" 책임의 데이터 수집 부분. KIS REST 의 분봉/호가 + KIS WebSocket 의 실시간 호가가 매수/매도 룰의 핵심 입력.


2. 범위

포함 (6 항목)

  • KIS REST client (signal_v2/kis_client.py) — 분봉 polling + screener Top-N 호가 polling. V1 토큰 파일 (signal_v1/data/kis_token.json) read-only 공유.
  • KIS WebSocket client (signal_v2/kis_websocket.py) — approval_key 신규 발급 + portfolio 보유 종목 호가 실시간 구독 + reconnect with exponential backoff.
  • pull_worker.py 확장 — 분봉 1분 polling cycle 추가 + WebSocket 메시지 처리 task.
  • PollState 확장minute_bars: dict[ticker, deque(maxlen=60)], asking_price: dict[ticker, dict], last_updated["minute_bars"] / ["asking_price"].
  • scheduler.py 수정 — NXT 시간대 폴링 (20:00-23:30 / 04:30-07:00) 5분 cron 추가.
  • 테스트 13 신규 (KIS REST 4 + WebSocket 4 + scheduler NXT 3 + pull_worker 2). 기존 19 + 신규 13 = 32 total.

범위 외 (NOT)

  • Chronos-2 모델 로드 + 추론 (Phase 3b)
  • 분봉 모멘텀 분류기 (Phase 3b — 5분봉 aggregate + 5연속 양봉 룰)
  • Signal generator 매수/매도 룰 (Phase 4)
  • NXT 자체 API 호출 — V2 가 별도 NXT API client 없음. stock 측 price_fetcher 가 NXT 시간대 가격 자동 반환 (price_session 필드)
  • WebSocket 동적 subscribe 갱신 — portfolio 변동 시 다음 cycle 에서 일괄 갱신
  • 분봉 daily aggregate — 60 분봉 sliding window 만
  • 분봉 영속 저장 — 메모리만, 재기동 시 reset
  • V2 자체 KIS 토큰 발급 — Phase 6 deprecation 까지 V1 단독 갱신 책임

3. 파일 구조 + 변경 매트릭스

3.1 신규 / 수정

파일 작업 라인
signal_v2/kis_client.py 신규 ~150
signal_v2/kis_websocket.py 신규 ~180
signal_v2/state.py 필드 2개 추가 +5
signal_v2/pull_worker.py 분봉 cycle + WebSocket task +60
signal_v2/scheduler.py NXT 시간대 분기 +15
signal_v2/main.py KIS lifespan 통합 +20
signal_v2/config.py KIS env 5개 + V1 token path +10
signal_v2/tests/test_kis_client.py 신규 4 케이스 ~150
signal_v2/tests/test_kis_websocket.py 신규 4 케이스 ~170
signal_v2/tests/test_pull_worker.py 신규 2 케이스 ~80
signal_v2/tests/test_scheduler.py NXT 3 케이스 추가 +30
signal_v2/tests/test_main.py KIS lifespan 케이스 +20
signal_v2/requirements.txt websockets>=12 +1
web-ai/.env KIS env 5 + V1_TOKEN_PATH (사용자 수동) +6

3.2 외부 의존성 신규

  • websockets>=12 (KIS WebSocket client)

3.3 V1 공유 / 무영향

  • 공유 (read-only): signal_v1/data/kis_token.json — V1 의 단독 갱신 책임. V2 는 mtime 캐시 + read.
  • 무영향: V1 의 main_server.py / modules / 자동매매 봇 — Phase 6 까지 분리 유지.

4. KIS REST client (kis_client.py)

class KISClient:
    """KIS REST API (분봉 + 호가). V1 토큰 read-only 공유."""

    def __init__(
        self,
        app_key: str, app_secret: str, account: str, is_virtual: bool,
        v1_token_path: Path,
        timeout: float = 10.0,
    ):
        self._app_key = app_key
        self._app_secret = app_secret
        self._account = account
        self._is_virtual = is_virtual
        self._v1_token_path = Path(v1_token_path)
        self._base_url = (
            "https://openapivts.koreainvestment.com:29443" if is_virtual
            else "https://openapi.koreainvestment.com:9443"
        )
        self._client = httpx.AsyncClient(timeout=timeout)
        self._token_cache: tuple[str, float] | None = None  # (token, file_mtime)
        self._last_throttle_at = 0.0  # 초당 2회 제한

    async def get_minute_ohlcv(self, ticker: str) -> list[dict]:
        """현재 시점 직전 30개 1분봉 OHLCV (TR_ID: FHKST03010200).

        Returns: [{"datetime", "open", "high", "low", "close", "volume"}, ...] (시간 오름차순)
        """

    async def get_asking_price(self, ticker: str) -> dict:
        """현재 호가 5단계 + 매수/매도 잔량 (TR_ID: FHKST01010200).

        Returns: {
            "bid_total": int,
            "ask_total": int,
            "bid_ratio": float,
            "current_price": int,
            "as_of": str (ISO),
        }
        """

    async def close(self) -> None: ...

    # internal
    def _read_v1_token(self) -> str:
        """signal_v1/data/kis_token.json 읽기. mtime 캐시 — 갱신 시 자동 재로드."""

    async def _throttle(self) -> None:
        """V1 패턴 — 초당 2회 제한 (0.5s sleep)."""

    def _common_headers(self, tr_id: str) -> dict:
        """authorization, appkey, appsecret, tr_id."""

4.1 토큰 공유 디자인

  • _v1_token_path env V1_TOKEN_PATH 에서 로드. 기본값 ../signal_v1/data/kis_token.json.
  • 첫 호출 시 파일 read + mtime 캐시.
  • 매 호출 전 mtime 비교 — 변경 시 재로드. 캐시 hit 시 빠른 통과.
  • 파일 미존재 / 만료 시: WARNING log + HTTPException (Phase 6 까지 V1 단독 책임 명시).

4.2 분봉 응답 정규화

KIS API 의 분봉 raw 응답 (output2 배열) → 표준 dict 리스트로 변환. 시간 오름차순, 거래량 0 인 분봉 skip.


5. KIS WebSocket client (kis_websocket.py)

class KISWebSocket:
    """KIS WebSocket — approval_key 발급 + 호가 실시간 구독."""

    def __init__(self, app_key: str, app_secret: str, is_virtual: bool):
        self._app_key = app_key
        self._app_secret = app_secret
        self._ws_url = (
            "wss://openapivts.koreainvestment.com:29443/tryitout" if is_virtual
            else "wss://openapi.koreainvestment.com:9443/tryitout"
        )
        self._approval_key: str | None = None
        self._ws: WebSocketClientProtocol | None = None
        self._subscriptions: set[str] = set()
        self._on_asking_price: Callable[[str, dict], None] | None = None
        self._recv_task: asyncio.Task | None = None
        self._shutdown = asyncio.Event()

    async def start(
        self, tickers: list[str],
        on_asking_price: Callable[[str, dict], None],
    ) -> None:
        """approval_key 발급 + WebSocket 연결 + 종목 호가 구독 + receive loop 시작."""

    async def subscribe(self, ticker: str) -> None:
        """동적 구독 추가."""

    async def unsubscribe(self, ticker: str) -> None: ...

    async def close(self) -> None:
        """unsubscribe all + shutdown event + close socket."""

    # internal
    async def _fetch_approval_key(self) -> str:
        """POST {base_rest}/oauth2/Approval — approval_key 발급."""

    async def _send_subscription(self, ticker: str, tr_id: str = "H0STASP0") -> None:
        """tr_id H0STASP0 = 실시간 호가."""

    async def _receive_loop(self) -> None:
        """메시지 receive loop. PING/PONG 30초 + 호가 message parse → callback.
        끊김 감지 → exponential backoff (1s→2s→4s→max 30s) + reconnect + subscribe 재등록."""

    def _parse_asking_price(self, raw: str) -> tuple[str, dict] | None:
        """KIS 호가 raw string '0|H0STASP0|...|005930^...' 파싱.

        Returns: (ticker, {bid_total, ask_total, bid_ratio, current_price, as_of})
        또는 None (parse fail).
        """

5.1 메시지 형식 (KIS 공식 문서)

호가 메시지 raw 예시 (실제는 더 긴 ^ 구분 필드):

0|H0STASP0|001|005930^091500^78500^...^bid_total^ask_total^...

파싱 키 (필드 인덱스 기반):

  • ticker = 4번째 필드의 종목코드 부분
  • as_of = 5번째 필드 (HHMMSS)
  • bid_total / ask_total = 정해진 인덱스 (KIS 문서 참조)

5.2 Reconnect 정책

  • websockets 의 ConnectionClosed 캐치
  • exponential backoff: 1s → 2s → 4s → 8s → 16s → max 30s
  • 재연결 후 _subscriptions 의 모든 ticker 재구독
  • 5분 이상 연결 실패 시 ERROR log + shutdown event 발생 (운영자 알림은 Phase 7)

6. PollState 확장 + pull_worker

6.1 PollState 추가 필드

@dataclass
class PollState:
    portfolio: dict | None = None
    news_sentiment: dict | None = None
    screener_preview: dict | None = None
    # 신규 (Phase 3a)
    minute_bars: dict[str, deque] = field(default_factory=dict)  # {ticker: deque(maxlen=60)}
    asking_price: dict[str, dict] = field(default_factory=dict)  # {ticker: {bid_total, ask_total, bid_ratio, ...}}
    last_updated: dict[str, str] = field(default_factory=dict)
    fetch_errors: dict[str, int] = field(default_factory=dict)

6.2 pull_worker 확장

async def _run_polling_cycle(client, state, kis_client):
    """기존 3 endpoint (stock) + 분봉 (KIS REST) 4 fetch 병렬."""
    portfolio, sentiment, screener = await asyncio.gather(
        client.get_portfolio(),
        client.get_news_sentiment(),
        client.run_screener_preview(),
        return_exceptions=True,
    )
    # ... (기존 state 갱신)

    # 분봉 갱신 — portfolio + screener top-N 종목 대상
    tickers = _collect_tickers(state)  # portfolio + screener Top-N union
    minute_results = await asyncio.gather(*[
        kis_client.get_minute_ohlcv(t) for t in tickers
    ], return_exceptions=True)
    for ticker, result in zip(tickers, minute_results):
        if isinstance(result, list):
            state.minute_bars.setdefault(ticker, deque(maxlen=60)).extend(result)
            state.last_updated[f"minute_bars/{ticker}"] = now_iso

    # 호가 갱신 (screener Top-N 만, portfolio 는 WebSocket 으로 들어옴)
    screener_only = _screener_tickers_excluding_portfolio(state)
    asking_results = await asyncio.gather(*[
        kis_client.get_asking_price(t) for t in screener_only
    ], return_exceptions=True)
    for ticker, result in zip(screener_only, asking_results):
        if isinstance(result, dict):
            state.asking_price[ticker] = result
            state.last_updated[f"asking_price/{ticker}"] = now_iso


def on_websocket_asking_price(ticker: str, data: dict):
    """KIS WebSocket callback — portfolio 호가 실시간 갱신."""
    state.asking_price[ticker] = data
    state.last_updated[f"asking_price/{ticker}"] = datetime.now(KST).isoformat()

6.3 종목 동기화

매 cycle 후 state.portfolio.holdings 의 ticker 목록과 kis_websocket._subscriptions 비교 → 신규 추가 / 제거 ticker 별로 subscribe() / unsubscribe() 호출.


7. Scheduler NXT 시간대

# Market windows (기존)
_PRE_OPEN = time(7, 0)
_OPEN = time(9, 0)
_CLOSE = time(15, 30)
_POST_END = time(20, 0)

# NXT windows (신규)
_NXT_PRE_END = time(23, 30)
_NXT_POST_OPEN = time(4, 30)
# 23:30 - 04:30 (새벽) skip


def _next_interval(now: datetime) -> float:
    if not _is_market_day(now):
        return _seconds_until_next_market_open(now)

    t = now.time()
    if _PRE_OPEN <= t < _OPEN:
        return 300.0
    elif _OPEN <= t < _CLOSE:
        return 60.0
    elif _CLOSE <= t < _POST_END:
        return 300.0
    elif _POST_END <= t < _NXT_PRE_END:
        return 300.0   # NXT 야간 5분 (신규)
    elif _NXT_POST_OPEN <= t < _PRE_OPEN:
        return 300.0   # NXT 새벽 5분 (신규)
    else:
        return _seconds_until_next_market_open(now)


def _is_polling_window(now: datetime) -> bool:
    """이제 야간 NXT 도 포함."""
    t = now.time()
    return (
        (_PRE_OPEN <= t < _NXT_PRE_END)
        or (_NXT_POST_OPEN <= t < _PRE_OPEN)
    )

8. 테스트 (13 신규)

8.1 test_kis_client.py (4)

  • test_get_minute_ohlcv_normal_returns_30_bars — respx 200 → list[30 dict]
  • test_get_minute_ohlcv_429_retry_then_success — 429 → 1s backoff → 200
  • test_get_minute_ohlcv_uses_v1_token — v1_token_path fixture → token in header
  • test_get_asking_price_computes_bid_ratio — bid_total=600/ask_total=400 → bid_ratio=0.6

8.2 test_kis_websocket.py (4)

  • test_fetch_approval_key_via_oauth_endpoint — respx POST /oauth2/Approval → approval_key 추출
  • test_subscribe_sends_h0stasp0_message — fake WebSocket server → 종목 구독 메시지 전송 검증
  • test_parse_asking_price_extracts_bid_ask_totals — KIS raw string fixture → (ticker, dict)
  • test_reconnect_on_disconnect_with_backoff — fake server close → exponential retry

8.3 test_scheduler.py 추가 (3)

  • test_next_interval_nxt_evening_5min — now=22:00 평일 → 300
  • test_next_interval_nxt_dawn_5min — now=05:30 평일 → 300
  • test_next_interval_dead_zone_skip — now=02:00 평일 → 다음 04:30 까지

8.4 test_pull_worker.py (2)

  • test_minute_polling_cycle_updates_state_minute_bars — KIS mock → state.minute_bars[ticker] deque 갱신
  • test_websocket_message_updates_state_asking_price — WebSocket callback → state.asking_price[ticker] dict

합계: 4 + 4 + 3 + 2 = 13 신규. 기존 19 + 13 = 32 total signal_v2 tests.


9. 위험 및 완화

위험 완화
V1 토큰 파일 미존재 (V1 미가동) startup ERROR log + KIS REST 호출 fail. Phase 6 까지 V1 단독 책임
KIS WebSocket 연결 끊김 exponential backoff (1s→2s→4s→max 30s) + subscription 재등록
KIS WebSocket 호가 메시지 형식 변경 _parse_asking_price parse fail → WARNING log + skip. KIS API 변경 시 spec 갱신
V1 토큰 갱신 race (V1 갱신 중 V2 read) mtime 캐시 + 짧은 fail 허용 (다음 호출에서 새 token 사용)
approval_key 만료 매 reconnect 시 재발급
KIS REST rate limit (초당 2회) _throttle() 0.5s sleep (V1 패턴)
분봉 buffer 메모리 누수 deque(maxlen=60) 자동 cap. ticker ~40 → ~200KB
websockets 라이브러리 호환 websockets>=12 명시
WebSocket subscription / portfolio drift pull_worker 가 매 cycle 후 비교 + 동적 subscribe/unsubscribe
NXT 시간대 polling 시 stock API 부하 5분 cron × portfolio 11 종목 → 분당 ~2 call 무시 가능
분봉 데이터 누락 (network 단절) retry 3회 + cache. 누락 분봉 skip + WARNING
KIS API 점검 시간대 KIS 점검 (보통 새벽 02:00-04:30) 은 dead zone 시간대와 일치 — 영향 없음

10. 운영 영향

항목 영향
다운타임 0 (signal_v2 재기동만, V1 무영향)
사용자 영향 없음 (Phase 3a 데이터 수집만, 신호 발송은 Phase 5)
.env 갱신 사용자 1회 (KIS_APP_KEY/SECRET/ACCOUNT/IS_VIRTUAL + V1_TOKEN_PATH)
V1 영향 0 (read-only 토큰 공유)
stock NAS 부하 무관
KIS API 부하 매 분봉 cycle 분당 ~20 종목 × 2 call (분봉+호가) = 40 call/min ≈ 초당 0.67 < 2 한도
WebSocket 세션 1 세션 / portfolio 보유 종목 (~11) 구독

11. Phase 3a 완료 조건 (DoD)

  • signal_v2/kis_client.py 신규 (REST 분봉 + 호가)
  • signal_v2/kis_websocket.py 신규 (WebSocket approval_key + 호가)
  • signal_v2/state.py PollState 확장 (minute_bars + asking_price)
  • signal_v2/pull_worker.py 분봉 cycle + WebSocket task 추가
  • signal_v2/scheduler.py NXT 시간대 추가
  • signal_v2/main.py lifespan 에 KISClient/KISWebSocket 통합
  • signal_v2/config.py KIS env + V1_TOKEN_PATH
  • requirements.txtwebsockets>=12
  • 13 신규 테스트 PASS (총 32)
  • .env 갱신 (사용자 1회)
  • 운영 smoke: signal_v2 시작 → KIS WebSocket 연결 → portfolio 호가 1건 수신 → state.asking_price 갱신 → 분봉 1회 fetch → state.minute_bars 갱신
  • V1 봇 무영향 (토큰 read-only 공유 동작)
  • git push (web-ai repo)

12. Phase 3b 와의 관계

본 Phase 3a 완료 후 즉시 Phase 3b (Chronos-2 + 분봉 모멘텀) brainstorming. 의존성:

[Phase 3a spec/plan/실행]   →   [Phase 3b spec/plan/실행]
       1주                          1주

Phase 3b 의 입력 = 본 spec 의 state.minute_bars + state.asking_price. Phase 3b 산출 = state.chronos_predictions + state.minute_momentum (Phase 4 가 사용).


13. Backlog (본 spec NOT)

  • WebSocket 동적 subscribe (현재 매 cycle 일괄, 즉시 갱신 안 됨)
  • KIS 분봉 60+ 보관 (장기 추세 분석용)
  • 체결 데이터 (H0STCNT0) 추가 — 자체 분봉 builder 가능성
  • KIS API 응답 시간 모니터링 (Phase 7)
  • V2 자체 KIS 토큰 갱신 (Phase 6 deprecation 시)
  • WebSocket session 멀티 (41 종목 한도 초과 시)
  • approval_key 만료 자동 감지 (현재는 reconnect 시점)