feat(agent-office): 노드 헬스 1분 cron + 텔레그램 경보(다운/복구/dead-letter)
This commit is contained in:
@@ -2,10 +2,13 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
import datetime as dt, json, logging
|
import datetime as dt, json, logging
|
||||||
import redis.asyncio as aioredis
|
import redis.asyncio as aioredis
|
||||||
from .config import REDIS_URL
|
from .config import REDIS_URL, NODE_ALERT_DEADLETTER_THRESHOLD
|
||||||
|
|
||||||
logger = logging.getLogger("agent-office.node_monitor")
|
logger = logging.getLogger("agent-office.node_monitor")
|
||||||
|
|
||||||
|
_node_state: dict[str, bool] = {} # name -> 직전 alive
|
||||||
|
_dl_notified: dict[str, int] = {} # name -> 직전 알린 dead_letter 수
|
||||||
|
|
||||||
WORKER_REGISTRY = [
|
WORKER_REGISTRY = [
|
||||||
{"name": "music-render", "kind": "render", "queue": "queue:music-render"},
|
{"name": "music-render", "kind": "render", "queue": "queue:music-render"},
|
||||||
{"name": "video-render", "kind": "render", "queue": "queue:video-render"},
|
{"name": "video-render", "kind": "render", "queue": "queue:video-render"},
|
||||||
@@ -94,3 +97,39 @@ async def collect_status(redis=None) -> dict:
|
|||||||
if out["paused"] and not out["paused_reason"]:
|
if out["paused"] and not out["paused_reason"]:
|
||||||
out["paused_reason"] = "trading"
|
out["paused_reason"] = "trading"
|
||||||
return out
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
async def check_and_alert(status=None) -> list[str]:
|
||||||
|
"""워커 상태를 점검해 다운/복구/dead-letter 전이를 텔레그램으로 경보한다.
|
||||||
|
|
||||||
|
첫 관측(prev=None)엔 경보 없음 — 부팅 시 false alarm 방지.
|
||||||
|
반환값: 실제로 전송된 경보 텍스트 목록 (테스트용).
|
||||||
|
"""
|
||||||
|
from .telegram.messaging import send_raw
|
||||||
|
from .db import add_log
|
||||||
|
st = status or await collect_status()
|
||||||
|
sent: list[str] = []
|
||||||
|
for w in st["workers"]:
|
||||||
|
name, alive = w["name"], w.get("alive", False)
|
||||||
|
prev = _node_state.get(name)
|
||||||
|
if prev is True and not alive:
|
||||||
|
text = f"🔴 [{name}] 워커 다운"
|
||||||
|
if (await send_raw(text=text)).get("ok"):
|
||||||
|
add_log("node_monitor", f"{name} 다운", "warning")
|
||||||
|
sent.append(text)
|
||||||
|
elif prev is False and alive:
|
||||||
|
text = f"🟢 [{name}] 워커 복구"
|
||||||
|
if (await send_raw(text=text)).get("ok"):
|
||||||
|
add_log("node_monitor", f"{name} 복구", "info")
|
||||||
|
sent.append(text)
|
||||||
|
_node_state[name] = alive
|
||||||
|
dl = w.get("dead_letter", 0)
|
||||||
|
if dl >= NODE_ALERT_DEADLETTER_THRESHOLD and dl != _dl_notified.get(name, 0):
|
||||||
|
text = f"❌ [{name}] 실패 누적 {dl}건 (dead-letter)"
|
||||||
|
if (await send_raw(text=text)).get("ok"):
|
||||||
|
add_log("node_monitor", f"{name} dead-letter {dl}", "warning")
|
||||||
|
sent.append(text)
|
||||||
|
_dl_notified[name] = dl
|
||||||
|
elif dl == 0:
|
||||||
|
_dl_notified.pop(name, None)
|
||||||
|
return sent
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|||||||
|
|
||||||
from .agents import AGENT_REGISTRY
|
from .agents import AGENT_REGISTRY
|
||||||
from .db import delete_old_logs
|
from .db import delete_old_logs
|
||||||
|
from . import node_monitor
|
||||||
|
|
||||||
scheduler = AsyncIOScheduler(timezone="Asia/Seoul")
|
scheduler = AsyncIOScheduler(timezone="Asia/Seoul")
|
||||||
|
|
||||||
@@ -98,6 +99,9 @@ async def _poll_pipelines():
|
|||||||
if agent:
|
if agent:
|
||||||
await agent.poll_state_changes()
|
await agent.poll_state_changes()
|
||||||
|
|
||||||
|
async def _run_node_health_check():
|
||||||
|
await node_monitor.check_and_alert()
|
||||||
|
|
||||||
def _cleanup_old_logs():
|
def _cleanup_old_logs():
|
||||||
n = delete_old_logs(days=90)
|
n = delete_old_logs(days=90)
|
||||||
if n:
|
if n:
|
||||||
@@ -142,5 +146,6 @@ def init_scheduler():
|
|||||||
scheduler.add_job(_run_youtube_research, "cron", hour=9, minute=10, id="youtube_research")
|
scheduler.add_job(_run_youtube_research, "cron", hour=9, minute=10, id="youtube_research")
|
||||||
scheduler.add_job(_send_youtube_weekly_report, "cron", day_of_week="mon", hour=8, minute=0, id="youtube_weekly_report")
|
scheduler.add_job(_send_youtube_weekly_report, "cron", day_of_week="mon", hour=8, minute=0, id="youtube_weekly_report")
|
||||||
scheduler.add_job(_poll_pipelines, "interval", seconds=30, id="pipeline_poll")
|
scheduler.add_job(_poll_pipelines, "interval", seconds=30, id="pipeline_poll")
|
||||||
|
scheduler.add_job(_run_node_health_check, "interval", seconds=60, id="node_health_check", replace_existing=True)
|
||||||
scheduler.add_job(_cleanup_old_logs, "cron", hour=3, minute=0, id="cleanup_old_logs", replace_existing=True)
|
scheduler.add_job(_cleanup_old_logs, "cron", hour=3, minute=0, id="cleanup_old_logs", replace_existing=True)
|
||||||
scheduler.start()
|
scheduler.start()
|
||||||
|
|||||||
@@ -96,3 +96,31 @@ async def test_llen_exception_returns_redis_ok_false():
|
|||||||
)
|
)
|
||||||
st = await node_monitor.collect_status(redis=r)
|
st = await node_monitor.collect_status(redis=r)
|
||||||
assert st["redis_ok"] is False
|
assert st["redis_ok"] is False
|
||||||
|
|
||||||
|
|
||||||
|
import app.node_monitor as nm
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_alert_on_alive_to_dead(monkeypatch):
|
||||||
|
sent = []
|
||||||
|
async def fake_send_raw(text, **kw): sent.append(text); return {"ok": True}
|
||||||
|
monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw)
|
||||||
|
monkeypatch.setattr("app.db.add_log", lambda *a, **k: None)
|
||||||
|
nm._node_state.clear(); nm._dl_notified.clear()
|
||||||
|
alive = {"workers": [{"name":"image-render","alive":True,"dead_letter":0}], "links": []}
|
||||||
|
dead = {"workers": [{"name":"image-render","alive":False,"dead_letter":0}], "links": []}
|
||||||
|
await nm.check_and_alert(status=alive) # 첫 관측 — 경보 없음
|
||||||
|
assert sent == []
|
||||||
|
await nm.check_and_alert(status=dead) # alive→dead 전이
|
||||||
|
assert any("다운" in t for t in sent)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_alert_on_dead_letter_growth(monkeypatch):
|
||||||
|
sent = []
|
||||||
|
async def fake_send_raw(text, **kw): sent.append(text); return {"ok": True}
|
||||||
|
monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw)
|
||||||
|
monkeypatch.setattr("app.db.add_log", lambda *a, **k: None)
|
||||||
|
nm._node_state.clear(); nm._dl_notified.clear()
|
||||||
|
s = {"workers": [{"name":"video-render","alive":True,"dead_letter":2}], "links": []}
|
||||||
|
await nm.check_and_alert(status=s)
|
||||||
|
assert any("dead-letter" in t for t in sent)
|
||||||
|
|||||||
Reference in New Issue
Block a user