Files
web-page/docs/superpowers/plans/2026-05-16-signal-v2-phase3a-kis-data-collection.md
gahusb 78b77e2691 docs(signal-v2): Phase 3a implementation plan — 7 tasks TDD
Task 1: config + state + websockets dep
Task 2: scheduler NXT windows + 3 tests
Task 3: kis_client REST + 4 tests
Task 4: kis_websocket + 4 tests (most heavy)
Task 5: pull_worker minute cycle + 2 tests
Task 6: main.py KIS lifespan + 1 test
Task 7: user manual .env + smoke + push

13 new tests, total 32 signal_v2 tests. ~1 week.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 05:00:43 +09:00

53 KiB

Signal V2 Phase 3a — KIS Data Collection Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: signal_v2 에 KIS REST client (분봉 + 호가 polling, V1 토큰 read-only 공유) + KIS WebSocket client (approval_key + portfolio 호가 실시간 구독) + scheduler NXT 시간대 확장 + PollState 의 minute_bars/asking_price 통합. Phase 3b (Chronos-2 추론) 시작점.

Architecture: V1 signal_v1/data/kis_token.json 의 access_token 을 read-only 공유. WebSocket 의 approval_key 는 V2 자체 발급. portfolio 보유 종목 (~11) 만 WebSocket 호가 구독, screener Top-N 은 REST 1분 polling. NXT 시간대 (20:00-23:30 / 04:30-07:00) 도 scheduler 가 5분 cron 으로 polling 활성화.

Tech Stack: httpx async / websockets / pytest-asyncio + respx / sqlite3 / Python 3.11+ / KIS OpenAPI

Spec: web-ui/docs/superpowers/specs/2026-05-16-signal-v2-phase3a-kis-data-collection.md

참고 V1 코드 (구현 시 참조 권장):

  • web-ai/signal_v1/modules/services/kis.py — V1 KIS REST client (TR_ID, 응답 키, 토큰 파일 형식)
  • web-ai/signal_v1/KIS_SETUP.md — KIS 인증 설정 가이드

파일 구조

파일 책임
web-ai/signal_v2/config.py (수정) KIS env 5개 + V1_TOKEN_PATH 추가
web-ai/signal_v2/state.py (수정) PollStateminute_bars: dict[str, deque] + asking_price: dict[str, dict]
web-ai/signal_v2/scheduler.py (수정) NXT 시간대 (20:00-23:30 / 04:30-07:00) 5분 cron 분기
web-ai/signal_v2/kis_client.py (신규) REST KISClient — 분봉 + 호가 + V1 토큰 read
web-ai/signal_v2/kis_websocket.py (신규) WebSocket KISWebSocket — approval_key + portfolio 호가 실시간
web-ai/signal_v2/pull_worker.py (수정) 분봉 cycle + WebSocket subscription 동기화
web-ai/signal_v2/main.py (수정) lifespan 에 KISClient/KISWebSocket startup/shutdown
web-ai/signal_v2/requirements.txt (수정) websockets>=12 추가
web-ai/signal_v2/tests/test_scheduler.py (수정) NXT 3 케이스 추가
web-ai/signal_v2/tests/test_kis_client.py (신규) 4 케이스
web-ai/signal_v2/tests/test_kis_websocket.py (신규) 4 케이스
web-ai/signal_v2/tests/test_pull_worker.py (신규) 2 케이스
web-ai/signal_v2/tests/test_main.py (수정) KIS lifespan 1 케이스 추가
web-ai/.env (수정, 사용자 Task 7) KIS env 5 + V1_TOKEN_PATH

총 14 파일 변경, 13 신규 테스트.


Task 순서

Task 1: config + state + requirements (foundation 확장)
Task 2: scheduler NXT 시간대 + 3 테스트 (TDD)
Task 3: kis_client.py REST + 4 통합 테스트 (TDD)
Task 4: kis_websocket.py WebSocket + 4 통합 테스트 (TDD)
Task 5: pull_worker 확장 + 2 통합 테스트 (TDD)
Task 6: main.py KIS lifespan 통합 + 1 케이스
Task 7: 사용자 수동 — .env 갱신 + manual smoke + push

Task 1: config + state + requirements 확장

Files:

  • Modify: web-ai/signal_v2/config.py

  • Modify: web-ai/signal_v2/state.py

  • Modify: web-ai/signal_v2/requirements.txt

  • Step 1: Update config.py with KIS env + V1_TOKEN_PATH

Use Read tool to inspect current web-ai/signal_v2/config.py, then Edit to add new fields. Final state:

"""Signal V2 환경변수 로딩."""
import os
from dataclasses import dataclass, field
from pathlib import Path

from dotenv import load_dotenv

load_dotenv(Path(__file__).parent.parent / ".env")


@dataclass(frozen=True)
class Settings:
    stock_api_url: str = field(
        default_factory=lambda: os.getenv("STOCK_API_URL", "").rstrip("/")
    )
    webai_api_key: str = field(
        default_factory=lambda: os.getenv("WEBAI_API_KEY", "").strip()
    )
    port: int = field(default_factory=lambda: int(os.getenv("SIGNAL_V2_PORT", "8001")))
    db_path: Path = field(
        default_factory=lambda: Path(__file__).parent / "data" / "signal_v2.db"
    )
    # Phase 3a additions
    kis_app_key: str = field(default_factory=lambda: os.getenv("KIS_APP_KEY", "").strip())
    kis_app_secret: str = field(default_factory=lambda: os.getenv("KIS_APP_SECRET", "").strip())
    kis_account: str = field(default_factory=lambda: os.getenv("KIS_ACCOUNT", "").strip())
    kis_is_virtual: bool = field(
        default_factory=lambda: os.getenv("KIS_IS_VIRTUAL", "true").lower() == "true"
    )
    v1_token_path: Path = field(
        default_factory=lambda: Path(
            os.getenv("V1_TOKEN_PATH",
                      str(Path(__file__).parent.parent / "signal_v1" / "data" / "kis_token.json"))
        )
    )


def get_settings() -> Settings:
    return Settings()
  • Step 2: Update state.py with minute_bars + asking_price fields

Edit web-ai/signal_v2/state.py:

"""PollState — process-wide singleton."""
from collections import deque
from dataclasses import dataclass, field


@dataclass
class PollState:
    portfolio: dict | None = None
    news_sentiment: dict | None = None
    screener_preview: dict | None = None
    # Phase 3a additions
    minute_bars: dict[str, deque] = field(default_factory=dict)  # {ticker: deque(maxlen=60)}
    asking_price: dict[str, dict] = field(default_factory=dict)  # {ticker: {bid_total, ask_total, bid_ratio, ...}}
    last_updated: dict[str, str] = field(default_factory=dict)
    fetch_errors: dict[str, int] = field(default_factory=dict)


state = PollState()
  • Step 3: Add websockets to requirements.txt

Read current web-ai/requirements.txt. Append (if not present):

websockets>=12
  • Step 4: pip install + smoke import
