"""분산 워커 상태 집계 (read-only). Global Constraints 계약 2 스키마 생성.""" from __future__ import annotations import datetime as dt, json, logging import redis.asyncio as aioredis from .config import REDIS_URL, NODE_ALERT_DEADLETTER_THRESHOLD logger = logging.getLogger("agent-office.node_monitor") _node_state: dict[str, bool] = {} # name -> 직전 alive _dl_notified: dict[str, int] = {} # name -> 직전 알린 dead_letter 수 WORKER_REGISTRY = [ {"name": "music-render", "kind": "render", "queue": "queue:music-render"}, {"name": "video-render", "kind": "render", "queue": "queue:video-render"}, {"name": "image-render", "kind": "render", "queue": "queue:image-render"}, {"name": "insta-render", "kind": "render", "queue": "queue:insta-render"}, {"name": "task-watcher", "kind": "watcher", "queue": None}, {"name": "ai_trade", "kind": "trader", "queue": None}, ] _redis = None def _get_redis(): global _redis if _redis is None: _redis = aioredis.from_url(REDIS_URL, decode_responses=False) return _redis def _beat_age(ts_str, now): try: beat = dt.datetime.fromisoformat(ts_str.replace("Z", "+00:00")) return max(0, int((now - beat).total_seconds())) except Exception: return None def _render_link_status(w): if not w["alive"]: return "down" if w["state"] == "paused": return "paused" if w["dead_letter"] > 0: return "degraded" return "healthy" async def collect_status(redis=None) -> dict: r = redis or _get_redis() now = dt.datetime.now(dt.timezone.utc) out = {"redis_ok": True, "paused": False, "paused_reason": None, "generated_at": now.strftime("%Y-%m-%dT%H:%M:%SZ"), "workers": [], "links": []} try: out["paused"] = (await r.get("queue:paused")) == b"1" except Exception: logger.exception("redis 접근 실패") out["redis_ok"] = False return out for w in WORKER_REGISTRY: try: info = {"name": w["name"], "kind": w["kind"], "alive": False, "state": None, "last_beat_age_s": None, "queue_depth": 0, "dead_letter": 0, "processing": 0, "jobs_done": 0, "jobs_failed": 0, "last_job_at": None} raw = await r.get(f"worker:{w['name']}:heartbeat") if raw: try: hb = json.loads(raw) info.update(alive=True, state=hb.get("state"), jobs_done=hb.get("jobs_done", 0), jobs_failed=hb.get("jobs_failed", 0), last_job_at=hb.get("last_job_at"), last_beat_age_s=_beat_age(hb.get("ts") or "", now)) if w["kind"] == "watcher" and hb.get("mode"): out["paused_reason"] = hb["mode"] except (json.JSONDecodeError, UnicodeDecodeError): logger.warning("heartbeat JSON 파싱 실패 name=%s", w["name"]) if w["queue"]: info["queue_depth"] = await r.llen(w["queue"]) info["dead_letter"] = await r.llen(f"dead_letter:{w['queue']}") proc = 0 async for key in r.scan_iter(match=f"processing:{w['queue']}:*"): proc += await r.llen(key) info["processing"] = proc out["workers"].append(info) except Exception: logger.exception("워커 상태 수집 실패 name=%s", w["name"]) out["redis_ok"] = False break for w in out["workers"]: if w["kind"] == "trader": out["links"].append({"from": "ai_trade", "to": "nas-stock", "type": "http-pull", "status": "healthy" if w["alive"] else "down"}) elif w["kind"] == "render": out["links"].append({"from": "nas", "to": w["name"], "type": "redis-queue", "status": _render_link_status(w)}) if out["paused"] and not out["paused_reason"]: out["paused_reason"] = "trading" return out async def check_and_alert(status=None) -> list[str]: """워커 상태를 점검해 다운/복구/dead-letter 전이를 텔레그램으로 경보한다. 첫 관측(prev=None)엔 경보 없음 — 부팅 시 false alarm 방지. 반환값: 실제로 전송된 경보 텍스트 목록 (테스트용). """ from .telegram.messaging import send_raw from .db import add_log try: st = status or await collect_status() except Exception: logger.exception("collect_status 예외") return [] sent: list[str] = [] for w in st["workers"]: name, alive = w["name"], w.get("alive", False) prev = _node_state.get(name) if prev is True and not alive: text = f"🔴 [{name}] 워커 다운" if (await send_raw(text=text)).get("ok"): add_log("node_monitor", f"{name} 다운", "warning") sent.append(text) elif prev is False and alive: text = f"🟢 [{name}] 워커 복구" if (await send_raw(text=text)).get("ok"): add_log("node_monitor", f"{name} 복구", "info") sent.append(text) _node_state[name] = alive dl = w.get("dead_letter", 0) if dl >= NODE_ALERT_DEADLETTER_THRESHOLD and dl != _dl_notified.get(name, 0): text = f"❌ [{name}] 실패 누적 {dl}건 (dead-letter)" if (await send_raw(text=text)).get("ok"): add_log("node_monitor", f"{name} dead-letter {dl}", "warning") sent.append(text) _dl_notified[name] = dl elif dl == 0: _dl_notified.pop(name, None) return sent