"""분산 워커 heartbeat — worker::heartbeat SET (TTL). Global Constraints 계약 1.""" from __future__ import annotations import asyncio, datetime as dt, json, logging, os logger = logging.getLogger(__name__) DEFAULT_INTERVAL = int(os.getenv("HEARTBEAT_INTERVAL", "15")) DEFAULT_TTL = int(os.getenv("HEARTBEAT_TTL", "45")) class WorkerStats: """worker_loop가 갱신, heartbeat_loop가 읽는 가변 카운터.""" def __init__(self): self.busy = False self.jobs_done = 0 self.jobs_failed = 0 self.last_job_at = None # ISO str | None def utc_now_iso() -> str: return dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def build_payload(name: str, kind: str, state: str, stats: WorkerStats, extra: dict | None = None) -> str: payload = { "name": name, "kind": kind, "state": state, "ts": utc_now_iso(), "last_job_at": stats.last_job_at, "jobs_done": stats.jobs_done, "jobs_failed": stats.jobs_failed, } if extra: payload.update(extra) return json.dumps(payload) async def render_state(redis, stats: WorkerStats, paused_key: str = "queue:paused") -> str: if await redis.get(paused_key) == b"1": return "paused" return "busy" if stats.busy else "idle" async def heartbeat_loop(redis, name, kind, stats, *, interval=DEFAULT_INTERVAL, ttl=DEFAULT_TTL, paused_key="queue:paused", state_fn=None): key = f"worker:{name}:heartbeat" logger.info("heartbeat 시작 name=%s ttl=%ds", name, ttl) while True: try: if state_fn is not None: state, extra = await state_fn(redis, stats) else: state, extra = await render_state(redis, stats, paused_key), None await redis.set(key, build_payload(name, kind, state, stats, extra), ex=ttl) except asyncio.CancelledError: raise except Exception: logger.exception("heartbeat 발신 실패 name=%s", name) await asyncio.sleep(interval)