cd /c/Users/jaeoh/Desktop/workspace/web-ai
pip install -r requirements.txt 2>&1 | tail -5
python -c "from signal_v2.config import get_settings; from signal_v2.state import state; print(get_settings().kis_is_virtual); print(state)"

Expected: pip install 성공 + get_settings().kis_is_virtual 출력 + state 출력 (minute_bars / asking_price 빈 dict).

  • Step 5: Run existing tests — no regression
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests -q 2>&1 | tail -3

Expected: 19 passed (기존 Phase 2 테스트 그대로).

  • Step 6: Commit
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/config.py signal_v2/state.py requirements.txt
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3a): config + state extensions for KIS data

- config.py: KIS_APP_KEY/SECRET/ACCOUNT/IS_VIRTUAL + V1_TOKEN_PATH env
- state.py: PollState extended with minute_bars (deque maxlen 60) and
  asking_price (per-ticker dict)
- requirements.txt: websockets>=12 (KIS WebSocket client)

19 existing tests still pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"

Task 2: Scheduler NXT 시간대 + 3 테스트

Files:

  • Modify: web-ai/signal_v2/scheduler.py

  • Modify: web-ai/signal_v2/tests/test_scheduler.py

  • Step 1: Write 3 failing tests for NXT windows

Append to web-ai/signal_v2/tests/test_scheduler.py:

def test_next_interval_nxt_evening_5min():
    """22:00 평일 (NXT 야간) → 300 (5분)."""
    now = _kst(2026, 5, 18, 22, 0)
    assert _next_interval(now) == 300


def test_next_interval_nxt_dawn_5min():
    """05:30 평일 (NXT 새벽) → 300 (5분)."""
    now = _kst(2026, 5, 18, 5, 30)
    assert _next_interval(now) == 300


def test_next_interval_dead_zone_skip():
    """02:00 평일 (dead zone) → 다음 04:30 까지 (~9000s)."""
    now = _kst(2026, 5, 18, 2, 0)
    interval = _next_interval(now)
    # 02:00 → 04:30 = 2.5h = 9000s
    assert 9000 - 60 < interval < 9000 + 60
  • Step 2: Run tests to verify FAIL
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_scheduler.py -v -k "nxt or dead_zone" 2>&1 | tail -10

Expected: 3 failed (scheduler 현재 22:00 / 05:30 / 02:00 모두 overnight skip 으로 처리).

  • Step 3: Update scheduler.py

Edit web-ai/signal_v2/scheduler.py — replace the time-window constants block + _next_interval + _is_polling_window with:

# Market windows (정규장)
_PRE_OPEN = time(7, 0)
_OPEN = time(9, 0)
_CLOSE = time(15, 30)
_POST_END = time(20, 0)

# NXT windows (시간외)
_NXT_PRE_END = time(23, 30)
_NXT_POST_OPEN = time(4, 30)
# 23:30 - 04:30 (dead zone) skip


def _is_market_day(now: datetime) -> bool:
    """평일 + 휴장일 아닌 날."""
    if now.weekday() >= 5:
        return False
    return now.strftime("%Y-%m-%d") not in _HOLIDAYS


def _is_polling_window(now: datetime) -> bool:
    """폴링 윈도우: 07:00-23:30 + 04:30-07:00."""
    t = now.time()
    return (
        (_PRE_OPEN <= t < _NXT_PRE_END)
        or (_NXT_POST_OPEN <= t < _PRE_OPEN)
    )


def _next_interval(now: datetime) -> float:
    if not _is_market_day(now):
        return _seconds_until_next_market_open(now)

    t = now.time()
    if _PRE_OPEN <= t < _OPEN:
        return 300.0  # 장전 5분
    elif _OPEN <= t < _CLOSE:
        return 60.0   # 장중 1분
    elif _CLOSE <= t < _POST_END:
        return 300.0  # 장후 5분
    elif _POST_END <= t < _NXT_PRE_END:
        return 300.0  # NXT 야간 5분
    elif _NXT_POST_OPEN <= t < _PRE_OPEN:
        return 300.0  # NXT 새벽 5분
    else:
        # Dead zone (23:30-04:30) — wait until 04:30
        return _seconds_until_nxt_or_market_open(now)


def _seconds_until_nxt_or_market_open(now: datetime) -> float:
    """다음 04:30 (NXT 새벽 start) 까지 초수."""
    candidate = now.replace(hour=4, minute=30, second=0, microsecond=0)
    if candidate <= now:
        candidate += timedelta(days=1)

    for _ in range(14):
        if _is_market_day(candidate):
            return (candidate - now).total_seconds()
        candidate += timedelta(days=1)

    logger.warning("could not find next market day within 14 days")
    return 86400.0

Note: _seconds_until_next_market_open (기존) 은 weekend/holiday 의 다음 영업일 07:00 계산 — 그대로 유지. 새로 추가된 _seconds_until_nxt_or_market_open 은 dead zone (02:00-04:30) 의 04:30 계산 — 만약 그날이 영업일이면 04:30, 아니면 다음 영업일.

_seconds_until_next_market_open 의 hour=7 인지 hour=4 minute=30 인지 검토 — 휴장일/주말 다음 영업일은 07:00 으로 유지 (휴장 다음 영업일은 정규 pre-market 부터 시작). NXT 는 영업일에만 운영되므로 weekend skip 후 다음 영업일 07:00 OK.

  • Step 4: Run tests to verify PASS
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_scheduler.py -v 2>&1 | tail -15

Expected: 11 passed (기존 8 + 신규 3).

Also full suite:

python -m pytest signal_v2/tests -q 2>&1 | tail -3

Expected: 22 passed (19 + 3).

  • Step 5: Commit
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/scheduler.py signal_v2/tests/test_scheduler.py
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3a): scheduler NXT windows (20:00-23:30 / 04:30-07:00)

NXT 시간외 거래 시간대도 5분 cron 폴링 활성화. 23:30-04:30 사이는
dead zone (KIS 점검 시간) → 04:30 까지 skip.

3 new tests (총 22).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"

Task 3: KIS REST client + 4 통합 테스트

Files:

  • Create: web-ai/signal_v2/kis_client.py
  • Create: web-ai/signal_v2/tests/test_kis_client.py

이 task 는 KIS API 의 정확한 TR_ID + 응답 키를 알아야 함. V1 코드 (web-ai/signal_v1/modules/services/kis.py) 참조 권장:

  • 분봉 TR_ID: FHKST03010200 (국내주식 당일 분봉) — 응답 output2 배열

  • 호가 TR_ID: FHKST01010200 (국내주식 호가) — 응답 output1 객체

  • 토큰 파일 (signal_v1/data/kis_token.json) 형식: {"access_token": "...", "token_expired": "YYYY-MM-DD HH:MM:SS"}

  • Step 1: Write 4 failing tests

Create web-ai/signal_v2/tests/test_kis_client.py:

"""Tests for KISClient (REST)."""
import json
import time
from pathlib import Path

import httpx
import pytest
import respx

from signal_v2.kis_client import KISClient


