From d1b9ff570df0b43b887e34d1810100414bcd6b47 Mon Sep 17 00:00:00 2001 From: gahusb Date: Wed, 1 Jul 2026 00:43:01 +0900 Subject: [PATCH] =?UTF-8?q?feat(=5Fshared):=20=EC=9B=8C=EC=BB=A4=20heartbe?= =?UTF-8?q?at=20=EB=AA=A8=EB=93=88=20(worker::heartbeat=20TTL=20SET)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Sonnet 4.6 --- services/_shared/heartbeat.py | 55 ++++++++++++++++++++++++ services/_shared/tests/test_heartbeat.py | 46 ++++++++++++++++++++ 2 files changed, 101 insertions(+) create mode 100644 services/_shared/heartbeat.py create mode 100644 services/_shared/tests/test_heartbeat.py diff --git a/services/_shared/heartbeat.py b/services/_shared/heartbeat.py new file mode 100644 index 0000000..f30a58e --- /dev/null +++ b/services/_shared/heartbeat.py @@ -0,0 +1,55 @@ +"""분산 워커 heartbeat — worker::heartbeat SET (TTL). Global Constraints 계약 1.""" +from __future__ import annotations +import asyncio, datetime as dt, json, logging, os + +logger = logging.getLogger(__name__) +DEFAULT_INTERVAL = int(os.getenv("HEARTBEAT_INTERVAL", "15")) +DEFAULT_TTL = int(os.getenv("HEARTBEAT_TTL", "45")) + + +class WorkerStats: + """worker_loop가 갱신, heartbeat_loop가 읽는 가변 카운터.""" + def __init__(self): + self.busy = False + self.jobs_done = 0 + self.jobs_failed = 0 + self.last_job_at = None # ISO str | None + + +def utc_now_iso() -> str: + return dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def build_payload(name: str, kind: str, state: str, stats: WorkerStats, extra: dict | None = None) -> str: + payload = { + "name": name, "kind": kind, "state": state, "ts": utc_now_iso(), + "last_job_at": stats.last_job_at, + "jobs_done": stats.jobs_done, "jobs_failed": stats.jobs_failed, + } + if extra: + payload.update(extra) + return json.dumps(payload) + + +async def render_state(redis, stats: WorkerStats, paused_key: str = "queue:paused") -> str: + if await redis.get(paused_key) == b"1": + return "paused" + return "busy" if stats.busy else "idle" + + +async def heartbeat_loop(redis, name, kind, stats, *, interval=DEFAULT_INTERVAL, + ttl=DEFAULT_TTL, paused_key="queue:paused", state_fn=None): + key = f"worker:{name}:heartbeat" + logger.info("heartbeat 시작 name=%s ttl=%ds", name, ttl) + while True: + try: + if state_fn is not None: + state, extra = await state_fn(redis, stats) + else: + state, extra = await render_state(redis, stats, paused_key), None + await redis.set(key, build_payload(name, kind, state, stats, extra), ex=ttl) + except asyncio.CancelledError: + raise + except Exception: + logger.exception("heartbeat 발신 실패 name=%s", name) + await asyncio.sleep(interval) diff --git a/services/_shared/tests/test_heartbeat.py b/services/_shared/tests/test_heartbeat.py new file mode 100644 index 0000000..687a926 --- /dev/null +++ b/services/_shared/tests/test_heartbeat.py @@ -0,0 +1,46 @@ +"""Tests for _shared.heartbeat — Task A1.""" +import json +import sys +from pathlib import Path + +import pytest + +# Make `_shared` importable (same pattern as test_reliable_queue.py) +sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent)) + +from _shared.heartbeat import WorkerStats, build_payload, render_state + + +def test_build_payload_has_contract_fields(): + s = WorkerStats(); s.jobs_done = 3; s.last_job_at = "2026-06-29T00:00:00Z" + payload = json.loads(build_payload("image-render", "render", "idle", s)) + assert payload["name"] == "image-render" + assert payload["kind"] == "render" + assert payload["state"] == "idle" + assert payload["jobs_done"] == 3 + assert payload["last_job_at"] == "2026-06-29T00:00:00Z" + assert payload["ts"].endswith("Z") + + +def test_build_payload_merges_extra(): + payload = json.loads(build_payload("task-watcher", "watcher", "free", WorkerStats(), extra={"mode": "free"})) + assert payload["mode"] == "free" + + +class _FakeRedis: + def __init__(self, paused): self._paused = paused + async def get(self, key): return b"1" if self._paused else None + + +@pytest.mark.asyncio +async def test_render_state_paused_overrides_busy(): + s = WorkerStats(); s.busy = True + assert await render_state(_FakeRedis(paused=True), s) == "paused" + + +@pytest.mark.asyncio +async def test_render_state_busy_then_idle(): + s = WorkerStats(); s.busy = True + assert await render_state(_FakeRedis(paused=False), s) == "busy" + s.busy = False + assert await render_state(_FakeRedis(paused=False), s) == "idle"