56 lines
2.0 KiB
Python
56 lines
2.0 KiB
Python
"""분산 워커 heartbeat — worker:<name>:heartbeat SET (TTL). Global Constraints 계약 1."""
|
|
from __future__ import annotations
|
|
import asyncio, datetime as dt, json, logging, os
|
|
|
|
logger = logging.getLogger(__name__)
|
|
DEFAULT_INTERVAL = int(os.getenv("HEARTBEAT_INTERVAL", "15"))
|
|
DEFAULT_TTL = int(os.getenv("HEARTBEAT_TTL", "45"))
|
|
|
|
|
|
class WorkerStats:
|
|
"""worker_loop가 갱신, heartbeat_loop가 읽는 가변 카운터."""
|
|
def __init__(self):
|
|
self.busy = False
|
|
self.jobs_done = 0
|
|
self.jobs_failed = 0
|
|
self.last_job_at = None # ISO str | None
|
|
|
|
|
|
def utc_now_iso() -> str:
|
|
return dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
|
|
def build_payload(name: str, kind: str, state: str, stats: WorkerStats, extra: dict | None = None) -> str:
|
|
payload = {
|
|
"name": name, "kind": kind, "state": state, "ts": utc_now_iso(),
|
|
"last_job_at": stats.last_job_at,
|
|
"jobs_done": stats.jobs_done, "jobs_failed": stats.jobs_failed,
|
|
}
|
|
if extra:
|
|
payload.update(extra)
|
|
return json.dumps(payload)
|
|
|
|
|
|
async def render_state(redis, stats: WorkerStats, paused_key: str = "queue:paused") -> str:
|
|
if await redis.get(paused_key) == b"1":
|
|
return "paused"
|
|
return "busy" if stats.busy else "idle"
|
|
|
|
|
|
async def heartbeat_loop(redis, name, kind, stats, *, interval=DEFAULT_INTERVAL,
|
|
ttl=DEFAULT_TTL, paused_key="queue:paused", state_fn=None):
|
|
key = f"worker:{name}:heartbeat"
|
|
logger.info("heartbeat 시작 name=%s ttl=%ds", name, ttl)
|
|
while True:
|
|
try:
|
|
if state_fn is not None:
|
|
state, extra = await state_fn(redis, stats)
|
|
else:
|
|
state, extra = await render_state(redis, stats, paused_key), None
|
|
await redis.set(key, build_payload(name, kind, state, stats, extra), ex=ttl)
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except Exception:
|
|
logger.exception("heartbeat 발신 실패 name=%s", name)
|
|
await asyncio.sleep(interval)
|