@pytest.fixture
def fake_v1_token(tmp_path):
    """V1 토큰 파일 fixture."""
    token_file = tmp_path / "kis_token.json"
    token_file.write_text(json.dumps({
        "access_token": "test-kis-token-abc123",
        "token_expired": "2099-12-31 23:59:59",
    }))
    return token_file


@pytest.fixture
def kis_client_factory(fake_v1_token):
    def _make():
        return KISClient(
            app_key="test-app-key",
            app_secret="test-app-secret",
            account="50000000-01",
            is_virtual=True,
            v1_token_path=fake_v1_token,
        )
    return _make


@respx.mock
async def test_get_minute_ohlcv_normal_returns_30_bars(kis_client_factory):
    """정상 200 → 30개 분봉 list 반환."""
    sample_output2 = [
        {
            "stck_bsop_date": "20260518",
            "stck_cntg_hour": f"{h:02d}{m:02d}00",
            "stck_oprc": "78000", "stck_hgpr": "78500",
            "stck_lwpr": "77800", "stck_prpr": "78300",
            "cntg_vol": "12345",
        }
        for h in range(9, 10) for m in range(0, 30)  # 9:00-9:29 = 30 bars
    ]
    respx.get(
        "https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
    ).mock(
        return_value=httpx.Response(200, json={"output2": sample_output2})
    )

    client = kis_client_factory()
    try:
        bars = await client.get_minute_ohlcv("005930")
        assert len(bars) == 30
        assert bars[0]["close"] == 78300
        assert "datetime" in bars[0]
    finally:
        await client.close()


@respx.mock
async def test_get_minute_ohlcv_429_retry_then_success(kis_client_factory, monkeypatch):
    """429 → exponential backoff → 200."""
    sleep_calls = []
    async def fake_sleep(s): sleep_calls.append(s)
    monkeypatch.setattr("asyncio.sleep", fake_sleep)

    respx.get(
        "https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
    ).mock(side_effect=[
        httpx.Response(429, text="rate limit"),
        httpx.Response(200, json={"output2": []}),
    ])
    client = kis_client_factory()
    try:
        result = await client.get_minute_ohlcv("005930")
        assert result == []
        # 1 retry sleep
        assert 1 in sleep_calls
    finally:
        await client.close()


async def test_get_minute_ohlcv_uses_v1_token(kis_client_factory, fake_v1_token):
    """KIS 호출 헤더에 V1 토큰 파일의 access_token 사용."""
    captured_headers = {}

    @respx.mock
    async def _do_request():
        route = respx.get(
            "https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
        ).mock(return_value=httpx.Response(200, json={"output2": []}))
        client = kis_client_factory()
        try:
            await client.get_minute_ohlcv("005930")
            # respx route captures the request — check
            assert route.called
            req = route.calls.last.request
            captured_headers.update(req.headers)
        finally:
            await client.close()

    await _do_request()
    # KIS 의 인증 헤더는 `authorization: Bearer <token>`
    assert "authorization" in {k.lower() for k in captured_headers.keys()}
    auth_value = captured_headers.get("authorization") or captured_headers.get("Authorization")
    assert "test-kis-token-abc123" in auth_value


@respx.mock
async def test_get_asking_price_computes_bid_ratio(kis_client_factory):
    """호가 응답 → bid_total / (bid+ask) bid_ratio 계산."""
    # KIS 응답 output1 의 호가 잔량 필드명: total_askp_rsqn (총 매도잔량), total_bidp_rsqn (총 매수잔량)
    respx.get(
        "https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-asking-price-exp-ccn"
    ).mock(return_value=httpx.Response(200, json={
        "output1": {
            "total_bidp_rsqn": "600",
            "total_askp_rsqn": "400",
            "stck_prpr": "78500",
        }
    }))

    client = kis_client_factory()
    try:
        data = await client.get_asking_price("005930")
        assert data["bid_total"] == 600
        assert data["ask_total"] == 400
        assert abs(data["bid_ratio"] - 0.6) < 1e-9
        assert data["current_price"] == 78500
        assert "as_of" in data
    finally:
        await client.close()
  • Step 2: Run tests to verify FAIL
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_kis_client.py -v 2>&1 | tail -10

Expected: ImportError (signal_v2.kis_client missing).

  • Step 3: Implement kis_client.py

Create web-ai/signal_v2/kis_client.py:

"""KIS REST API client — 분봉 + 호가. V1 토큰 read-only 공유."""
from __future__ import annotations
import asyncio
import json
import logging
import time
from datetime import datetime
from pathlib import Path
from zoneinfo import ZoneInfo

import httpx

logger = logging.getLogger(__name__)
KST = ZoneInfo("Asia/Seoul")

_MAX_ATTEMPTS = 3
_THROTTLE_INTERVAL = 0.5  # 초당 2회 제한


