fix(agent-office): alive를 heartbeat staleness로 판정 + 다운/복구 전이 발송실패 시 재시도 (최종 리뷰 I1·I2)
I1: collect_status - heartbeat 키 존재 여부가 아닌 ts age 기반으로 alive 판정.
age > NODE_STALE_THRESHOLD_SEC(90s, env 주입 가능)이면 키 있어도 dead.
config.py에 NODE_STALE_THRESHOLD_SEC=90 추가.
I2: check_and_alert - 다운/복구 전이 시 send_raw 실패하면 _node_state 갱신 보류.
다음 사이클에서 동일 전이 재감지 → 재발송 시도 (다운 이벤트 유실 방지).
테스트: _hb 헬퍼 현재 시각 기본값으로 수정 + 신규 2개 (stale→dead, I2 재시도 회귀).
14 passed.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
This commit is contained in:
@@ -55,3 +55,5 @@ AGENT_CONTAINER_MAP: dict[str, tuple[str, int, _re.Pattern]] = {
|
||||
# Redis (node monitor)
|
||||
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379")
|
||||
NODE_ALERT_DEADLETTER_THRESHOLD = int(os.getenv("NODE_ALERT_DEADLETTER_THRESHOLD", "1"))
|
||||
# heartbeat TTL(45s)의 2배 — 키가 남아있어도 age>90s면 dead 판정
|
||||
NODE_STALE_THRESHOLD_SEC = int(os.getenv("NODE_STALE_THRESHOLD_SEC", "90"))
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
from __future__ import annotations
|
||||
import datetime as dt, json, logging
|
||||
import redis.asyncio as aioredis
|
||||
from .config import REDIS_URL, NODE_ALERT_DEADLETTER_THRESHOLD
|
||||
from .config import REDIS_URL, NODE_ALERT_DEADLETTER_THRESHOLD, NODE_STALE_THRESHOLD_SEC
|
||||
|
||||
logger = logging.getLogger("agent-office.node_monitor")
|
||||
|
||||
@@ -66,10 +66,13 @@ async def collect_status(redis=None) -> dict:
|
||||
if raw:
|
||||
try:
|
||||
hb = json.loads(raw)
|
||||
info.update(alive=True, state=hb.get("state"),
|
||||
jobs_done=hb.get("jobs_done", 0), jobs_failed=hb.get("jobs_failed", 0),
|
||||
last_job_at=hb.get("last_job_at"),
|
||||
last_beat_age_s=_beat_age(hb.get("ts") or "", now))
|
||||
age = _beat_age(hb.get("ts") or "", now)
|
||||
info["last_beat_age_s"] = age
|
||||
info["alive"] = age is not None and age <= NODE_STALE_THRESHOLD_SEC
|
||||
info["state"] = hb.get("state")
|
||||
info["jobs_done"] = hb.get("jobs_done", 0)
|
||||
info["jobs_failed"] = hb.get("jobs_failed", 0)
|
||||
info["last_job_at"] = hb.get("last_job_at")
|
||||
if w["kind"] == "watcher" and hb.get("mode"):
|
||||
out["paused_reason"] = hb["mode"]
|
||||
except (json.JSONDecodeError, UnicodeDecodeError):
|
||||
@@ -114,19 +117,24 @@ async def check_and_alert(status=None) -> list[str]:
|
||||
return []
|
||||
sent: list[str] = []
|
||||
for w in st["workers"]:
|
||||
name, alive = w["name"], w.get("alive", False)
|
||||
name = w["name"]
|
||||
alive = w.get("alive", False)
|
||||
prev = _node_state.get(name)
|
||||
transition_send_failed = False
|
||||
if prev is True and not alive:
|
||||
text = f"🔴 [{name}] 워커 다운"
|
||||
if (await send_raw(text=text)).get("ok"):
|
||||
add_log("node_monitor", f"{name} 다운", "warning")
|
||||
sent.append(text)
|
||||
add_log("node_monitor", f"{name} 다운", "warning"); sent.append(text)
|
||||
else:
|
||||
transition_send_failed = True
|
||||
elif prev is False and alive:
|
||||
text = f"🟢 [{name}] 워커 복구"
|
||||
if (await send_raw(text=text)).get("ok"):
|
||||
add_log("node_monitor", f"{name} 복구", "info")
|
||||
sent.append(text)
|
||||
_node_state[name] = alive
|
||||
add_log("node_monitor", f"{name} 복구", "info"); sent.append(text)
|
||||
else:
|
||||
transition_send_failed = True
|
||||
if not transition_send_failed:
|
||||
_node_state[name] = alive
|
||||
dl = w.get("dead_letter", 0)
|
||||
if dl >= NODE_ALERT_DEADLETTER_THRESHOLD and dl != _dl_notified.get(name, 0):
|
||||
text = f"❌ [{name}] 실패 누적 {dl}건 (dead-letter)"
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
# agent-office/tests/test_node_monitor.py
|
||||
import datetime as dt
|
||||
import json, pytest
|
||||
from app import node_monitor
|
||||
import app.node_monitor as nm
|
||||
@@ -18,8 +19,11 @@ class FakeRedis:
|
||||
if k.startswith(prefix):
|
||||
yield k
|
||||
|
||||
def _hb(name, kind, state, **extra):
|
||||
return json.dumps({"name": name, "kind": kind, "state": state, "ts": "2026-06-29T00:00:00Z",
|
||||
def _hb(name, kind, state, ts=None, **extra):
|
||||
"""heartbeat 페이로드 생성. ts 기본값은 현재 시각(신선한 heartbeat)."""
|
||||
if ts is None:
|
||||
ts = dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
return json.dumps({"name": name, "kind": kind, "state": state, "ts": ts,
|
||||
"last_job_at": None, "jobs_done": 0, "jobs_failed": 0, **extra}).encode()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -145,3 +149,48 @@ async def test_dl_notified_not_updated_on_telegram_failure(monkeypatch):
|
||||
result2 = await nm.check_and_alert(status=s)
|
||||
assert any("dead-letter" in t for t in result2)
|
||||
assert nm._dl_notified.get("video-render") == 2
|
||||
|
||||
|
||||
# ── I1: staleness 판정 신규 테스트 ─────────────────────────────────────────────
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stale_heartbeat_is_dead():
|
||||
"""heartbeat 키가 존재해도 ts가 90s 초과면 alive=False (staleness 판정)."""
|
||||
stale_ts = (dt.datetime.now(dt.timezone.utc) - dt.timedelta(seconds=300)).strftime(
|
||||
"%Y-%m-%dT%H:%M:%SZ"
|
||||
)
|
||||
r = FakeRedis(kv={"worker:image-render:heartbeat": _hb("image-render", "render", "idle", ts=stale_ts)})
|
||||
st = await node_monitor.collect_status(redis=r)
|
||||
img = next(w for w in st["workers"] if w["name"] == "image-render")
|
||||
assert img["alive"] is False
|
||||
link = next(l for l in st["links"] if l["to"] == "image-render")
|
||||
assert link["status"] == "down"
|
||||
|
||||
|
||||
# ── I2: 전이 발송 실패 시 재시도 회귀 테스트 ──────────────────────────────────
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_transition_send_failure_retries_next_cycle(monkeypatch):
|
||||
"""alive→dead 전이 시 send_raw 실패하면 _node_state 갱신 안 됨 → 다음 사이클 재시도."""
|
||||
calls = []
|
||||
async def fake_send_raw(text, **kw):
|
||||
calls.append(text)
|
||||
if len(calls) == 1:
|
||||
return {"ok": False} # 첫 호출: 텔레그램 다운
|
||||
return {"ok": True} # 두 번째 호출: 성공
|
||||
monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw)
|
||||
monkeypatch.setattr("app.db.add_log", lambda *a, **k: None)
|
||||
nm._node_state.clear(); nm._dl_notified.clear()
|
||||
alive = {"workers": [{"name": "music-render", "alive": True, "dead_letter": 0}], "links": []}
|
||||
dead = {"workers": [{"name": "music-render", "alive": False, "dead_letter": 0}], "links": []}
|
||||
# 첫 관측: baseline 설정(전이 없음)
|
||||
await nm.check_and_alert(status=alive)
|
||||
assert nm._node_state.get("music-render") is True
|
||||
# alive→dead 전이, send_raw 실패 → _node_state 갱신 안 됨
|
||||
result1 = await nm.check_and_alert(status=dead)
|
||||
assert result1 == [] # 경보 미발송
|
||||
assert nm._node_state.get("music-render") is True # 여전히 True
|
||||
# 두 번째 사이클: 동일 dead, send_raw 성공 → 경보 발송
|
||||
result2 = await nm.check_and_alert(status=dead)
|
||||
assert any("다운" in t for t in result2)
|
||||
assert nm._node_state.get("music-render") is False # 이제 갱신
|
||||
|
||||
Reference in New Issue
Block a user