diff --git a/agent-office/app/node_monitor.py b/agent-office/app/node_monitor.py new file mode 100644 index 0000000..b623a09 --- /dev/null +++ b/agent-office/app/node_monitor.py @@ -0,0 +1,91 @@ +"""분산 워커 상태 집계 (read-only). Global Constraints 계약 2 스키마 생성.""" +from __future__ import annotations +import datetime as dt, json, logging +import redis.asyncio as aioredis +from .config import REDIS_URL + +logger = logging.getLogger("agent-office.node_monitor") + +WORKER_REGISTRY = [ + {"name": "music-render", "kind": "render", "queue": "queue:music-render"}, + {"name": "video-render", "kind": "render", "queue": "queue:video-render"}, + {"name": "image-render", "kind": "render", "queue": "queue:image-render"}, + {"name": "insta-render", "kind": "render", "queue": "queue:insta-render"}, + {"name": "task-watcher", "kind": "watcher", "queue": None}, + {"name": "ai_trade", "kind": "trader", "queue": None}, +] + +_redis = None +def _get_redis(): + global _redis + if _redis is None: + _redis = aioredis.from_url(REDIS_URL, decode_responses=False) + return _redis + + +def _beat_age(ts_str, now): + try: + beat = dt.datetime.fromisoformat(ts_str.replace("Z", "+00:00")) + return max(0, int((now - beat).total_seconds())) + except Exception: + return None + + +def _render_link_status(w): + if not w["alive"]: + return "down" + if w["state"] == "paused": + return "paused" + if w["dead_letter"] > 0: + return "degraded" + return "healthy" + + +async def collect_status(redis=None) -> dict: + r = redis or _get_redis() + now = dt.datetime.now(dt.timezone.utc) + out = {"redis_ok": True, "paused": False, "paused_reason": None, + "generated_at": now.strftime("%Y-%m-%dT%H:%M:%SZ"), + "workers": [], "links": []} + try: + out["paused"] = (await r.get("queue:paused")) == b"1" + except Exception: + logger.exception("redis 접근 실패") + out["redis_ok"] = False + return out + + for w in WORKER_REGISTRY: + info = {"name": w["name"], "kind": w["kind"], "alive": False, "state": None, + "last_beat_age_s": None, "queue_depth": 0, "dead_letter": 0, + "processing": 0, "jobs_done": 0, "jobs_failed": 0, "last_job_at": None} + raw = await r.get(f"worker:{w['name']}:heartbeat") + if raw: + try: + hb = json.loads(raw) + info.update(alive=True, state=hb.get("state"), + jobs_done=hb.get("jobs_done", 0), jobs_failed=hb.get("jobs_failed", 0), + last_job_at=hb.get("last_job_at"), + last_beat_age_s=_beat_age(hb.get("ts", ""), now)) + if w["kind"] == "watcher" and hb.get("mode"): + out["paused_reason"] = hb["mode"] + except json.JSONDecodeError: + logger.warning("heartbeat JSON 파싱 실패 name=%s", w["name"]) + if w["queue"]: + info["queue_depth"] = await r.llen(w["queue"]) + info["dead_letter"] = await r.llen(f"dead_letter:{w['queue']}") + proc = 0 + async for key in r.scan_iter(match=f"processing:{w['queue']}:*"): + proc += await r.llen(key) + info["processing"] = proc + out["workers"].append(info) + + for w in out["workers"]: + if w["kind"] == "trader": + out["links"].append({"from": "ai_trade", "to": "nas-stock", "type": "http-pull", + "status": "healthy" if w["alive"] else "down"}) + elif w["kind"] == "render": + out["links"].append({"from": "nas", "to": w["name"], "type": "redis-queue", + "status": _render_link_status(w)}) + if out["paused"] and not out["paused_reason"]: + out["paused_reason"] = "trading" + return out diff --git a/agent-office/tests/test_node_monitor.py b/agent-office/tests/test_node_monitor.py new file mode 100644 index 0000000..a0ab2ac --- /dev/null +++ b/agent-office/tests/test_node_monitor.py @@ -0,0 +1,64 @@ +# agent-office/tests/test_node_monitor.py +import json, pytest +from app import node_monitor + +class FakeRedis: + """worker heartbeat + queue llen + scan_iter 흉내.""" + def __init__(self, kv=None, lists=None): + self._kv = kv or {} # key(str) -> bytes + self._lists = lists or {} # key(str) -> length(int) + async def get(self, key): + return self._kv.get(key) + async def llen(self, key): + return self._lists.get(key, 0) + async def scan_iter(self, match=None): + prefix = match.rstrip("*") + for k in list(self._lists): + if k.startswith(prefix): + yield k + +def _hb(name, kind, state, **extra): + return json.dumps({"name": name, "kind": kind, "state": state, "ts": "2026-06-29T00:00:00Z", + "last_job_at": None, "jobs_done": 0, "jobs_failed": 0, **extra}).encode() + +@pytest.mark.asyncio +async def test_alive_worker_healthy_link(): + r = FakeRedis(kv={"worker:image-render:heartbeat": _hb("image-render","render","idle")}) + st = await node_monitor.collect_status(redis=r) + img = next(w for w in st["workers"] if w["name"] == "image-render") + assert img["alive"] is True and img["state"] == "idle" + link = next(l for l in st["links"] if l["to"] == "image-render") + assert link["status"] == "healthy" and link["type"] == "redis-queue" + +@pytest.mark.asyncio +async def test_missing_heartbeat_is_dead_and_down(): + r = FakeRedis() # heartbeat 없음 + st = await node_monitor.collect_status(redis=r) + img = next(w for w in st["workers"] if w["name"] == "image-render") + assert img["alive"] is False + link = next(l for l in st["links"] if l["to"] == "image-render") + assert link["status"] == "down" + +@pytest.mark.asyncio +async def test_dead_letter_makes_degraded(): + r = FakeRedis(kv={"worker:video-render:heartbeat": _hb("video-render","render","idle")}, + lists={"dead_letter:queue:video-render": 2}) + st = await node_monitor.collect_status(redis=r) + vid = next(w for w in st["workers"] if w["name"] == "video-render") + assert vid["dead_letter"] == 2 + link = next(l for l in st["links"] if l["to"] == "video-render") + assert link["status"] == "degraded" + +@pytest.mark.asyncio +async def test_paused_reason_from_watcher(): + r = FakeRedis(kv={"queue:paused": b"1", + "worker:task-watcher:heartbeat": _hb("task-watcher","watcher","trading",mode="trading")}) + st = await node_monitor.collect_status(redis=r) + assert st["paused"] is True and st["paused_reason"] == "trading" + +@pytest.mark.asyncio +async def test_trader_http_pull_link(): + r = FakeRedis(kv={"worker:ai_trade:heartbeat": _hb("ai_trade","trader","market_open")}) + st = await node_monitor.collect_status(redis=r) + link = next(l for l in st["links"] if l["from"] == "ai_trade") + assert link["type"] == "http-pull" and link["status"] == "healthy"