diff --git a/agent-office/app/config.py b/agent-office/app/config.py index eee58d7..9ab6199 100644 --- a/agent-office/app/config.py +++ b/agent-office/app/config.py @@ -55,3 +55,5 @@ AGENT_CONTAINER_MAP: dict[str, tuple[str, int, _re.Pattern]] = { # Redis (node monitor) REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379") NODE_ALERT_DEADLETTER_THRESHOLD = int(os.getenv("NODE_ALERT_DEADLETTER_THRESHOLD", "1")) +# heartbeat TTL(45s)의 2배 — 키가 남아있어도 age>90s면 dead 판정 +NODE_STALE_THRESHOLD_SEC = int(os.getenv("NODE_STALE_THRESHOLD_SEC", "90")) diff --git a/agent-office/app/node_monitor.py b/agent-office/app/node_monitor.py index 954cd6e..e88a2c6 100644 --- a/agent-office/app/node_monitor.py +++ b/agent-office/app/node_monitor.py @@ -2,7 +2,7 @@ from __future__ import annotations import datetime as dt, json, logging import redis.asyncio as aioredis -from .config import REDIS_URL, NODE_ALERT_DEADLETTER_THRESHOLD +from .config import REDIS_URL, NODE_ALERT_DEADLETTER_THRESHOLD, NODE_STALE_THRESHOLD_SEC logger = logging.getLogger("agent-office.node_monitor") @@ -66,10 +66,13 @@ async def collect_status(redis=None) -> dict: if raw: try: hb = json.loads(raw) - info.update(alive=True, state=hb.get("state"), - jobs_done=hb.get("jobs_done", 0), jobs_failed=hb.get("jobs_failed", 0), - last_job_at=hb.get("last_job_at"), - last_beat_age_s=_beat_age(hb.get("ts") or "", now)) + age = _beat_age(hb.get("ts") or "", now) + info["last_beat_age_s"] = age + info["alive"] = age is not None and age <= NODE_STALE_THRESHOLD_SEC + info["state"] = hb.get("state") + info["jobs_done"] = hb.get("jobs_done", 0) + info["jobs_failed"] = hb.get("jobs_failed", 0) + info["last_job_at"] = hb.get("last_job_at") if w["kind"] == "watcher" and hb.get("mode"): out["paused_reason"] = hb["mode"] except (json.JSONDecodeError, UnicodeDecodeError): @@ -114,19 +117,24 @@ async def check_and_alert(status=None) -> list[str]: return [] sent: list[str] = [] for w in st["workers"]: - name, alive = w["name"], w.get("alive", False) + name = w["name"] + alive = w.get("alive", False) prev = _node_state.get(name) + transition_send_failed = False if prev is True and not alive: text = f"🔴 [{name}] 워커 다운" if (await send_raw(text=text)).get("ok"): - add_log("node_monitor", f"{name} 다운", "warning") - sent.append(text) + add_log("node_monitor", f"{name} 다운", "warning"); sent.append(text) + else: + transition_send_failed = True elif prev is False and alive: text = f"🟢 [{name}] 워커 복구" if (await send_raw(text=text)).get("ok"): - add_log("node_monitor", f"{name} 복구", "info") - sent.append(text) - _node_state[name] = alive + add_log("node_monitor", f"{name} 복구", "info"); sent.append(text) + else: + transition_send_failed = True + if not transition_send_failed: + _node_state[name] = alive dl = w.get("dead_letter", 0) if dl >= NODE_ALERT_DEADLETTER_THRESHOLD and dl != _dl_notified.get(name, 0): text = f"❌ [{name}] 실패 누적 {dl}건 (dead-letter)" diff --git a/agent-office/tests/test_node_monitor.py b/agent-office/tests/test_node_monitor.py index c2f0918..5df84af 100644 --- a/agent-office/tests/test_node_monitor.py +++ b/agent-office/tests/test_node_monitor.py @@ -1,4 +1,5 @@ # agent-office/tests/test_node_monitor.py +import datetime as dt import json, pytest from app import node_monitor import app.node_monitor as nm @@ -18,8 +19,11 @@ class FakeRedis: if k.startswith(prefix): yield k -def _hb(name, kind, state, **extra): - return json.dumps({"name": name, "kind": kind, "state": state, "ts": "2026-06-29T00:00:00Z", +def _hb(name, kind, state, ts=None, **extra): + """heartbeat 페이로드 생성. ts 기본값은 현재 시각(신선한 heartbeat).""" + if ts is None: + ts = dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + return json.dumps({"name": name, "kind": kind, "state": state, "ts": ts, "last_job_at": None, "jobs_done": 0, "jobs_failed": 0, **extra}).encode() @pytest.mark.asyncio @@ -145,3 +149,48 @@ async def test_dl_notified_not_updated_on_telegram_failure(monkeypatch): result2 = await nm.check_and_alert(status=s) assert any("dead-letter" in t for t in result2) assert nm._dl_notified.get("video-render") == 2 + + +# ── I1: staleness 판정 신규 테스트 ───────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_stale_heartbeat_is_dead(): + """heartbeat 키가 존재해도 ts가 90s 초과면 alive=False (staleness 판정).""" + stale_ts = (dt.datetime.now(dt.timezone.utc) - dt.timedelta(seconds=300)).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + r = FakeRedis(kv={"worker:image-render:heartbeat": _hb("image-render", "render", "idle", ts=stale_ts)}) + st = await node_monitor.collect_status(redis=r) + img = next(w for w in st["workers"] if w["name"] == "image-render") + assert img["alive"] is False + link = next(l for l in st["links"] if l["to"] == "image-render") + assert link["status"] == "down" + + +# ── I2: 전이 발송 실패 시 재시도 회귀 테스트 ────────────────────────────────── + +@pytest.mark.asyncio +async def test_transition_send_failure_retries_next_cycle(monkeypatch): + """alive→dead 전이 시 send_raw 실패하면 _node_state 갱신 안 됨 → 다음 사이클 재시도.""" + calls = [] + async def fake_send_raw(text, **kw): + calls.append(text) + if len(calls) == 1: + return {"ok": False} # 첫 호출: 텔레그램 다운 + return {"ok": True} # 두 번째 호출: 성공 + monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw) + monkeypatch.setattr("app.db.add_log", lambda *a, **k: None) + nm._node_state.clear(); nm._dl_notified.clear() + alive = {"workers": [{"name": "music-render", "alive": True, "dead_letter": 0}], "links": []} + dead = {"workers": [{"name": "music-render", "alive": False, "dead_letter": 0}], "links": []} + # 첫 관측: baseline 설정(전이 없음) + await nm.check_and_alert(status=alive) + assert nm._node_state.get("music-render") is True + # alive→dead 전이, send_raw 실패 → _node_state 갱신 안 됨 + result1 = await nm.check_and_alert(status=dead) + assert result1 == [] # 경보 미발송 + assert nm._node_state.get("music-render") is True # 여전히 True + # 두 번째 사이클: 동일 dead, send_raw 성공 → 경보 발송 + result2 = await nm.check_and_alert(status=dead) + assert any("다운" in t for t in result2) + assert nm._node_state.get("music-render") is False # 이제 갱신