class KISClient:
    """KIS REST (분봉 + 호가). V1 토큰 파일 read-only."""

    def __init__(
        self,
        app_key: str, app_secret: str, account: str, is_virtual: bool,
        v1_token_path: Path,
        timeout: float = 10.0,
    ):
        self._app_key = app_key
        self._app_secret = app_secret
        self._account = account
        self._is_virtual = is_virtual
        self._v1_token_path = Path(v1_token_path)
        self._base_url = (
            "https://openapivts.koreainvestment.com:29443" if is_virtual
            else "https://openapi.koreainvestment.com:9443"
        )
        self._client = httpx.AsyncClient(timeout=timeout)
        self._token_cache: tuple[str, float] | None = None  # (token, file_mtime)
        self._last_throttle_at = 0.0

    async def close(self) -> None:
        await self._client.aclose()

    def _read_v1_token(self) -> str:
        if not self._v1_token_path.exists():
            raise RuntimeError(f"V1 token file missing: {self._v1_token_path}")
        mtime = self._v1_token_path.stat().st_mtime
        if self._token_cache and self._token_cache[1] == mtime:
            return self._token_cache[0]
        data = json.loads(self._v1_token_path.read_text(encoding="utf-8"))
        token = data.get("access_token", "")
        if not token:
            raise RuntimeError("V1 token file has no access_token")
        self._token_cache = (token, mtime)
        return token

    async def _throttle(self) -> None:
        elapsed = time.monotonic() - self._last_throttle_at
        if elapsed < _THROTTLE_INTERVAL:
            await asyncio.sleep(_THROTTLE_INTERVAL - elapsed)
        self._last_throttle_at = time.monotonic()

    def _common_headers(self, tr_id: str) -> dict[str, str]:
        token = self._read_v1_token()
        return {
            "authorization": f"Bearer {token}",
            "appkey": self._app_key,
            "appsecret": self._app_secret,
            "tr_id": tr_id,
            "custtype": "P",
        }

    async def _request_with_retry(
        self, method: str, path: str, tr_id: str, **kwargs,
    ) -> dict:
        url = f"{self._base_url}{path}"
        headers = self._common_headers(tr_id)
        for attempt in range(_MAX_ATTEMPTS):
            await self._throttle()
            try:
                response = await self._client.request(
                    method, url, headers=headers, **kwargs
                )
                if response.status_code == 429:
                    if attempt < _MAX_ATTEMPTS - 1:
                        await asyncio.sleep(2**attempt)
                        continue
                    response.raise_for_status()
                response.raise_for_status()
                return response.json()
            except httpx.TimeoutException:
                if attempt < _MAX_ATTEMPTS - 1:
                    await asyncio.sleep(2**attempt)
                    continue
                raise
        raise RuntimeError("retry exhausted")

    async def get_minute_ohlcv(self, ticker: str) -> list[dict]:
        """현재 시점 직전 30개 1분봉 OHLCV (TR_ID FHKST03010200).

        Returns: [{datetime, open, high, low, close, volume}, ...] 시간 오름차순.
        """
        path = "/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
        params = {
            "FID_ETC_CLS_CODE": "",
            "FID_COND_MRKT_DIV_CODE": "J",
            "FID_INPUT_ISCD": ticker,
            "FID_INPUT_HOUR_1": datetime.now(KST).strftime("%H%M%S"),
            "FID_PW_DATA_INCU_YN": "N",
        }
        raw = await self._request_with_retry(
            "GET", path, tr_id="FHKST03010200", params=params,
        )
        output2 = raw.get("output2", [])
        bars = []
        for row in output2:
            try:
                date = row["stck_bsop_date"]
                hhmmss = row["stck_cntg_hour"]
                dt = datetime.strptime(f"{date} {hhmmss}", "%Y%m%d %H%M%S").replace(tzinfo=KST)
                bars.append({
                    "datetime": dt.isoformat(),
                    "open": int(row["stck_oprc"]),
                    "high": int(row["stck_hgpr"]),
                    "low": int(row["stck_lwpr"]),
                    "close": int(row["stck_prpr"]),
                    "volume": int(row["cntg_vol"]),
                })
            except (KeyError, ValueError) as e:
                logger.warning("skip malformed bar for %s: %r", ticker, e)
        # KIS returns descending; reverse to ascending
        bars.reverse()
        return bars

    async def get_asking_price(self, ticker: str) -> dict:
        """현재 호가 + 매수/매도 잔량 (TR_ID FHKST01010200)."""
        path = "/uapi/domestic-stock/v1/quotations/inquire-asking-price-exp-ccn"
        params = {
            "FID_COND_MRKT_DIV_CODE": "J",
            "FID_INPUT_ISCD": ticker,
        }
        raw = await self._request_with_retry(
            "GET", path, tr_id="FHKST01010200", params=params,
        )
        output1 = raw.get("output1", {})
        bid_total = int(output1.get("total_bidp_rsqn", 0))
        ask_total = int(output1.get("total_askp_rsqn", 0))
        total = bid_total + ask_total
        bid_ratio = bid_total / total if total > 0 else 0.0
        current_price = int(output1.get("stck_prpr", 0))
        return {
            "bid_total": bid_total,
            "ask_total": ask_total,
            "bid_ratio": bid_ratio,
            "current_price": current_price,
            "as_of": datetime.now(KST).isoformat(),
        }
  • Step 4: Run tests to verify PASS
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_kis_client.py -v 2>&1 | tail -15

Expected: 4 passed.

Full suite:

python -m pytest signal_v2/tests -q 2>&1 | tail -3

Expected: 26 passed (22 + 4).

If failures relate to KIS TR_ID or field names, refer to V1 signal_v1/modules/services/kis.py and adjust.

  • Step 5: Commit
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/kis_client.py signal_v2/tests/test_kis_client.py
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3a): kis_client REST + 4 integration tests

KISClient: 분봉 (FHKST03010200) + 호가 (FHKST01010200) async REST.
V1 토큰 파일 (signal_v1/data/kis_token.json) read-only 공유, mtime
캐시. 초당 2회 throttle. exponential retry.

26 tests pass (Phase 2 19 + Task 2 NXT 3 + Task 3 KIS REST 4).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"

Task 4: KIS WebSocket client + 4 통합 테스트

Files:

  • Create: web-ai/signal_v2/kis_websocket.py
  • Create: web-ai/signal_v2/tests/test_kis_websocket.py

이 task 는 가장 무거움. WebSocket mock 은 websockets lib 의 serve 사용 또는 AsyncMock. KIS 호가 메시지 형식은 | + ^ 구분.

KIS 호가 메시지 raw 형식 (실제 더 길지만 핵심 필드만):

0|H0STASP0|001|005930^091500^78500^...^total_ask^total_bid^...
  • 0 = 수신 데이터

  • H0STASP0 = TR_ID (실시간 호가)

  • 001 = 데이터 개수

  • 그 다음 ^ 구분 필드: [0]종목코드, [1]시간 HHMMSS, [2]현재가, ..., 매수/매도 잔량 인덱스는 KIS 공식 문서 참조 (KIS_SETUP.md / V1 코드).

  • Step 1: Write 4 failing tests

Create web-ai/signal_v2/tests/test_kis_websocket.py:

"""Tests for KISWebSocket."""
import asyncio
import json
from unittest.mock import AsyncMock, MagicMock

import httpx
import pytest
import respx

from signal_v2.kis_websocket import KISWebSocket


BASE_REST = "https://openapivts.koreainvestment.com:29443"


@respx.mock
async def test_fetch_approval_key_via_oauth_endpoint():
    """POST /oauth2/Approval → approval_key 추출."""
    respx.post(f"{BASE_REST}/oauth2/Approval").mock(
        return_value=httpx.Response(200, json={"approval_key": "test-approval-key-xyz"})
    )
    ws = KISWebSocket(app_key="k", app_secret="s", is_virtual=True)
    key = await ws._fetch_approval_key()
    assert key == "test-approval-key-xyz"
    assert ws._approval_key == "test-approval-key-xyz"


async def test_subscribe_sends_h0stasp0_message(monkeypatch):
    """subscribe() → WebSocket 으로 H0STASP0 구독 메시지 전송."""
    sent_messages = []
    mock_ws = AsyncMock()
    mock_ws.send = AsyncMock(side_effect=lambda m: sent_messages.append(m))

    ws = KISWebSocket(app_key="k", app_secret="s", is_virtual=True)
    ws._approval_key = "test-key"
    ws._ws = mock_ws
    await ws.subscribe("005930")
    assert ws._subscriptions == {"005930"}
    assert len(sent_messages) == 1
    # subscribe 메시지는 JSON 구조 (KIS 명세)
    msg = json.loads(sent_messages[0])
    assert msg["header"]["tr_type"] == "1"  # subscribe
    assert msg["body"]["input"]["tr_id"] == "H0STASP0"
    assert msg["body"]["input"]["tr_key"] == "005930"


