- ai_trade/heartbeat.py: build_trader_payload() + heartbeat_loop() 자체 미니 헬퍼 (Windows 호스트 실행이라 _shared import 경로 달라 독립 구현, 계약은 동일) - ai_trade/main.py: lifespan에 hb_task spawn + shutdown 시 cancel state_fn = scheduler._is_market_day & _is_polling_window(KST now) 조합 signals = len(state.signals) 실시간 주입 - requirements.txt: redis>=5.0 추가 - ai_trade/tests/test_heartbeat.py: build_trader_payload 3케이스 TDD 검증 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
39 lines
1.1 KiB
Python
39 lines
1.1 KiB
Python
"""Tests for ai_trade heartbeat payload builder."""
|
|
import json
|
|
|
|
import pytest
|
|
|
|
|
|
def test_trader_payload_market_open():
|
|
from ai_trade.heartbeat import build_trader_payload
|
|
|
|
p = json.loads(build_trader_payload("market_open", signals=2))
|
|
assert p["name"] == "ai_trade"
|
|
assert p["kind"] == "trader"
|
|
assert p["state"] == "market_open"
|
|
assert p["ts"].endswith("Z")
|
|
assert p["jobs_done"] == 2
|
|
|
|
|
|
def test_trader_payload_market_closed():
|
|
from ai_trade.heartbeat import build_trader_payload
|
|
|
|
p = json.loads(build_trader_payload("market_closed"))
|
|
assert p["name"] == "ai_trade"
|
|
assert p["kind"] == "trader"
|
|
assert p["state"] == "market_closed"
|
|
assert p["jobs_done"] == 0
|
|
assert p["jobs_failed"] == 0
|
|
assert p["last_job_at"] is None
|
|
|
|
|
|
def test_trader_payload_ts_format():
|
|
"""ts 필드가 ISO 8601 UTC 형식 (YYYY-MM-DDTHH:MM:SSZ)인지 확인."""
|
|
from ai_trade.heartbeat import build_trader_payload
|
|
import re
|
|
|
|
p = json.loads(build_trader_payload("market_open"))
|
|
assert re.match(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z", p["ts"]), (
|
|
f"ts={p['ts']!r} does not match expected UTC format"
|
|
)
|