Task 1: config + state + websockets dep Task 2: scheduler NXT windows + 3 tests Task 3: kis_client REST + 4 tests Task 4: kis_websocket + 4 tests (most heavy) Task 5: pull_worker minute cycle + 2 tests Task 6: main.py KIS lifespan + 1 test Task 7: user manual .env + smoke + push 13 new tests, total 32 signal_v2 tests. ~1 week. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
53 KiB
Signal V2 Phase 3a — KIS Data Collection Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: signal_v2 에 KIS REST client (분봉 + 호가 polling, V1 토큰 read-only 공유) + KIS WebSocket client (approval_key + portfolio 호가 실시간 구독) + scheduler NXT 시간대 확장 + PollState 의 minute_bars/asking_price 통합. Phase 3b (Chronos-2 추론) 시작점.
Architecture: V1 signal_v1/data/kis_token.json 의 access_token 을 read-only 공유. WebSocket 의 approval_key 는 V2 자체 발급. portfolio 보유 종목 (~11) 만 WebSocket 호가 구독, screener Top-N 은 REST 1분 polling. NXT 시간대 (20:00-23:30 / 04:30-07:00) 도 scheduler 가 5분 cron 으로 polling 활성화.
Tech Stack: httpx async / websockets / pytest-asyncio + respx / sqlite3 / Python 3.11+ / KIS OpenAPI
Spec: web-ui/docs/superpowers/specs/2026-05-16-signal-v2-phase3a-kis-data-collection.md
참고 V1 코드 (구현 시 참조 권장):
web-ai/signal_v1/modules/services/kis.py— V1 KIS REST client (TR_ID, 응답 키, 토큰 파일 형식)web-ai/signal_v1/KIS_SETUP.md— KIS 인증 설정 가이드
파일 구조
| 파일 | 책임 |
|---|---|
web-ai/signal_v2/config.py |
(수정) KIS env 5개 + V1_TOKEN_PATH 추가 |
web-ai/signal_v2/state.py |
(수정) PollState 에 minute_bars: dict[str, deque] + asking_price: dict[str, dict] |
web-ai/signal_v2/scheduler.py |
(수정) NXT 시간대 (20:00-23:30 / 04:30-07:00) 5분 cron 분기 |
web-ai/signal_v2/kis_client.py |
(신규) REST KISClient — 분봉 + 호가 + V1 토큰 read |
web-ai/signal_v2/kis_websocket.py |
(신규) WebSocket KISWebSocket — approval_key + portfolio 호가 실시간 |
web-ai/signal_v2/pull_worker.py |
(수정) 분봉 cycle + WebSocket subscription 동기화 |
web-ai/signal_v2/main.py |
(수정) lifespan 에 KISClient/KISWebSocket startup/shutdown |
web-ai/signal_v2/requirements.txt |
(수정) websockets>=12 추가 |
web-ai/signal_v2/tests/test_scheduler.py |
(수정) NXT 3 케이스 추가 |
web-ai/signal_v2/tests/test_kis_client.py |
(신규) 4 케이스 |
web-ai/signal_v2/tests/test_kis_websocket.py |
(신규) 4 케이스 |
web-ai/signal_v2/tests/test_pull_worker.py |
(신규) 2 케이스 |
web-ai/signal_v2/tests/test_main.py |
(수정) KIS lifespan 1 케이스 추가 |
web-ai/.env |
(수정, 사용자 Task 7) KIS env 5 + V1_TOKEN_PATH |
총 14 파일 변경, 13 신규 테스트.
Task 순서
Task 1: config + state + requirements (foundation 확장)
Task 2: scheduler NXT 시간대 + 3 테스트 (TDD)
Task 3: kis_client.py REST + 4 통합 테스트 (TDD)
Task 4: kis_websocket.py WebSocket + 4 통합 테스트 (TDD)
Task 5: pull_worker 확장 + 2 통합 테스트 (TDD)
Task 6: main.py KIS lifespan 통합 + 1 케이스
Task 7: 사용자 수동 — .env 갱신 + manual smoke + push
Task 1: config + state + requirements 확장
Files:
-
Modify:
web-ai/signal_v2/config.py -
Modify:
web-ai/signal_v2/state.py -
Modify:
web-ai/signal_v2/requirements.txt -
Step 1: Update config.py with KIS env + V1_TOKEN_PATH
Use Read tool to inspect current web-ai/signal_v2/config.py, then Edit to add new fields. Final state:
"""Signal V2 환경변수 로딩."""
import os
from dataclasses import dataclass, field
from pathlib import Path
from dotenv import load_dotenv
load_dotenv(Path(__file__).parent.parent / ".env")
@dataclass(frozen=True)
class Settings:
stock_api_url: str = field(
default_factory=lambda: os.getenv("STOCK_API_URL", "").rstrip("/")
)
webai_api_key: str = field(
default_factory=lambda: os.getenv("WEBAI_API_KEY", "").strip()
)
port: int = field(default_factory=lambda: int(os.getenv("SIGNAL_V2_PORT", "8001")))
db_path: Path = field(
default_factory=lambda: Path(__file__).parent / "data" / "signal_v2.db"
)
# Phase 3a additions
kis_app_key: str = field(default_factory=lambda: os.getenv("KIS_APP_KEY", "").strip())
kis_app_secret: str = field(default_factory=lambda: os.getenv("KIS_APP_SECRET", "").strip())
kis_account: str = field(default_factory=lambda: os.getenv("KIS_ACCOUNT", "").strip())
kis_is_virtual: bool = field(
default_factory=lambda: os.getenv("KIS_IS_VIRTUAL", "true").lower() == "true"
)
v1_token_path: Path = field(
default_factory=lambda: Path(
os.getenv("V1_TOKEN_PATH",
str(Path(__file__).parent.parent / "signal_v1" / "data" / "kis_token.json"))
)
)
def get_settings() -> Settings:
return Settings()
- Step 2: Update state.py with minute_bars + asking_price fields
Edit web-ai/signal_v2/state.py:
"""PollState — process-wide singleton."""
from collections import deque
from dataclasses import dataclass, field
@dataclass
class PollState:
portfolio: dict | None = None
news_sentiment: dict | None = None
screener_preview: dict | None = None
# Phase 3a additions
minute_bars: dict[str, deque] = field(default_factory=dict) # {ticker: deque(maxlen=60)}
asking_price: dict[str, dict] = field(default_factory=dict) # {ticker: {bid_total, ask_total, bid_ratio, ...}}
last_updated: dict[str, str] = field(default_factory=dict)
fetch_errors: dict[str, int] = field(default_factory=dict)
state = PollState()
- Step 3: Add websockets to requirements.txt
Read current web-ai/requirements.txt. Append (if not present):
websockets>=12
- Step 4: pip install + smoke import
cd /c/Users/jaeoh/Desktop/workspace/web-ai
pip install -r requirements.txt 2>&1 | tail -5
python -c "from signal_v2.config import get_settings; from signal_v2.state import state; print(get_settings().kis_is_virtual); print(state)"
Expected: pip install 성공 + get_settings().kis_is_virtual 출력 + state 출력 (minute_bars / asking_price 빈 dict).
- Step 5: Run existing tests — no regression
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests -q 2>&1 | tail -3
Expected: 19 passed (기존 Phase 2 테스트 그대로).
- Step 6: Commit
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/config.py signal_v2/state.py requirements.txt
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3a): config + state extensions for KIS data
- config.py: KIS_APP_KEY/SECRET/ACCOUNT/IS_VIRTUAL + V1_TOKEN_PATH env
- state.py: PollState extended with minute_bars (deque maxlen 60) and
asking_price (per-ticker dict)
- requirements.txt: websockets>=12 (KIS WebSocket client)
19 existing tests still pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
Task 2: Scheduler NXT 시간대 + 3 테스트
Files:
-
Modify:
web-ai/signal_v2/scheduler.py -
Modify:
web-ai/signal_v2/tests/test_scheduler.py -
Step 1: Write 3 failing tests for NXT windows
Append to web-ai/signal_v2/tests/test_scheduler.py:
def test_next_interval_nxt_evening_5min():
"""22:00 평일 (NXT 야간) → 300 (5분)."""
now = _kst(2026, 5, 18, 22, 0)
assert _next_interval(now) == 300
def test_next_interval_nxt_dawn_5min():
"""05:30 평일 (NXT 새벽) → 300 (5분)."""
now = _kst(2026, 5, 18, 5, 30)
assert _next_interval(now) == 300
def test_next_interval_dead_zone_skip():
"""02:00 평일 (dead zone) → 다음 04:30 까지 (~9000s)."""
now = _kst(2026, 5, 18, 2, 0)
interval = _next_interval(now)
# 02:00 → 04:30 = 2.5h = 9000s
assert 9000 - 60 < interval < 9000 + 60
- Step 2: Run tests to verify FAIL
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_scheduler.py -v -k "nxt or dead_zone" 2>&1 | tail -10
Expected: 3 failed (scheduler 현재 22:00 / 05:30 / 02:00 모두 overnight skip 으로 처리).
- Step 3: Update scheduler.py
Edit web-ai/signal_v2/scheduler.py — replace the time-window constants block + _next_interval + _is_polling_window with:
# Market windows (정규장)
_PRE_OPEN = time(7, 0)
_OPEN = time(9, 0)
_CLOSE = time(15, 30)
_POST_END = time(20, 0)
# NXT windows (시간외)
_NXT_PRE_END = time(23, 30)
_NXT_POST_OPEN = time(4, 30)
# 23:30 - 04:30 (dead zone) skip
def _is_market_day(now: datetime) -> bool:
"""평일 + 휴장일 아닌 날."""
if now.weekday() >= 5:
return False
return now.strftime("%Y-%m-%d") not in _HOLIDAYS
def _is_polling_window(now: datetime) -> bool:
"""폴링 윈도우: 07:00-23:30 + 04:30-07:00."""
t = now.time()
return (
(_PRE_OPEN <= t < _NXT_PRE_END)
or (_NXT_POST_OPEN <= t < _PRE_OPEN)
)
def _next_interval(now: datetime) -> float:
if not _is_market_day(now):
return _seconds_until_next_market_open(now)
t = now.time()
if _PRE_OPEN <= t < _OPEN:
return 300.0 # 장전 5분
elif _OPEN <= t < _CLOSE:
return 60.0 # 장중 1분
elif _CLOSE <= t < _POST_END:
return 300.0 # 장후 5분
elif _POST_END <= t < _NXT_PRE_END:
return 300.0 # NXT 야간 5분
elif _NXT_POST_OPEN <= t < _PRE_OPEN:
return 300.0 # NXT 새벽 5분
else:
# Dead zone (23:30-04:30) — wait until 04:30
return _seconds_until_nxt_or_market_open(now)
def _seconds_until_nxt_or_market_open(now: datetime) -> float:
"""다음 04:30 (NXT 새벽 start) 까지 초수."""
candidate = now.replace(hour=4, minute=30, second=0, microsecond=0)
if candidate <= now:
candidate += timedelta(days=1)
for _ in range(14):
if _is_market_day(candidate):
return (candidate - now).total_seconds()
candidate += timedelta(days=1)
logger.warning("could not find next market day within 14 days")
return 86400.0
Note: _seconds_until_next_market_open (기존) 은 weekend/holiday 의 다음 영업일 07:00 계산 — 그대로 유지. 새로 추가된 _seconds_until_nxt_or_market_open 은 dead zone (02:00-04:30) 의 04:30 계산 — 만약 그날이 영업일이면 04:30, 아니면 다음 영업일.
_seconds_until_next_market_open 의 hour=7 인지 hour=4 minute=30 인지 검토 — 휴장일/주말 다음 영업일은 07:00 으로 유지 (휴장 다음 영업일은 정규 pre-market 부터 시작). NXT 는 영업일에만 운영되므로 weekend skip 후 다음 영업일 07:00 OK.
- Step 4: Run tests to verify PASS
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_scheduler.py -v 2>&1 | tail -15
Expected: 11 passed (기존 8 + 신규 3).
Also full suite:
python -m pytest signal_v2/tests -q 2>&1 | tail -3
Expected: 22 passed (19 + 3).
- Step 5: Commit
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/scheduler.py signal_v2/tests/test_scheduler.py
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3a): scheduler NXT windows (20:00-23:30 / 04:30-07:00)
NXT 시간외 거래 시간대도 5분 cron 폴링 활성화. 23:30-04:30 사이는
dead zone (KIS 점검 시간) → 04:30 까지 skip.
3 new tests (총 22).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
Task 3: KIS REST client + 4 통합 테스트
Files:
- Create:
web-ai/signal_v2/kis_client.py - Create:
web-ai/signal_v2/tests/test_kis_client.py
이 task 는 KIS API 의 정확한 TR_ID + 응답 키를 알아야 함. V1 코드 (web-ai/signal_v1/modules/services/kis.py) 참조 권장:
-
분봉 TR_ID:
FHKST03010200(국내주식 당일 분봉) — 응답output2배열 -
호가 TR_ID:
FHKST01010200(국내주식 호가) — 응답output1객체 -
토큰 파일 (
signal_v1/data/kis_token.json) 형식:{"access_token": "...", "token_expired": "YYYY-MM-DD HH:MM:SS"} -
Step 1: Write 4 failing tests
Create web-ai/signal_v2/tests/test_kis_client.py:
"""Tests for KISClient (REST)."""
import json
import time
from pathlib import Path
import httpx
import pytest
import respx
from signal_v2.kis_client import KISClient
@pytest.fixture
def fake_v1_token(tmp_path):
"""V1 토큰 파일 fixture."""
token_file = tmp_path / "kis_token.json"
token_file.write_text(json.dumps({
"access_token": "test-kis-token-abc123",
"token_expired": "2099-12-31 23:59:59",
}))
return token_file
@pytest.fixture
def kis_client_factory(fake_v1_token):
def _make():
return KISClient(
app_key="test-app-key",
app_secret="test-app-secret",
account="50000000-01",
is_virtual=True,
v1_token_path=fake_v1_token,
)
return _make
@respx.mock
async def test_get_minute_ohlcv_normal_returns_30_bars(kis_client_factory):
"""정상 200 → 30개 분봉 list 반환."""
sample_output2 = [
{
"stck_bsop_date": "20260518",
"stck_cntg_hour": f"{h:02d}{m:02d}00",
"stck_oprc": "78000", "stck_hgpr": "78500",
"stck_lwpr": "77800", "stck_prpr": "78300",
"cntg_vol": "12345",
}
for h in range(9, 10) for m in range(0, 30) # 9:00-9:29 = 30 bars
]
respx.get(
"https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
).mock(
return_value=httpx.Response(200, json={"output2": sample_output2})
)
client = kis_client_factory()
try:
bars = await client.get_minute_ohlcv("005930")
assert len(bars) == 30
assert bars[0]["close"] == 78300
assert "datetime" in bars[0]
finally:
await client.close()
@respx.mock
async def test_get_minute_ohlcv_429_retry_then_success(kis_client_factory, monkeypatch):
"""429 → exponential backoff → 200."""
sleep_calls = []
async def fake_sleep(s): sleep_calls.append(s)
monkeypatch.setattr("asyncio.sleep", fake_sleep)
respx.get(
"https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
).mock(side_effect=[
httpx.Response(429, text="rate limit"),
httpx.Response(200, json={"output2": []}),
])
client = kis_client_factory()
try:
result = await client.get_minute_ohlcv("005930")
assert result == []
# 1 retry sleep
assert 1 in sleep_calls
finally:
await client.close()
async def test_get_minute_ohlcv_uses_v1_token(kis_client_factory, fake_v1_token):
"""KIS 호출 헤더에 V1 토큰 파일의 access_token 사용."""
captured_headers = {}
@respx.mock
async def _do_request():
route = respx.get(
"https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
).mock(return_value=httpx.Response(200, json={"output2": []}))
client = kis_client_factory()
try:
await client.get_minute_ohlcv("005930")
# respx route captures the request — check
assert route.called
req = route.calls.last.request
captured_headers.update(req.headers)
finally:
await client.close()
await _do_request()
# KIS 의 인증 헤더는 `authorization: Bearer <token>`
assert "authorization" in {k.lower() for k in captured_headers.keys()}
auth_value = captured_headers.get("authorization") or captured_headers.get("Authorization")
assert "test-kis-token-abc123" in auth_value
@respx.mock
async def test_get_asking_price_computes_bid_ratio(kis_client_factory):
"""호가 응답 → bid_total / (bid+ask) bid_ratio 계산."""
# KIS 응답 output1 의 호가 잔량 필드명: total_askp_rsqn (총 매도잔량), total_bidp_rsqn (총 매수잔량)
respx.get(
"https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-asking-price-exp-ccn"
).mock(return_value=httpx.Response(200, json={
"output1": {
"total_bidp_rsqn": "600",
"total_askp_rsqn": "400",
"stck_prpr": "78500",
}
}))
client = kis_client_factory()
try:
data = await client.get_asking_price("005930")
assert data["bid_total"] == 600
assert data["ask_total"] == 400
assert abs(data["bid_ratio"] - 0.6) < 1e-9
assert data["current_price"] == 78500
assert "as_of" in data
finally:
await client.close()
- Step 2: Run tests to verify FAIL
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_kis_client.py -v 2>&1 | tail -10
Expected: ImportError (signal_v2.kis_client missing).
- Step 3: Implement kis_client.py
Create web-ai/signal_v2/kis_client.py:
"""KIS REST API client — 분봉 + 호가. V1 토큰 read-only 공유."""
from __future__ import annotations
import asyncio
import json
import logging
import time
from datetime import datetime
from pathlib import Path
from zoneinfo import ZoneInfo
import httpx
logger = logging.getLogger(__name__)
KST = ZoneInfo("Asia/Seoul")
_MAX_ATTEMPTS = 3
_THROTTLE_INTERVAL = 0.5 # 초당 2회 제한
class KISClient:
"""KIS REST (분봉 + 호가). V1 토큰 파일 read-only."""
def __init__(
self,
app_key: str, app_secret: str, account: str, is_virtual: bool,
v1_token_path: Path,
timeout: float = 10.0,
):
self._app_key = app_key
self._app_secret = app_secret
self._account = account
self._is_virtual = is_virtual
self._v1_token_path = Path(v1_token_path)
self._base_url = (
"https://openapivts.koreainvestment.com:29443" if is_virtual
else "https://openapi.koreainvestment.com:9443"
)
self._client = httpx.AsyncClient(timeout=timeout)
self._token_cache: tuple[str, float] | None = None # (token, file_mtime)
self._last_throttle_at = 0.0
async def close(self) -> None:
await self._client.aclose()
def _read_v1_token(self) -> str:
if not self._v1_token_path.exists():
raise RuntimeError(f"V1 token file missing: {self._v1_token_path}")
mtime = self._v1_token_path.stat().st_mtime
if self._token_cache and self._token_cache[1] == mtime:
return self._token_cache[0]
data = json.loads(self._v1_token_path.read_text(encoding="utf-8"))
token = data.get("access_token", "")
if not token:
raise RuntimeError("V1 token file has no access_token")
self._token_cache = (token, mtime)
return token
async def _throttle(self) -> None:
elapsed = time.monotonic() - self._last_throttle_at
if elapsed < _THROTTLE_INTERVAL:
await asyncio.sleep(_THROTTLE_INTERVAL - elapsed)
self._last_throttle_at = time.monotonic()
def _common_headers(self, tr_id: str) -> dict[str, str]:
token = self._read_v1_token()
return {
"authorization": f"Bearer {token}",
"appkey": self._app_key,
"appsecret": self._app_secret,
"tr_id": tr_id,
"custtype": "P",
}
async def _request_with_retry(
self, method: str, path: str, tr_id: str, **kwargs,
) -> dict:
url = f"{self._base_url}{path}"
headers = self._common_headers(tr_id)
for attempt in range(_MAX_ATTEMPTS):
await self._throttle()
try:
response = await self._client.request(
method, url, headers=headers, **kwargs
)
if response.status_code == 429:
if attempt < _MAX_ATTEMPTS - 1:
await asyncio.sleep(2**attempt)
continue
response.raise_for_status()
response.raise_for_status()
return response.json()
except httpx.TimeoutException:
if attempt < _MAX_ATTEMPTS - 1:
await asyncio.sleep(2**attempt)
continue
raise
raise RuntimeError("retry exhausted")
async def get_minute_ohlcv(self, ticker: str) -> list[dict]:
"""현재 시점 직전 30개 1분봉 OHLCV (TR_ID FHKST03010200).
Returns: [{datetime, open, high, low, close, volume}, ...] 시간 오름차순.
"""
path = "/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
params = {
"FID_ETC_CLS_CODE": "",
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": ticker,
"FID_INPUT_HOUR_1": datetime.now(KST).strftime("%H%M%S"),
"FID_PW_DATA_INCU_YN": "N",
}
raw = await self._request_with_retry(
"GET", path, tr_id="FHKST03010200", params=params,
)
output2 = raw.get("output2", [])
bars = []
for row in output2:
try:
date = row["stck_bsop_date"]
hhmmss = row["stck_cntg_hour"]
dt = datetime.strptime(f"{date} {hhmmss}", "%Y%m%d %H%M%S").replace(tzinfo=KST)
bars.append({
"datetime": dt.isoformat(),
"open": int(row["stck_oprc"]),
"high": int(row["stck_hgpr"]),
"low": int(row["stck_lwpr"]),
"close": int(row["stck_prpr"]),
"volume": int(row["cntg_vol"]),
})
except (KeyError, ValueError) as e:
logger.warning("skip malformed bar for %s: %r", ticker, e)
# KIS returns descending; reverse to ascending
bars.reverse()
return bars
async def get_asking_price(self, ticker: str) -> dict:
"""현재 호가 + 매수/매도 잔량 (TR_ID FHKST01010200)."""
path = "/uapi/domestic-stock/v1/quotations/inquire-asking-price-exp-ccn"
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": ticker,
}
raw = await self._request_with_retry(
"GET", path, tr_id="FHKST01010200", params=params,
)
output1 = raw.get("output1", {})
bid_total = int(output1.get("total_bidp_rsqn", 0))
ask_total = int(output1.get("total_askp_rsqn", 0))
total = bid_total + ask_total
bid_ratio = bid_total / total if total > 0 else 0.0
current_price = int(output1.get("stck_prpr", 0))
return {
"bid_total": bid_total,
"ask_total": ask_total,
"bid_ratio": bid_ratio,
"current_price": current_price,
"as_of": datetime.now(KST).isoformat(),
}
- Step 4: Run tests to verify PASS
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_kis_client.py -v 2>&1 | tail -15
Expected: 4 passed.
Full suite:
python -m pytest signal_v2/tests -q 2>&1 | tail -3
Expected: 26 passed (22 + 4).
If failures relate to KIS TR_ID or field names, refer to V1 signal_v1/modules/services/kis.py and adjust.
- Step 5: Commit
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/kis_client.py signal_v2/tests/test_kis_client.py
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3a): kis_client REST + 4 integration tests
KISClient: 분봉 (FHKST03010200) + 호가 (FHKST01010200) async REST.
V1 토큰 파일 (signal_v1/data/kis_token.json) read-only 공유, mtime
캐시. 초당 2회 throttle. exponential retry.
26 tests pass (Phase 2 19 + Task 2 NXT 3 + Task 3 KIS REST 4).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
Task 4: KIS WebSocket client + 4 통합 테스트
Files:
- Create:
web-ai/signal_v2/kis_websocket.py - Create:
web-ai/signal_v2/tests/test_kis_websocket.py
이 task 는 가장 무거움. WebSocket mock 은 websockets lib 의 serve 사용 또는 AsyncMock. KIS 호가 메시지 형식은 | + ^ 구분.
KIS 호가 메시지 raw 형식 (실제 더 길지만 핵심 필드만):
0|H0STASP0|001|005930^091500^78500^...^total_ask^total_bid^...
-
0 = 수신 데이터
-
H0STASP0 = TR_ID (실시간 호가)
-
001 = 데이터 개수
-
그 다음
^구분 필드: [0]종목코드, [1]시간 HHMMSS, [2]현재가, ..., 매수/매도 잔량 인덱스는 KIS 공식 문서 참조 (KIS_SETUP.md / V1 코드). -
Step 1: Write 4 failing tests
Create web-ai/signal_v2/tests/test_kis_websocket.py:
"""Tests for KISWebSocket."""
import asyncio
import json
from unittest.mock import AsyncMock, MagicMock
import httpx
import pytest
import respx
from signal_v2.kis_websocket import KISWebSocket
BASE_REST = "https://openapivts.koreainvestment.com:29443"
@respx.mock
async def test_fetch_approval_key_via_oauth_endpoint():
"""POST /oauth2/Approval → approval_key 추출."""
respx.post(f"{BASE_REST}/oauth2/Approval").mock(
return_value=httpx.Response(200, json={"approval_key": "test-approval-key-xyz"})
)
ws = KISWebSocket(app_key="k", app_secret="s", is_virtual=True)
key = await ws._fetch_approval_key()
assert key == "test-approval-key-xyz"
assert ws._approval_key == "test-approval-key-xyz"
async def test_subscribe_sends_h0stasp0_message(monkeypatch):
"""subscribe() → WebSocket 으로 H0STASP0 구독 메시지 전송."""
sent_messages = []
mock_ws = AsyncMock()
mock_ws.send = AsyncMock(side_effect=lambda m: sent_messages.append(m))
ws = KISWebSocket(app_key="k", app_secret="s", is_virtual=True)
ws._approval_key = "test-key"
ws._ws = mock_ws
await ws.subscribe("005930")
assert ws._subscriptions == {"005930"}
assert len(sent_messages) == 1
# subscribe 메시지는 JSON 구조 (KIS 명세)
msg = json.loads(sent_messages[0])
assert msg["header"]["tr_type"] == "1" # subscribe
assert msg["body"]["input"]["tr_id"] == "H0STASP0"
assert msg["body"]["input"]["tr_key"] == "005930"
def test_parse_asking_price_extracts_bid_ask_totals():
"""KIS raw '0|H0STASP0|001|...' → (ticker, dict)."""
ws = KISWebSocket(app_key="k", app_secret="s", is_virtual=True)
# 모의 호가 메시지 (실제 형식은 더 복잡 — KIS_SETUP.md 참조).
# H0STASP0 의 응답: '0|H0STASP0|001|<data>'
# data: ticker^time^current_price^... (more fields)
# 본 테스트는 implementer 가 KIS 명세 보고 인덱스 맞춰서 구현하도록.
# 우리는 dict 출력 형식만 검증.
sample_raw = (
"0|H0STASP0|001|005930^091500^78500^"
+ "^".join(["0"] * 40) # padding (실제는 50+ 필드)
+ "^1000^2000" # 마지막 두 필드: total_askp_rsqn / total_bidp_rsqn (예시)
)
# 인덱스가 실제 KIS 명세와 정확히 맞을 때 parse 성공
result = ws._parse_asking_price(sample_raw)
if result is None:
# 인덱스 mismatch — implementer 가 KIS 명세 참조해서 fix
pytest.fail("parse_asking_price returned None — check field indices vs KIS spec")
ticker, data = result
assert ticker == "005930"
assert "bid_total" in data
assert "ask_total" in data
assert "bid_ratio" in data
async def test_reconnect_on_disconnect_with_backoff(monkeypatch):
"""연결 끊김 → exponential backoff retry."""
sleep_calls = []
async def fake_sleep(s): sleep_calls.append(s)
monkeypatch.setattr("asyncio.sleep", fake_sleep)
# Implementer: ws 내부에 reconnect logic 구현 시
# _connect_with_backoff() 메서드 또는 내부 reconnect loop 가
# exponential 1s → 2s → 4s sleep 호출하는지 검증.
ws = KISWebSocket(app_key="k", app_secret="s", is_virtual=True)
# Mock _connect to fail twice then succeed
call_count = [0]
async def fake_connect():
call_count[0] += 1
if call_count[0] < 3:
raise ConnectionError("fake disconnect")
return AsyncMock()
monkeypatch.setattr(ws, "_connect", fake_connect, raising=False)
result = await ws._connect_with_backoff()
assert call_count[0] == 3 # 2 fails + 1 success
assert sleep_calls[:2] == [1, 2] # exponential
- Step 2: Run tests to verify FAIL
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_kis_websocket.py -v 2>&1 | tail -10
Expected: ImportError.
- Step 3: Implement kis_websocket.py
Create web-ai/signal_v2/kis_websocket.py:
"""KIS WebSocket — approval_key + 실시간 호가 구독."""
from __future__ import annotations
import asyncio
import json
import logging
from datetime import datetime
from typing import Callable
from zoneinfo import ZoneInfo
import httpx
import websockets
logger = logging.getLogger(__name__)
KST = ZoneInfo("Asia/Seoul")
# KIS 호가 메시지의 필드 인덱스 (KIS 공식 spec 참조 — 실제 인덱스 운영 환경 확인 필요)
# H0STASP0 응답: ticker | time | current_price | ... | total_ask_rsqn | total_bid_rsqn | ...
# 실 운영 환경에서 정확한 인덱스 필드 확인 필요 (KIS docs / V1 활용 미 가능)
_ASKING_TICKER_IDX = 0
_ASKING_TIME_IDX = 1
_ASKING_CURRENT_PRICE_IDX = 2
_ASKING_TOTAL_ASK_IDX = -2 # 끝에서 두번째 (가정)
_ASKING_TOTAL_BID_IDX = -1 # 마지막 (가정)
class KISWebSocket:
"""KIS WebSocket client. approval_key 발급 + 호가 실시간."""
def __init__(self, app_key: str, app_secret: str, is_virtual: bool):
self._app_key = app_key
self._app_secret = app_secret
self._is_virtual = is_virtual
self._base_rest = (
"https://openapivts.koreainvestment.com:29443" if is_virtual
else "https://openapi.koreainvestment.com:9443"
)
self._ws_url = (
"ws://ops.koreainvestment.com:31000" if is_virtual
else "ws://ops.koreainvestment.com:21000"
)
self._approval_key: str | None = None
self._ws: websockets.WebSocketClientProtocol | None = None
self._subscriptions: set[str] = set()
self._on_asking_price: Callable[[str, dict], None] | None = None
self._recv_task: asyncio.Task | None = None
self._shutdown = asyncio.Event()
async def _fetch_approval_key(self) -> str:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
f"{self._base_rest}/oauth2/Approval",
json={
"grant_type": "client_credentials",
"appkey": self._app_key,
"secretkey": self._app_secret,
},
)
response.raise_for_status()
data = response.json()
self._approval_key = data["approval_key"]
return self._approval_key
async def _connect(self) -> websockets.WebSocketClientProtocol:
return await websockets.connect(self._ws_url)
async def _connect_with_backoff(self) -> websockets.WebSocketClientProtocol:
"""연결 시도 with exponential backoff (1s → 2s → 4s → max 30s)."""
for attempt in range(10):
try:
ws = await self._connect()
return ws
except Exception as e:
wait = min(2**attempt, 30)
logger.warning("KIS WebSocket connect failed (attempt %d): %r — retrying in %ds", attempt + 1, e, wait)
await asyncio.sleep(wait)
raise RuntimeError("KIS WebSocket connect exhausted retries")
async def start(
self, tickers: list[str],
on_asking_price: Callable[[str, dict], None],
) -> None:
if self._approval_key is None:
await self._fetch_approval_key()
self._on_asking_price = on_asking_price
self._ws = await self._connect_with_backoff()
for ticker in tickers:
await self.subscribe(ticker)
self._recv_task = asyncio.create_task(self._receive_loop())
async def subscribe(self, ticker: str) -> None:
if self._ws is None or self._approval_key is None:
raise RuntimeError("KIS WebSocket not started")
msg = json.dumps({
"header": {
"approval_key": self._approval_key,
"custtype": "P",
"tr_type": "1", # subscribe
"content-type": "utf-8",
},
"body": {
"input": {"tr_id": "H0STASP0", "tr_key": ticker},
},
})
await self._ws.send(msg)
self._subscriptions.add(ticker)
async def unsubscribe(self, ticker: str) -> None:
if self._ws is None or self._approval_key is None:
return
msg = json.dumps({
"header": {
"approval_key": self._approval_key,
"custtype": "P",
"tr_type": "2", # unsubscribe
"content-type": "utf-8",
},
"body": {
"input": {"tr_id": "H0STASP0", "tr_key": ticker},
},
})
await self._ws.send(msg)
self._subscriptions.discard(ticker)
async def close(self) -> None:
self._shutdown.set()
if self._recv_task is not None:
self._recv_task.cancel()
try:
await self._recv_task
except asyncio.CancelledError:
pass
if self._ws is not None:
await self._ws.close()
async def _receive_loop(self) -> None:
while not self._shutdown.is_set():
try:
raw = await self._ws.recv()
except websockets.ConnectionClosed:
logger.warning("KIS WebSocket closed — reconnecting")
self._ws = await self._connect_with_backoff()
for ticker in list(self._subscriptions):
await self.subscribe(ticker)
continue
if not isinstance(raw, str):
continue
parsed = self._parse_asking_price(raw)
if parsed is not None and self._on_asking_price is not None:
ticker, data = parsed
try:
self._on_asking_price(ticker, data)
except Exception:
logger.exception("on_asking_price callback failed")
def _parse_asking_price(self, raw: str) -> tuple[str, dict] | None:
"""KIS H0STASP0 raw → (ticker, asking_price dict)."""
try:
parts = raw.split("|")
if len(parts) < 4 or parts[1] != "H0STASP0":
return None
fields = parts[3].split("^")
ticker = fields[_ASKING_TICKER_IDX]
current_price = int(fields[_ASKING_CURRENT_PRICE_IDX]) if fields[_ASKING_CURRENT_PRICE_IDX].isdigit() else 0
ask_total = int(fields[_ASKING_TOTAL_ASK_IDX]) if fields[_ASKING_TOTAL_ASK_IDX].lstrip("-").isdigit() else 0
bid_total = int(fields[_ASKING_TOTAL_BID_IDX]) if fields[_ASKING_TOTAL_BID_IDX].lstrip("-").isdigit() else 0
total = bid_total + ask_total
return ticker, {
"bid_total": bid_total,
"ask_total": ask_total,
"bid_ratio": bid_total / total if total > 0 else 0.0,
"current_price": current_price,
"as_of": datetime.now(KST).isoformat(),
}
except (IndexError, ValueError) as e:
logger.warning("parse_asking_price failed: %r", e)
return None
중요 노트: 위 _ASKING_*_IDX 인덱스는 추정값 입니다. 운영 환경에서 정확한 인덱스 확인 필요:
- KIS 공식 문서 (
https://apiportal.koreainvestment.com/) - 또는 운영 시 raw 메시지 로그 캡처 후 필드 인덱스 매핑
본 plan 의 테스트는 인덱스가 -2/-1 가정. 실 KIS 호가에서 필드 50+ 개라 정확한 인덱스 운영 검증 필요. parse fail 시 WARNING 로그 + skip (위험 매트릭스 §9 명시).
- Step 4: Run tests to verify PASS
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_kis_websocket.py -v 2>&1 | tail -15
Expected: 4 passed.
만약 test_parse_asking_price_extracts_bid_ask_totals 가 실패하면 — 인덱스 가정과 sample raw 의 padding 수 mismatch. 두 가지 옵션:
- a) sample raw 의
["0"]*40부분을 인덱스에 맞춰 조정 (테스트만 fix) - b)
_parse_asking_price의 인덱스를 명세 확인 후 조정 (코드 fix)
선택은 implementer 의 판단. 양쪽 모두 가능.
Full suite:
python -m pytest signal_v2/tests -q 2>&1 | tail -3
Expected: 30 passed (26 + 4).
- Step 5: Commit
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/kis_websocket.py signal_v2/tests/test_kis_websocket.py
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3a): kis_websocket + 4 integration tests
KISWebSocket: approval_key (POST /oauth2/Approval) + H0STASP0 호가
실시간 subscribe + receive loop with reconnect. exponential backoff
(1s→2s→4s→max 30s) + subscription 재등록.
_parse_asking_price 의 필드 인덱스는 운영 검증 후 조정 필요
(KIS 공식 명세 또는 raw 로그 매핑).
30 tests pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
Task 5: pull_worker 확장 + 2 통합 테스트
Files:
-
Modify:
web-ai/signal_v2/pull_worker.py -
Create:
web-ai/signal_v2/tests/test_pull_worker.py -
Step 1: Write 2 failing tests
Create web-ai/signal_v2/tests/test_pull_worker.py:
"""Tests for pull_worker (Phase 3a additions)."""
from collections import deque
from unittest.mock import AsyncMock, MagicMock
import pytest
from signal_v2.state import PollState
async def test_minute_polling_cycle_updates_state_minute_bars():
"""KIS REST mock 의 분봉 데이터가 state.minute_bars[ticker] deque 에 들어간다."""
from signal_v2.pull_worker import _run_kis_minute_cycle
state = PollState()
state.portfolio = {"holdings": [{"ticker": "005930"}, {"ticker": "000660"}]}
state.screener_preview = {
"items": [{"ticker": "005930"}, {"ticker": "035720"}] # 005930 dup
}
kis_client_mock = MagicMock()
kis_client_mock.get_minute_ohlcv = AsyncMock(side_effect=[
[{"datetime": "2026-05-18T09:00:00+09:00", "open": 78000,
"high": 78500, "low": 77900, "close": 78300, "volume": 12345}],
[{"datetime": "2026-05-18T09:00:00+09:00", "open": 180000,
"high": 181000, "low": 179800, "close": 180500, "volume": 5000}],
[{"datetime": "2026-05-18T09:00:00+09:00", "open": 51000,
"high": 51200, "low": 50800, "close": 51100, "volume": 8000}],
])
kis_client_mock.get_asking_price = AsyncMock(return_value={
"bid_total": 600, "ask_total": 400, "bid_ratio": 0.6,
"current_price": 51100, "as_of": "2026-05-18T09:00:30+09:00",
})
await _run_kis_minute_cycle(kis_client_mock, state)
# 3 unique tickers (005930, 000660, 035720)
assert "005930" in state.minute_bars
assert "000660" in state.minute_bars
assert "035720" in state.minute_bars
assert len(state.minute_bars["005930"]) >= 1
# asking price 만 screener-only ticker (035720) 에 들어가야 함 (portfolio 는 WebSocket)
assert "035720" in state.asking_price
def test_websocket_message_updates_state_asking_price():
"""WebSocket callback 가 state.asking_price 갱신."""
from signal_v2.pull_worker import make_asking_price_callback
state = PollState()
cb = make_asking_price_callback(state)
cb("005930", {"bid_total": 1000, "ask_total": 800, "bid_ratio": 0.555,
"current_price": 78500, "as_of": "2026-05-18T10:00:00+09:00"})
assert state.asking_price["005930"]["bid_total"] == 1000
assert "asking_price/005930" in state.last_updated
- Step 2: Run tests to verify FAIL
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_pull_worker.py -v 2>&1 | tail -10
Expected: ImportError or AttributeError.
- Step 3: Implement pull_worker.py additions
Edit web-ai/signal_v2/pull_worker.py — keep existing poll_loop and _run_polling_cycle, then ADD:
from collections import deque
from signal_v2.kis_client import KISClient # 추가 import
async def _run_kis_minute_cycle(kis_client: KISClient, state: PollState) -> None:
"""KIS 분봉 + 호가 fetch + state 갱신.
- 분봉: portfolio + screener Top-N union 종목 모두
- 호가 (REST): screener-only 종목 (portfolio 는 WebSocket 으로 들어옴)
"""
portfolio_tickers = _portfolio_tickers(state)
screener_tickers = _screener_tickers(state)
all_tickers = list(set(portfolio_tickers) | set(screener_tickers))
# 분봉 fetch (병렬)
minute_results = await asyncio.gather(*[
kis_client.get_minute_ohlcv(t) for t in all_tickers
], return_exceptions=True)
now_iso = datetime.now(KST).isoformat()
for ticker, result in zip(all_tickers, minute_results):
if isinstance(result, list):
buf = state.minute_bars.setdefault(ticker, deque(maxlen=60))
buf.extend(result)
state.last_updated[f"minute_bars/{ticker}"] = now_iso
else:
state.fetch_errors[f"minute_bars/{ticker}"] = (
state.fetch_errors.get(f"minute_bars/{ticker}", 0) + 1
)
# 호가 fetch (REST) — screener-only
screener_only = list(set(screener_tickers) - set(portfolio_tickers))
asking_results = await asyncio.gather(*[
kis_client.get_asking_price(t) for t in screener_only
], return_exceptions=True)
for ticker, result in zip(screener_only, asking_results):
if isinstance(result, dict):
state.asking_price[ticker] = result
state.last_updated[f"asking_price/{ticker}"] = now_iso
def make_asking_price_callback(state: PollState):
"""KIS WebSocket on_asking_price callback factory."""
def _cb(ticker: str, data: dict) -> None:
state.asking_price[ticker] = data
state.last_updated[f"asking_price/{ticker}"] = datetime.now(KST).isoformat()
return _cb
def _portfolio_tickers(state: PollState) -> list[str]:
if state.portfolio is None:
return []
return [h["ticker"] for h in state.portfolio.get("holdings", []) if "ticker" in h]
def _screener_tickers(state: PollState) -> list[str]:
if state.screener_preview is None:
return []
return [i["ticker"] for i in state.screener_preview.get("items", []) if "ticker" in i]
Also ADD to _run_polling_cycle signature an optional kis_client param:
async def _run_polling_cycle(
client: StockClient, state: PollState,
kis_client: KISClient | None = None,
) -> None:
# ... existing 3-endpoint stock fetch ...
# KIS 분봉 + 호가 (kis_client 주어졌을 때만)
if kis_client is not None:
try:
await _run_kis_minute_cycle(kis_client, state)
except Exception:
logger.exception("kis minute cycle failed")
And update poll_loop signature to accept kis_client:
async def poll_loop(
client: StockClient, state: PollState, shutdown: asyncio.Event,
kis_client: KISClient | None = None,
) -> None:
# ... existing while loop ...
# call _run_polling_cycle(client, state, kis_client=kis_client)
- Step 4: Run tests to verify PASS
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_pull_worker.py -v 2>&1 | tail -10
Expected: 2 passed.
Full suite:
python -m pytest signal_v2/tests -q 2>&1 | tail -3
Expected: 32 passed (30 + 2).
- Step 5: Commit
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/pull_worker.py signal_v2/tests/test_pull_worker.py
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3a): pull_worker KIS minute cycle + WS callback
_run_kis_minute_cycle: portfolio + screener union 종목 분봉 fetch +
screener-only 종목 호가 REST fetch. WebSocket callback factory.
poll_loop / _run_polling_cycle 에 kis_client optional param 추가
(Phase 5 까지 None 도 정상 동작).
32 tests pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
Task 6: main.py KIS lifespan 통합 + 1 케이스
Files:
-
Modify:
web-ai/signal_v2/main.py -
Modify:
web-ai/signal_v2/tests/test_main.py -
Step 1: Write 1 failing test for KIS lifespan
Append to web-ai/signal_v2/tests/test_main.py:
def test_startup_warns_if_kis_app_key_missing(monkeypatch, caplog):
"""KIS_APP_KEY 미설정 시 startup WARNING (KIS 호출 disabled)."""
monkeypatch.setenv("STOCK_API_URL", "https://test.stock.local")
monkeypatch.setenv("WEBAI_API_KEY", "test-secret")
monkeypatch.delenv("KIS_APP_KEY", raising=False)
import importlib
from signal_v2 import config as cfg
importlib.reload(cfg)
from signal_v2 import main as main_mod
importlib.reload(main_mod)
with caplog.at_level("WARNING", logger="signal_v2.main"):
with TestClient(main_mod.app) as client:
client.get("/health")
assert any("KIS_APP_KEY" in rec.message for rec in caplog.records)
- Step 2: Run test to verify FAIL
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_main.py::test_startup_warns_if_kis_app_key_missing -v 2>&1 | tail -10
Expected: FAIL (no such warning yet).
- Step 3: Update main.py to integrate KIS clients
Edit web-ai/signal_v2/main.py — augment AppContext + lifespan:
from signal_v2.kis_client import KISClient
from signal_v2.kis_websocket import KISWebSocket
from signal_v2.pull_worker import make_asking_price_callback
class AppContext:
client: StockClient | None = None
dedup: SignalDedup | None = None
shutdown: asyncio.Event | None = None
poll_task: asyncio.Task | None = None
kis_client: KISClient | None = None
kis_ws: KISWebSocket | None = None
# (keep _ctx = AppContext() at module level)
@asynccontextmanager
async def lifespan(app: FastAPI):
settings = get_settings()
if not settings.webai_api_key:
logger.warning("WEBAI_API_KEY not configured — stock API calls will fail with 401")
if not settings.kis_app_key:
logger.warning("KIS_APP_KEY not configured — KIS REST/WebSocket disabled")
_ctx.client = StockClient(settings.stock_api_url, settings.webai_api_key)
_ctx.dedup = SignalDedup(settings.db_path)
_ctx.shutdown = asyncio.Event()
# KIS only if app_key configured
if settings.kis_app_key:
_ctx.kis_client = KISClient(
app_key=settings.kis_app_key,
app_secret=settings.kis_app_secret,
account=settings.kis_account,
is_virtual=settings.kis_is_virtual,
v1_token_path=settings.v1_token_path,
)
_ctx.kis_ws = KISWebSocket(
app_key=settings.kis_app_key,
app_secret=settings.kis_app_secret,
is_virtual=settings.kis_is_virtual,
)
# Subscribe portfolio holdings (if any)
try:
portfolio = await _ctx.client.get_portfolio()
tickers = [h["ticker"] for h in portfolio.get("holdings", []) if "ticker" in h]
cb = make_asking_price_callback(state_mod.state)
await _ctx.kis_ws.start(tickers, cb)
except Exception:
logger.exception("KIS WebSocket startup failed — continuing without realtime asking_price")
_ctx.poll_task = asyncio.create_task(
poll_loop(
_ctx.client, state_mod.state, _ctx.shutdown,
kis_client=_ctx.kis_client,
)
)
yield
# Shutdown
if _ctx.shutdown is not None:
_ctx.shutdown.set()
if _ctx.poll_task is not None:
try:
await asyncio.wait_for(_ctx.poll_task, timeout=5.0)
except asyncio.TimeoutError:
_ctx.poll_task.cancel()
try:
await _ctx.poll_task
except asyncio.CancelledError:
pass
if _ctx.kis_ws is not None:
await _ctx.kis_ws.close()
if _ctx.kis_client is not None:
await _ctx.kis_client.close()
if _ctx.client is not None:
await _ctx.client.close()
- Step 4: Run tests to verify PASS
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_main.py -v 2>&1 | tail -15
Expected: 3 passed (2 기존 + 1 신규).
Full suite:
python -m pytest signal_v2/tests -q 2>&1 | tail -3
Expected: 33 passed (32 + 1).
- Step 5: Commit
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/main.py signal_v2/tests/test_main.py
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3a): main.py lifespan integrates KIS client + WS
AppContext extended with kis_client + kis_ws. lifespan:
- If KIS_APP_KEY set: create KISClient + KISWebSocket, fetch portfolio,
subscribe WebSocket H0STASP0 for holdings.
- If unset: WARNING log, signal_v2 still serves /health (no KIS data).
- Shutdown closes kis_ws → kis_client → stock client in order.
33 tests pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
Task 7: 사용자 수동 — .env 갱신 + manual smoke + push
This task requires user action.
- Step 1: Update
.env
User adds to C:\Users\jaeoh\Desktop\workspace\web-ai\.env:
# KIS API (Phase 3a, 2026-05-16)
KIS_APP_KEY=<from KIS portal>
KIS_APP_SECRET=<from KIS portal>
KIS_ACCOUNT=<8자리-2자리 형식, e.g. 50000000-01>
KIS_IS_VIRTUAL=true
# V1 token path (default: ../signal_v1/data/kis_token.json)
# V1_TOKEN_PATH=
User reference: web-ai/signal_v1/KIS_SETUP.md 의 정확한 키 발급 + 계좌 번호 형식.
- Step 2: V1 봇 정상 가동 확인 (토큰 발급 책임)
cd C:\Users\jaeoh\Desktop\workspace\web-ai
.\start.bat
콘솔 로그에 KIS auth 성공 + signal_v1/data/kis_token.json 갱신 확인.
V1 봇은 계속 실행 또는 토큰 발급 후 종료. signal_v2 가 토큰 파일 read 만.
- Step 3: signal_v2 시작
cd C:\Users\jaeoh\Desktop\workspace\web-ai\signal_v2
.\start.bat
기대 출력 (수십 줄):
Uvicorn running on http://0.0.0.0:8001poll_loop startedKIS WebSocket connected(V1 토큰 로딩 성공 시)
만약 V1 token file missing 에러 → V1 토큰 파일 경로 확인 + V1 봇 가동 후 재시도.
- Step 4: /health smoke
curl http://localhost:8001/health
기대: 200 JSON. 처음에는 last_poll 의 minute_bars / asking_price 없음 — 첫 cycle 까지 대기.
- Step 5: 분봉 + 호가 검증 (장중 또는 NXT 시간대)
장중 / NXT 시간대 진입 후 60-300초 대기 → 다시 /health 호출:
curl http://localhost:8001/health
기대 응답의 last_poll:
minute_bars/005930등 ticker별 timestamp 표시asking_price/<screener-only-ticker>도 표시
장외 시간대라면:
-
KIS REST 호출은 가능하나 분봉 데이터는 직전 거래일 마지막 분봉 반환
-
WebSocket 호가는 영업 시간 외 데이터 안 옴
-
Step 6: V1 무영향 검증
V1 봇 콘솔에서 정상 동작 + Telegram /status 응답 확인. V2 가 토큰 파일 read 만 했으므로 V1 무영향 검증.
- Step 7: git push
cd C:\Users\jaeoh\Desktop\workspace\web-ai
git push
Gitea 자격증명 시 사용자 수동.
-
Step 8: 결과 보고
-
Step 3 (signal_v2 시작): PASS / FAIL — 에러 메시지 공유
-
Step 4 (/health): PASS / FAIL
-
Step 5 (분봉 + 호가): PASS (state 갱신 확인) / FAIL — KIS 응답 또는 parse 이슈 공유
-
Step 6 (V1 무영향): PASS / FAIL
-
Step 7 (push): PASS / FAIL
만약 Step 5 의 호가 parse 가 fail → _parse_asking_price 의 필드 인덱스 문제. KIS raw 메시지 sample 캡처 후 인덱스 조정 (별도 fix slice).
전체 PASS 시 Phase 3a 완료 → Phase 3b (Chronos-2 추론 + 분봉 모멘텀) brainstorming.
Self-Review
1. Spec coverage:
| Spec § | 요구사항 | Plan task |
|---|---|---|
| §2 포함 ① KIS REST client | Task 3 ✅ | |
| §2 포함 ② KIS WebSocket | Task 4 ✅ | |
| §2 포함 ③ pull_worker 확장 | Task 5 ✅ | |
| §2 포함 ④ PollState 확장 | Task 1 ✅ | |
| §2 포함 ⑤ scheduler NXT | Task 2 ✅ | |
| §2 포함 ⑥ 13 테스트 | Task 2 (3) + Task 3 (4) + Task 4 (4) + Task 5 (2) = 13 ✅ | |
| §3 변경 매트릭스 14 파일 | Task 1-7 합산 ✅ | |
| §4 KISClient interface | Task 3 Step 3 ✅ | |
| §5 KISWebSocket interface | Task 4 Step 3 ✅ | |
| §6 PollState 확장 + pull_worker | Task 1 + Task 5 ✅ | |
| §7 scheduler NXT | Task 2 ✅ | |
| §8 13 테스트 케이스 | Task 2-5 ✅ | |
| §11 DoD 13 항목 | Task 1-7 ✅ |
No gaps.
2. Placeholder scan: No "TBD". KIS WebSocket _parse_asking_price 의 필드 인덱스는 "운영 검증 후 조정" 으로 명시 — implementer 가 운영 sample 으로 확정. 이는 placeholder 가 아니라 운영 환경 의존 파라미터.
3. Type consistency:
KISClient.__init__(app_key, app_secret, account, is_virtual, v1_token_path, timeout=10.0)consistent ✅KISClient.get_minute_ohlcv(ticker) -> list[dict]/get_asking_price(ticker) -> dict/close()consistent ✅KISWebSocket.__init__(app_key, app_secret, is_virtual)consistent ✅KISWebSocket.start(tickers, on_asking_price)/subscribe(ticker)/unsubscribe(ticker)/close()consistent ✅PollState필드 (minute_bars + asking_price) consistent across Task 1 / 5 / 6 ✅_next_interval(now) -> float/_is_market_day(now) -> bool/_is_polling_window(now) -> boolconsistent ✅- env var names (
KIS_APP_KEY/KIS_APP_SECRET/KIS_ACCOUNT/KIS_IS_VIRTUAL/V1_TOKEN_PATH) consistent ✅
Plan passes self-review.