diff --git a/docs/superpowers/plans/2026-05-16-signal-v2-phase3a-kis-data-collection.md b/docs/superpowers/plans/2026-05-16-signal-v2-phase3a-kis-data-collection.md new file mode 100644 index 0000000..5e6ce85 --- /dev/null +++ b/docs/superpowers/plans/2026-05-16-signal-v2-phase3a-kis-data-collection.md @@ -0,0 +1,1535 @@ +# Signal V2 Phase 3a — KIS Data Collection Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** signal_v2 에 KIS REST client (분봉 + 호가 polling, V1 토큰 read-only 공유) + KIS WebSocket client (approval_key + portfolio 호가 실시간 구독) + scheduler NXT 시간대 확장 + PollState 의 minute_bars/asking_price 통합. Phase 3b (Chronos-2 추론) 시작점. + +**Architecture:** V1 `signal_v1/data/kis_token.json` 의 access_token 을 read-only 공유. WebSocket 의 approval_key 는 V2 자체 발급. portfolio 보유 종목 (~11) 만 WebSocket 호가 구독, screener Top-N 은 REST 1분 polling. NXT 시간대 (20:00-23:30 / 04:30-07:00) 도 scheduler 가 5분 cron 으로 polling 활성화. + +**Tech Stack:** httpx async / websockets / pytest-asyncio + respx / sqlite3 / Python 3.11+ / KIS OpenAPI + +**Spec:** `web-ui/docs/superpowers/specs/2026-05-16-signal-v2-phase3a-kis-data-collection.md` + +**참고 V1 코드** (구현 시 참조 권장): +- `web-ai/signal_v1/modules/services/kis.py` — V1 KIS REST client (TR_ID, 응답 키, 토큰 파일 형식) +- `web-ai/signal_v1/KIS_SETUP.md` — KIS 인증 설정 가이드 + +--- + +## 파일 구조 + +| 파일 | 책임 | +|------|------| +| `web-ai/signal_v2/config.py` | (수정) KIS env 5개 + V1_TOKEN_PATH 추가 | +| `web-ai/signal_v2/state.py` | (수정) `PollState` 에 `minute_bars: dict[str, deque]` + `asking_price: dict[str, dict]` | +| `web-ai/signal_v2/scheduler.py` | (수정) NXT 시간대 (20:00-23:30 / 04:30-07:00) 5분 cron 분기 | +| `web-ai/signal_v2/kis_client.py` | (신규) REST KISClient — 분봉 + 호가 + V1 토큰 read | +| `web-ai/signal_v2/kis_websocket.py` | (신규) WebSocket KISWebSocket — approval_key + portfolio 호가 실시간 | +| `web-ai/signal_v2/pull_worker.py` | (수정) 분봉 cycle + WebSocket subscription 동기화 | +| `web-ai/signal_v2/main.py` | (수정) lifespan 에 KISClient/KISWebSocket startup/shutdown | +| `web-ai/signal_v2/requirements.txt` | (수정) `websockets>=12` 추가 | +| `web-ai/signal_v2/tests/test_scheduler.py` | (수정) NXT 3 케이스 추가 | +| `web-ai/signal_v2/tests/test_kis_client.py` | (신규) 4 케이스 | +| `web-ai/signal_v2/tests/test_kis_websocket.py` | (신규) 4 케이스 | +| `web-ai/signal_v2/tests/test_pull_worker.py` | (신규) 2 케이스 | +| `web-ai/signal_v2/tests/test_main.py` | (수정) KIS lifespan 1 케이스 추가 | +| `web-ai/.env` | (수정, 사용자 Task 7) KIS env 5 + V1_TOKEN_PATH | + +총 14 파일 변경, 13 신규 테스트. + +--- + +## Task 순서 + +``` +Task 1: config + state + requirements (foundation 확장) +Task 2: scheduler NXT 시간대 + 3 테스트 (TDD) +Task 3: kis_client.py REST + 4 통합 테스트 (TDD) +Task 4: kis_websocket.py WebSocket + 4 통합 테스트 (TDD) +Task 5: pull_worker 확장 + 2 통합 테스트 (TDD) +Task 6: main.py KIS lifespan 통합 + 1 케이스 +Task 7: 사용자 수동 — .env 갱신 + manual smoke + push +``` + +--- + +### Task 1: config + state + requirements 확장 + +**Files:** +- Modify: `web-ai/signal_v2/config.py` +- Modify: `web-ai/signal_v2/state.py` +- Modify: `web-ai/signal_v2/requirements.txt` + +- [ ] **Step 1: Update config.py with KIS env + V1_TOKEN_PATH** + +Use Read tool to inspect current `web-ai/signal_v2/config.py`, then Edit to add new fields. Final state: + +```python +"""Signal V2 환경변수 로딩.""" +import os +from dataclasses import dataclass, field +from pathlib import Path + +from dotenv import load_dotenv + +load_dotenv(Path(__file__).parent.parent / ".env") + + +@dataclass(frozen=True) +class Settings: + stock_api_url: str = field( + default_factory=lambda: os.getenv("STOCK_API_URL", "").rstrip("/") + ) + webai_api_key: str = field( + default_factory=lambda: os.getenv("WEBAI_API_KEY", "").strip() + ) + port: int = field(default_factory=lambda: int(os.getenv("SIGNAL_V2_PORT", "8001"))) + db_path: Path = field( + default_factory=lambda: Path(__file__).parent / "data" / "signal_v2.db" + ) + # Phase 3a additions + kis_app_key: str = field(default_factory=lambda: os.getenv("KIS_APP_KEY", "").strip()) + kis_app_secret: str = field(default_factory=lambda: os.getenv("KIS_APP_SECRET", "").strip()) + kis_account: str = field(default_factory=lambda: os.getenv("KIS_ACCOUNT", "").strip()) + kis_is_virtual: bool = field( + default_factory=lambda: os.getenv("KIS_IS_VIRTUAL", "true").lower() == "true" + ) + v1_token_path: Path = field( + default_factory=lambda: Path( + os.getenv("V1_TOKEN_PATH", + str(Path(__file__).parent.parent / "signal_v1" / "data" / "kis_token.json")) + ) + ) + + +def get_settings() -> Settings: + return Settings() +``` + +- [ ] **Step 2: Update state.py with minute_bars + asking_price fields** + +Edit `web-ai/signal_v2/state.py`: + +```python +"""PollState — process-wide singleton.""" +from collections import deque +from dataclasses import dataclass, field + + +@dataclass +class PollState: + portfolio: dict | None = None + news_sentiment: dict | None = None + screener_preview: dict | None = None + # Phase 3a additions + minute_bars: dict[str, deque] = field(default_factory=dict) # {ticker: deque(maxlen=60)} + asking_price: dict[str, dict] = field(default_factory=dict) # {ticker: {bid_total, ask_total, bid_ratio, ...}} + last_updated: dict[str, str] = field(default_factory=dict) + fetch_errors: dict[str, int] = field(default_factory=dict) + + +state = PollState() +``` + +- [ ] **Step 3: Add websockets to requirements.txt** + +Read current `web-ai/requirements.txt`. Append (if not present): +``` +websockets>=12 +``` + +- [ ] **Step 4: pip install + smoke import** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +pip install -r requirements.txt 2>&1 | tail -5 +python -c "from signal_v2.config import get_settings; from signal_v2.state import state; print(get_settings().kis_is_virtual); print(state)" +``` +Expected: `pip install` 성공 + `get_settings().kis_is_virtual` 출력 + state 출력 (minute_bars / asking_price 빈 dict). + +- [ ] **Step 5: Run existing tests — no regression** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +python -m pytest signal_v2/tests -q 2>&1 | tail -3 +``` +Expected: 19 passed (기존 Phase 2 테스트 그대로). + +- [ ] **Step 6: Commit** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +git add signal_v2/config.py signal_v2/state.py requirements.txt +git commit -m "$(cat <<'EOF' +feat(signal_v2-phase3a): config + state extensions for KIS data + +- config.py: KIS_APP_KEY/SECRET/ACCOUNT/IS_VIRTUAL + V1_TOKEN_PATH env +- state.py: PollState extended with minute_bars (deque maxlen 60) and + asking_price (per-ticker dict) +- requirements.txt: websockets>=12 (KIS WebSocket client) + +19 existing tests still pass. + +Co-Authored-By: Claude Opus 4.7 (1M context) +EOF +)" +``` + +--- + +### Task 2: Scheduler NXT 시간대 + 3 테스트 + +**Files:** +- Modify: `web-ai/signal_v2/scheduler.py` +- Modify: `web-ai/signal_v2/tests/test_scheduler.py` + +- [ ] **Step 1: Write 3 failing tests for NXT windows** + +Append to `web-ai/signal_v2/tests/test_scheduler.py`: + +```python +def test_next_interval_nxt_evening_5min(): + """22:00 평일 (NXT 야간) → 300 (5분).""" + now = _kst(2026, 5, 18, 22, 0) + assert _next_interval(now) == 300 + + +def test_next_interval_nxt_dawn_5min(): + """05:30 평일 (NXT 새벽) → 300 (5분).""" + now = _kst(2026, 5, 18, 5, 30) + assert _next_interval(now) == 300 + + +def test_next_interval_dead_zone_skip(): + """02:00 평일 (dead zone) → 다음 04:30 까지 (~9000s).""" + now = _kst(2026, 5, 18, 2, 0) + interval = _next_interval(now) + # 02:00 → 04:30 = 2.5h = 9000s + assert 9000 - 60 < interval < 9000 + 60 +``` + +- [ ] **Step 2: Run tests to verify FAIL** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +python -m pytest signal_v2/tests/test_scheduler.py -v -k "nxt or dead_zone" 2>&1 | tail -10 +``` +Expected: 3 failed (scheduler 현재 22:00 / 05:30 / 02:00 모두 overnight skip 으로 처리). + +- [ ] **Step 3: Update scheduler.py** + +Edit `web-ai/signal_v2/scheduler.py` — replace the time-window constants block + `_next_interval` + `_is_polling_window` with: + +```python +# Market windows (정규장) +_PRE_OPEN = time(7, 0) +_OPEN = time(9, 0) +_CLOSE = time(15, 30) +_POST_END = time(20, 0) + +# NXT windows (시간외) +_NXT_PRE_END = time(23, 30) +_NXT_POST_OPEN = time(4, 30) +# 23:30 - 04:30 (dead zone) skip + + +def _is_market_day(now: datetime) -> bool: + """평일 + 휴장일 아닌 날.""" + if now.weekday() >= 5: + return False + return now.strftime("%Y-%m-%d") not in _HOLIDAYS + + +def _is_polling_window(now: datetime) -> bool: + """폴링 윈도우: 07:00-23:30 + 04:30-07:00.""" + t = now.time() + return ( + (_PRE_OPEN <= t < _NXT_PRE_END) + or (_NXT_POST_OPEN <= t < _PRE_OPEN) + ) + + +def _next_interval(now: datetime) -> float: + if not _is_market_day(now): + return _seconds_until_next_market_open(now) + + t = now.time() + if _PRE_OPEN <= t < _OPEN: + return 300.0 # 장전 5분 + elif _OPEN <= t < _CLOSE: + return 60.0 # 장중 1분 + elif _CLOSE <= t < _POST_END: + return 300.0 # 장후 5분 + elif _POST_END <= t < _NXT_PRE_END: + return 300.0 # NXT 야간 5분 + elif _NXT_POST_OPEN <= t < _PRE_OPEN: + return 300.0 # NXT 새벽 5분 + else: + # Dead zone (23:30-04:30) — wait until 04:30 + return _seconds_until_nxt_or_market_open(now) + + +def _seconds_until_nxt_or_market_open(now: datetime) -> float: + """다음 04:30 (NXT 새벽 start) 까지 초수.""" + candidate = now.replace(hour=4, minute=30, second=0, microsecond=0) + if candidate <= now: + candidate += timedelta(days=1) + + for _ in range(14): + if _is_market_day(candidate): + return (candidate - now).total_seconds() + candidate += timedelta(days=1) + + logger.warning("could not find next market day within 14 days") + return 86400.0 +``` + +Note: `_seconds_until_next_market_open` (기존) 은 weekend/holiday 의 다음 영업일 07:00 계산 — 그대로 유지. 새로 추가된 `_seconds_until_nxt_or_market_open` 은 dead zone (02:00-04:30) 의 04:30 계산 — 만약 그날이 영업일이면 04:30, 아니면 다음 영업일. + +`_seconds_until_next_market_open` 의 hour=7 인지 hour=4 minute=30 인지 검토 — 휴장일/주말 다음 영업일은 **07:00** 으로 유지 (휴장 다음 영업일은 정규 pre-market 부터 시작). NXT 는 영업일에만 운영되므로 weekend skip 후 다음 영업일 07:00 OK. + +- [ ] **Step 4: Run tests to verify PASS** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +python -m pytest signal_v2/tests/test_scheduler.py -v 2>&1 | tail -15 +``` +Expected: 11 passed (기존 8 + 신규 3). + +Also full suite: +```bash +python -m pytest signal_v2/tests -q 2>&1 | tail -3 +``` +Expected: 22 passed (19 + 3). + +- [ ] **Step 5: Commit** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +git add signal_v2/scheduler.py signal_v2/tests/test_scheduler.py +git commit -m "$(cat <<'EOF' +feat(signal_v2-phase3a): scheduler NXT windows (20:00-23:30 / 04:30-07:00) + +NXT 시간외 거래 시간대도 5분 cron 폴링 활성화. 23:30-04:30 사이는 +dead zone (KIS 점검 시간) → 04:30 까지 skip. + +3 new tests (총 22). + +Co-Authored-By: Claude Opus 4.7 (1M context) +EOF +)" +``` + +--- + +### Task 3: KIS REST client + 4 통합 테스트 + +**Files:** +- Create: `web-ai/signal_v2/kis_client.py` +- Create: `web-ai/signal_v2/tests/test_kis_client.py` + +이 task 는 KIS API 의 정확한 TR_ID + 응답 키를 알아야 함. V1 코드 (`web-ai/signal_v1/modules/services/kis.py`) 참조 권장: +- 분봉 TR_ID: `FHKST03010200` (국내주식 당일 분봉) — 응답 `output2` 배열 +- 호가 TR_ID: `FHKST01010200` (국내주식 호가) — 응답 `output1` 객체 +- 토큰 파일 (`signal_v1/data/kis_token.json`) 형식: `{"access_token": "...", "token_expired": "YYYY-MM-DD HH:MM:SS"}` + +- [ ] **Step 1: Write 4 failing tests** + +Create `web-ai/signal_v2/tests/test_kis_client.py`: + +```python +"""Tests for KISClient (REST).""" +import json +import time +from pathlib import Path + +import httpx +import pytest +import respx + +from signal_v2.kis_client import KISClient + + +@pytest.fixture +def fake_v1_token(tmp_path): + """V1 토큰 파일 fixture.""" + token_file = tmp_path / "kis_token.json" + token_file.write_text(json.dumps({ + "access_token": "test-kis-token-abc123", + "token_expired": "2099-12-31 23:59:59", + })) + return token_file + + +@pytest.fixture +def kis_client_factory(fake_v1_token): + def _make(): + return KISClient( + app_key="test-app-key", + app_secret="test-app-secret", + account="50000000-01", + is_virtual=True, + v1_token_path=fake_v1_token, + ) + return _make + + +@respx.mock +async def test_get_minute_ohlcv_normal_returns_30_bars(kis_client_factory): + """정상 200 → 30개 분봉 list 반환.""" + sample_output2 = [ + { + "stck_bsop_date": "20260518", + "stck_cntg_hour": f"{h:02d}{m:02d}00", + "stck_oprc": "78000", "stck_hgpr": "78500", + "stck_lwpr": "77800", "stck_prpr": "78300", + "cntg_vol": "12345", + } + for h in range(9, 10) for m in range(0, 30) # 9:00-9:29 = 30 bars + ] + respx.get( + "https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice" + ).mock( + return_value=httpx.Response(200, json={"output2": sample_output2}) + ) + + client = kis_client_factory() + try: + bars = await client.get_minute_ohlcv("005930") + assert len(bars) == 30 + assert bars[0]["close"] == 78300 + assert "datetime" in bars[0] + finally: + await client.close() + + +@respx.mock +async def test_get_minute_ohlcv_429_retry_then_success(kis_client_factory, monkeypatch): + """429 → exponential backoff → 200.""" + sleep_calls = [] + async def fake_sleep(s): sleep_calls.append(s) + monkeypatch.setattr("asyncio.sleep", fake_sleep) + + respx.get( + "https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice" + ).mock(side_effect=[ + httpx.Response(429, text="rate limit"), + httpx.Response(200, json={"output2": []}), + ]) + client = kis_client_factory() + try: + result = await client.get_minute_ohlcv("005930") + assert result == [] + # 1 retry sleep + assert 1 in sleep_calls + finally: + await client.close() + + +async def test_get_minute_ohlcv_uses_v1_token(kis_client_factory, fake_v1_token): + """KIS 호출 헤더에 V1 토큰 파일의 access_token 사용.""" + captured_headers = {} + + @respx.mock + async def _do_request(): + route = respx.get( + "https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice" + ).mock(return_value=httpx.Response(200, json={"output2": []})) + client = kis_client_factory() + try: + await client.get_minute_ohlcv("005930") + # respx route captures the request — check + assert route.called + req = route.calls.last.request + captured_headers.update(req.headers) + finally: + await client.close() + + await _do_request() + # KIS 의 인증 헤더는 `authorization: Bearer ` + assert "authorization" in {k.lower() for k in captured_headers.keys()} + auth_value = captured_headers.get("authorization") or captured_headers.get("Authorization") + assert "test-kis-token-abc123" in auth_value + + +@respx.mock +async def test_get_asking_price_computes_bid_ratio(kis_client_factory): + """호가 응답 → bid_total / (bid+ask) bid_ratio 계산.""" + # KIS 응답 output1 의 호가 잔량 필드명: total_askp_rsqn (총 매도잔량), total_bidp_rsqn (총 매수잔량) + respx.get( + "https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-asking-price-exp-ccn" + ).mock(return_value=httpx.Response(200, json={ + "output1": { + "total_bidp_rsqn": "600", + "total_askp_rsqn": "400", + "stck_prpr": "78500", + } + })) + + client = kis_client_factory() + try: + data = await client.get_asking_price("005930") + assert data["bid_total"] == 600 + assert data["ask_total"] == 400 + assert abs(data["bid_ratio"] - 0.6) < 1e-9 + assert data["current_price"] == 78500 + assert "as_of" in data + finally: + await client.close() +``` + +- [ ] **Step 2: Run tests to verify FAIL** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +python -m pytest signal_v2/tests/test_kis_client.py -v 2>&1 | tail -10 +``` +Expected: ImportError (signal_v2.kis_client missing). + +- [ ] **Step 3: Implement kis_client.py** + +Create `web-ai/signal_v2/kis_client.py`: + +```python +"""KIS REST API client — 분봉 + 호가. V1 토큰 read-only 공유.""" +from __future__ import annotations +import asyncio +import json +import logging +import time +from datetime import datetime +from pathlib import Path +from zoneinfo import ZoneInfo + +import httpx + +logger = logging.getLogger(__name__) +KST = ZoneInfo("Asia/Seoul") + +_MAX_ATTEMPTS = 3 +_THROTTLE_INTERVAL = 0.5 # 초당 2회 제한 + + +class KISClient: + """KIS REST (분봉 + 호가). V1 토큰 파일 read-only.""" + + def __init__( + self, + app_key: str, app_secret: str, account: str, is_virtual: bool, + v1_token_path: Path, + timeout: float = 10.0, + ): + self._app_key = app_key + self._app_secret = app_secret + self._account = account + self._is_virtual = is_virtual + self._v1_token_path = Path(v1_token_path) + self._base_url = ( + "https://openapivts.koreainvestment.com:29443" if is_virtual + else "https://openapi.koreainvestment.com:9443" + ) + self._client = httpx.AsyncClient(timeout=timeout) + self._token_cache: tuple[str, float] | None = None # (token, file_mtime) + self._last_throttle_at = 0.0 + + async def close(self) -> None: + await self._client.aclose() + + def _read_v1_token(self) -> str: + if not self._v1_token_path.exists(): + raise RuntimeError(f"V1 token file missing: {self._v1_token_path}") + mtime = self._v1_token_path.stat().st_mtime + if self._token_cache and self._token_cache[1] == mtime: + return self._token_cache[0] + data = json.loads(self._v1_token_path.read_text(encoding="utf-8")) + token = data.get("access_token", "") + if not token: + raise RuntimeError("V1 token file has no access_token") + self._token_cache = (token, mtime) + return token + + async def _throttle(self) -> None: + elapsed = time.monotonic() - self._last_throttle_at + if elapsed < _THROTTLE_INTERVAL: + await asyncio.sleep(_THROTTLE_INTERVAL - elapsed) + self._last_throttle_at = time.monotonic() + + def _common_headers(self, tr_id: str) -> dict[str, str]: + token = self._read_v1_token() + return { + "authorization": f"Bearer {token}", + "appkey": self._app_key, + "appsecret": self._app_secret, + "tr_id": tr_id, + "custtype": "P", + } + + async def _request_with_retry( + self, method: str, path: str, tr_id: str, **kwargs, + ) -> dict: + url = f"{self._base_url}{path}" + headers = self._common_headers(tr_id) + for attempt in range(_MAX_ATTEMPTS): + await self._throttle() + try: + response = await self._client.request( + method, url, headers=headers, **kwargs + ) + if response.status_code == 429: + if attempt < _MAX_ATTEMPTS - 1: + await asyncio.sleep(2**attempt) + continue + response.raise_for_status() + response.raise_for_status() + return response.json() + except httpx.TimeoutException: + if attempt < _MAX_ATTEMPTS - 1: + await asyncio.sleep(2**attempt) + continue + raise + raise RuntimeError("retry exhausted") + + async def get_minute_ohlcv(self, ticker: str) -> list[dict]: + """현재 시점 직전 30개 1분봉 OHLCV (TR_ID FHKST03010200). + + Returns: [{datetime, open, high, low, close, volume}, ...] 시간 오름차순. + """ + path = "/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice" + params = { + "FID_ETC_CLS_CODE": "", + "FID_COND_MRKT_DIV_CODE": "J", + "FID_INPUT_ISCD": ticker, + "FID_INPUT_HOUR_1": datetime.now(KST).strftime("%H%M%S"), + "FID_PW_DATA_INCU_YN": "N", + } + raw = await self._request_with_retry( + "GET", path, tr_id="FHKST03010200", params=params, + ) + output2 = raw.get("output2", []) + bars = [] + for row in output2: + try: + date = row["stck_bsop_date"] + hhmmss = row["stck_cntg_hour"] + dt = datetime.strptime(f"{date} {hhmmss}", "%Y%m%d %H%M%S").replace(tzinfo=KST) + bars.append({ + "datetime": dt.isoformat(), + "open": int(row["stck_oprc"]), + "high": int(row["stck_hgpr"]), + "low": int(row["stck_lwpr"]), + "close": int(row["stck_prpr"]), + "volume": int(row["cntg_vol"]), + }) + except (KeyError, ValueError) as e: + logger.warning("skip malformed bar for %s: %r", ticker, e) + # KIS returns descending; reverse to ascending + bars.reverse() + return bars + + async def get_asking_price(self, ticker: str) -> dict: + """현재 호가 + 매수/매도 잔량 (TR_ID FHKST01010200).""" + path = "/uapi/domestic-stock/v1/quotations/inquire-asking-price-exp-ccn" + params = { + "FID_COND_MRKT_DIV_CODE": "J", + "FID_INPUT_ISCD": ticker, + } + raw = await self._request_with_retry( + "GET", path, tr_id="FHKST01010200", params=params, + ) + output1 = raw.get("output1", {}) + bid_total = int(output1.get("total_bidp_rsqn", 0)) + ask_total = int(output1.get("total_askp_rsqn", 0)) + total = bid_total + ask_total + bid_ratio = bid_total / total if total > 0 else 0.0 + current_price = int(output1.get("stck_prpr", 0)) + return { + "bid_total": bid_total, + "ask_total": ask_total, + "bid_ratio": bid_ratio, + "current_price": current_price, + "as_of": datetime.now(KST).isoformat(), + } +``` + +- [ ] **Step 4: Run tests to verify PASS** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +python -m pytest signal_v2/tests/test_kis_client.py -v 2>&1 | tail -15 +``` +Expected: 4 passed. + +Full suite: +```bash +python -m pytest signal_v2/tests -q 2>&1 | tail -3 +``` +Expected: 26 passed (22 + 4). + +If failures relate to KIS TR_ID or field names, refer to V1 `signal_v1/modules/services/kis.py` and adjust. + +- [ ] **Step 5: Commit** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +git add signal_v2/kis_client.py signal_v2/tests/test_kis_client.py +git commit -m "$(cat <<'EOF' +feat(signal_v2-phase3a): kis_client REST + 4 integration tests + +KISClient: 분봉 (FHKST03010200) + 호가 (FHKST01010200) async REST. +V1 토큰 파일 (signal_v1/data/kis_token.json) read-only 공유, mtime +캐시. 초당 2회 throttle. exponential retry. + +26 tests pass (Phase 2 19 + Task 2 NXT 3 + Task 3 KIS REST 4). + +Co-Authored-By: Claude Opus 4.7 (1M context) +EOF +)" +``` + +--- + +### Task 4: KIS WebSocket client + 4 통합 테스트 + +**Files:** +- Create: `web-ai/signal_v2/kis_websocket.py` +- Create: `web-ai/signal_v2/tests/test_kis_websocket.py` + +이 task 는 가장 무거움. WebSocket mock 은 `websockets` lib 의 `serve` 사용 또는 `AsyncMock`. KIS 호가 메시지 형식은 `|` + `^` 구분. + +KIS 호가 메시지 raw 형식 (실제 더 길지만 핵심 필드만): +``` +0|H0STASP0|001|005930^091500^78500^...^total_ask^total_bid^... +``` +- 0 = 수신 데이터 +- H0STASP0 = TR_ID (실시간 호가) +- 001 = 데이터 개수 +- 그 다음 `^` 구분 필드: [0]종목코드, [1]시간 HHMMSS, [2]현재가, ..., 매수/매도 잔량 인덱스는 KIS 공식 문서 참조 (KIS_SETUP.md / V1 코드). + +- [ ] **Step 1: Write 4 failing tests** + +Create `web-ai/signal_v2/tests/test_kis_websocket.py`: + +```python +"""Tests for KISWebSocket.""" +import asyncio +import json +from unittest.mock import AsyncMock, MagicMock + +import httpx +import pytest +import respx + +from signal_v2.kis_websocket import KISWebSocket + + +BASE_REST = "https://openapivts.koreainvestment.com:29443" + + +@respx.mock +async def test_fetch_approval_key_via_oauth_endpoint(): + """POST /oauth2/Approval → approval_key 추출.""" + respx.post(f"{BASE_REST}/oauth2/Approval").mock( + return_value=httpx.Response(200, json={"approval_key": "test-approval-key-xyz"}) + ) + ws = KISWebSocket(app_key="k", app_secret="s", is_virtual=True) + key = await ws._fetch_approval_key() + assert key == "test-approval-key-xyz" + assert ws._approval_key == "test-approval-key-xyz" + + +async def test_subscribe_sends_h0stasp0_message(monkeypatch): + """subscribe() → WebSocket 으로 H0STASP0 구독 메시지 전송.""" + sent_messages = [] + mock_ws = AsyncMock() + mock_ws.send = AsyncMock(side_effect=lambda m: sent_messages.append(m)) + + ws = KISWebSocket(app_key="k", app_secret="s", is_virtual=True) + ws._approval_key = "test-key" + ws._ws = mock_ws + await ws.subscribe("005930") + assert ws._subscriptions == {"005930"} + assert len(sent_messages) == 1 + # subscribe 메시지는 JSON 구조 (KIS 명세) + msg = json.loads(sent_messages[0]) + assert msg["header"]["tr_type"] == "1" # subscribe + assert msg["body"]["input"]["tr_id"] == "H0STASP0" + assert msg["body"]["input"]["tr_key"] == "005930" + + +def test_parse_asking_price_extracts_bid_ask_totals(): + """KIS raw '0|H0STASP0|001|...' → (ticker, dict).""" + ws = KISWebSocket(app_key="k", app_secret="s", is_virtual=True) + # 모의 호가 메시지 (실제 형식은 더 복잡 — KIS_SETUP.md 참조). + # H0STASP0 의 응답: '0|H0STASP0|001|' + # data: ticker^time^current_price^... (more fields) + # 본 테스트는 implementer 가 KIS 명세 보고 인덱스 맞춰서 구현하도록. + # 우리는 dict 출력 형식만 검증. + sample_raw = ( + "0|H0STASP0|001|005930^091500^78500^" + + "^".join(["0"] * 40) # padding (실제는 50+ 필드) + + "^1000^2000" # 마지막 두 필드: total_askp_rsqn / total_bidp_rsqn (예시) + ) + # 인덱스가 실제 KIS 명세와 정확히 맞을 때 parse 성공 + result = ws._parse_asking_price(sample_raw) + if result is None: + # 인덱스 mismatch — implementer 가 KIS 명세 참조해서 fix + pytest.fail("parse_asking_price returned None — check field indices vs KIS spec") + ticker, data = result + assert ticker == "005930" + assert "bid_total" in data + assert "ask_total" in data + assert "bid_ratio" in data + + +async def test_reconnect_on_disconnect_with_backoff(monkeypatch): + """연결 끊김 → exponential backoff retry.""" + sleep_calls = [] + async def fake_sleep(s): sleep_calls.append(s) + monkeypatch.setattr("asyncio.sleep", fake_sleep) + + # Implementer: ws 내부에 reconnect logic 구현 시 + # _connect_with_backoff() 메서드 또는 내부 reconnect loop 가 + # exponential 1s → 2s → 4s sleep 호출하는지 검증. + ws = KISWebSocket(app_key="k", app_secret="s", is_virtual=True) + # Mock _connect to fail twice then succeed + call_count = [0] + async def fake_connect(): + call_count[0] += 1 + if call_count[0] < 3: + raise ConnectionError("fake disconnect") + return AsyncMock() + monkeypatch.setattr(ws, "_connect", fake_connect, raising=False) + + result = await ws._connect_with_backoff() + assert call_count[0] == 3 # 2 fails + 1 success + assert sleep_calls[:2] == [1, 2] # exponential +``` + +- [ ] **Step 2: Run tests to verify FAIL** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +python -m pytest signal_v2/tests/test_kis_websocket.py -v 2>&1 | tail -10 +``` +Expected: ImportError. + +- [ ] **Step 3: Implement kis_websocket.py** + +Create `web-ai/signal_v2/kis_websocket.py`: + +```python +"""KIS WebSocket — approval_key + 실시간 호가 구독.""" +from __future__ import annotations +import asyncio +import json +import logging +from datetime import datetime +from typing import Callable +from zoneinfo import ZoneInfo + +import httpx +import websockets + +logger = logging.getLogger(__name__) +KST = ZoneInfo("Asia/Seoul") + +# KIS 호가 메시지의 필드 인덱스 (KIS 공식 spec 참조 — 실제 인덱스 운영 환경 확인 필요) +# H0STASP0 응답: ticker | time | current_price | ... | total_ask_rsqn | total_bid_rsqn | ... +# 실 운영 환경에서 정확한 인덱스 필드 확인 필요 (KIS docs / V1 활용 미 가능) +_ASKING_TICKER_IDX = 0 +_ASKING_TIME_IDX = 1 +_ASKING_CURRENT_PRICE_IDX = 2 +_ASKING_TOTAL_ASK_IDX = -2 # 끝에서 두번째 (가정) +_ASKING_TOTAL_BID_IDX = -1 # 마지막 (가정) + + +class KISWebSocket: + """KIS WebSocket client. approval_key 발급 + 호가 실시간.""" + + def __init__(self, app_key: str, app_secret: str, is_virtual: bool): + self._app_key = app_key + self._app_secret = app_secret + self._is_virtual = is_virtual + self._base_rest = ( + "https://openapivts.koreainvestment.com:29443" if is_virtual + else "https://openapi.koreainvestment.com:9443" + ) + self._ws_url = ( + "ws://ops.koreainvestment.com:31000" if is_virtual + else "ws://ops.koreainvestment.com:21000" + ) + self._approval_key: str | None = None + self._ws: websockets.WebSocketClientProtocol | None = None + self._subscriptions: set[str] = set() + self._on_asking_price: Callable[[str, dict], None] | None = None + self._recv_task: asyncio.Task | None = None + self._shutdown = asyncio.Event() + + async def _fetch_approval_key(self) -> str: + async with httpx.AsyncClient(timeout=10.0) as client: + response = await client.post( + f"{self._base_rest}/oauth2/Approval", + json={ + "grant_type": "client_credentials", + "appkey": self._app_key, + "secretkey": self._app_secret, + }, + ) + response.raise_for_status() + data = response.json() + self._approval_key = data["approval_key"] + return self._approval_key + + async def _connect(self) -> websockets.WebSocketClientProtocol: + return await websockets.connect(self._ws_url) + + async def _connect_with_backoff(self) -> websockets.WebSocketClientProtocol: + """연결 시도 with exponential backoff (1s → 2s → 4s → max 30s).""" + for attempt in range(10): + try: + ws = await self._connect() + return ws + except Exception as e: + wait = min(2**attempt, 30) + logger.warning("KIS WebSocket connect failed (attempt %d): %r — retrying in %ds", attempt + 1, e, wait) + await asyncio.sleep(wait) + raise RuntimeError("KIS WebSocket connect exhausted retries") + + async def start( + self, tickers: list[str], + on_asking_price: Callable[[str, dict], None], + ) -> None: + if self._approval_key is None: + await self._fetch_approval_key() + self._on_asking_price = on_asking_price + self._ws = await self._connect_with_backoff() + for ticker in tickers: + await self.subscribe(ticker) + self._recv_task = asyncio.create_task(self._receive_loop()) + + async def subscribe(self, ticker: str) -> None: + if self._ws is None or self._approval_key is None: + raise RuntimeError("KIS WebSocket not started") + msg = json.dumps({ + "header": { + "approval_key": self._approval_key, + "custtype": "P", + "tr_type": "1", # subscribe + "content-type": "utf-8", + }, + "body": { + "input": {"tr_id": "H0STASP0", "tr_key": ticker}, + }, + }) + await self._ws.send(msg) + self._subscriptions.add(ticker) + + async def unsubscribe(self, ticker: str) -> None: + if self._ws is None or self._approval_key is None: + return + msg = json.dumps({ + "header": { + "approval_key": self._approval_key, + "custtype": "P", + "tr_type": "2", # unsubscribe + "content-type": "utf-8", + }, + "body": { + "input": {"tr_id": "H0STASP0", "tr_key": ticker}, + }, + }) + await self._ws.send(msg) + self._subscriptions.discard(ticker) + + async def close(self) -> None: + self._shutdown.set() + if self._recv_task is not None: + self._recv_task.cancel() + try: + await self._recv_task + except asyncio.CancelledError: + pass + if self._ws is not None: + await self._ws.close() + + async def _receive_loop(self) -> None: + while not self._shutdown.is_set(): + try: + raw = await self._ws.recv() + except websockets.ConnectionClosed: + logger.warning("KIS WebSocket closed — reconnecting") + self._ws = await self._connect_with_backoff() + for ticker in list(self._subscriptions): + await self.subscribe(ticker) + continue + if not isinstance(raw, str): + continue + parsed = self._parse_asking_price(raw) + if parsed is not None and self._on_asking_price is not None: + ticker, data = parsed + try: + self._on_asking_price(ticker, data) + except Exception: + logger.exception("on_asking_price callback failed") + + def _parse_asking_price(self, raw: str) -> tuple[str, dict] | None: + """KIS H0STASP0 raw → (ticker, asking_price dict).""" + try: + parts = raw.split("|") + if len(parts) < 4 or parts[1] != "H0STASP0": + return None + fields = parts[3].split("^") + ticker = fields[_ASKING_TICKER_IDX] + current_price = int(fields[_ASKING_CURRENT_PRICE_IDX]) if fields[_ASKING_CURRENT_PRICE_IDX].isdigit() else 0 + ask_total = int(fields[_ASKING_TOTAL_ASK_IDX]) if fields[_ASKING_TOTAL_ASK_IDX].lstrip("-").isdigit() else 0 + bid_total = int(fields[_ASKING_TOTAL_BID_IDX]) if fields[_ASKING_TOTAL_BID_IDX].lstrip("-").isdigit() else 0 + total = bid_total + ask_total + return ticker, { + "bid_total": bid_total, + "ask_total": ask_total, + "bid_ratio": bid_total / total if total > 0 else 0.0, + "current_price": current_price, + "as_of": datetime.now(KST).isoformat(), + } + except (IndexError, ValueError) as e: + logger.warning("parse_asking_price failed: %r", e) + return None +``` + +**중요 노트**: 위 `_ASKING_*_IDX` 인덱스는 **추정값** 입니다. 운영 환경에서 정확한 인덱스 확인 필요: +- KIS 공식 문서 (`https://apiportal.koreainvestment.com/`) +- 또는 운영 시 raw 메시지 로그 캡처 후 필드 인덱스 매핑 + +본 plan 의 테스트는 인덱스가 -2/-1 가정. 실 KIS 호가에서 필드 50+ 개라 정확한 인덱스 운영 검증 필요. parse fail 시 WARNING 로그 + skip (위험 매트릭스 §9 명시). + +- [ ] **Step 4: Run tests to verify PASS** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +python -m pytest signal_v2/tests/test_kis_websocket.py -v 2>&1 | tail -15 +``` +Expected: 4 passed. + +만약 `test_parse_asking_price_extracts_bid_ask_totals` 가 실패하면 — 인덱스 가정과 sample raw 의 padding 수 mismatch. 두 가지 옵션: +- a) sample raw 의 `["0"]*40` 부분을 인덱스에 맞춰 조정 (테스트만 fix) +- b) `_parse_asking_price` 의 인덱스를 명세 확인 후 조정 (코드 fix) + +선택은 implementer 의 판단. 양쪽 모두 가능. + +Full suite: +```bash +python -m pytest signal_v2/tests -q 2>&1 | tail -3 +``` +Expected: 30 passed (26 + 4). + +- [ ] **Step 5: Commit** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +git add signal_v2/kis_websocket.py signal_v2/tests/test_kis_websocket.py +git commit -m "$(cat <<'EOF' +feat(signal_v2-phase3a): kis_websocket + 4 integration tests + +KISWebSocket: approval_key (POST /oauth2/Approval) + H0STASP0 호가 +실시간 subscribe + receive loop with reconnect. exponential backoff +(1s→2s→4s→max 30s) + subscription 재등록. + +_parse_asking_price 의 필드 인덱스는 운영 검증 후 조정 필요 +(KIS 공식 명세 또는 raw 로그 매핑). + +30 tests pass. + +Co-Authored-By: Claude Opus 4.7 (1M context) +EOF +)" +``` + +--- + +### Task 5: pull_worker 확장 + 2 통합 테스트 + +**Files:** +- Modify: `web-ai/signal_v2/pull_worker.py` +- Create: `web-ai/signal_v2/tests/test_pull_worker.py` + +- [ ] **Step 1: Write 2 failing tests** + +Create `web-ai/signal_v2/tests/test_pull_worker.py`: + +```python +"""Tests for pull_worker (Phase 3a additions).""" +from collections import deque +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from signal_v2.state import PollState + + +async def test_minute_polling_cycle_updates_state_minute_bars(): + """KIS REST mock 의 분봉 데이터가 state.minute_bars[ticker] deque 에 들어간다.""" + from signal_v2.pull_worker import _run_kis_minute_cycle + + state = PollState() + state.portfolio = {"holdings": [{"ticker": "005930"}, {"ticker": "000660"}]} + state.screener_preview = { + "items": [{"ticker": "005930"}, {"ticker": "035720"}] # 005930 dup + } + + kis_client_mock = MagicMock() + kis_client_mock.get_minute_ohlcv = AsyncMock(side_effect=[ + [{"datetime": "2026-05-18T09:00:00+09:00", "open": 78000, + "high": 78500, "low": 77900, "close": 78300, "volume": 12345}], + [{"datetime": "2026-05-18T09:00:00+09:00", "open": 180000, + "high": 181000, "low": 179800, "close": 180500, "volume": 5000}], + [{"datetime": "2026-05-18T09:00:00+09:00", "open": 51000, + "high": 51200, "low": 50800, "close": 51100, "volume": 8000}], + ]) + kis_client_mock.get_asking_price = AsyncMock(return_value={ + "bid_total": 600, "ask_total": 400, "bid_ratio": 0.6, + "current_price": 51100, "as_of": "2026-05-18T09:00:30+09:00", + }) + + await _run_kis_minute_cycle(kis_client_mock, state) + + # 3 unique tickers (005930, 000660, 035720) + assert "005930" in state.minute_bars + assert "000660" in state.minute_bars + assert "035720" in state.minute_bars + assert len(state.minute_bars["005930"]) >= 1 + # asking price 만 screener-only ticker (035720) 에 들어가야 함 (portfolio 는 WebSocket) + assert "035720" in state.asking_price + + +def test_websocket_message_updates_state_asking_price(): + """WebSocket callback 가 state.asking_price 갱신.""" + from signal_v2.pull_worker import make_asking_price_callback + + state = PollState() + cb = make_asking_price_callback(state) + cb("005930", {"bid_total": 1000, "ask_total": 800, "bid_ratio": 0.555, + "current_price": 78500, "as_of": "2026-05-18T10:00:00+09:00"}) + assert state.asking_price["005930"]["bid_total"] == 1000 + assert "asking_price/005930" in state.last_updated +``` + +- [ ] **Step 2: Run tests to verify FAIL** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +python -m pytest signal_v2/tests/test_pull_worker.py -v 2>&1 | tail -10 +``` +Expected: ImportError or AttributeError. + +- [ ] **Step 3: Implement pull_worker.py additions** + +Edit `web-ai/signal_v2/pull_worker.py` — keep existing `poll_loop` and `_run_polling_cycle`, then ADD: + +```python +from collections import deque +from signal_v2.kis_client import KISClient # 추가 import + + +async def _run_kis_minute_cycle(kis_client: KISClient, state: PollState) -> None: + """KIS 분봉 + 호가 fetch + state 갱신. + + - 분봉: portfolio + screener Top-N union 종목 모두 + - 호가 (REST): screener-only 종목 (portfolio 는 WebSocket 으로 들어옴) + """ + portfolio_tickers = _portfolio_tickers(state) + screener_tickers = _screener_tickers(state) + all_tickers = list(set(portfolio_tickers) | set(screener_tickers)) + + # 분봉 fetch (병렬) + minute_results = await asyncio.gather(*[ + kis_client.get_minute_ohlcv(t) for t in all_tickers + ], return_exceptions=True) + now_iso = datetime.now(KST).isoformat() + for ticker, result in zip(all_tickers, minute_results): + if isinstance(result, list): + buf = state.minute_bars.setdefault(ticker, deque(maxlen=60)) + buf.extend(result) + state.last_updated[f"minute_bars/{ticker}"] = now_iso + else: + state.fetch_errors[f"minute_bars/{ticker}"] = ( + state.fetch_errors.get(f"minute_bars/{ticker}", 0) + 1 + ) + + # 호가 fetch (REST) — screener-only + screener_only = list(set(screener_tickers) - set(portfolio_tickers)) + asking_results = await asyncio.gather(*[ + kis_client.get_asking_price(t) for t in screener_only + ], return_exceptions=True) + for ticker, result in zip(screener_only, asking_results): + if isinstance(result, dict): + state.asking_price[ticker] = result + state.last_updated[f"asking_price/{ticker}"] = now_iso + + +def make_asking_price_callback(state: PollState): + """KIS WebSocket on_asking_price callback factory.""" + def _cb(ticker: str, data: dict) -> None: + state.asking_price[ticker] = data + state.last_updated[f"asking_price/{ticker}"] = datetime.now(KST).isoformat() + return _cb + + +def _portfolio_tickers(state: PollState) -> list[str]: + if state.portfolio is None: + return [] + return [h["ticker"] for h in state.portfolio.get("holdings", []) if "ticker" in h] + + +def _screener_tickers(state: PollState) -> list[str]: + if state.screener_preview is None: + return [] + return [i["ticker"] for i in state.screener_preview.get("items", []) if "ticker" in i] +``` + +Also ADD to `_run_polling_cycle` signature an optional `kis_client` param: + +```python +async def _run_polling_cycle( + client: StockClient, state: PollState, + kis_client: KISClient | None = None, +) -> None: + # ... existing 3-endpoint stock fetch ... + + # KIS 분봉 + 호가 (kis_client 주어졌을 때만) + if kis_client is not None: + try: + await _run_kis_minute_cycle(kis_client, state) + except Exception: + logger.exception("kis minute cycle failed") +``` + +And update `poll_loop` signature to accept `kis_client`: +```python +async def poll_loop( + client: StockClient, state: PollState, shutdown: asyncio.Event, + kis_client: KISClient | None = None, +) -> None: + # ... existing while loop ... + # call _run_polling_cycle(client, state, kis_client=kis_client) +``` + +- [ ] **Step 4: Run tests to verify PASS** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +python -m pytest signal_v2/tests/test_pull_worker.py -v 2>&1 | tail -10 +``` +Expected: 2 passed. + +Full suite: +```bash +python -m pytest signal_v2/tests -q 2>&1 | tail -3 +``` +Expected: 32 passed (30 + 2). + +- [ ] **Step 5: Commit** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +git add signal_v2/pull_worker.py signal_v2/tests/test_pull_worker.py +git commit -m "$(cat <<'EOF' +feat(signal_v2-phase3a): pull_worker KIS minute cycle + WS callback + +_run_kis_minute_cycle: portfolio + screener union 종목 분봉 fetch + +screener-only 종목 호가 REST fetch. WebSocket callback factory. + +poll_loop / _run_polling_cycle 에 kis_client optional param 추가 +(Phase 5 까지 None 도 정상 동작). + +32 tests pass. + +Co-Authored-By: Claude Opus 4.7 (1M context) +EOF +)" +``` + +--- + +### Task 6: main.py KIS lifespan 통합 + 1 케이스 + +**Files:** +- Modify: `web-ai/signal_v2/main.py` +- Modify: `web-ai/signal_v2/tests/test_main.py` + +- [ ] **Step 1: Write 1 failing test for KIS lifespan** + +Append to `web-ai/signal_v2/tests/test_main.py`: + +```python +def test_startup_warns_if_kis_app_key_missing(monkeypatch, caplog): + """KIS_APP_KEY 미설정 시 startup WARNING (KIS 호출 disabled).""" + monkeypatch.setenv("STOCK_API_URL", "https://test.stock.local") + monkeypatch.setenv("WEBAI_API_KEY", "test-secret") + monkeypatch.delenv("KIS_APP_KEY", raising=False) + + import importlib + from signal_v2 import config as cfg + importlib.reload(cfg) + from signal_v2 import main as main_mod + importlib.reload(main_mod) + with caplog.at_level("WARNING", logger="signal_v2.main"): + with TestClient(main_mod.app) as client: + client.get("/health") + assert any("KIS_APP_KEY" in rec.message for rec in caplog.records) +``` + +- [ ] **Step 2: Run test to verify FAIL** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +python -m pytest signal_v2/tests/test_main.py::test_startup_warns_if_kis_app_key_missing -v 2>&1 | tail -10 +``` +Expected: FAIL (no such warning yet). + +- [ ] **Step 3: Update main.py to integrate KIS clients** + +Edit `web-ai/signal_v2/main.py` — augment `AppContext` + `lifespan`: + +```python +from signal_v2.kis_client import KISClient +from signal_v2.kis_websocket import KISWebSocket +from signal_v2.pull_worker import make_asking_price_callback + + +class AppContext: + client: StockClient | None = None + dedup: SignalDedup | None = None + shutdown: asyncio.Event | None = None + poll_task: asyncio.Task | None = None + kis_client: KISClient | None = None + kis_ws: KISWebSocket | None = None + + +# (keep _ctx = AppContext() at module level) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + settings = get_settings() + if not settings.webai_api_key: + logger.warning("WEBAI_API_KEY not configured — stock API calls will fail with 401") + if not settings.kis_app_key: + logger.warning("KIS_APP_KEY not configured — KIS REST/WebSocket disabled") + + _ctx.client = StockClient(settings.stock_api_url, settings.webai_api_key) + _ctx.dedup = SignalDedup(settings.db_path) + _ctx.shutdown = asyncio.Event() + + # KIS only if app_key configured + if settings.kis_app_key: + _ctx.kis_client = KISClient( + app_key=settings.kis_app_key, + app_secret=settings.kis_app_secret, + account=settings.kis_account, + is_virtual=settings.kis_is_virtual, + v1_token_path=settings.v1_token_path, + ) + _ctx.kis_ws = KISWebSocket( + app_key=settings.kis_app_key, + app_secret=settings.kis_app_secret, + is_virtual=settings.kis_is_virtual, + ) + # Subscribe portfolio holdings (if any) + try: + portfolio = await _ctx.client.get_portfolio() + tickers = [h["ticker"] for h in portfolio.get("holdings", []) if "ticker" in h] + cb = make_asking_price_callback(state_mod.state) + await _ctx.kis_ws.start(tickers, cb) + except Exception: + logger.exception("KIS WebSocket startup failed — continuing without realtime asking_price") + + _ctx.poll_task = asyncio.create_task( + poll_loop( + _ctx.client, state_mod.state, _ctx.shutdown, + kis_client=_ctx.kis_client, + ) + ) + + yield + + # Shutdown + if _ctx.shutdown is not None: + _ctx.shutdown.set() + if _ctx.poll_task is not None: + try: + await asyncio.wait_for(_ctx.poll_task, timeout=5.0) + except asyncio.TimeoutError: + _ctx.poll_task.cancel() + try: + await _ctx.poll_task + except asyncio.CancelledError: + pass + if _ctx.kis_ws is not None: + await _ctx.kis_ws.close() + if _ctx.kis_client is not None: + await _ctx.kis_client.close() + if _ctx.client is not None: + await _ctx.client.close() +``` + +- [ ] **Step 4: Run tests to verify PASS** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +python -m pytest signal_v2/tests/test_main.py -v 2>&1 | tail -15 +``` +Expected: 3 passed (2 기존 + 1 신규). + +Full suite: +```bash +python -m pytest signal_v2/tests -q 2>&1 | tail -3 +``` +Expected: 33 passed (32 + 1). + +- [ ] **Step 5: Commit** + +```bash +cd /c/Users/jaeoh/Desktop/workspace/web-ai +git add signal_v2/main.py signal_v2/tests/test_main.py +git commit -m "$(cat <<'EOF' +feat(signal_v2-phase3a): main.py lifespan integrates KIS client + WS + +AppContext extended with kis_client + kis_ws. lifespan: +- If KIS_APP_KEY set: create KISClient + KISWebSocket, fetch portfolio, + subscribe WebSocket H0STASP0 for holdings. +- If unset: WARNING log, signal_v2 still serves /health (no KIS data). +- Shutdown closes kis_ws → kis_client → stock client in order. + +33 tests pass. + +Co-Authored-By: Claude Opus 4.7 (1M context) +EOF +)" +``` + +--- + +### Task 7: 사용자 수동 — .env 갱신 + manual smoke + push + +**This task requires user action.** + +- [ ] **Step 1: Update `.env`** + +User adds to `C:\Users\jaeoh\Desktop\workspace\web-ai\.env`: + +``` +# KIS API (Phase 3a, 2026-05-16) +KIS_APP_KEY= +KIS_APP_SECRET= +KIS_ACCOUNT=<8자리-2자리 형식, e.g. 50000000-01> +KIS_IS_VIRTUAL=true +# V1 token path (default: ../signal_v1/data/kis_token.json) +# V1_TOKEN_PATH= +``` + +User reference: `web-ai/signal_v1/KIS_SETUP.md` 의 정확한 키 발급 + 계좌 번호 형식. + +- [ ] **Step 2: V1 봇 정상 가동 확인 (토큰 발급 책임)** + +```powershell +cd C:\Users\jaeoh\Desktop\workspace\web-ai +.\start.bat +``` +콘솔 로그에 KIS auth 성공 + `signal_v1/data/kis_token.json` 갱신 확인. + +V1 봇은 계속 실행 또는 토큰 발급 후 종료. signal_v2 가 토큰 파일 read 만. + +- [ ] **Step 3: signal_v2 시작** + +```powershell +cd C:\Users\jaeoh\Desktop\workspace\web-ai\signal_v2 +.\start.bat +``` +기대 출력 (수십 줄): +- `Uvicorn running on http://0.0.0.0:8001` +- `poll_loop started` +- `KIS WebSocket connected` (V1 토큰 로딩 성공 시) + +만약 `V1 token file missing` 에러 → V1 토큰 파일 경로 확인 + V1 봇 가동 후 재시도. + +- [ ] **Step 4: /health smoke** + +```powershell +curl http://localhost:8001/health +``` +기대: 200 JSON. 처음에는 `last_poll` 의 minute_bars / asking_price 없음 — 첫 cycle 까지 대기. + +- [ ] **Step 5: 분봉 + 호가 검증 (장중 또는 NXT 시간대)** + +장중 / NXT 시간대 진입 후 60-300초 대기 → 다시 `/health` 호출: +```powershell +curl http://localhost:8001/health +``` +기대 응답의 `last_poll`: +- `minute_bars/005930` 등 ticker별 timestamp 표시 +- `asking_price/` 도 표시 + +장외 시간대라면: +- KIS REST 호출은 가능하나 분봉 데이터는 직전 거래일 마지막 분봉 반환 +- WebSocket 호가는 영업 시간 외 데이터 안 옴 + +- [ ] **Step 6: V1 무영향 검증** + +V1 봇 콘솔에서 정상 동작 + Telegram /status 응답 확인. V2 가 토큰 파일 read 만 했으므로 V1 무영향 검증. + +- [ ] **Step 7: git push** + +```powershell +cd C:\Users\jaeoh\Desktop\workspace\web-ai +git push +``` +Gitea 자격증명 시 사용자 수동. + +- [ ] **Step 8: 결과 보고** + +- Step 3 (signal_v2 시작): PASS / FAIL — 에러 메시지 공유 +- Step 4 (/health): PASS / FAIL +- Step 5 (분봉 + 호가): PASS (state 갱신 확인) / FAIL — KIS 응답 또는 parse 이슈 공유 +- Step 6 (V1 무영향): PASS / FAIL +- Step 7 (push): PASS / FAIL + +만약 Step 5 의 호가 parse 가 fail → `_parse_asking_price` 의 필드 인덱스 문제. KIS raw 메시지 sample 캡처 후 인덱스 조정 (별도 fix slice). + +전체 PASS 시 Phase 3a 완료 → Phase 3b (Chronos-2 추론 + 분봉 모멘텀) brainstorming. + +--- + +## Self-Review + +**1. Spec coverage:** + +| Spec § | 요구사항 | Plan task | +|--------|----------|----------| +| §2 포함 ① KIS REST client | Task 3 ✅ | +| §2 포함 ② KIS WebSocket | Task 4 ✅ | +| §2 포함 ③ pull_worker 확장 | Task 5 ✅ | +| §2 포함 ④ PollState 확장 | Task 1 ✅ | +| §2 포함 ⑤ scheduler NXT | Task 2 ✅ | +| §2 포함 ⑥ 13 테스트 | Task 2 (3) + Task 3 (4) + Task 4 (4) + Task 5 (2) = 13 ✅ | +| §3 변경 매트릭스 14 파일 | Task 1-7 합산 ✅ | +| §4 KISClient interface | Task 3 Step 3 ✅ | +| §5 KISWebSocket interface | Task 4 Step 3 ✅ | +| §6 PollState 확장 + pull_worker | Task 1 + Task 5 ✅ | +| §7 scheduler NXT | Task 2 ✅ | +| §8 13 테스트 케이스 | Task 2-5 ✅ | +| §11 DoD 13 항목 | Task 1-7 ✅ | + +No gaps. + +**2. Placeholder scan:** No "TBD". KIS WebSocket `_parse_asking_price` 의 필드 인덱스는 "운영 검증 후 조정" 으로 명시 — implementer 가 운영 sample 으로 확정. 이는 placeholder 가 아니라 운영 환경 의존 파라미터. + +**3. Type consistency:** +- `KISClient.__init__(app_key, app_secret, account, is_virtual, v1_token_path, timeout=10.0)` consistent ✅ +- `KISClient.get_minute_ohlcv(ticker) -> list[dict]` / `get_asking_price(ticker) -> dict` / `close()` consistent ✅ +- `KISWebSocket.__init__(app_key, app_secret, is_virtual)` consistent ✅ +- `KISWebSocket.start(tickers, on_asking_price)` / `subscribe(ticker)` / `unsubscribe(ticker)` / `close()` consistent ✅ +- `PollState` 필드 (minute_bars + asking_price) consistent across Task 1 / 5 / 6 ✅ +- `_next_interval(now) -> float` / `_is_market_day(now) -> bool` / `_is_polling_window(now) -> bool` consistent ✅ +- env var names (`KIS_APP_KEY` / `KIS_APP_SECRET` / `KIS_ACCOUNT` / `KIS_IS_VIRTUAL` / `V1_TOKEN_PATH`) consistent ✅ + +Plan passes self-review.