Files
web-page-backend/agent-office/tests/test_node_monitor.py
gahusb 246c8d5328 feat(agent-office): node_monitor에 trade-monitor 워커 등재 + trader 링크 from을 워커명으로 수정
WSL 워커 관측 규칙 — 모든 WSL docker 워커는 /infra에서 모니터링 가능해야 함.
trade-monitor(kind=trader) 등재 → /nodes·/infra 노출. 링크 from 하드코딩('ai_trade')을
w[name]으로 고쳐 다중 trader가 각자 링크를 갖도록 함. 미배포 워커는 prev=None이라 다운 경보 없음.
2026-07-03 10:45:45 +09:00

209 lines
10 KiB
Python

# agent-office/tests/test_node_monitor.py
import datetime as dt
import json, pytest
from app import node_monitor
import app.node_monitor as nm
class FakeRedis:
"""worker heartbeat + queue llen + scan_iter 흉내."""
def __init__(self, kv=None, lists=None):
self._kv = kv or {} # key(str) -> bytes
self._lists = lists or {} # key(str) -> length(int)
async def get(self, key):
return self._kv.get(key)
async def llen(self, key):
return self._lists.get(key, 0)
async def scan_iter(self, match=None):
prefix = match.rstrip("*")
for k in list(self._lists):
if k.startswith(prefix):
yield k
def _hb(name, kind, state, ts=None, **extra):
"""heartbeat 페이로드 생성. ts 기본값은 현재 시각(신선한 heartbeat)."""
if ts is None:
ts = dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
return json.dumps({"name": name, "kind": kind, "state": state, "ts": ts,
"last_job_at": None, "jobs_done": 0, "jobs_failed": 0, **extra}).encode()
@pytest.mark.asyncio
async def test_alive_worker_healthy_link():
r = FakeRedis(kv={"worker:image-render:heartbeat": _hb("image-render","render","idle")})
st = await node_monitor.collect_status(redis=r)
img = next(w for w in st["workers"] if w["name"] == "image-render")
assert img["alive"] is True and img["state"] == "idle"
link = next(l for l in st["links"] if l["to"] == "image-render")
assert link["status"] == "healthy" and link["type"] == "redis-queue"
@pytest.mark.asyncio
async def test_missing_heartbeat_is_dead_and_down():
r = FakeRedis() # heartbeat 없음
st = await node_monitor.collect_status(redis=r)
img = next(w for w in st["workers"] if w["name"] == "image-render")
assert img["alive"] is False
link = next(l for l in st["links"] if l["to"] == "image-render")
assert link["status"] == "down"
@pytest.mark.asyncio
async def test_dead_letter_makes_degraded():
r = FakeRedis(kv={"worker:video-render:heartbeat": _hb("video-render","render","idle")},
lists={"dead_letter:queue:video-render": 2})
st = await node_monitor.collect_status(redis=r)
vid = next(w for w in st["workers"] if w["name"] == "video-render")
assert vid["dead_letter"] == 2
link = next(l for l in st["links"] if l["to"] == "video-render")
assert link["status"] == "degraded"
@pytest.mark.asyncio
async def test_paused_reason_from_watcher():
r = FakeRedis(kv={"queue:paused": b"1",
"worker:task-watcher:heartbeat": _hb("task-watcher","watcher","trading",mode="trading")})
st = await node_monitor.collect_status(redis=r)
assert st["paused"] is True and st["paused_reason"] == "trading"
@pytest.mark.asyncio
async def test_trader_http_pull_link():
r = FakeRedis(kv={"worker:ai_trade:heartbeat": _hb("ai_trade","trader","market_open")})
st = await node_monitor.collect_status(redis=r)
link = next(l for l in st["links"] if l["from"] == "ai_trade")
assert link["type"] == "http-pull" and link["status"] == "healthy"
@pytest.mark.asyncio
async def test_trade_monitor_registered_and_own_link():
"""WSL 워커 trade-monitor가 registry에 있어 /nodes에 노출되고, 링크 from은
ai_trade 하드코딩이 아니라 자기 이름(trade-monitor)이어야 한다 (다중 trader 구분)."""
r = FakeRedis(kv={"worker:trade-monitor:heartbeat": _hb("trade-monitor", "trader", "market_open")})
st = await node_monitor.collect_status(redis=r)
tm = next(w for w in st["workers"] if w["name"] == "trade-monitor")
assert tm["alive"] is True and tm["kind"] == "trader"
link = next(l for l in st["links"] if l["from"] == "trade-monitor")
assert link["type"] == "http-pull" and link["to"] == "nas-stock" and link["status"] == "healthy"
@pytest.mark.asyncio
async def test_paused_no_watcher_heartbeat_fallback_reason():
"""paused=True인데 watcher heartbeat 없으면 paused_reason == 'trading' 폴백."""
r = FakeRedis(kv={"queue:paused": b"1"}) # watcher heartbeat 없음
st = await node_monitor.collect_status(redis=r)
assert st["paused"] is True
assert st["paused_reason"] == "trading"
@pytest.mark.asyncio
async def test_processing_count_image_render():
"""processing:<queue>:<worker_id> 리스트가 있으면 processing 필드에 합산된다."""
worker_id = "abc123"
proc_key = f"processing:queue:image-render:{worker_id}"
r = FakeRedis(
kv={"worker:image-render:heartbeat": _hb("image-render", "render", "busy")},
lists={proc_key: 3},
)
st = await node_monitor.collect_status(redis=r)
img = next(w for w in st["workers"] if w["name"] == "image-render")
assert img["processing"] == 3
@pytest.mark.asyncio
async def test_llen_exception_returns_redis_ok_false():
"""워커 루프 중 llen 예외 발생 시 예외를 전파하지 않고 redis_ok=False 반환 (Blocker 회귀)."""
class BrokenLlenRedis(FakeRedis):
async def llen(self, key):
raise ConnectionError("Redis 연결 끊김")
r = BrokenLlenRedis(
kv={"worker:music-render:heartbeat": _hb("music-render", "render", "idle")}
)
st = await node_monitor.collect_status(redis=r)
assert st["redis_ok"] is False
@pytest.mark.asyncio
async def test_alert_on_alive_to_dead(monkeypatch):
sent = []
async def fake_send_raw(text, **kw): sent.append(text); return {"ok": True}
monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw)
monkeypatch.setattr("app.db.add_log", lambda *a, **k: None)
nm._node_state.clear(); nm._dl_notified.clear()
alive = {"workers": [{"name":"image-render","alive":True,"dead_letter":0}], "links": []}
dead = {"workers": [{"name":"image-render","alive":False,"dead_letter":0}], "links": []}
await nm.check_and_alert(status=alive) # 첫 관측 — 경보 없음
assert sent == []
await nm.check_and_alert(status=dead) # alive→dead 전이
assert any("다운" in t for t in sent)
@pytest.mark.asyncio
async def test_alert_on_dead_letter_growth(monkeypatch):
sent = []
async def fake_send_raw(text, **kw): sent.append(text); return {"ok": True}
monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw)
monkeypatch.setattr("app.db.add_log", lambda *a, **k: None)
nm._node_state.clear(); nm._dl_notified.clear()
s = {"workers": [{"name":"video-render","alive":True,"dead_letter":2}], "links": []}
await nm.check_and_alert(status=s)
assert any("dead-letter" in t for t in sent)
@pytest.mark.asyncio
async def test_dl_notified_not_updated_on_telegram_failure(monkeypatch):
"""텔레그램 실패(ok=False) 시 _dl_notified 갱신 안 됨 → 다음 사이클에서 재시도."""
calls = []
async def fake_send_raw(text, **kw):
calls.append(text)
if len(calls) == 1:
return {"ok": False} # 첫 호출: 텔레그램 다운
return {"ok": True} # 두 번째 호출: 성공
monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw)
monkeypatch.setattr("app.db.add_log", lambda *a, **k: None)
nm._node_state.clear(); nm._dl_notified.clear()
s = {"workers": [{"name": "video-render", "alive": True, "dead_letter": 2}], "links": []}
# 첫 호출: 텔레그램 다운 → ok=False → _dl_notified 갱신 안 됨
result1 = await nm.check_and_alert(status=s)
assert result1 == []
assert nm._dl_notified.get("video-render", 0) == 0
# 두 번째 호출: 같은 dl=2 → _dl_notified 미갱신으로 조건 재만족 → 재시도 발송
result2 = await nm.check_and_alert(status=s)
assert any("dead-letter" in t for t in result2)
assert nm._dl_notified.get("video-render") == 2
# ── I1: staleness 판정 신규 테스트 ─────────────────────────────────────────────
@pytest.mark.asyncio
async def test_stale_heartbeat_is_dead():
"""heartbeat 키가 존재해도 ts가 90s 초과면 alive=False (staleness 판정)."""
stale_ts = (dt.datetime.now(dt.timezone.utc) - dt.timedelta(seconds=300)).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
r = FakeRedis(kv={"worker:image-render:heartbeat": _hb("image-render", "render", "idle", ts=stale_ts)})
st = await node_monitor.collect_status(redis=r)
img = next(w for w in st["workers"] if w["name"] == "image-render")
assert img["alive"] is False
link = next(l for l in st["links"] if l["to"] == "image-render")
assert link["status"] == "down"
# ── I2: 전이 발송 실패 시 재시도 회귀 테스트 ──────────────────────────────────
@pytest.mark.asyncio
async def test_transition_send_failure_retries_next_cycle(monkeypatch):
"""alive→dead 전이 시 send_raw 실패하면 _node_state 갱신 안 됨 → 다음 사이클 재시도."""
calls = []
async def fake_send_raw(text, **kw):
calls.append(text)
if len(calls) == 1:
return {"ok": False} # 첫 호출: 텔레그램 다운
return {"ok": True} # 두 번째 호출: 성공
monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw)
monkeypatch.setattr("app.db.add_log", lambda *a, **k: None)
nm._node_state.clear(); nm._dl_notified.clear()
alive = {"workers": [{"name": "music-render", "alive": True, "dead_letter": 0}], "links": []}
dead = {"workers": [{"name": "music-render", "alive": False, "dead_letter": 0}], "links": []}
# 첫 관측: baseline 설정(전이 없음)
await nm.check_and_alert(status=alive)
assert nm._node_state.get("music-render") is True
# alive→dead 전이, send_raw 실패 → _node_state 갱신 안 됨
result1 = await nm.check_and_alert(status=dead)
assert result1 == [] # 경보 미발송
assert nm._node_state.get("music-render") is True # 여전히 True
# 두 번째 사이클: 동일 dead, send_raw 성공 → 경보 발송
result2 = await nm.check_and_alert(status=dead)
assert any("다운" in t for t in result2)
assert nm._node_state.get("music-render") is False # 이제 갱신