def test_parse_asking_price_extracts_bid_ask_totals():
    """KIS raw '0|H0STASP0|001|...' → (ticker, dict)."""
    ws = KISWebSocket(app_key="k", app_secret="s", is_virtual=True)
    # 모의 호가 메시지 (실제 형식은 더 복잡 — KIS_SETUP.md 참조).
    # H0STASP0 의 응답: '0|H0STASP0|001|<data>'
    # data: ticker^time^current_price^... (more fields)
    # 본 테스트는 implementer 가 KIS 명세 보고 인덱스 맞춰서 구현하도록.
    # 우리는 dict 출력 형식만 검증.
    sample_raw = (
        "0|H0STASP0|001|005930^091500^78500^"
        + "^".join(["0"] * 40)  # padding (실제는 50+ 필드)
        + "^1000^2000"  # 마지막 두 필드: total_askp_rsqn / total_bidp_rsqn (예시)
    )
    # 인덱스가 실제 KIS 명세와 정확히 맞을 때 parse 성공
    result = ws._parse_asking_price(sample_raw)
    if result is None:
        # 인덱스 mismatch — implementer 가 KIS 명세 참조해서 fix
        pytest.fail("parse_asking_price returned None — check field indices vs KIS spec")
    ticker, data = result
    assert ticker == "005930"
    assert "bid_total" in data
    assert "ask_total" in data
    assert "bid_ratio" in data


async def test_reconnect_on_disconnect_with_backoff(monkeypatch):
    """연결 끊김 → exponential backoff retry."""
    sleep_calls = []
    async def fake_sleep(s): sleep_calls.append(s)
    monkeypatch.setattr("asyncio.sleep", fake_sleep)

    # Implementer: ws 내부에 reconnect logic 구현 시
    # _connect_with_backoff() 메서드 또는 내부 reconnect loop 가
    # exponential 1s → 2s → 4s sleep 호출하는지 검증.
    ws = KISWebSocket(app_key="k", app_secret="s", is_virtual=True)
    # Mock _connect to fail twice then succeed
    call_count = [0]
    async def fake_connect():
        call_count[0] += 1
        if call_count[0] < 3:
            raise ConnectionError("fake disconnect")
        return AsyncMock()
    monkeypatch.setattr(ws, "_connect", fake_connect, raising=False)

    result = await ws._connect_with_backoff()
    assert call_count[0] == 3  # 2 fails + 1 success
    assert sleep_calls[:2] == [1, 2]  # exponential
  • Step 2: Run tests to verify FAIL
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_kis_websocket.py -v 2>&1 | tail -10

Expected: ImportError.

  • Step 3: Implement kis_websocket.py

Create web-ai/signal_v2/kis_websocket.py:

"""KIS WebSocket — approval_key + 실시간 호가 구독."""
from __future__ import annotations
import asyncio
import json
import logging
from datetime import datetime
from typing import Callable
from zoneinfo import ZoneInfo

import httpx
import websockets

logger = logging.getLogger(__name__)
KST = ZoneInfo("Asia/Seoul")

# KIS 호가 메시지의 필드 인덱스 (KIS 공식 spec 참조 — 실제 인덱스 운영 환경 확인 필요)
# H0STASP0 응답: ticker | time | current_price | ... | total_ask_rsqn | total_bid_rsqn | ...
# 실 운영 환경에서 정확한 인덱스 필드 확인 필요 (KIS docs / V1 활용 미 가능)
_ASKING_TICKER_IDX = 0
_ASKING_TIME_IDX = 1
_ASKING_CURRENT_PRICE_IDX = 2
_ASKING_TOTAL_ASK_IDX = -2   # 끝에서 두번째 (가정)
_ASKING_TOTAL_BID_IDX = -1   # 마지막 (가정)


class KISWebSocket:
    """KIS WebSocket client. approval_key 발급 + 호가 실시간."""

    def __init__(self, app_key: str, app_secret: str, is_virtual: bool):
        self._app_key = app_key
        self._app_secret = app_secret
        self._is_virtual = is_virtual
        self._base_rest = (
            "https://openapivts.koreainvestment.com:29443" if is_virtual
            else "https://openapi.koreainvestment.com:9443"
        )
        self._ws_url = (
            "ws://ops.koreainvestment.com:31000" if is_virtual
            else "ws://ops.koreainvestment.com:21000"
        )
        self._approval_key: str | None = None
        self._ws: websockets.WebSocketClientProtocol | None = None
        self._subscriptions: set[str] = set()
        self._on_asking_price: Callable[[str, dict], None] | None = None
        self._recv_task: asyncio.Task | None = None
        self._shutdown = asyncio.Event()

    async def _fetch_approval_key(self) -> str:
        async with httpx.AsyncClient(timeout=10.0) as client:
            response = await client.post(
                f"{self._base_rest}/oauth2/Approval",
                json={
                    "grant_type": "client_credentials",
                    "appkey": self._app_key,
                    "secretkey": self._app_secret,
                },
            )
            response.raise_for_status()
            data = response.json()
            self._approval_key = data["approval_key"]
            return self._approval_key

    async def _connect(self) -> websockets.WebSocketClientProtocol:
        return await websockets.connect(self._ws_url)

    async def _connect_with_backoff(self) -> websockets.WebSocketClientProtocol:
        """연결 시도 with exponential backoff (1s → 2s → 4s → max 30s)."""
        for attempt in range(10):
            try:
                ws = await self._connect()
                return ws
            except Exception as e:
                wait = min(2**attempt, 30)
                logger.warning("KIS WebSocket connect failed (attempt %d): %r — retrying in %ds", attempt + 1, e, wait)
                await asyncio.sleep(wait)
        raise RuntimeError("KIS WebSocket connect exhausted retries")

    async def start(
        self, tickers: list[str],
        on_asking_price: Callable[[str, dict], None],
    ) -> None:
        if self._approval_key is None:
            await self._fetch_approval_key()
        self._on_asking_price = on_asking_price
        self._ws = await self._connect_with_backoff()
        for ticker in tickers:
            await self.subscribe(ticker)
        self._recv_task = asyncio.create_task(self._receive_loop())

    async def subscribe(self, ticker: str) -> None:
        if self._ws is None or self._approval_key is None:
            raise RuntimeError("KIS WebSocket not started")
        msg = json.dumps({
            "header": {
                "approval_key": self._approval_key,
                "custtype": "P",
                "tr_type": "1",  # subscribe
                "content-type": "utf-8",
            },
            "body": {
                "input": {"tr_id": "H0STASP0", "tr_key": ticker},
            },
        })
        await self._ws.send(msg)
        self._subscriptions.add(ticker)

    async def unsubscribe(self, ticker: str) -> None:
        if self._ws is None or self._approval_key is None:
            return
        msg = json.dumps({
            "header": {
                "approval_key": self._approval_key,
                "custtype": "P",
                "tr_type": "2",  # unsubscribe
                "content-type": "utf-8",
            },
            "body": {
                "input": {"tr_id": "H0STASP0", "tr_key": ticker},
            },
        })
        await self._ws.send(msg)
        self._subscriptions.discard(ticker)

    async def close(self) -> None:
        self._shutdown.set()
        if self._recv_task is not None:
            self._recv_task.cancel()
            try:
                await self._recv_task
            except asyncio.CancelledError:
                pass
        if self._ws is not None:
            await self._ws.close()

    async def _receive_loop(self) -> None:
        while not self._shutdown.is_set():
            try:
                raw = await self._ws.recv()
            except websockets.ConnectionClosed:
                logger.warning("KIS WebSocket closed — reconnecting")
                self._ws = await self._connect_with_backoff()
                for ticker in list(self._subscriptions):
                    await self.subscribe(ticker)
                continue
            if not isinstance(raw, str):
                continue
            parsed = self._parse_asking_price(raw)
            if parsed is not None and self._on_asking_price is not None:
                ticker, data = parsed
                try:
                    self._on_asking_price(ticker, data)
                except Exception:
                    logger.exception("on_asking_price callback failed")

    def _parse_asking_price(self, raw: str) -> tuple[str, dict] | None:
        """KIS H0STASP0 raw → (ticker, asking_price dict)."""
        try:
            parts = raw.split("|")
            if len(parts) < 4 or parts[1] != "H0STASP0":
                return None
            fields = parts[3].split("^")
            ticker = fields[_ASKING_TICKER_IDX]
            current_price = int(fields[_ASKING_CURRENT_PRICE_IDX]) if fields[_ASKING_CURRENT_PRICE_IDX].isdigit() else 0
            ask_total = int(fields[_ASKING_TOTAL_ASK_IDX]) if fields[_ASKING_TOTAL_ASK_IDX].lstrip("-").isdigit() else 0
            bid_total = int(fields[_ASKING_TOTAL_BID_IDX]) if fields[_ASKING_TOTAL_BID_IDX].lstrip("-").isdigit() else 0
            total = bid_total + ask_total
            return ticker, {
                "bid_total": bid_total,
                "ask_total": ask_total,
                "bid_ratio": bid_total / total if total > 0 else 0.0,
                "current_price": current_price,
                "as_of": datetime.now(KST).isoformat(),
            }
        except (IndexError, ValueError) as e:
            logger.warning("parse_asking_price failed: %r", e)
            return None

