Files
web-page-backend/agent-office/app/node_monitor.py
gahusb 94cddccaa7 fix(agent-office): alive를 heartbeat staleness로 판정 + 다운/복구 전이 발송실패 시 재시도 (최종 리뷰 I1·I2)
I1: collect_status - heartbeat 키 존재 여부가 아닌 ts age 기반으로 alive 판정.
    age > NODE_STALE_THRESHOLD_SEC(90s, env 주입 가능)이면 키 있어도 dead.
    config.py에 NODE_STALE_THRESHOLD_SEC=90 추가.
I2: check_and_alert - 다운/복구 전이 시 send_raw 실패하면 _node_state 갱신 보류.
    다음 사이클에서 동일 전이 재감지 → 재발송 시도 (다운 이벤트 유실 방지).
테스트: _hb 헬퍼 현재 시각 기본값으로 수정 + 신규 2개 (stale→dead, I2 재시도 회귀).
14 passed.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
2026-06-29 18:50:45 +09:00

148 lines
6.0 KiB
Python

"""분산 워커 상태 집계 (read-only). Global Constraints 계약 2 스키마 생성."""
from __future__ import annotations
import datetime as dt, json, logging
import redis.asyncio as aioredis
from .config import REDIS_URL, NODE_ALERT_DEADLETTER_THRESHOLD, NODE_STALE_THRESHOLD_SEC
logger = logging.getLogger("agent-office.node_monitor")
_node_state: dict[str, bool] = {} # name -> 직전 alive
_dl_notified: dict[str, int] = {} # name -> 직전 알린 dead_letter 수
WORKER_REGISTRY = [
{"name": "music-render", "kind": "render", "queue": "queue:music-render"},
{"name": "video-render", "kind": "render", "queue": "queue:video-render"},
{"name": "image-render", "kind": "render", "queue": "queue:image-render"},
{"name": "insta-render", "kind": "render", "queue": "queue:insta-render"},
{"name": "task-watcher", "kind": "watcher", "queue": None},
{"name": "ai_trade", "kind": "trader", "queue": None},
]
_redis = None
def _get_redis():
global _redis
if _redis is None:
_redis = aioredis.from_url(REDIS_URL, decode_responses=False)
return _redis
def _beat_age(ts_str, now):
try:
beat = dt.datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
return max(0, int((now - beat).total_seconds()))
except Exception:
return None
def _render_link_status(w):
if not w["alive"]:
return "down"
if w["state"] == "paused":
return "paused"
if w["dead_letter"] > 0:
return "degraded"
return "healthy"
async def collect_status(redis=None) -> dict:
r = redis or _get_redis()
now = dt.datetime.now(dt.timezone.utc)
out = {"redis_ok": True, "paused": False, "paused_reason": None,
"generated_at": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
"workers": [], "links": []}
try:
out["paused"] = (await r.get("queue:paused")) == b"1"
except Exception:
logger.exception("redis 접근 실패")
out["redis_ok"] = False
return out
for w in WORKER_REGISTRY:
try:
info = {"name": w["name"], "kind": w["kind"], "alive": False, "state": None,
"last_beat_age_s": None, "queue_depth": 0, "dead_letter": 0,
"processing": 0, "jobs_done": 0, "jobs_failed": 0, "last_job_at": None}
raw = await r.get(f"worker:{w['name']}:heartbeat")
if raw:
try:
hb = json.loads(raw)
age = _beat_age(hb.get("ts") or "", now)
info["last_beat_age_s"] = age
info["alive"] = age is not None and age <= NODE_STALE_THRESHOLD_SEC
info["state"] = hb.get("state")
info["jobs_done"] = hb.get("jobs_done", 0)
info["jobs_failed"] = hb.get("jobs_failed", 0)
info["last_job_at"] = hb.get("last_job_at")
if w["kind"] == "watcher" and hb.get("mode"):
out["paused_reason"] = hb["mode"]
except (json.JSONDecodeError, UnicodeDecodeError):
logger.warning("heartbeat JSON 파싱 실패 name=%s", w["name"])
if w["queue"]:
info["queue_depth"] = await r.llen(w["queue"])
info["dead_letter"] = await r.llen(f"dead_letter:{w['queue']}")
proc = 0
async for key in r.scan_iter(match=f"processing:{w['queue']}:*"):
proc += await r.llen(key)
info["processing"] = proc
out["workers"].append(info)
except Exception:
logger.exception("워커 상태 수집 실패 name=%s", w["name"])
out["redis_ok"] = False
break
for w in out["workers"]:
if w["kind"] == "trader":
out["links"].append({"from": "ai_trade", "to": "nas-stock", "type": "http-pull",
"status": "healthy" if w["alive"] else "down"})
elif w["kind"] == "render":
out["links"].append({"from": "nas", "to": w["name"], "type": "redis-queue",
"status": _render_link_status(w)})
if out["paused"] and not out["paused_reason"]:
out["paused_reason"] = "trading"
return out
async def check_and_alert(status=None) -> list[str]:
"""워커 상태를 점검해 다운/복구/dead-letter 전이를 텔레그램으로 경보한다.
첫 관측(prev=None)엔 경보 없음 — 부팅 시 false alarm 방지.
반환값: 실제로 전송된 경보 텍스트 목록 (테스트용).
"""
from .telegram.messaging import send_raw
from .db import add_log
try:
st = status or await collect_status()
except Exception:
logger.exception("collect_status 예외")
return []
sent: list[str] = []
for w in st["workers"]:
name = w["name"]
alive = w.get("alive", False)
prev = _node_state.get(name)
transition_send_failed = False
if prev is True and not alive:
text = f"🔴 [{name}] 워커 다운"
if (await send_raw(text=text)).get("ok"):
add_log("node_monitor", f"{name} 다운", "warning"); sent.append(text)
else:
transition_send_failed = True
elif prev is False and alive:
text = f"🟢 [{name}] 워커 복구"
if (await send_raw(text=text)).get("ok"):
add_log("node_monitor", f"{name} 복구", "info"); sent.append(text)
else:
transition_send_failed = True
if not transition_send_failed:
_node_state[name] = alive
dl = w.get("dead_letter", 0)
if dl >= NODE_ALERT_DEADLETTER_THRESHOLD and dl != _dl_notified.get(name, 0):
text = f"❌ [{name}] 실패 누적 {dl}건 (dead-letter)"
if (await send_raw(text=text)).get("ok"):
add_log("node_monitor", f"{name} dead-letter {dl}", "warning")
sent.append(text)
_dl_notified[name] = dl
elif dl == 0:
_dl_notified.pop(name, None)
return sent