Files
web-page/docs/superpowers/plans/2026-05-16-signal-v2-phase3a-kis-data-collection.md
gahusb 78b77e2691 docs(signal-v2): Phase 3a implementation plan — 7 tasks TDD
Task 1: config + state + websockets dep
Task 2: scheduler NXT windows + 3 tests
Task 3: kis_client REST + 4 tests
Task 4: kis_websocket + 4 tests (most heavy)
Task 5: pull_worker minute cycle + 2 tests
Task 6: main.py KIS lifespan + 1 test
Task 7: user manual .env + smoke + push

13 new tests, total 32 signal_v2 tests. ~1 week.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 05:00:43 +09:00

1536 lines
53 KiB
Markdown

# Signal V2 Phase 3a — KIS Data Collection Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** signal_v2 에 KIS REST client (분봉 + 호가 polling, V1 토큰 read-only 공유) + KIS WebSocket client (approval_key + portfolio 호가 실시간 구독) + scheduler NXT 시간대 확장 + PollState 의 minute_bars/asking_price 통합. Phase 3b (Chronos-2 추론) 시작점.
**Architecture:** V1 `signal_v1/data/kis_token.json` 의 access_token 을 read-only 공유. WebSocket 의 approval_key 는 V2 자체 발급. portfolio 보유 종목 (~11) 만 WebSocket 호가 구독, screener Top-N 은 REST 1분 polling. NXT 시간대 (20:00-23:30 / 04:30-07:00) 도 scheduler 가 5분 cron 으로 polling 활성화.
**Tech Stack:** httpx async / websockets / pytest-asyncio + respx / sqlite3 / Python 3.11+ / KIS OpenAPI
**Spec:** `web-ui/docs/superpowers/specs/2026-05-16-signal-v2-phase3a-kis-data-collection.md`
**참고 V1 코드** (구현 시 참조 권장):
- `web-ai/signal_v1/modules/services/kis.py` — V1 KIS REST client (TR_ID, 응답 키, 토큰 파일 형식)
- `web-ai/signal_v1/KIS_SETUP.md` — KIS 인증 설정 가이드
---
## 파일 구조
| 파일 | 책임 |
|------|------|
| `web-ai/signal_v2/config.py` | (수정) KIS env 5개 + V1_TOKEN_PATH 추가 |
| `web-ai/signal_v2/state.py` | (수정) `PollState``minute_bars: dict[str, deque]` + `asking_price: dict[str, dict]` |
| `web-ai/signal_v2/scheduler.py` | (수정) NXT 시간대 (20:00-23:30 / 04:30-07:00) 5분 cron 분기 |
| `web-ai/signal_v2/kis_client.py` | (신규) REST KISClient — 분봉 + 호가 + V1 토큰 read |
| `web-ai/signal_v2/kis_websocket.py` | (신규) WebSocket KISWebSocket — approval_key + portfolio 호가 실시간 |
| `web-ai/signal_v2/pull_worker.py` | (수정) 분봉 cycle + WebSocket subscription 동기화 |
| `web-ai/signal_v2/main.py` | (수정) lifespan 에 KISClient/KISWebSocket startup/shutdown |
| `web-ai/signal_v2/requirements.txt` | (수정) `websockets>=12` 추가 |
| `web-ai/signal_v2/tests/test_scheduler.py` | (수정) NXT 3 케이스 추가 |
| `web-ai/signal_v2/tests/test_kis_client.py` | (신규) 4 케이스 |
| `web-ai/signal_v2/tests/test_kis_websocket.py` | (신규) 4 케이스 |
| `web-ai/signal_v2/tests/test_pull_worker.py` | (신규) 2 케이스 |
| `web-ai/signal_v2/tests/test_main.py` | (수정) KIS lifespan 1 케이스 추가 |
| `web-ai/.env` | (수정, 사용자 Task 7) KIS env 5 + V1_TOKEN_PATH |
총 14 파일 변경, 13 신규 테스트.
---
## Task 순서
```
Task 1: config + state + requirements (foundation 확장)
Task 2: scheduler NXT 시간대 + 3 테스트 (TDD)
Task 3: kis_client.py REST + 4 통합 테스트 (TDD)
Task 4: kis_websocket.py WebSocket + 4 통합 테스트 (TDD)
Task 5: pull_worker 확장 + 2 통합 테스트 (TDD)
Task 6: main.py KIS lifespan 통합 + 1 케이스
Task 7: 사용자 수동 — .env 갱신 + manual smoke + push
```
---
### Task 1: config + state + requirements 확장
**Files:**
- Modify: `web-ai/signal_v2/config.py`
- Modify: `web-ai/signal_v2/state.py`
- Modify: `web-ai/signal_v2/requirements.txt`
- [ ] **Step 1: Update config.py with KIS env + V1_TOKEN_PATH**
Use Read tool to inspect current `web-ai/signal_v2/config.py`, then Edit to add new fields. Final state:
```python
"""Signal V2 환경변수 로딩."""
import os
from dataclasses import dataclass, field
from pathlib import Path
from dotenv import load_dotenv
load_dotenv(Path(__file__).parent.parent / ".env")
@dataclass(frozen=True)
class Settings:
stock_api_url: str = field(
default_factory=lambda: os.getenv("STOCK_API_URL", "").rstrip("/")
)
webai_api_key: str = field(
default_factory=lambda: os.getenv("WEBAI_API_KEY", "").strip()
)
port: int = field(default_factory=lambda: int(os.getenv("SIGNAL_V2_PORT", "8001")))
db_path: Path = field(
default_factory=lambda: Path(__file__).parent / "data" / "signal_v2.db"
)
# Phase 3a additions
kis_app_key: str = field(default_factory=lambda: os.getenv("KIS_APP_KEY", "").strip())
kis_app_secret: str = field(default_factory=lambda: os.getenv("KIS_APP_SECRET", "").strip())
kis_account: str = field(default_factory=lambda: os.getenv("KIS_ACCOUNT", "").strip())
kis_is_virtual: bool = field(
default_factory=lambda: os.getenv("KIS_IS_VIRTUAL", "true").lower() == "true"
)
v1_token_path: Path = field(
default_factory=lambda: Path(
os.getenv("V1_TOKEN_PATH",
str(Path(__file__).parent.parent / "signal_v1" / "data" / "kis_token.json"))
)
)
def get_settings() -> Settings:
return Settings()
```
- [ ] **Step 2: Update state.py with minute_bars + asking_price fields**
Edit `web-ai/signal_v2/state.py`:
```python
"""PollState — process-wide singleton."""
from collections import deque
from dataclasses import dataclass, field
@dataclass
class PollState:
portfolio: dict | None = None
news_sentiment: dict | None = None
screener_preview: dict | None = None
# Phase 3a additions
minute_bars: dict[str, deque] = field(default_factory=dict) # {ticker: deque(maxlen=60)}
asking_price: dict[str, dict] = field(default_factory=dict) # {ticker: {bid_total, ask_total, bid_ratio, ...}}
last_updated: dict[str, str] = field(default_factory=dict)
fetch_errors: dict[str, int] = field(default_factory=dict)
state = PollState()
```
- [ ] **Step 3: Add websockets to requirements.txt**
Read current `web-ai/requirements.txt`. Append (if not present):
```
websockets>=12
```
- [ ] **Step 4: pip install + smoke import**
```bash
cd /c/Users/jaeoh/Desktop/workspace/web-ai
pip install -r requirements.txt 2>&1 | tail -5
python -c "from signal_v2.config import get_settings; from signal_v2.state import state; print(get_settings().kis_is_virtual); print(state)"
```
Expected: `pip install` 성공 + `get_settings().kis_is_virtual` 출력 + state 출력 (minute_bars / asking_price 빈 dict).
- [ ] **Step 5: Run existing tests — no regression**
```bash
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests -q 2>&1 | tail -3
```
Expected: 19 passed (기존 Phase 2 테스트 그대로).
- [ ] **Step 6: Commit**
```bash
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/config.py signal_v2/state.py requirements.txt
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3a): config + state extensions for KIS data
- config.py: KIS_APP_KEY/SECRET/ACCOUNT/IS_VIRTUAL + V1_TOKEN_PATH env
- state.py: PollState extended with minute_bars (deque maxlen 60) and
asking_price (per-ticker dict)
- requirements.txt: websockets>=12 (KIS WebSocket client)
19 existing tests still pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
```
---
### Task 2: Scheduler NXT 시간대 + 3 테스트
**Files:**
- Modify: `web-ai/signal_v2/scheduler.py`
- Modify: `web-ai/signal_v2/tests/test_scheduler.py`
- [ ] **Step 1: Write 3 failing tests for NXT windows**
Append to `web-ai/signal_v2/tests/test_scheduler.py`:
```python
def test_next_interval_nxt_evening_5min():
"""22:00 평일 (NXT 야간) → 300 (5분)."""
now = _kst(2026, 5, 18, 22, 0)
assert _next_interval(now) == 300
def test_next_interval_nxt_dawn_5min():
"""05:30 평일 (NXT 새벽) → 300 (5분)."""
now = _kst(2026, 5, 18, 5, 30)
assert _next_interval(now) == 300
def test_next_interval_dead_zone_skip():
"""02:00 평일 (dead zone) → 다음 04:30 까지 (~9000s)."""
now = _kst(2026, 5, 18, 2, 0)
interval = _next_interval(now)
# 02:00 → 04:30 = 2.5h = 9000s
assert 9000 - 60 < interval < 9000 + 60
```
- [ ] **Step 2: Run tests to verify FAIL**
```bash
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_scheduler.py -v -k "nxt or dead_zone" 2>&1 | tail -10
```
Expected: 3 failed (scheduler 현재 22:00 / 05:30 / 02:00 모두 overnight skip 으로 처리).
- [ ] **Step 3: Update scheduler.py**
Edit `web-ai/signal_v2/scheduler.py` — replace the time-window constants block + `_next_interval` + `_is_polling_window` with:
```python
# Market windows (정규장)
_PRE_OPEN = time(7, 0)
_OPEN = time(9, 0)
_CLOSE = time(15, 30)
_POST_END = time(20, 0)
# NXT windows (시간외)
_NXT_PRE_END = time(23, 30)
_NXT_POST_OPEN = time(4, 30)
# 23:30 - 04:30 (dead zone) skip
def _is_market_day(now: datetime) -> bool:
"""평일 + 휴장일 아닌 날."""
if now.weekday() >= 5:
return False
return now.strftime("%Y-%m-%d") not in _HOLIDAYS
def _is_polling_window(now: datetime) -> bool:
"""폴링 윈도우: 07:00-23:30 + 04:30-07:00."""
t = now.time()
return (
(_PRE_OPEN <= t < _NXT_PRE_END)
or (_NXT_POST_OPEN <= t < _PRE_OPEN)
)
def _next_interval(now: datetime) -> float:
if not _is_market_day(now):
return _seconds_until_next_market_open(now)
t = now.time()
if _PRE_OPEN <= t < _OPEN:
return 300.0 # 장전 5분
elif _OPEN <= t < _CLOSE:
return 60.0 # 장중 1분
elif _CLOSE <= t < _POST_END:
return 300.0 # 장후 5분
elif _POST_END <= t < _NXT_PRE_END:
return 300.0 # NXT 야간 5분
elif _NXT_POST_OPEN <= t < _PRE_OPEN:
return 300.0 # NXT 새벽 5분
else:
# Dead zone (23:30-04:30) — wait until 04:30
return _seconds_until_nxt_or_market_open(now)
def _seconds_until_nxt_or_market_open(now: datetime) -> float:
"""다음 04:30 (NXT 새벽 start) 까지 초수."""
candidate = now.replace(hour=4, minute=30, second=0, microsecond=0)
if candidate <= now:
candidate += timedelta(days=1)
for _ in range(14):
if _is_market_day(candidate):
return (candidate - now).total_seconds()
candidate += timedelta(days=1)
logger.warning("could not find next market day within 14 days")
return 86400.0
```
Note: `_seconds_until_next_market_open` (기존) 은 weekend/holiday 의 다음 영업일 07:00 계산 — 그대로 유지. 새로 추가된 `_seconds_until_nxt_or_market_open` 은 dead zone (02:00-04:30) 의 04:30 계산 — 만약 그날이 영업일이면 04:30, 아니면 다음 영업일.
`_seconds_until_next_market_open` 의 hour=7 인지 hour=4 minute=30 인지 검토 — 휴장일/주말 다음 영업일은 **07:00** 으로 유지 (휴장 다음 영업일은 정규 pre-market 부터 시작). NXT 는 영업일에만 운영되므로 weekend skip 후 다음 영업일 07:00 OK.
- [ ] **Step 4: Run tests to verify PASS**
```bash
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_scheduler.py -v 2>&1 | tail -15
```
Expected: 11 passed (기존 8 + 신규 3).
Also full suite:
```bash
python -m pytest signal_v2/tests -q 2>&1 | tail -3
```
Expected: 22 passed (19 + 3).
- [ ] **Step 5: Commit**
```bash
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/scheduler.py signal_v2/tests/test_scheduler.py
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3a): scheduler NXT windows (20:00-23:30 / 04:30-07:00)
NXT 시간외 거래 시간대도 5분 cron 폴링 활성화. 23:30-04:30 사이는
dead zone (KIS 점검 시간) → 04:30 까지 skip.
3 new tests (총 22).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
```
---
### Task 3: KIS REST client + 4 통합 테스트
**Files:**
- Create: `web-ai/signal_v2/kis_client.py`
- Create: `web-ai/signal_v2/tests/test_kis_client.py`
이 task 는 KIS API 의 정확한 TR_ID + 응답 키를 알아야 함. V1 코드 (`web-ai/signal_v1/modules/services/kis.py`) 참조 권장:
- 분봉 TR_ID: `FHKST03010200` (국내주식 당일 분봉) — 응답 `output2` 배열
- 호가 TR_ID: `FHKST01010200` (국내주식 호가) — 응답 `output1` 객체
- 토큰 파일 (`signal_v1/data/kis_token.json`) 형식: `{"access_token": "...", "token_expired": "YYYY-MM-DD HH:MM:SS"}`
- [ ] **Step 1: Write 4 failing tests**
Create `web-ai/signal_v2/tests/test_kis_client.py`:
```python
"""Tests for KISClient (REST)."""
import json
import time
from pathlib import Path
import httpx
import pytest
import respx
from signal_v2.kis_client import KISClient
@pytest.fixture
def fake_v1_token(tmp_path):
"""V1 토큰 파일 fixture."""
token_file = tmp_path / "kis_token.json"
token_file.write_text(json.dumps({
"access_token": "test-kis-token-abc123",
"token_expired": "2099-12-31 23:59:59",
}))
return token_file
@pytest.fixture
def kis_client_factory(fake_v1_token):
def _make():
return KISClient(
app_key="test-app-key",
app_secret="test-app-secret",
account="50000000-01",
is_virtual=True,
v1_token_path=fake_v1_token,
)
return _make
@respx.mock
async def test_get_minute_ohlcv_normal_returns_30_bars(kis_client_factory):
"""정상 200 → 30개 분봉 list 반환."""
sample_output2 = [
{
"stck_bsop_date": "20260518",
"stck_cntg_hour": f"{h:02d}{m:02d}00",
"stck_oprc": "78000", "stck_hgpr": "78500",
"stck_lwpr": "77800", "stck_prpr": "78300",
"cntg_vol": "12345",
}
for h in range(9, 10) for m in range(0, 30) # 9:00-9:29 = 30 bars
]
respx.get(
"https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
).mock(
return_value=httpx.Response(200, json={"output2": sample_output2})
)
client = kis_client_factory()
try:
bars = await client.get_minute_ohlcv("005930")
assert len(bars) == 30
assert bars[0]["close"] == 78300
assert "datetime" in bars[0]
finally:
await client.close()
@respx.mock
async def test_get_minute_ohlcv_429_retry_then_success(kis_client_factory, monkeypatch):
"""429 → exponential backoff → 200."""
sleep_calls = []
async def fake_sleep(s): sleep_calls.append(s)
monkeypatch.setattr("asyncio.sleep", fake_sleep)
respx.get(
"https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
).mock(side_effect=[
httpx.Response(429, text="rate limit"),
httpx.Response(200, json={"output2": []}),
])
client = kis_client_factory()
try:
result = await client.get_minute_ohlcv("005930")
assert result == []
# 1 retry sleep
assert 1 in sleep_calls
finally:
await client.close()
async def test_get_minute_ohlcv_uses_v1_token(kis_client_factory, fake_v1_token):
"""KIS 호출 헤더에 V1 토큰 파일의 access_token 사용."""
captured_headers = {}
@respx.mock
async def _do_request():
route = respx.get(
"https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
).mock(return_value=httpx.Response(200, json={"output2": []}))
client = kis_client_factory()
try:
await client.get_minute_ohlcv("005930")
# respx route captures the request — check
assert route.called
req = route.calls.last.request
captured_headers.update(req.headers)
finally:
await client.close()
await _do_request()
# KIS 의 인증 헤더는 `authorization: Bearer <token>`
assert "authorization" in {k.lower() for k in captured_headers.keys()}
auth_value = captured_headers.get("authorization") or captured_headers.get("Authorization")
assert "test-kis-token-abc123" in auth_value
@respx.mock
async def test_get_asking_price_computes_bid_ratio(kis_client_factory):
"""호가 응답 → bid_total / (bid+ask) bid_ratio 계산."""
# KIS 응답 output1 의 호가 잔량 필드명: total_askp_rsqn (총 매도잔량), total_bidp_rsqn (총 매수잔량)
respx.get(
"https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-asking-price-exp-ccn"
).mock(return_value=httpx.Response(200, json={
"output1": {
"total_bidp_rsqn": "600",
"total_askp_rsqn": "400",
"stck_prpr": "78500",
}
}))
client = kis_client_factory()
try:
data = await client.get_asking_price("005930")
assert data["bid_total"] == 600
assert data["ask_total"] == 400
assert abs(data["bid_ratio"] - 0.6) < 1e-9
assert data["current_price"] == 78500
assert "as_of" in data
finally:
await client.close()
```
- [ ] **Step 2: Run tests to verify FAIL**
```bash
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_kis_client.py -v 2>&1 | tail -10
```
Expected: ImportError (signal_v2.kis_client missing).
- [ ] **Step 3: Implement kis_client.py**
Create `web-ai/signal_v2/kis_client.py`:
```python
"""KIS REST API client — 분봉 + 호가. V1 토큰 read-only 공유."""
from __future__ import annotations
import asyncio
import json
import logging
import time
from datetime import datetime
from pathlib import Path
from zoneinfo import ZoneInfo
import httpx
logger = logging.getLogger(__name__)
KST = ZoneInfo("Asia/Seoul")
_MAX_ATTEMPTS = 3
_THROTTLE_INTERVAL = 0.5 # 초당 2회 제한
class KISClient:
"""KIS REST (분봉 + 호가). V1 토큰 파일 read-only."""
def __init__(
self,
app_key: str, app_secret: str, account: str, is_virtual: bool,
v1_token_path: Path,
timeout: float = 10.0,
):
self._app_key = app_key
self._app_secret = app_secret
self._account = account
self._is_virtual = is_virtual
self._v1_token_path = Path(v1_token_path)
self._base_url = (
"https://openapivts.koreainvestment.com:29443" if is_virtual
else "https://openapi.koreainvestment.com:9443"
)
self._client = httpx.AsyncClient(timeout=timeout)
self._token_cache: tuple[str, float] | None = None # (token, file_mtime)
self._last_throttle_at = 0.0
async def close(self) -> None:
await self._client.aclose()
def _read_v1_token(self) -> str:
if not self._v1_token_path.exists():
raise RuntimeError(f"V1 token file missing: {self._v1_token_path}")
mtime = self._v1_token_path.stat().st_mtime
if self._token_cache and self._token_cache[1] == mtime:
return self._token_cache[0]
data = json.loads(self._v1_token_path.read_text(encoding="utf-8"))
token = data.get("access_token", "")
if not token:
raise RuntimeError("V1 token file has no access_token")
self._token_cache = (token, mtime)
return token
async def _throttle(self) -> None:
elapsed = time.monotonic() - self._last_throttle_at
if elapsed < _THROTTLE_INTERVAL:
await asyncio.sleep(_THROTTLE_INTERVAL - elapsed)
self._last_throttle_at = time.monotonic()
def _common_headers(self, tr_id: str) -> dict[str, str]:
token = self._read_v1_token()
return {
"authorization": f"Bearer {token}",
"appkey": self._app_key,
"appsecret": self._app_secret,
"tr_id": tr_id,
"custtype": "P",
}
async def _request_with_retry(
self, method: str, path: str, tr_id: str, **kwargs,
) -> dict:
url = f"{self._base_url}{path}"
headers = self._common_headers(tr_id)
for attempt in range(_MAX_ATTEMPTS):
await self._throttle()
try:
response = await self._client.request(
method, url, headers=headers, **kwargs
)
if response.status_code == 429:
if attempt < _MAX_ATTEMPTS - 1:
await asyncio.sleep(2**attempt)
continue
response.raise_for_status()
response.raise_for_status()
return response.json()
except httpx.TimeoutException:
if attempt < _MAX_ATTEMPTS - 1:
await asyncio.sleep(2**attempt)
continue
raise
raise RuntimeError("retry exhausted")
async def get_minute_ohlcv(self, ticker: str) -> list[dict]:
"""현재 시점 직전 30개 1분봉 OHLCV (TR_ID FHKST03010200).
Returns: [{datetime, open, high, low, close, volume}, ...] 시간 오름차순.
"""
path = "/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
params = {
"FID_ETC_CLS_CODE": "",
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": ticker,
"FID_INPUT_HOUR_1": datetime.now(KST).strftime("%H%M%S"),
"FID_PW_DATA_INCU_YN": "N",
}
raw = await self._request_with_retry(
"GET", path, tr_id="FHKST03010200", params=params,
)
output2 = raw.get("output2", [])
bars = []
for row in output2:
try:
date = row["stck_bsop_date"]
hhmmss = row["stck_cntg_hour"]
dt = datetime.strptime(f"{date} {hhmmss}", "%Y%m%d %H%M%S").replace(tzinfo=KST)
bars.append({
"datetime": dt.isoformat(),
"open": int(row["stck_oprc"]),
"high": int(row["stck_hgpr"]),
"low": int(row["stck_lwpr"]),
"close": int(row["stck_prpr"]),
"volume": int(row["cntg_vol"]),
})
except (KeyError, ValueError) as e:
logger.warning("skip malformed bar for %s: %r", ticker, e)
# KIS returns descending; reverse to ascending
bars.reverse()
return bars
async def get_asking_price(self, ticker: str) -> dict:
"""현재 호가 + 매수/매도 잔량 (TR_ID FHKST01010200)."""
path = "/uapi/domestic-stock/v1/quotations/inquire-asking-price-exp-ccn"
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": ticker,
}
raw = await self._request_with_retry(
"GET", path, tr_id="FHKST01010200", params=params,
)
output1 = raw.get("output1", {})
bid_total = int(output1.get("total_bidp_rsqn", 0))
ask_total = int(output1.get("total_askp_rsqn", 0))
total = bid_total + ask_total
bid_ratio = bid_total / total if total > 0 else 0.0
current_price = int(output1.get("stck_prpr", 0))
return {
"bid_total": bid_total,
"ask_total": ask_total,
"bid_ratio": bid_ratio,
"current_price": current_price,
"as_of": datetime.now(KST).isoformat(),
}
```
- [ ] **Step 4: Run tests to verify PASS**
```bash
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_kis_client.py -v 2>&1 | tail -15
```
Expected: 4 passed.
Full suite:
```bash
python -m pytest signal_v2/tests -q 2>&1 | tail -3
```
Expected: 26 passed (22 + 4).
If failures relate to KIS TR_ID or field names, refer to V1 `signal_v1/modules/services/kis.py` and adjust.
- [ ] **Step 5: Commit**
```bash
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/kis_client.py signal_v2/tests/test_kis_client.py
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3a): kis_client REST + 4 integration tests
KISClient: 분봉 (FHKST03010200) + 호가 (FHKST01010200) async REST.
V1 토큰 파일 (signal_v1/data/kis_token.json) read-only 공유, mtime
캐시. 초당 2회 throttle. exponential retry.
26 tests pass (Phase 2 19 + Task 2 NXT 3 + Task 3 KIS REST 4).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
```
---
### Task 4: KIS WebSocket client + 4 통합 테스트
**Files:**
- Create: `web-ai/signal_v2/kis_websocket.py`
- Create: `web-ai/signal_v2/tests/test_kis_websocket.py`
이 task 는 가장 무거움. WebSocket mock 은 `websockets` lib 의 `serve` 사용 또는 `AsyncMock`. KIS 호가 메시지 형식은 `|` + `^` 구분.
KIS 호가 메시지 raw 형식 (실제 더 길지만 핵심 필드만):
```
0|H0STASP0|001|005930^091500^78500^...^total_ask^total_bid^...
```
- 0 = 수신 데이터
- H0STASP0 = TR_ID (실시간 호가)
- 001 = 데이터 개수
- 그 다음 `^` 구분 필드: [0]종목코드, [1]시간 HHMMSS, [2]현재가, ..., 매수/매도 잔량 인덱스는 KIS 공식 문서 참조 (KIS_SETUP.md / V1 코드).
- [ ] **Step 1: Write 4 failing tests**
Create `web-ai/signal_v2/tests/test_kis_websocket.py`:
```python
"""Tests for KISWebSocket."""
import asyncio
import json
from unittest.mock import AsyncMock, MagicMock
import httpx
import pytest
import respx
from signal_v2.kis_websocket import KISWebSocket
BASE_REST = "https://openapivts.koreainvestment.com:29443"
@respx.mock
async def test_fetch_approval_key_via_oauth_endpoint():
"""POST /oauth2/Approval → approval_key 추출."""
respx.post(f"{BASE_REST}/oauth2/Approval").mock(
return_value=httpx.Response(200, json={"approval_key": "test-approval-key-xyz"})
)
ws = KISWebSocket(app_key="k", app_secret="s", is_virtual=True)
key = await ws._fetch_approval_key()
assert key == "test-approval-key-xyz"
assert ws._approval_key == "test-approval-key-xyz"
async def test_subscribe_sends_h0stasp0_message(monkeypatch):
"""subscribe() → WebSocket 으로 H0STASP0 구독 메시지 전송."""
sent_messages = []
mock_ws = AsyncMock()
mock_ws.send = AsyncMock(side_effect=lambda m: sent_messages.append(m))
ws = KISWebSocket(app_key="k", app_secret="s", is_virtual=True)
ws._approval_key = "test-key"
ws._ws = mock_ws
await ws.subscribe("005930")
assert ws._subscriptions == {"005930"}
assert len(sent_messages) == 1
# subscribe 메시지는 JSON 구조 (KIS 명세)
msg = json.loads(sent_messages[0])
assert msg["header"]["tr_type"] == "1" # subscribe
assert msg["body"]["input"]["tr_id"] == "H0STASP0"
assert msg["body"]["input"]["tr_key"] == "005930"
def test_parse_asking_price_extracts_bid_ask_totals():
"""KIS raw '0|H0STASP0|001|...' → (ticker, dict)."""
ws = KISWebSocket(app_key="k", app_secret="s", is_virtual=True)
# 모의 호가 메시지 (실제 형식은 더 복잡 — KIS_SETUP.md 참조).
# H0STASP0 의 응답: '0|H0STASP0|001|<data>'
# data: ticker^time^current_price^... (more fields)
# 본 테스트는 implementer 가 KIS 명세 보고 인덱스 맞춰서 구현하도록.
# 우리는 dict 출력 형식만 검증.
sample_raw = (
"0|H0STASP0|001|005930^091500^78500^"
+ "^".join(["0"] * 40) # padding (실제는 50+ 필드)
+ "^1000^2000" # 마지막 두 필드: total_askp_rsqn / total_bidp_rsqn (예시)
)
# 인덱스가 실제 KIS 명세와 정확히 맞을 때 parse 성공
result = ws._parse_asking_price(sample_raw)
if result is None:
# 인덱스 mismatch — implementer 가 KIS 명세 참조해서 fix
pytest.fail("parse_asking_price returned None — check field indices vs KIS spec")
ticker, data = result
assert ticker == "005930"
assert "bid_total" in data
assert "ask_total" in data
assert "bid_ratio" in data
async def test_reconnect_on_disconnect_with_backoff(monkeypatch):
"""연결 끊김 → exponential backoff retry."""
sleep_calls = []
async def fake_sleep(s): sleep_calls.append(s)
monkeypatch.setattr("asyncio.sleep", fake_sleep)
# Implementer: ws 내부에 reconnect logic 구현 시
# _connect_with_backoff() 메서드 또는 내부 reconnect loop 가
# exponential 1s → 2s → 4s sleep 호출하는지 검증.
ws = KISWebSocket(app_key="k", app_secret="s", is_virtual=True)
# Mock _connect to fail twice then succeed
call_count = [0]
async def fake_connect():
call_count[0] += 1
if call_count[0] < 3:
raise ConnectionError("fake disconnect")
return AsyncMock()
monkeypatch.setattr(ws, "_connect", fake_connect, raising=False)
result = await ws._connect_with_backoff()
assert call_count[0] == 3 # 2 fails + 1 success
assert sleep_calls[:2] == [1, 2] # exponential
```
- [ ] **Step 2: Run tests to verify FAIL**
```bash
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_kis_websocket.py -v 2>&1 | tail -10
```
Expected: ImportError.
- [ ] **Step 3: Implement kis_websocket.py**
Create `web-ai/signal_v2/kis_websocket.py`:
```python
"""KIS WebSocket — approval_key + 실시간 호가 구독."""
from __future__ import annotations
import asyncio
import json
import logging
from datetime import datetime
from typing import Callable
from zoneinfo import ZoneInfo
import httpx
import websockets
logger = logging.getLogger(__name__)
KST = ZoneInfo("Asia/Seoul")
# KIS 호가 메시지의 필드 인덱스 (KIS 공식 spec 참조 — 실제 인덱스 운영 환경 확인 필요)
# H0STASP0 응답: ticker | time | current_price | ... | total_ask_rsqn | total_bid_rsqn | ...
# 실 운영 환경에서 정확한 인덱스 필드 확인 필요 (KIS docs / V1 활용 미 가능)
_ASKING_TICKER_IDX = 0
_ASKING_TIME_IDX = 1
_ASKING_CURRENT_PRICE_IDX = 2
_ASKING_TOTAL_ASK_IDX = -2 # 끝에서 두번째 (가정)
_ASKING_TOTAL_BID_IDX = -1 # 마지막 (가정)
class KISWebSocket:
"""KIS WebSocket client. approval_key 발급 + 호가 실시간."""
def __init__(self, app_key: str, app_secret: str, is_virtual: bool):
self._app_key = app_key
self._app_secret = app_secret
self._is_virtual = is_virtual
self._base_rest = (
"https://openapivts.koreainvestment.com:29443" if is_virtual
else "https://openapi.koreainvestment.com:9443"
)
self._ws_url = (
"ws://ops.koreainvestment.com:31000" if is_virtual
else "ws://ops.koreainvestment.com:21000"
)
self._approval_key: str | None = None
self._ws: websockets.WebSocketClientProtocol | None = None
self._subscriptions: set[str] = set()
self._on_asking_price: Callable[[str, dict], None] | None = None
self._recv_task: asyncio.Task | None = None
self._shutdown = asyncio.Event()
async def _fetch_approval_key(self) -> str:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
f"{self._base_rest}/oauth2/Approval",
json={
"grant_type": "client_credentials",
"appkey": self._app_key,
"secretkey": self._app_secret,
},
)
response.raise_for_status()
data = response.json()
self._approval_key = data["approval_key"]
return self._approval_key
async def _connect(self) -> websockets.WebSocketClientProtocol:
return await websockets.connect(self._ws_url)
async def _connect_with_backoff(self) -> websockets.WebSocketClientProtocol:
"""연결 시도 with exponential backoff (1s → 2s → 4s → max 30s)."""
for attempt in range(10):
try:
ws = await self._connect()
return ws
except Exception as e:
wait = min(2**attempt, 30)
logger.warning("KIS WebSocket connect failed (attempt %d): %r — retrying in %ds", attempt + 1, e, wait)
await asyncio.sleep(wait)
raise RuntimeError("KIS WebSocket connect exhausted retries")
async def start(
self, tickers: list[str],
on_asking_price: Callable[[str, dict], None],
) -> None:
if self._approval_key is None:
await self._fetch_approval_key()
self._on_asking_price = on_asking_price
self._ws = await self._connect_with_backoff()
for ticker in tickers:
await self.subscribe(ticker)
self._recv_task = asyncio.create_task(self._receive_loop())
async def subscribe(self, ticker: str) -> None:
if self._ws is None or self._approval_key is None:
raise RuntimeError("KIS WebSocket not started")
msg = json.dumps({
"header": {
"approval_key": self._approval_key,
"custtype": "P",
"tr_type": "1", # subscribe
"content-type": "utf-8",
},
"body": {
"input": {"tr_id": "H0STASP0", "tr_key": ticker},
},
})
await self._ws.send(msg)
self._subscriptions.add(ticker)
async def unsubscribe(self, ticker: str) -> None:
if self._ws is None or self._approval_key is None:
return
msg = json.dumps({
"header": {
"approval_key": self._approval_key,
"custtype": "P",
"tr_type": "2", # unsubscribe
"content-type": "utf-8",
},
"body": {
"input": {"tr_id": "H0STASP0", "tr_key": ticker},
},
})
await self._ws.send(msg)
self._subscriptions.discard(ticker)
async def close(self) -> None:
self._shutdown.set()
if self._recv_task is not None:
self._recv_task.cancel()
try:
await self._recv_task
except asyncio.CancelledError:
pass
if self._ws is not None:
await self._ws.close()
async def _receive_loop(self) -> None:
while not self._shutdown.is_set():
try:
raw = await self._ws.recv()
except websockets.ConnectionClosed:
logger.warning("KIS WebSocket closed — reconnecting")
self._ws = await self._connect_with_backoff()
for ticker in list(self._subscriptions):
await self.subscribe(ticker)
continue
if not isinstance(raw, str):
continue
parsed = self._parse_asking_price(raw)
if parsed is not None and self._on_asking_price is not None:
ticker, data = parsed
try:
self._on_asking_price(ticker, data)
except Exception:
logger.exception("on_asking_price callback failed")
def _parse_asking_price(self, raw: str) -> tuple[str, dict] | None:
"""KIS H0STASP0 raw → (ticker, asking_price dict)."""
try:
parts = raw.split("|")
if len(parts) < 4 or parts[1] != "H0STASP0":
return None
fields = parts[3].split("^")
ticker = fields[_ASKING_TICKER_IDX]
current_price = int(fields[_ASKING_CURRENT_PRICE_IDX]) if fields[_ASKING_CURRENT_PRICE_IDX].isdigit() else 0
ask_total = int(fields[_ASKING_TOTAL_ASK_IDX]) if fields[_ASKING_TOTAL_ASK_IDX].lstrip("-").isdigit() else 0
bid_total = int(fields[_ASKING_TOTAL_BID_IDX]) if fields[_ASKING_TOTAL_BID_IDX].lstrip("-").isdigit() else 0
total = bid_total + ask_total
return ticker, {
"bid_total": bid_total,
"ask_total": ask_total,
"bid_ratio": bid_total / total if total > 0 else 0.0,
"current_price": current_price,
"as_of": datetime.now(KST).isoformat(),
}
except (IndexError, ValueError) as e:
logger.warning("parse_asking_price failed: %r", e)
return None
```
**중요 노트**: 위 `_ASKING_*_IDX` 인덱스는 **추정값** 입니다. 운영 환경에서 정확한 인덱스 확인 필요:
- KIS 공식 문서 (`https://apiportal.koreainvestment.com/`)
- 또는 운영 시 raw 메시지 로그 캡처 후 필드 인덱스 매핑
본 plan 의 테스트는 인덱스가 -2/-1 가정. 실 KIS 호가에서 필드 50+ 개라 정확한 인덱스 운영 검증 필요. parse fail 시 WARNING 로그 + skip (위험 매트릭스 §9 명시).
- [ ] **Step 4: Run tests to verify PASS**
```bash
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_kis_websocket.py -v 2>&1 | tail -15
```
Expected: 4 passed.
만약 `test_parse_asking_price_extracts_bid_ask_totals` 가 실패하면 — 인덱스 가정과 sample raw 의 padding 수 mismatch. 두 가지 옵션:
- a) sample raw 의 `["0"]*40` 부분을 인덱스에 맞춰 조정 (테스트만 fix)
- b) `_parse_asking_price` 의 인덱스를 명세 확인 후 조정 (코드 fix)
선택은 implementer 의 판단. 양쪽 모두 가능.
Full suite:
```bash
python -m pytest signal_v2/tests -q 2>&1 | tail -3
```
Expected: 30 passed (26 + 4).
- [ ] **Step 5: Commit**
```bash
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/kis_websocket.py signal_v2/tests/test_kis_websocket.py
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3a): kis_websocket + 4 integration tests
KISWebSocket: approval_key (POST /oauth2/Approval) + H0STASP0 호가
실시간 subscribe + receive loop with reconnect. exponential backoff
(1s→2s→4s→max 30s) + subscription 재등록.
_parse_asking_price 의 필드 인덱스는 운영 검증 후 조정 필요
(KIS 공식 명세 또는 raw 로그 매핑).
30 tests pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
```
---
### Task 5: pull_worker 확장 + 2 통합 테스트
**Files:**
- Modify: `web-ai/signal_v2/pull_worker.py`
- Create: `web-ai/signal_v2/tests/test_pull_worker.py`
- [ ] **Step 1: Write 2 failing tests**
Create `web-ai/signal_v2/tests/test_pull_worker.py`:
```python
"""Tests for pull_worker (Phase 3a additions)."""
from collections import deque
from unittest.mock import AsyncMock, MagicMock
import pytest
from signal_v2.state import PollState
async def test_minute_polling_cycle_updates_state_minute_bars():
"""KIS REST mock 의 분봉 데이터가 state.minute_bars[ticker] deque 에 들어간다."""
from signal_v2.pull_worker import _run_kis_minute_cycle
state = PollState()
state.portfolio = {"holdings": [{"ticker": "005930"}, {"ticker": "000660"}]}
state.screener_preview = {
"items": [{"ticker": "005930"}, {"ticker": "035720"}] # 005930 dup
}
kis_client_mock = MagicMock()
kis_client_mock.get_minute_ohlcv = AsyncMock(side_effect=[
[{"datetime": "2026-05-18T09:00:00+09:00", "open": 78000,
"high": 78500, "low": 77900, "close": 78300, "volume": 12345}],
[{"datetime": "2026-05-18T09:00:00+09:00", "open": 180000,
"high": 181000, "low": 179800, "close": 180500, "volume": 5000}],
[{"datetime": "2026-05-18T09:00:00+09:00", "open": 51000,
"high": 51200, "low": 50800, "close": 51100, "volume": 8000}],
])
kis_client_mock.get_asking_price = AsyncMock(return_value={
"bid_total": 600, "ask_total": 400, "bid_ratio": 0.6,
"current_price": 51100, "as_of": "2026-05-18T09:00:30+09:00",
})
await _run_kis_minute_cycle(kis_client_mock, state)
# 3 unique tickers (005930, 000660, 035720)
assert "005930" in state.minute_bars
assert "000660" in state.minute_bars
assert "035720" in state.minute_bars
assert len(state.minute_bars["005930"]) >= 1
# asking price 만 screener-only ticker (035720) 에 들어가야 함 (portfolio 는 WebSocket)
assert "035720" in state.asking_price
def test_websocket_message_updates_state_asking_price():
"""WebSocket callback 가 state.asking_price 갱신."""
from signal_v2.pull_worker import make_asking_price_callback
state = PollState()
cb = make_asking_price_callback(state)
cb("005930", {"bid_total": 1000, "ask_total": 800, "bid_ratio": 0.555,
"current_price": 78500, "as_of": "2026-05-18T10:00:00+09:00"})
assert state.asking_price["005930"]["bid_total"] == 1000
assert "asking_price/005930" in state.last_updated
```
- [ ] **Step 2: Run tests to verify FAIL**
```bash
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_pull_worker.py -v 2>&1 | tail -10
```
Expected: ImportError or AttributeError.
- [ ] **Step 3: Implement pull_worker.py additions**
Edit `web-ai/signal_v2/pull_worker.py` — keep existing `poll_loop` and `_run_polling_cycle`, then ADD:
```python
from collections import deque
from signal_v2.kis_client import KISClient # 추가 import
async def _run_kis_minute_cycle(kis_client: KISClient, state: PollState) -> None:
"""KIS 분봉 + 호가 fetch + state 갱신.
- 분봉: portfolio + screener Top-N union 종목 모두
- 호가 (REST): screener-only 종목 (portfolio 는 WebSocket 으로 들어옴)
"""
portfolio_tickers = _portfolio_tickers(state)
screener_tickers = _screener_tickers(state)
all_tickers = list(set(portfolio_tickers) | set(screener_tickers))
# 분봉 fetch (병렬)
minute_results = await asyncio.gather(*[
kis_client.get_minute_ohlcv(t) for t in all_tickers
], return_exceptions=True)
now_iso = datetime.now(KST).isoformat()
for ticker, result in zip(all_tickers, minute_results):
if isinstance(result, list):
buf = state.minute_bars.setdefault(ticker, deque(maxlen=60))
buf.extend(result)
state.last_updated[f"minute_bars/{ticker}"] = now_iso
else:
state.fetch_errors[f"minute_bars/{ticker}"] = (
state.fetch_errors.get(f"minute_bars/{ticker}", 0) + 1
)
# 호가 fetch (REST) — screener-only
screener_only = list(set(screener_tickers) - set(portfolio_tickers))
asking_results = await asyncio.gather(*[
kis_client.get_asking_price(t) for t in screener_only
], return_exceptions=True)
for ticker, result in zip(screener_only, asking_results):
if isinstance(result, dict):
state.asking_price[ticker] = result
state.last_updated[f"asking_price/{ticker}"] = now_iso
def make_asking_price_callback(state: PollState):
"""KIS WebSocket on_asking_price callback factory."""
def _cb(ticker: str, data: dict) -> None:
state.asking_price[ticker] = data
state.last_updated[f"asking_price/{ticker}"] = datetime.now(KST).isoformat()
return _cb
def _portfolio_tickers(state: PollState) -> list[str]:
if state.portfolio is None:
return []
return [h["ticker"] for h in state.portfolio.get("holdings", []) if "ticker" in h]
def _screener_tickers(state: PollState) -> list[str]:
if state.screener_preview is None:
return []
return [i["ticker"] for i in state.screener_preview.get("items", []) if "ticker" in i]
```
Also ADD to `_run_polling_cycle` signature an optional `kis_client` param:
```python
async def _run_polling_cycle(
client: StockClient, state: PollState,
kis_client: KISClient | None = None,
) -> None:
# ... existing 3-endpoint stock fetch ...
# KIS 분봉 + 호가 (kis_client 주어졌을 때만)
if kis_client is not None:
try:
await _run_kis_minute_cycle(kis_client, state)
except Exception:
logger.exception("kis minute cycle failed")
```
And update `poll_loop` signature to accept `kis_client`:
```python
async def poll_loop(
client: StockClient, state: PollState, shutdown: asyncio.Event,
kis_client: KISClient | None = None,
) -> None:
# ... existing while loop ...
# call _run_polling_cycle(client, state, kis_client=kis_client)
```
- [ ] **Step 4: Run tests to verify PASS**
```bash
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_pull_worker.py -v 2>&1 | tail -10
```
Expected: 2 passed.
Full suite:
```bash
python -m pytest signal_v2/tests -q 2>&1 | tail -3
```
Expected: 32 passed (30 + 2).
- [ ] **Step 5: Commit**
```bash
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/pull_worker.py signal_v2/tests/test_pull_worker.py
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3a): pull_worker KIS minute cycle + WS callback
_run_kis_minute_cycle: portfolio + screener union 종목 분봉 fetch +
screener-only 종목 호가 REST fetch. WebSocket callback factory.
poll_loop / _run_polling_cycle 에 kis_client optional param 추가
(Phase 5 까지 None 도 정상 동작).
32 tests pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
```
---
### Task 6: main.py KIS lifespan 통합 + 1 케이스
**Files:**
- Modify: `web-ai/signal_v2/main.py`
- Modify: `web-ai/signal_v2/tests/test_main.py`
- [ ] **Step 1: Write 1 failing test for KIS lifespan**
Append to `web-ai/signal_v2/tests/test_main.py`:
```python
def test_startup_warns_if_kis_app_key_missing(monkeypatch, caplog):
"""KIS_APP_KEY 미설정 시 startup WARNING (KIS 호출 disabled)."""
monkeypatch.setenv("STOCK_API_URL", "https://test.stock.local")
monkeypatch.setenv("WEBAI_API_KEY", "test-secret")
monkeypatch.delenv("KIS_APP_KEY", raising=False)
import importlib
from signal_v2 import config as cfg
importlib.reload(cfg)
from signal_v2 import main as main_mod
importlib.reload(main_mod)
with caplog.at_level("WARNING", logger="signal_v2.main"):
with TestClient(main_mod.app) as client:
client.get("/health")
assert any("KIS_APP_KEY" in rec.message for rec in caplog.records)
```
- [ ] **Step 2: Run test to verify FAIL**
```bash
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_main.py::test_startup_warns_if_kis_app_key_missing -v 2>&1 | tail -10
```
Expected: FAIL (no such warning yet).
- [ ] **Step 3: Update main.py to integrate KIS clients**
Edit `web-ai/signal_v2/main.py` — augment `AppContext` + `lifespan`:
```python
from signal_v2.kis_client import KISClient
from signal_v2.kis_websocket import KISWebSocket
from signal_v2.pull_worker import make_asking_price_callback
class AppContext:
client: StockClient | None = None
dedup: SignalDedup | None = None
shutdown: asyncio.Event | None = None
poll_task: asyncio.Task | None = None
kis_client: KISClient | None = None
kis_ws: KISWebSocket | None = None
# (keep _ctx = AppContext() at module level)
@asynccontextmanager
async def lifespan(app: FastAPI):
settings = get_settings()
if not settings.webai_api_key:
logger.warning("WEBAI_API_KEY not configured — stock API calls will fail with 401")
if not settings.kis_app_key:
logger.warning("KIS_APP_KEY not configured — KIS REST/WebSocket disabled")
_ctx.client = StockClient(settings.stock_api_url, settings.webai_api_key)
_ctx.dedup = SignalDedup(settings.db_path)
_ctx.shutdown = asyncio.Event()
# KIS only if app_key configured
if settings.kis_app_key:
_ctx.kis_client = KISClient(
app_key=settings.kis_app_key,
app_secret=settings.kis_app_secret,
account=settings.kis_account,
is_virtual=settings.kis_is_virtual,
v1_token_path=settings.v1_token_path,
)
_ctx.kis_ws = KISWebSocket(
app_key=settings.kis_app_key,
app_secret=settings.kis_app_secret,
is_virtual=settings.kis_is_virtual,
)
# Subscribe portfolio holdings (if any)
try:
portfolio = await _ctx.client.get_portfolio()
tickers = [h["ticker"] for h in portfolio.get("holdings", []) if "ticker" in h]
cb = make_asking_price_callback(state_mod.state)
await _ctx.kis_ws.start(tickers, cb)
except Exception:
logger.exception("KIS WebSocket startup failed — continuing without realtime asking_price")
_ctx.poll_task = asyncio.create_task(
poll_loop(
_ctx.client, state_mod.state, _ctx.shutdown,
kis_client=_ctx.kis_client,
)
)
yield
# Shutdown
if _ctx.shutdown is not None:
_ctx.shutdown.set()
if _ctx.poll_task is not None:
try:
await asyncio.wait_for(_ctx.poll_task, timeout=5.0)
except asyncio.TimeoutError:
_ctx.poll_task.cancel()
try:
await _ctx.poll_task
except asyncio.CancelledError:
pass
if _ctx.kis_ws is not None:
await _ctx.kis_ws.close()
if _ctx.kis_client is not None:
await _ctx.kis_client.close()
if _ctx.client is not None:
await _ctx.client.close()
```
- [ ] **Step 4: Run tests to verify PASS**
```bash
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_main.py -v 2>&1 | tail -15
```
Expected: 3 passed (2 기존 + 1 신규).
Full suite:
```bash
python -m pytest signal_v2/tests -q 2>&1 | tail -3
```
Expected: 33 passed (32 + 1).
- [ ] **Step 5: Commit**
```bash
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/main.py signal_v2/tests/test_main.py
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3a): main.py lifespan integrates KIS client + WS
AppContext extended with kis_client + kis_ws. lifespan:
- If KIS_APP_KEY set: create KISClient + KISWebSocket, fetch portfolio,
subscribe WebSocket H0STASP0 for holdings.
- If unset: WARNING log, signal_v2 still serves /health (no KIS data).
- Shutdown closes kis_ws → kis_client → stock client in order.
33 tests pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
```
---
### Task 7: 사용자 수동 — .env 갱신 + manual smoke + push
**This task requires user action.**
- [ ] **Step 1: Update `.env`**
User adds to `C:\Users\jaeoh\Desktop\workspace\web-ai\.env`:
```
# KIS API (Phase 3a, 2026-05-16)
KIS_APP_KEY=<from KIS portal>
KIS_APP_SECRET=<from KIS portal>
KIS_ACCOUNT=<8자리-2자리 형식, e.g. 50000000-01>
KIS_IS_VIRTUAL=true
# V1 token path (default: ../signal_v1/data/kis_token.json)
# V1_TOKEN_PATH=
```
User reference: `web-ai/signal_v1/KIS_SETUP.md` 의 정확한 키 발급 + 계좌 번호 형식.
- [ ] **Step 2: V1 봇 정상 가동 확인 (토큰 발급 책임)**
```powershell
cd C:\Users\jaeoh\Desktop\workspace\web-ai
.\start.bat
```
콘솔 로그에 KIS auth 성공 + `signal_v1/data/kis_token.json` 갱신 확인.
V1 봇은 계속 실행 또는 토큰 발급 후 종료. signal_v2 가 토큰 파일 read 만.
- [ ] **Step 3: signal_v2 시작**
```powershell
cd C:\Users\jaeoh\Desktop\workspace\web-ai\signal_v2
.\start.bat
```
기대 출력 (수십 줄):
- `Uvicorn running on http://0.0.0.0:8001`
- `poll_loop started`
- `KIS WebSocket connected` (V1 토큰 로딩 성공 시)
만약 `V1 token file missing` 에러 → V1 토큰 파일 경로 확인 + V1 봇 가동 후 재시도.
- [ ] **Step 4: /health smoke**
```powershell
curl http://localhost:8001/health
```
기대: 200 JSON. 처음에는 `last_poll` 의 minute_bars / asking_price 없음 — 첫 cycle 까지 대기.
- [ ] **Step 5: 분봉 + 호가 검증 (장중 또는 NXT 시간대)**
장중 / NXT 시간대 진입 후 60-300초 대기 → 다시 `/health` 호출:
```powershell
curl http://localhost:8001/health
```
기대 응답의 `last_poll`:
- `minute_bars/005930` 등 ticker별 timestamp 표시
- `asking_price/<screener-only-ticker>` 도 표시
장외 시간대라면:
- KIS REST 호출은 가능하나 분봉 데이터는 직전 거래일 마지막 분봉 반환
- WebSocket 호가는 영업 시간 외 데이터 안 옴
- [ ] **Step 6: V1 무영향 검증**
V1 봇 콘솔에서 정상 동작 + Telegram /status 응답 확인. V2 가 토큰 파일 read 만 했으므로 V1 무영향 검증.
- [ ] **Step 7: git push**
```powershell
cd C:\Users\jaeoh\Desktop\workspace\web-ai
git push
```
Gitea 자격증명 시 사용자 수동.
- [ ] **Step 8: 결과 보고**
- Step 3 (signal_v2 시작): PASS / FAIL — 에러 메시지 공유
- Step 4 (/health): PASS / FAIL
- Step 5 (분봉 + 호가): PASS (state 갱신 확인) / FAIL — KIS 응답 또는 parse 이슈 공유
- Step 6 (V1 무영향): PASS / FAIL
- Step 7 (push): PASS / FAIL
만약 Step 5 의 호가 parse 가 fail → `_parse_asking_price` 의 필드 인덱스 문제. KIS raw 메시지 sample 캡처 후 인덱스 조정 (별도 fix slice).
전체 PASS 시 Phase 3a 완료 → Phase 3b (Chronos-2 추론 + 분봉 모멘텀) brainstorming.
---
## Self-Review
**1. Spec coverage:**
| Spec § | 요구사항 | Plan task |
|--------|----------|----------|
| §2 포함 ① KIS REST client | Task 3 ✅ |
| §2 포함 ② KIS WebSocket | Task 4 ✅ |
| §2 포함 ③ pull_worker 확장 | Task 5 ✅ |
| §2 포함 ④ PollState 확장 | Task 1 ✅ |
| §2 포함 ⑤ scheduler NXT | Task 2 ✅ |
| §2 포함 ⑥ 13 테스트 | Task 2 (3) + Task 3 (4) + Task 4 (4) + Task 5 (2) = 13 ✅ |
| §3 변경 매트릭스 14 파일 | Task 1-7 합산 ✅ |
| §4 KISClient interface | Task 3 Step 3 ✅ |
| §5 KISWebSocket interface | Task 4 Step 3 ✅ |
| §6 PollState 확장 + pull_worker | Task 1 + Task 5 ✅ |
| §7 scheduler NXT | Task 2 ✅ |
| §8 13 테스트 케이스 | Task 2-5 ✅ |
| §11 DoD 13 항목 | Task 1-7 ✅ |
No gaps.
**2. Placeholder scan:** No "TBD". KIS WebSocket `_parse_asking_price` 의 필드 인덱스는 "운영 검증 후 조정" 으로 명시 — implementer 가 운영 sample 으로 확정. 이는 placeholder 가 아니라 운영 환경 의존 파라미터.
**3. Type consistency:**
- `KISClient.__init__(app_key, app_secret, account, is_virtual, v1_token_path, timeout=10.0)` consistent ✅
- `KISClient.get_minute_ohlcv(ticker) -> list[dict]` / `get_asking_price(ticker) -> dict` / `close()` consistent ✅
- `KISWebSocket.__init__(app_key, app_secret, is_virtual)` consistent ✅
- `KISWebSocket.start(tickers, on_asking_price)` / `subscribe(ticker)` / `unsubscribe(ticker)` / `close()` consistent ✅
- `PollState` 필드 (minute_bars + asking_price) consistent across Task 1 / 5 / 6 ✅
- `_next_interval(now) -> float` / `_is_market_day(now) -> bool` / `_is_polling_window(now) -> bool` consistent ✅
- env var names (`KIS_APP_KEY` / `KIS_APP_SECRET` / `KIS_ACCOUNT` / `KIS_IS_VIRTUAL` / `V1_TOKEN_PATH`) consistent ✅
Plan passes self-review.