중요 노트: 위 _ASKING_*_IDX 인덱스는 추정값 입니다. 운영 환경에서 정확한 인덱스 확인 필요:

  • KIS 공식 문서 (https://apiportal.koreainvestment.com/)
  • 또는 운영 시 raw 메시지 로그 캡처 후 필드 인덱스 매핑

본 plan 의 테스트는 인덱스가 -2/-1 가정. 실 KIS 호가에서 필드 50+ 개라 정확한 인덱스 운영 검증 필요. parse fail 시 WARNING 로그 + skip (위험 매트릭스 §9 명시).

  • Step 4: Run tests to verify PASS
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_kis_websocket.py -v 2>&1 | tail -15

Expected: 4 passed.

만약 test_parse_asking_price_extracts_bid_ask_totals 가 실패하면 — 인덱스 가정과 sample raw 의 padding 수 mismatch. 두 가지 옵션:

  • a) sample raw 의 ["0"]*40 부분을 인덱스에 맞춰 조정 (테스트만 fix)
  • b) _parse_asking_price 의 인덱스를 명세 확인 후 조정 (코드 fix)

선택은 implementer 의 판단. 양쪽 모두 가능.

Full suite:

python -m pytest signal_v2/tests -q 2>&1 | tail -3

Expected: 30 passed (26 + 4).

  • Step 5: Commit
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/kis_websocket.py signal_v2/tests/test_kis_websocket.py
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3a): kis_websocket + 4 integration tests

KISWebSocket: approval_key (POST /oauth2/Approval) + H0STASP0 호가
실시간 subscribe + receive loop with reconnect. exponential backoff
(1s→2s→4s→max 30s) + subscription 재등록.

_parse_asking_price 의 필드 인덱스는 운영 검증 후 조정 필요
(KIS 공식 명세 또는 raw 로그 매핑).

30 tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"

Task 5: pull_worker 확장 + 2 통합 테스트

Files:

  • Modify: web-ai/signal_v2/pull_worker.py

  • Create: web-ai/signal_v2/tests/test_pull_worker.py

  • Step 1: Write 2 failing tests

Create web-ai/signal_v2/tests/test_pull_worker.py:

"""Tests for pull_worker (Phase 3a additions)."""
from collections import deque
from unittest.mock import AsyncMock, MagicMock

import pytest

from signal_v2.state import PollState


async def test_minute_polling_cycle_updates_state_minute_bars():
    """KIS REST mock 의 분봉 데이터가 state.minute_bars[ticker] deque 에 들어간다."""
    from signal_v2.pull_worker import _run_kis_minute_cycle

    state = PollState()
    state.portfolio = {"holdings": [{"ticker": "005930"}, {"ticker": "000660"}]}
    state.screener_preview = {
        "items": [{"ticker": "005930"}, {"ticker": "035720"}]  # 005930 dup
    }

    kis_client_mock = MagicMock()
    kis_client_mock.get_minute_ohlcv = AsyncMock(side_effect=[
        [{"datetime": "2026-05-18T09:00:00+09:00", "open": 78000,
          "high": 78500, "low": 77900, "close": 78300, "volume": 12345}],
        [{"datetime": "2026-05-18T09:00:00+09:00", "open": 180000,
          "high": 181000, "low": 179800, "close": 180500, "volume": 5000}],
        [{"datetime": "2026-05-18T09:00:00+09:00", "open": 51000,
          "high": 51200, "low": 50800, "close": 51100, "volume": 8000}],
    ])
    kis_client_mock.get_asking_price = AsyncMock(return_value={
        "bid_total": 600, "ask_total": 400, "bid_ratio": 0.6,
        "current_price": 51100, "as_of": "2026-05-18T09:00:30+09:00",
    })

    await _run_kis_minute_cycle(kis_client_mock, state)

    # 3 unique tickers (005930, 000660, 035720)
    assert "005930" in state.minute_bars
    assert "000660" in state.minute_bars
    assert "035720" in state.minute_bars
    assert len(state.minute_bars["005930"]) >= 1
    # asking price 만 screener-only ticker (035720) 에 들어가야 함 (portfolio 는 WebSocket)
    assert "035720" in state.asking_price


def test_websocket_message_updates_state_asking_price():
    """WebSocket callback 가 state.asking_price 갱신."""
    from signal_v2.pull_worker import make_asking_price_callback

    state = PollState()
    cb = make_asking_price_callback(state)
    cb("005930", {"bid_total": 1000, "ask_total": 800, "bid_ratio": 0.555,
                  "current_price": 78500, "as_of": "2026-05-18T10:00:00+09:00"})
    assert state.asking_price["005930"]["bid_total"] == 1000
    assert "asking_price/005930" in state.last_updated
  • Step 2: Run tests to verify FAIL
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_pull_worker.py -v 2>&1 | tail -10

Expected: ImportError or AttributeError.

  • Step 3: Implement pull_worker.py additions

Edit web-ai/signal_v2/pull_worker.py — keep existing poll_loop and _run_polling_cycle, then ADD:

from collections import deque
from signal_v2.kis_client import KISClient  # 추가 import


