Files
ai-trade/signal_v2/tests/test_kis_websocket.py
gahusb 163c9fb690 feat(signal_v2-phase3a): kis_websocket + 4 integration tests
KISWebSocket: approval_key (POST /oauth2/Approval) + H0STASP0 호가
실시간 subscribe + receive loop + exponential reconnect (1s → 2s → 4s
→ max 30s). _parse_asking_price 필드 인덱스 운영 검증 필요
(현재 가정: 마지막 2 필드가 ask_total/bid_total).

4 tests pass, 29 total.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 05:16:20 +09:00

95 lines
3.5 KiB
Python

"""Tests for KISWebSocket."""
import asyncio
import json
from unittest.mock import AsyncMock, MagicMock
import httpx
import pytest
import respx
from signal_v2.kis_websocket import KISWebSocket
BASE_REST = "https://openapivts.koreainvestment.com:29443"
@respx.mock
async def test_fetch_approval_key_via_oauth_endpoint():
"""POST /oauth2/Approval → approval_key 추출."""
respx.post(f"{BASE_REST}/oauth2/Approval").mock(
return_value=httpx.Response(200, json={"approval_key": "test-approval-key-xyz"})
)
ws = KISWebSocket(app_key="k", app_secret="s", is_virtual=True)
key = await ws._fetch_approval_key()
assert key == "test-approval-key-xyz"
assert ws._approval_key == "test-approval-key-xyz"
async def test_subscribe_sends_h0stasp0_message():
"""subscribe() → WebSocket 으로 H0STASP0 구독 메시지 전송."""
sent_messages = []
mock_ws = AsyncMock()
mock_ws.send = AsyncMock(side_effect=lambda m: sent_messages.append(m))
ws = KISWebSocket(app_key="k", app_secret="s", is_virtual=True)
ws._approval_key = "test-key"
ws._ws = mock_ws
await ws.subscribe("005930")
assert ws._subscriptions == {"005930"}
assert len(sent_messages) == 1
msg = json.loads(sent_messages[0])
assert msg["header"]["tr_type"] == "1" # subscribe
assert msg["body"]["input"]["tr_id"] == "H0STASP0"
assert msg["body"]["input"]["tr_key"] == "005930"
def test_parse_asking_price_extracts_bid_ask_totals():
"""KIS raw '0|H0STASP0|001|...' → (ticker, dict).
KIS 호가 메시지 형식 — KIS 공식 spec 의 정확한 필드 인덱스 운영 검증 필요.
본 테스트는 implementer 의 _parse_asking_price 구현 인덱스에 맞춰서 sample 작성.
"""
ws = KISWebSocket(app_key="k", app_secret="s", is_virtual=True)
# Build a sample raw message — implementer 가 _ASKING_TOTAL_BID/ASK 인덱스에
# 맞춰서 필드 배치하면 됨. 예: 마지막 2개 필드를 bid_total / ask_total 로.
fields = ["005930", "091500", "78500"] # ticker, time, current_price
fields.extend(["0"] * 40) # padding (KIS 의 실 필드 수 ~50개)
fields.append("400") # ask_total
fields.append("600") # bid_total
raw = f"0|H0STASP0|001|{'^'.join(fields)}"
result = ws._parse_asking_price(raw)
assert result is not None, "parse_asking_price returned None"
ticker, data = result
assert ticker == "005930"
assert "bid_total" in data
assert "ask_total" in data
assert "bid_ratio" in data
assert "current_price" in data
# bid_total=600, ask_total=400, bid_ratio=0.6
assert data["bid_total"] == 600
assert data["ask_total"] == 400
assert abs(data["bid_ratio"] - 0.6) < 1e-9
async def test_reconnect_on_disconnect_with_backoff(monkeypatch):
"""연결 끊김 → exponential backoff retry. _connect_with_backoff() 검증."""
sleep_calls = []
async def fake_sleep(s): sleep_calls.append(s)
monkeypatch.setattr("asyncio.sleep", fake_sleep)
ws = KISWebSocket(app_key="k", app_secret="s", is_virtual=True)
# Mock _connect to fail twice then succeed
call_count = [0]
async def fake_connect():
call_count[0] += 1
if call_count[0] < 3:
raise ConnectionError("fake disconnect")
return AsyncMock()
monkeypatch.setattr(ws, "_connect", fake_connect)
result = await ws._connect_with_backoff()
assert call_count[0] == 3 # 2 fails + 1 success
# exponential 1s, 2s
assert sleep_calls[:2] == [1, 2]