"""ai_trade heartbeat — NAS Redis로 worker:ai_trade:heartbeat SET. Global Constraints 계약 1: kind=trader, state=market_open|market_closed. ai_trade는 Windows 호스트 실행이라 _shared import 경로가 달라 자체 미니 헬퍼로 둔다. """ from __future__ import annotations import asyncio import datetime as dt import json import logging import os import redis.asyncio as aioredis logger = logging.getLogger(__name__) REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379") KEY = "worker:ai_trade:heartbeat" INTERVAL = int(os.getenv("HEARTBEAT_INTERVAL", "15")) TTL = int(os.getenv("HEARTBEAT_TTL", "45")) def build_trader_payload(state: str, signals: int = 0) -> str: """JSON 문자열 반환. state: 'market_open' | 'market_closed'.""" return json.dumps({ "name": "ai_trade", "kind": "trader", "state": state, "ts": dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "last_job_at": None, "jobs_done": signals, "jobs_failed": 0, }) async def heartbeat_loop(state_fn) -> None: """Redis에 HEARTBEAT_INTERVAL마다 SET EX TTL. Args: state_fn: () -> (state: str, signals: int). 호출자가 폴링 윈도우 판정 주입. """ redis = aioredis.from_url(REDIS_URL, decode_responses=False) try: while True: try: state, signals = state_fn() payload = build_trader_payload(state, signals) await redis.set(KEY, payload, ex=TTL) logger.debug("ai_trade heartbeat sent: state=%s signals=%d", state, signals) except asyncio.CancelledError: raise except Exception: logger.exception("ai_trade heartbeat 실패 — 다음 주기에 재시도") await asyncio.sleep(INTERVAL) finally: await redis.aclose()