async def _run_kis_minute_cycle(kis_client: KISClient, state: PollState) -> None:
    """KIS 분봉 + 호가 fetch + state 갱신.

    - 분봉: portfolio + screener Top-N union 종목 모두
    - 호가 (REST): screener-only 종목 (portfolio 는 WebSocket 으로 들어옴)
    """
    portfolio_tickers = _portfolio_tickers(state)
    screener_tickers = _screener_tickers(state)
    all_tickers = list(set(portfolio_tickers) | set(screener_tickers))

    # 분봉 fetch (병렬)
    minute_results = await asyncio.gather(*[
        kis_client.get_minute_ohlcv(t) for t in all_tickers
    ], return_exceptions=True)
    now_iso = datetime.now(KST).isoformat()
    for ticker, result in zip(all_tickers, minute_results):
        if isinstance(result, list):
            buf = state.minute_bars.setdefault(ticker, deque(maxlen=60))
            buf.extend(result)
            state.last_updated[f"minute_bars/{ticker}"] = now_iso
        else:
            state.fetch_errors[f"minute_bars/{ticker}"] = (
                state.fetch_errors.get(f"minute_bars/{ticker}", 0) + 1
            )

    # 호가 fetch (REST) — screener-only
    screener_only = list(set(screener_tickers) - set(portfolio_tickers))
    asking_results = await asyncio.gather(*[
        kis_client.get_asking_price(t) for t in screener_only
    ], return_exceptions=True)
    for ticker, result in zip(screener_only, asking_results):
        if isinstance(result, dict):
            state.asking_price[ticker] = result
            state.last_updated[f"asking_price/{ticker}"] = now_iso


def make_asking_price_callback(state: PollState):
    """KIS WebSocket on_asking_price callback factory."""
    def _cb(ticker: str, data: dict) -> None:
        state.asking_price[ticker] = data
        state.last_updated[f"asking_price/{ticker}"] = datetime.now(KST).isoformat()
    return _cb


def _portfolio_tickers(state: PollState) -> list[str]:
    if state.portfolio is None:
        return []
    return [h["ticker"] for h in state.portfolio.get("holdings", []) if "ticker" in h]


def _screener_tickers(state: PollState) -> list[str]:
    if state.screener_preview is None:
        return []
    return [i["ticker"] for i in state.screener_preview.get("items", []) if "ticker" in i]

Also ADD to _run_polling_cycle signature an optional kis_client param:

async def _run_polling_cycle(
    client: StockClient, state: PollState,
    kis_client: KISClient | None = None,
) -> None:
    # ... existing 3-endpoint stock fetch ...

    # KIS 분봉 + 호가 (kis_client 주어졌을 때만)
    if kis_client is not None:
        try:
            await _run_kis_minute_cycle(kis_client, state)
        except Exception:
            logger.exception("kis minute cycle failed")

And update poll_loop signature to accept kis_client:

async def poll_loop(
    client: StockClient, state: PollState, shutdown: asyncio.Event,
    kis_client: KISClient | None = None,
) -> None:
    # ... existing while loop ...
    # call _run_polling_cycle(client, state, kis_client=kis_client)
  • Step 4: Run tests to verify PASS
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_pull_worker.py -v 2>&1 | tail -10

Expected: 2 passed.

Full suite:

python -m pytest signal_v2/tests -q 2>&1 | tail -3

Expected: 32 passed (30 + 2).

  • Step 5: Commit
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/pull_worker.py signal_v2/tests/test_pull_worker.py
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3a): pull_worker KIS minute cycle + WS callback

_run_kis_minute_cycle: portfolio + screener union 종목 분봉 fetch +
screener-only 종목 호가 REST fetch. WebSocket callback factory.

poll_loop / _run_polling_cycle 에 kis_client optional param 추가
(Phase 5 까지 None 도 정상 동작).

32 tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"

Task 6: main.py KIS lifespan 통합 + 1 케이스

Files:

  • Modify: web-ai/signal_v2/main.py

  • Modify: web-ai/signal_v2/tests/test_main.py

  • Step 1: Write 1 failing test for KIS lifespan

Append to web-ai/signal_v2/tests/test_main.py:

def test_startup_warns_if_kis_app_key_missing(monkeypatch, caplog):
    """KIS_APP_KEY 미설정 시 startup WARNING (KIS 호출 disabled)."""
    monkeypatch.setenv("STOCK_API_URL", "https://test.stock.local")
    monkeypatch.setenv("WEBAI_API_KEY", "test-secret")
    monkeypatch.delenv("KIS_APP_KEY", raising=False)

    import importlib
    from signal_v2 import config as cfg
    importlib.reload(cfg)
    from signal_v2 import main as main_mod
    importlib.reload(main_mod)
    with caplog.at_level("WARNING", logger="signal_v2.main"):
        with TestClient(main_mod.app) as client:
            client.get("/health")
        assert any("KIS_APP_KEY" in rec.message for rec in caplog.records)
  • Step 2: Run test to verify FAIL
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_main.py::test_startup_warns_if_kis_app_key_missing -v 2>&1 | tail -10

Expected: FAIL (no such warning yet).

  • Step 3: Update main.py to integrate KIS clients

Edit web-ai/signal_v2/main.py — augment AppContext + lifespan:

from signal_v2.kis_client import KISClient
from signal_v2.kis_websocket import KISWebSocket
from signal_v2.pull_worker import make_asking_price_callback


class AppContext:
    client: StockClient | None = None
    dedup: SignalDedup | None = None
    shutdown: asyncio.Event | None = None
    poll_task: asyncio.Task | None = None
    kis_client: KISClient | None = None
    kis_ws: KISWebSocket | None = None


# (keep _ctx = AppContext() at module level)


@asynccontextmanager
async def lifespan(app: FastAPI):
    settings = get_settings()
    if not settings.webai_api_key:
        logger.warning("WEBAI_API_KEY not configured — stock API calls will fail with 401")
    if not settings.kis_app_key:
        logger.warning("KIS_APP_KEY not configured — KIS REST/WebSocket disabled")

    _ctx.client = StockClient(settings.stock_api_url, settings.webai_api_key)
    _ctx.dedup = SignalDedup(settings.db_path)
    _ctx.shutdown = asyncio.Event()

    # KIS only if app_key configured
    if settings.kis_app_key:
        _ctx.kis_client = KISClient(
            app_key=settings.kis_app_key,
            app_secret=settings.kis_app_secret,
            account=settings.kis_account,
            is_virtual=settings.kis_is_virtual,
            v1_token_path=settings.v1_token_path,
        )
        _ctx.kis_ws = KISWebSocket(
            app_key=settings.kis_app_key,
            app_secret=settings.kis_app_secret,
            is_virtual=settings.kis_is_virtual,
        )
        # Subscribe portfolio holdings (if any)
        try:
            portfolio = await _ctx.client.get_portfolio()
            tickers = [h["ticker"] for h in portfolio.get("holdings", []) if "ticker" in h]
            cb = make_asking_price_callback(state_mod.state)
            await _ctx.kis_ws.start(tickers, cb)
        except Exception:
            logger.exception("KIS WebSocket startup failed — continuing without realtime asking_price")

    _ctx.poll_task = asyncio.create_task(
        poll_loop(
            _ctx.client, state_mod.state, _ctx.shutdown,
            kis_client=_ctx.kis_client,
        )
    )

    yield

    # Shutdown
    if _ctx.shutdown is not None:
        _ctx.shutdown.set()
    if _ctx.poll_task is not None:
        try:
            await asyncio.wait_for(_ctx.poll_task, timeout=5.0)
        except asyncio.TimeoutError:
            _ctx.poll_task.cancel()
            try:
                await _ctx.poll_task
            except asyncio.CancelledError:
                pass
    if _ctx.kis_ws is not None:
        await _ctx.kis_ws.close()
    if _ctx.kis_client is not None:
        await _ctx.kis_client.close()
    if _ctx.client is not None:
        await _ctx.client.close()
  • Step 4: Run tests to verify PASS
