From ea1f0d103d28bf8938f940dc98032b8bd4a9c927 Mon Sep 17 00:00:00 2001 From: gahusb Date: Mon, 29 Jun 2026 17:56:18 +0900 Subject: [PATCH] =?UTF-8?q?fix(agent-office):=20node=5Fmonitor=20=EB=A3=A8?= =?UTF-8?q?=ED=94=84=20=EC=98=88=EC=99=B8=20=EB=B0=A9=EC=96=B4=20+=20?= =?UTF-8?q?=ED=85=8C=EC=8A=A4=ED=8A=B8=20=EB=B3=B4=EA=B0=95=20(B2=20?= =?UTF-8?q?=EB=A6=AC=EB=B7=B0)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - per-worker 루프 전체를 try/except로 감싸 Redis 예외 시 redis_ok=False+break (Blocker) - heartbeat 파싱 except에 UnicodeDecodeError 추가 (Important) - hb.get('ts') or '' 로 null ts 안전 처리 (Minor) - 테스트 3개 추가: paused 폴백·processing 집계·llen 예외 회귀 Co-Authored-By: Claude Sonnet 4.6 Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq --- agent-office/app/node_monitor.py | 51 ++++++++++++++----------- agent-office/tests/test_node_monitor.py | 34 +++++++++++++++++ 2 files changed, 62 insertions(+), 23 deletions(-) diff --git a/agent-office/app/node_monitor.py b/agent-office/app/node_monitor.py index b623a09..87adbf0 100644 --- a/agent-office/app/node_monitor.py +++ b/agent-office/app/node_monitor.py @@ -55,29 +55,34 @@ async def collect_status(redis=None) -> dict: return out for w in WORKER_REGISTRY: - info = {"name": w["name"], "kind": w["kind"], "alive": False, "state": None, - "last_beat_age_s": None, "queue_depth": 0, "dead_letter": 0, - "processing": 0, "jobs_done": 0, "jobs_failed": 0, "last_job_at": None} - raw = await r.get(f"worker:{w['name']}:heartbeat") - if raw: - try: - hb = json.loads(raw) - info.update(alive=True, state=hb.get("state"), - jobs_done=hb.get("jobs_done", 0), jobs_failed=hb.get("jobs_failed", 0), - last_job_at=hb.get("last_job_at"), - last_beat_age_s=_beat_age(hb.get("ts", ""), now)) - if w["kind"] == "watcher" and hb.get("mode"): - out["paused_reason"] = hb["mode"] - except json.JSONDecodeError: - logger.warning("heartbeat JSON 파싱 실패 name=%s", w["name"]) - if w["queue"]: - info["queue_depth"] = await r.llen(w["queue"]) - info["dead_letter"] = await r.llen(f"dead_letter:{w['queue']}") - proc = 0 - async for key in r.scan_iter(match=f"processing:{w['queue']}:*"): - proc += await r.llen(key) - info["processing"] = proc - out["workers"].append(info) + try: + info = {"name": w["name"], "kind": w["kind"], "alive": False, "state": None, + "last_beat_age_s": None, "queue_depth": 0, "dead_letter": 0, + "processing": 0, "jobs_done": 0, "jobs_failed": 0, "last_job_at": None} + raw = await r.get(f"worker:{w['name']}:heartbeat") + if raw: + try: + hb = json.loads(raw) + info.update(alive=True, state=hb.get("state"), + jobs_done=hb.get("jobs_done", 0), jobs_failed=hb.get("jobs_failed", 0), + last_job_at=hb.get("last_job_at"), + last_beat_age_s=_beat_age(hb.get("ts") or "", now)) + if w["kind"] == "watcher" and hb.get("mode"): + out["paused_reason"] = hb["mode"] + except (json.JSONDecodeError, UnicodeDecodeError): + logger.warning("heartbeat JSON 파싱 실패 name=%s", w["name"]) + if w["queue"]: + info["queue_depth"] = await r.llen(w["queue"]) + info["dead_letter"] = await r.llen(f"dead_letter:{w['queue']}") + proc = 0 + async for key in r.scan_iter(match=f"processing:{w['queue']}:*"): + proc += await r.llen(key) + info["processing"] = proc + out["workers"].append(info) + except Exception: + logger.exception("워커 상태 수집 실패 name=%s", w["name"]) + out["redis_ok"] = False + break for w in out["workers"]: if w["kind"] == "trader": diff --git a/agent-office/tests/test_node_monitor.py b/agent-office/tests/test_node_monitor.py index a0ab2ac..d1864e7 100644 --- a/agent-office/tests/test_node_monitor.py +++ b/agent-office/tests/test_node_monitor.py @@ -62,3 +62,37 @@ async def test_trader_http_pull_link(): st = await node_monitor.collect_status(redis=r) link = next(l for l in st["links"] if l["from"] == "ai_trade") assert link["type"] == "http-pull" and link["status"] == "healthy" + +@pytest.mark.asyncio +async def test_paused_no_watcher_heartbeat_fallback_reason(): + """paused=True인데 watcher heartbeat 없으면 paused_reason == 'trading' 폴백.""" + r = FakeRedis(kv={"queue:paused": b"1"}) # watcher heartbeat 없음 + st = await node_monitor.collect_status(redis=r) + assert st["paused"] is True + assert st["paused_reason"] == "trading" + +@pytest.mark.asyncio +async def test_processing_count_image_render(): + """processing:: 리스트가 있으면 processing 필드에 합산된다.""" + worker_id = "abc123" + proc_key = f"processing:queue:image-render:{worker_id}" + r = FakeRedis( + kv={"worker:image-render:heartbeat": _hb("image-render", "render", "busy")}, + lists={proc_key: 3}, + ) + st = await node_monitor.collect_status(redis=r) + img = next(w for w in st["workers"] if w["name"] == "image-render") + assert img["processing"] == 3 + +@pytest.mark.asyncio +async def test_llen_exception_returns_redis_ok_false(): + """워커 루프 중 llen 예외 발생 시 예외를 전파하지 않고 redis_ok=False 반환 (Blocker 회귀).""" + class BrokenLlenRedis(FakeRedis): + async def llen(self, key): + raise ConnectionError("Redis 연결 끊김") + + r = BrokenLlenRedis( + kv={"worker:music-render:heartbeat": _hb("music-render", "render", "idle")} + ) + st = await node_monitor.collect_status(redis=r) + assert st["redis_ok"] is False