fix(agent-office): node_monitor 루프 예외 방어 + 테스트 보강 (B2 리뷰)

- per-worker 루프 전체를 try/except로 감싸 Redis 예외 시 redis_ok=False+break (Blocker)
- heartbeat 파싱 except에 UnicodeDecodeError 추가 (Important)
- hb.get('ts') or '' 로 null ts 안전 처리 (Minor)
- 테스트 3개 추가: paused 폴백·processing 집계·llen 예외 회귀

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
This commit is contained in:
2026-06-29 17:56:18 +09:00
parent a3ae85cde1
commit ea1f0d103d
2 changed files with 62 additions and 23 deletions

View File

@@ -55,29 +55,34 @@ async def collect_status(redis=None) -> dict:
return out
for w in WORKER_REGISTRY:
info = {"name": w["name"], "kind": w["kind"], "alive": False, "state": None,
"last_beat_age_s": None, "queue_depth": 0, "dead_letter": 0,
"processing": 0, "jobs_done": 0, "jobs_failed": 0, "last_job_at": None}
raw = await r.get(f"worker:{w['name']}:heartbeat")
if raw:
try:
hb = json.loads(raw)
info.update(alive=True, state=hb.get("state"),
jobs_done=hb.get("jobs_done", 0), jobs_failed=hb.get("jobs_failed", 0),
last_job_at=hb.get("last_job_at"),
last_beat_age_s=_beat_age(hb.get("ts", ""), now))
if w["kind"] == "watcher" and hb.get("mode"):
out["paused_reason"] = hb["mode"]
except json.JSONDecodeError:
logger.warning("heartbeat JSON 파싱 실패 name=%s", w["name"])
if w["queue"]:
info["queue_depth"] = await r.llen(w["queue"])
info["dead_letter"] = await r.llen(f"dead_letter:{w['queue']}")
proc = 0
async for key in r.scan_iter(match=f"processing:{w['queue']}:*"):
proc += await r.llen(key)
info["processing"] = proc
out["workers"].append(info)
try:
info = {"name": w["name"], "kind": w["kind"], "alive": False, "state": None,
"last_beat_age_s": None, "queue_depth": 0, "dead_letter": 0,
"processing": 0, "jobs_done": 0, "jobs_failed": 0, "last_job_at": None}
raw = await r.get(f"worker:{w['name']}:heartbeat")
if raw:
try:
hb = json.loads(raw)
info.update(alive=True, state=hb.get("state"),
jobs_done=hb.get("jobs_done", 0), jobs_failed=hb.get("jobs_failed", 0),
last_job_at=hb.get("last_job_at"),
last_beat_age_s=_beat_age(hb.get("ts") or "", now))
if w["kind"] == "watcher" and hb.get("mode"):
out["paused_reason"] = hb["mode"]
except (json.JSONDecodeError, UnicodeDecodeError):
logger.warning("heartbeat JSON 파싱 실패 name=%s", w["name"])
if w["queue"]:
info["queue_depth"] = await r.llen(w["queue"])
info["dead_letter"] = await r.llen(f"dead_letter:{w['queue']}")
proc = 0
async for key in r.scan_iter(match=f"processing:{w['queue']}:*"):
proc += await r.llen(key)
info["processing"] = proc
out["workers"].append(info)
except Exception:
logger.exception("워커 상태 수집 실패 name=%s", w["name"])
out["redis_ok"] = False
break
for w in out["workers"]:
if w["kind"] == "trader":

View File

@@ -62,3 +62,37 @@ async def test_trader_http_pull_link():
st = await node_monitor.collect_status(redis=r)
link = next(l for l in st["links"] if l["from"] == "ai_trade")
assert link["type"] == "http-pull" and link["status"] == "healthy"
@pytest.mark.asyncio
async def test_paused_no_watcher_heartbeat_fallback_reason():
"""paused=True인데 watcher heartbeat 없으면 paused_reason == 'trading' 폴백."""
r = FakeRedis(kv={"queue:paused": b"1"}) # watcher heartbeat 없음
st = await node_monitor.collect_status(redis=r)
assert st["paused"] is True
assert st["paused_reason"] == "trading"
@pytest.mark.asyncio
async def test_processing_count_image_render():
"""processing:<queue>:<worker_id> 리스트가 있으면 processing 필드에 합산된다."""
worker_id = "abc123"
proc_key = f"processing:queue:image-render:{worker_id}"
r = FakeRedis(
kv={"worker:image-render:heartbeat": _hb("image-render", "render", "busy")},
lists={proc_key: 3},
)
st = await node_monitor.collect_status(redis=r)
img = next(w for w in st["workers"] if w["name"] == "image-render")
assert img["processing"] == 3
@pytest.mark.asyncio
async def test_llen_exception_returns_redis_ok_false():
"""워커 루프 중 llen 예외 발생 시 예외를 전파하지 않고 redis_ok=False 반환 (Blocker 회귀)."""
class BrokenLlenRedis(FakeRedis):
async def llen(self, key):
raise ConnectionError("Redis 연결 끊김")
r = BrokenLlenRedis(
kv={"worker:music-render:heartbeat": _hb("music-render", "render", "idle")}
)
st = await node_monitor.collect_status(redis=r)
assert st["redis_ok"] is False