cd /c/Users/jaeoh/Desktop/workspace/web-ai
python -m pytest signal_v2/tests/test_main.py -v 2>&1 | tail -15

Expected: 3 passed (2 기존 + 1 신규).

Full suite:

python -m pytest signal_v2/tests -q 2>&1 | tail -3

Expected: 33 passed (32 + 1).

  • Step 5: Commit
cd /c/Users/jaeoh/Desktop/workspace/web-ai
git add signal_v2/main.py signal_v2/tests/test_main.py
git commit -m "$(cat <<'EOF'
feat(signal_v2-phase3a): main.py lifespan integrates KIS client + WS

AppContext extended with kis_client + kis_ws. lifespan:
- If KIS_APP_KEY set: create KISClient + KISWebSocket, fetch portfolio,
  subscribe WebSocket H0STASP0 for holdings.
- If unset: WARNING log, signal_v2 still serves /health (no KIS data).
- Shutdown closes kis_ws → kis_client → stock client in order.

33 tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"

Task 7: 사용자 수동 — .env 갱신 + manual smoke + push

This task requires user action.

  • Step 1: Update .env

User adds to C:\Users\jaeoh\Desktop\workspace\web-ai\.env:

# KIS API (Phase 3a, 2026-05-16)
KIS_APP_KEY=<from KIS portal>
KIS_APP_SECRET=<from KIS portal>
KIS_ACCOUNT=<8자리-2자리 형식, e.g. 50000000-01>
KIS_IS_VIRTUAL=true
# V1 token path (default: ../signal_v1/data/kis_token.json)
# V1_TOKEN_PATH=

User reference: web-ai/signal_v1/KIS_SETUP.md 의 정확한 키 발급 + 계좌 번호 형식.

  • Step 2: V1 봇 정상 가동 확인 (토큰 발급 책임)
cd C:\Users\jaeoh\Desktop\workspace\web-ai
.\start.bat

콘솔 로그에 KIS auth 성공 + signal_v1/data/kis_token.json 갱신 확인.

V1 봇은 계속 실행 또는 토큰 발급 후 종료. signal_v2 가 토큰 파일 read 만.

  • Step 3: signal_v2 시작
cd C:\Users\jaeoh\Desktop\workspace\web-ai\signal_v2
.\start.bat

기대 출력 (수십 줄):

  • Uvicorn running on http://0.0.0.0:8001
  • poll_loop started
  • KIS WebSocket connected (V1 토큰 로딩 성공 시)

만약 V1 token file missing 에러 → V1 토큰 파일 경로 확인 + V1 봇 가동 후 재시도.

  • Step 4: /health smoke
curl http://localhost:8001/health

기대: 200 JSON. 처음에는 last_poll 의 minute_bars / asking_price 없음 — 첫 cycle 까지 대기.

  • Step 5: 분봉 + 호가 검증 (장중 또는 NXT 시간대)

장중 / NXT 시간대 진입 후 60-300초 대기 → 다시 /health 호출:

curl http://localhost:8001/health

기대 응답의 last_poll:

  • minute_bars/005930 등 ticker별 timestamp 표시
  • asking_price/<screener-only-ticker> 도 표시

장외 시간대라면:

  • KIS REST 호출은 가능하나 분봉 데이터는 직전 거래일 마지막 분봉 반환

  • WebSocket 호가는 영업 시간 외 데이터 안 옴

  • Step 6: V1 무영향 검증

V1 봇 콘솔에서 정상 동작 + Telegram /status 응답 확인. V2 가 토큰 파일 read 만 했으므로 V1 무영향 검증.

  • Step 7: git push
cd C:\Users\jaeoh\Desktop\workspace\web-ai
git push

Gitea 자격증명 시 사용자 수동.

  • Step 8: 결과 보고

  • Step 3 (signal_v2 시작): PASS / FAIL — 에러 메시지 공유

  • Step 4 (/health): PASS / FAIL

  • Step 5 (분봉 + 호가): PASS (state 갱신 확인) / FAIL — KIS 응답 또는 parse 이슈 공유

  • Step 6 (V1 무영향): PASS / FAIL

  • Step 7 (push): PASS / FAIL

만약 Step 5 의 호가 parse 가 fail → _parse_asking_price 의 필드 인덱스 문제. KIS raw 메시지 sample 캡처 후 인덱스 조정 (별도 fix slice).

전체 PASS 시 Phase 3a 완료 → Phase 3b (Chronos-2 추론 + 분봉 모멘텀) brainstorming.


Self-Review

1. Spec coverage:

Spec § 요구사항 Plan task
§2 포함 ① KIS REST client Task 3
§2 포함 ② KIS WebSocket Task 4
§2 포함 ③ pull_worker 확장 Task 5
§2 포함 ④ PollState 확장 Task 1
§2 포함 ⑤ scheduler NXT Task 2
§2 포함 ⑥ 13 테스트 Task 2 (3) + Task 3 (4) + Task 4 (4) + Task 5 (2) = 13
§3 변경 매트릭스 14 파일 Task 1-7 합산
§4 KISClient interface Task 3 Step 3
§5 KISWebSocket interface Task 4 Step 3
§6 PollState 확장 + pull_worker Task 1 + Task 5
§7 scheduler NXT Task 2
§8 13 테스트 케이스 Task 2-5
§11 DoD 13 항목 Task 1-7

No gaps.

2. Placeholder scan: No "TBD". KIS WebSocket _parse_asking_price 의 필드 인덱스는 "운영 검증 후 조정" 으로 명시 — implementer 가 운영 sample 으로 확정. 이는 placeholder 가 아니라 운영 환경 의존 파라미터.

3. Type consistency:

  • KISClient.__init__(app_key, app_secret, account, is_virtual, v1_token_path, timeout=10.0) consistent
  • KISClient.get_minute_ohlcv(ticker) -> list[dict] / get_asking_price(ticker) -> dict / close() consistent
  • KISWebSocket.__init__(app_key, app_secret, is_virtual) consistent
  • KISWebSocket.start(tickers, on_asking_price) / subscribe(ticker) / unsubscribe(ticker) / close() consistent
  • PollState 필드 (minute_bars + asking_price) consistent across Task 1 / 5 / 6
  • _next_interval(now) -> float / _is_market_day(now) -> bool / _is_polling_window(now) -> bool consistent
  • env var names (KIS_APP_KEY / KIS_APP_SECRET / KIS_ACCOUNT / KIS_IS_VIRTUAL / V1_TOKEN_PATH) consistent

Plan passes self-review.