From 5d5ff27d290f3a26158d6f2f374247789fbd28bd Mon Sep 17 00:00:00 2001 From: gahusb Date: Mon, 29 Jun 2026 18:06:38 +0900 Subject: [PATCH] =?UTF-8?q?feat(agent-office):=20=EB=85=B8=EB=93=9C=20?= =?UTF-8?q?=ED=97=AC=EC=8A=A4=201=EB=B6=84=20cron=20+=20=ED=85=94=EB=A0=88?= =?UTF-8?q?=EA=B7=B8=EB=9E=A8=20=EA=B2=BD=EB=B3=B4(=EB=8B=A4=EC=9A=B4/?= =?UTF-8?q?=EB=B3=B5=EA=B5=AC/dead-letter)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent-office/app/node_monitor.py | 41 ++++++++++++++++++++++++- agent-office/app/scheduler.py | 5 +++ agent-office/tests/test_node_monitor.py | 28 +++++++++++++++++ 3 files changed, 73 insertions(+), 1 deletion(-) diff --git a/agent-office/app/node_monitor.py b/agent-office/app/node_monitor.py index 87adbf0..f7d89af 100644 --- a/agent-office/app/node_monitor.py +++ b/agent-office/app/node_monitor.py @@ -2,10 +2,13 @@ from __future__ import annotations import datetime as dt, json, logging import redis.asyncio as aioredis -from .config import REDIS_URL +from .config import REDIS_URL, NODE_ALERT_DEADLETTER_THRESHOLD logger = logging.getLogger("agent-office.node_monitor") +_node_state: dict[str, bool] = {} # name -> 직전 alive +_dl_notified: dict[str, int] = {} # name -> 직전 알린 dead_letter 수 + WORKER_REGISTRY = [ {"name": "music-render", "kind": "render", "queue": "queue:music-render"}, {"name": "video-render", "kind": "render", "queue": "queue:video-render"}, @@ -94,3 +97,39 @@ async def collect_status(redis=None) -> dict: if out["paused"] and not out["paused_reason"]: out["paused_reason"] = "trading" return out + + +async def check_and_alert(status=None) -> list[str]: + """워커 상태를 점검해 다운/복구/dead-letter 전이를 텔레그램으로 경보한다. + + 첫 관측(prev=None)엔 경보 없음 — 부팅 시 false alarm 방지. + 반환값: 실제로 전송된 경보 텍스트 목록 (테스트용). + """ + from .telegram.messaging import send_raw + from .db import add_log + st = status or await collect_status() + sent: list[str] = [] + for w in st["workers"]: + name, alive = w["name"], w.get("alive", False) + prev = _node_state.get(name) + if prev is True and not alive: + text = f"🔴 [{name}] 워커 다운" + if (await send_raw(text=text)).get("ok"): + add_log("node_monitor", f"{name} 다운", "warning") + sent.append(text) + elif prev is False and alive: + text = f"🟢 [{name}] 워커 복구" + if (await send_raw(text=text)).get("ok"): + add_log("node_monitor", f"{name} 복구", "info") + sent.append(text) + _node_state[name] = alive + dl = w.get("dead_letter", 0) + if dl >= NODE_ALERT_DEADLETTER_THRESHOLD and dl != _dl_notified.get(name, 0): + text = f"❌ [{name}] 실패 누적 {dl}건 (dead-letter)" + if (await send_raw(text=text)).get("ok"): + add_log("node_monitor", f"{name} dead-letter {dl}", "warning") + sent.append(text) + _dl_notified[name] = dl + elif dl == 0: + _dl_notified.pop(name, None) + return sent diff --git a/agent-office/app/scheduler.py b/agent-office/app/scheduler.py index e937da3..4429491 100644 --- a/agent-office/app/scheduler.py +++ b/agent-office/app/scheduler.py @@ -4,6 +4,7 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler from .agents import AGENT_REGISTRY from .db import delete_old_logs +from . import node_monitor scheduler = AsyncIOScheduler(timezone="Asia/Seoul") @@ -98,6 +99,9 @@ async def _poll_pipelines(): if agent: await agent.poll_state_changes() +async def _run_node_health_check(): + await node_monitor.check_and_alert() + def _cleanup_old_logs(): n = delete_old_logs(days=90) if n: @@ -142,5 +146,6 @@ def init_scheduler(): scheduler.add_job(_run_youtube_research, "cron", hour=9, minute=10, id="youtube_research") scheduler.add_job(_send_youtube_weekly_report, "cron", day_of_week="mon", hour=8, minute=0, id="youtube_weekly_report") scheduler.add_job(_poll_pipelines, "interval", seconds=30, id="pipeline_poll") + scheduler.add_job(_run_node_health_check, "interval", seconds=60, id="node_health_check", replace_existing=True) scheduler.add_job(_cleanup_old_logs, "cron", hour=3, minute=0, id="cleanup_old_logs", replace_existing=True) scheduler.start() diff --git a/agent-office/tests/test_node_monitor.py b/agent-office/tests/test_node_monitor.py index d1864e7..4287cea 100644 --- a/agent-office/tests/test_node_monitor.py +++ b/agent-office/tests/test_node_monitor.py @@ -96,3 +96,31 @@ async def test_llen_exception_returns_redis_ok_false(): ) st = await node_monitor.collect_status(redis=r) assert st["redis_ok"] is False + + +import app.node_monitor as nm + +@pytest.mark.asyncio +async def test_alert_on_alive_to_dead(monkeypatch): + sent = [] + async def fake_send_raw(text, **kw): sent.append(text); return {"ok": True} + monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw) + monkeypatch.setattr("app.db.add_log", lambda *a, **k: None) + nm._node_state.clear(); nm._dl_notified.clear() + alive = {"workers": [{"name":"image-render","alive":True,"dead_letter":0}], "links": []} + dead = {"workers": [{"name":"image-render","alive":False,"dead_letter":0}], "links": []} + await nm.check_and_alert(status=alive) # 첫 관측 — 경보 없음 + assert sent == [] + await nm.check_and_alert(status=dead) # alive→dead 전이 + assert any("다운" in t for t in sent) + +@pytest.mark.asyncio +async def test_alert_on_dead_letter_growth(monkeypatch): + sent = [] + async def fake_send_raw(text, **kw): sent.append(text); return {"ok": True} + monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw) + monkeypatch.setattr("app.db.add_log", lambda *a, **k: None) + nm._node_state.clear(); nm._dl_notified.clear() + s = {"workers": [{"name":"video-render","alive":True,"dead_letter":2}], "links": []} + await nm.check_and_alert(status=s) + assert any("dead-letter" in t for t in sent)