- ai_trade/heartbeat.py: build_trader_payload() + heartbeat_loop() 자체 미니 헬퍼 (Windows 호스트 실행이라 _shared import 경로 달라 독립 구현, 계약은 동일) - ai_trade/main.py: lifespan에 hb_task spawn + shutdown 시 cancel state_fn = scheduler._is_market_day & _is_polling_window(KST now) 조합 signals = len(state.signals) 실시간 주입 - requirements.txt: redis>=5.0 추가 - ai_trade/tests/test_heartbeat.py: build_trader_payload 3케이스 TDD 검증 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
58 lines
1.9 KiB
Python
58 lines
1.9 KiB
Python
"""ai_trade heartbeat — NAS Redis로 worker:ai_trade:heartbeat SET.
|
|
|
|
Global Constraints 계약 1: kind=trader, state=market_open|market_closed.
|
|
ai_trade는 Windows 호스트 실행이라 _shared import 경로가 달라 자체 미니 헬퍼로 둔다.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import datetime as dt
|
|
import json
|
|
import logging
|
|
import os
|
|
|
|
import redis.asyncio as aioredis
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
|
|
KEY = "worker:ai_trade:heartbeat"
|
|
INTERVAL = int(os.getenv("HEARTBEAT_INTERVAL", "15"))
|
|
TTL = int(os.getenv("HEARTBEAT_TTL", "45"))
|
|
|
|
|
|
def build_trader_payload(state: str, signals: int = 0) -> str:
|
|
"""JSON 문자열 반환. state: 'market_open' | 'market_closed'."""
|
|
return json.dumps({
|
|
"name": "ai_trade",
|
|
"kind": "trader",
|
|
"state": state,
|
|
"ts": dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
|
|
"last_job_at": None,
|
|
"jobs_done": signals,
|
|
"jobs_failed": 0,
|
|
})
|
|
|
|
|
|
async def heartbeat_loop(state_fn) -> None:
|
|
"""Redis에 HEARTBEAT_INTERVAL마다 SET EX TTL.
|
|
|
|
Args:
|
|
state_fn: () -> (state: str, signals: int). 호출자가 폴링 윈도우 판정 주입.
|
|
"""
|
|
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
|
|
try:
|
|
while True:
|
|
try:
|
|
state, signals = state_fn()
|
|
payload = build_trader_payload(state, signals)
|
|
await redis.set(KEY, payload, ex=TTL)
|
|
logger.debug("ai_trade heartbeat sent: state=%s signals=%d", state, signals)
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except Exception:
|
|
logger.exception("ai_trade heartbeat 실패 — 다음 주기에 재시도")
|
|
await asyncio.sleep(INTERVAL)
|
|
finally:
|
|
await redis.aclose()
|