Files
web-page-backend/docs/superpowers/plans/2026-06-29-worker-observability.md
gahusb c69b18243b docs: 분산 워커 관측 시스템 구현 계획(3-repo TDD plan) 추가
Part A(web-ai heartbeat) / Part B(agent-office 집계+경보) / Part C(web-ui
Three.js 대시보드). 각 Part 독립 실행·테스트 가능, 계약 2개를 Global
Constraints로 잠금.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
2026-06-29 17:33:16 +09:00

44 KiB

분산 워커 관측 시스템 Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: NAS↔Windows 분산 워커 6종의 생사·큐·실패 상태를 heartbeat로 관측하고, agent-office가 집계·텔레그램 경보하며, web-ui /infra에서 Three.js 파이프라인으로 시각화한다.

Architecture: 워커가 NAS Redis에 worker:<name>:heartbeat(TTL 45s) push → agent-office node_monitor가 heartbeat + ReliableQueue의 processing:*/dead_letter:* + queue:paused를 읽어 GET /api/agent-office/nodes로 노출 → 1분 cron이 상태 전이 시 텔레그램 경보 → web-ui가 3초 폴링해 토폴로지 시각화.

Tech Stack: Python 3.12 / FastAPI / redis.asyncio / APScheduler (web-backend·web-ai), React + Vite / three + @react-three/fiber + drei (web-ui).

Global Constraints

이 계약 2개는 3 Part가 독립 병렬 작업하기 위한 잠금 규약. 모든 Task는 이 값을 그대로 사용한다.

[계약 1] Heartbeat 키 스키마

  • 키: worker:<name>:heartbeat (name ∈ music-render, video-render, image-render, insta-render, task-watcher, ai_trade)
  • 명령: SET worker:<name>:heartbeat <json> EX 45
  • 값(JSON): {"name": str, "kind": "render"|"watcher"|"trader", "state": str, "ts": "<UTC ISO8601 ...Z>", "last_job_at": str|null, "jobs_done": int, "jobs_failed": int, "mode": str(optional, task-watcher 전용)}
  • state kind별: render=idle|busy|paused / watcher=trading|free / trader=market_open|market_closed
  • 발신 주기 15초, TTL 45초(3배). redis 클라이언트는 decode_responses=False(바이트) 기준 — 기존 워커와 동일.

[계약 2] GET /api/agent-office/nodes 응답 스키마

{ "redis_ok": true, "paused": false, "paused_reason": "trading", "generated_at": "...Z",
  "workers": [{"name":"image-render","kind":"render","alive":true,"state":"idle",
    "last_beat_age_s":3,"queue_depth":0,"dead_letter":0,"processing":0,
    "jobs_done":42,"jobs_failed":1,"last_job_at":"...Z"}],
  "links": [{"from":"nas","to":"image-render","type":"redis-queue","status":"healthy"}] }
  • links[].statushealthy|paused|down|degraded, links[].typeredis-queue|http-pull

공통 규칙: .env 커밋 금지. Docker는 NAS에서만 구동(로컬 docker 명령 금지). 각 repo는 자기 경로에서만 커밋. NAS Redis 주소 redis://192.168.45.54:6379(워커 측), 컨테이너 내부 redis://redis:6379(NAS 측).


Part A — web-ai 워커 heartbeat (AI 세션 소유)

repo: web-ai. 테스트: 시스템 Python(C:\Users\jaeoh\AppData\Local\Programs\Python\Python312\python.exe -m pytest) — .venv 한글경로 깨짐(web-ai CLAUDE.md).

Task A1: 공용 heartbeat 모듈

Files:

  • Create: services/_shared/heartbeat.py
  • Test: services/_shared/tests/test_heartbeat.py

Interfaces:

  • Produces: class WorkerStats(busy:bool, jobs_done:int, jobs_failed:int, last_job_at:str|None), def utc_now_iso()->str, def build_payload(name,kind,state,stats,extra=None)->str, async def render_state(redis,stats,paused_key="queue:paused")->str, async def heartbeat_loop(redis,name,kind,stats,*,interval=15,ttl=45,paused_key="queue:paused",state_fn=None)

  • Step 1: Write the failing test

# services/_shared/tests/test_heartbeat.py
import json
import pytest
from _shared.heartbeat import WorkerStats, build_payload, render_state

def test_build_payload_has_contract_fields():
    s = WorkerStats(); s.jobs_done = 3; s.last_job_at = "2026-06-29T00:00:00Z"
    payload = json.loads(build_payload("image-render", "render", "idle", s))
    assert payload["name"] == "image-render"
    assert payload["kind"] == "render"
    assert payload["state"] == "idle"
    assert payload["jobs_done"] == 3
    assert payload["last_job_at"] == "2026-06-29T00:00:00Z"
    assert payload["ts"].endswith("Z")

def test_build_payload_merges_extra():
    payload = json.loads(build_payload("task-watcher", "watcher", "free", WorkerStats(), extra={"mode": "free"}))
    assert payload["mode"] == "free"

class _FakeRedis:
    def __init__(self, paused): self._paused = paused
    async def get(self, key): return b"1" if self._paused else None

@pytest.mark.asyncio
async def test_render_state_paused_overrides_busy():
    s = WorkerStats(); s.busy = True
    assert await render_state(_FakeRedis(paused=True), s) == "paused"

@pytest.mark.asyncio
async def test_render_state_busy_then_idle():
    s = WorkerStats(); s.busy = True
    assert await render_state(_FakeRedis(paused=False), s) == "busy"
    s.busy = False
    assert await render_state(_FakeRedis(paused=False), s) == "idle"
  • Step 2: Run test to verify it fails

Run: python -m pytest services/_shared/tests/test_heartbeat.py -v Expected: FAIL with ModuleNotFoundError: No module named '_shared.heartbeat'

  • Step 3: Write minimal implementation
# services/_shared/heartbeat.py
"""분산 워커 heartbeat — worker:<name>:heartbeat SET (TTL). Global Constraints 계약 1."""
from __future__ import annotations
import asyncio, datetime as dt, json, logging, os

logger = logging.getLogger(__name__)
DEFAULT_INTERVAL = int(os.getenv("HEARTBEAT_INTERVAL", "15"))
DEFAULT_TTL = int(os.getenv("HEARTBEAT_TTL", "45"))


class WorkerStats:
    """worker_loop가 갱신, heartbeat_loop가 읽는 가변 카운터."""
    def __init__(self):
        self.busy = False
        self.jobs_done = 0
        self.jobs_failed = 0
        self.last_job_at = None  # ISO str | None


def utc_now_iso() -> str:
    return dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


def build_payload(name: str, kind: str, state: str, stats: WorkerStats, extra: dict | None = None) -> str:
    payload = {
        "name": name, "kind": kind, "state": state, "ts": utc_now_iso(),
        "last_job_at": stats.last_job_at,
        "jobs_done": stats.jobs_done, "jobs_failed": stats.jobs_failed,
    }
    if extra:
        payload.update(extra)
    return json.dumps(payload)


async def render_state(redis, stats: WorkerStats, paused_key: str = "queue:paused") -> str:
    if await redis.get(paused_key) == b"1":
        return "paused"
    return "busy" if stats.busy else "idle"


async def heartbeat_loop(redis, name, kind, stats, *, interval=DEFAULT_INTERVAL,
                         ttl=DEFAULT_TTL, paused_key="queue:paused", state_fn=None):
    key = f"worker:{name}:heartbeat"
    logger.info("heartbeat 시작 name=%s ttl=%ds", name, ttl)
    while True:
        try:
            if state_fn is not None:
                state, extra = await state_fn(redis, stats)
            else:
                state, extra = await render_state(redis, stats, paused_key), None
            await redis.set(key, build_payload(name, kind, state, stats, extra), ex=ttl)
        except asyncio.CancelledError:
            raise
        except Exception:
            logger.exception("heartbeat 발신 실패 name=%s", name)
        await asyncio.sleep(interval)
  • Step 4: Run test to verify it passes

Run: python -m pytest services/_shared/tests/test_heartbeat.py -v Expected: PASS (4 passed)

  • Step 5: Commit
git add services/_shared/heartbeat.py services/_shared/tests/test_heartbeat.py
git commit -m "feat(_shared): 워커 heartbeat 모듈 (worker:<name>:heartbeat TTL SET)"

Task A2: image-render 워커에 heartbeat 배선 (+ 나머지 3 render 워커 동형 적용)

Files:

  • Modify: services/image-render/worker.py (stats 추가 + poll_once 카운터)
  • Modify: services/image-render/main.py (lifespan에 heartbeat_loop 태스크)
  • Test: services/image-render/tests/test_worker.py (poll_once 카운터 검증 추가)

Interfaces:

  • Consumes: _shared.heartbeat.WorkerStats, heartbeat_loop, utc_now_iso

  • Produces: worker.stats(모듈 레벨 WorkerStats) — main.py가 heartbeat에 전달

  • Step 1: Write the failing test (poll_once가 성공 시 jobs_done 증가·busy 복귀)

# services/image-render/tests/test_worker.py 에 추가
import asyncio, pytest
import worker

class _OneJobQueue:
    def __init__(self): self.acked = False
    async def dequeue(self, timeout=5):
        if self.acked: return None
        return ({"job_type": "flux_generation", "task_id": "t1", "params": {}}, b"raw")
    async def ack(self, raw): self.acked = True
    async def fail(self, raw, payload): pass

@pytest.mark.asyncio
async def test_poll_once_increments_jobs_done(monkeypatch):
    worker.stats.jobs_done = 0
    monkeypatch.setattr(worker, "run_flux_generation", lambda task_id, params: None)
    handled = await worker.poll_once(_OneJobQueue())
    assert handled is True
    assert worker.stats.jobs_done == 1
    assert worker.stats.busy is False
    assert worker.stats.last_job_at is not None
  • Step 2: Run test to verify it fails

Run: python -m pytest services/image-render/tests/test_worker.py::test_poll_once_increments_jobs_done -v Expected: FAIL with AttributeError: module 'worker' has no attribute 'stats'

  • Step 3: Write minimal implementation

worker.py — import + 모듈 레벨 stats 추가, poll_once 교체:

from _shared.heartbeat import WorkerStats, utc_now_iso  # 상단 import 블록에 추가

stats = WorkerStats()  # 모듈 레벨 (REDIS_URL 상수 근처)

async def poll_once(queue: ReliableQueue) -> bool:
    result = await queue.dequeue(timeout=5)
    if result is None:
        return False
    payload, raw = result
    stats.busy = True
    try:
        await asyncio.to_thread(_dispatch, payload)
    except Exception:
        logger.exception("dispatch unhandled exception task_id=%s", payload.get("task_id"))
        await queue.fail(raw, payload)
        stats.jobs_failed += 1
        stats.last_job_at = utc_now_iso()
        stats.busy = False
        return True
    await queue.ack(raw)
    stats.jobs_done += 1
    stats.last_job_at = utc_now_iso()
    stats.busy = False
    return True

main.py — lifespan에 heartbeat 태스크 추가:

import os
import redis.asyncio as aioredis
from _shared.heartbeat import heartbeat_loop

@asynccontextmanager
async def lifespan(app: FastAPI):
    worker_task = asyncio.create_task(worker.worker_loop())
    hb_redis = aioredis.from_url(os.getenv("REDIS_URL", "redis://192.168.45.54:6379"), decode_responses=False)
    hb_task = asyncio.create_task(heartbeat_loop(hb_redis, "image-render", "render", worker.stats))
    logger.info("image-render lifespan 시작")
    try:
        yield
    finally:
        for t in (worker_task, hb_task):
            t.cancel()
            try:
                await t
            except asyncio.CancelledError:
                pass
        await hb_redis.aclose()
        logger.info("image-render lifespan 종료")
  • Step 4: Run test to verify it passes

Run: python -m pytest services/image-render/tests/ -v Expected: PASS (기존 + 신규 1)

  • Step 5: 나머지 3 render 워커 동형 적용

video-render, music-render, insta-render 각각에 Step 3과 동일 패턴 적용. 유일한 차이는 main.py의 워커 이름 문자열:

  • services/video-render/main.py: heartbeat_loop(hb_redis, "video-render", "render", worker.stats)

  • services/music-render/main.py: heartbeat_loop(hb_redis, "music-render", "render", worker.stats)

  • services/insta-render/main.py: heartbeat_loop(hb_redis, "insta-render", "render", worker.stats)

  • worker.pyfrom _shared.heartbeat import WorkerStats, utc_now_iso + stats = WorkerStats() + poll_once 카운터(ack→jobs_done, fail→jobs_failed, 둘 다 last_job_at/busy 갱신). music-render는 worker.py 구조가 더 복잡(12 job_type)하지만 hook 지점은 동일(dispatch 전 busy=True, ack/fail 후 카운터+busy=False).

  • tests/test_worker.py에 Step 1 테스트를 워커별 job_type으로 맞춰 추가.

  • Step 6: Commit

git add services/image-render services/video-render services/music-render services/insta-render
git commit -m "feat(render-workers): 4 render 워커 heartbeat 배선 + poll_once 카운터"

Task A3: task-watcher heartbeat (mode 포함)

Files:

  • Modify: services/task-watcher/watcher.py
  • Test: services/task-watcher/tests/test_watcher.py (신규)

Interfaces:

  • Consumes: _shared.heartbeat.build_payload, WorkerStats

  • Produces: worker:task-watcher:heartbeat 값에 state=mode, mode=mode

  • Step 1: Write the failing test

# services/task-watcher/tests/test_watcher.py
import json
from _shared.heartbeat import build_payload, WorkerStats

def test_watcher_heartbeat_payload_carries_mode():
    payload = json.loads(build_payload("task-watcher", "watcher", "trading", WorkerStats(), extra={"mode": "trading"}))
    assert payload["kind"] == "watcher"
    assert payload["state"] == "trading"
    assert payload["mode"] == "trading"
  • Step 2: Run test to verify it fails

Run: python -m pytest services/task-watcher/tests/test_watcher.py -v Expected: FAIL (ModuleNotFoundError 또는 import 경로 — task-watcher의 PYTHONPATH에 _shared 추가 필요. compose PYTHONPATH=/app:/shared 확인)

  • Step 3: Write minimal implementation (watcher_loop에 heartbeat SET 추가)
# watcher.py 상단 import
from _shared.heartbeat import build_payload, WorkerStats
_HB_STATS = WorkerStats()
HEARTBEAT_KEY = "worker:task-watcher:heartbeat"
HEARTBEAT_TTL = 45

# watcher_loop while 본문 — mode 판정 직후 추가:
            mode = current_mode(now, holidays)
            if mode == "trading":
                await redis.set(PAUSED_KEY, b"1", ex=PAUSED_TTL)
            else:
                await redis.delete(PAUSED_KEY)
            # heartbeat (LOOP_INTERVAL=30s < TTL 45s)
            await redis.set(HEARTBEAT_KEY,
                            build_payload("task-watcher", "watcher", mode, _HB_STATS, extra={"mode": mode}),
                            ex=HEARTBEAT_TTL)

참고: watcher 루프 주기 30s < TTL 45s 라 만료 전 갱신됨.

  • Step 4: Run test to verify it passes

Run: python -m pytest services/task-watcher/tests/test_watcher.py -v Expected: PASS

  • Step 5: Commit
git add services/task-watcher
git commit -m "feat(task-watcher): heartbeat 발신 (state=mode, paused 이유 노출)"

Task A4: ai_trade heartbeat (다른 런타임)

Files:

  • Modify: ai_trade/requirements.txt 또는 루트 requirements.txt (redis 추가 — 기존 미포함 시)
  • Create: ai_trade/heartbeat.py (자체 미니 — 호스트 실행이라 _shared import 경로 다름)
  • Modify: ai_trade/main.py (lifespan에 heartbeat 태스크)
  • Test: ai_trade/tests/test_heartbeat.py

Interfaces:

  • Produces: worker:ai_trade:heartbeat (kind=trader, state=market_open|market_closed)

  • Consumes: ai_trade/scheduler.py의 폴링 윈도우 판정 함수 (AI 세션이 정확한 함수명 확인 — web-ai CLAUDE.md상 scheduler.py가 polling window 판정 소유)

  • Step 1: Write the failing test

# ai_trade/tests/test_heartbeat.py
import json
from heartbeat import build_trader_payload

def test_trader_payload_market_open():
    p = json.loads(build_trader_payload("market_open", signals=2))
    assert p["name"] == "ai_trade"
    assert p["kind"] == "trader"
    assert p["state"] == "market_open"
    assert p["ts"].endswith("Z")
  • Step 2: Run test to verify it fails

Run: python -m pytest ai_trade/tests/test_heartbeat.py -v Expected: FAIL (ModuleNotFoundError: heartbeat)

  • Step 3: Write minimal implementation
# ai_trade/heartbeat.py
"""ai_trade heartbeat — NAS Redis로 worker:ai_trade:heartbeat SET. Global Constraints 계약 1."""
from __future__ import annotations
import asyncio, datetime as dt, json, logging, os
import redis.asyncio as aioredis

logger = logging.getLogger(__name__)
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
KEY = "worker:ai_trade:heartbeat"
INTERVAL = int(os.getenv("HEARTBEAT_INTERVAL", "15"))
TTL = int(os.getenv("HEARTBEAT_TTL", "45"))


def build_trader_payload(state: str, signals: int = 0) -> str:
    return json.dumps({
        "name": "ai_trade", "kind": "trader", "state": state,
        "ts": dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
        "last_job_at": None, "jobs_done": signals, "jobs_failed": 0,
    })


async def heartbeat_loop(state_fn):
    """state_fn() -> (state:str, signals:int). poll window는 호출자가 주입."""
    redis = aioredis.from_url(REDIS_URL, decode_responses=False)
    try:
        while True:
            try:
                state, signals = state_fn()
                await redis.set(KEY, build_trader_payload(state, signals), ex=TTL)
            except asyncio.CancelledError:
                raise
            except Exception:
                logger.exception("ai_trade heartbeat 실패")
            await asyncio.sleep(INTERVAL)
    finally:
        await redis.aclose()

ai_trade/main.py lifespan에 추가 (AI 세션이 실제 폴링 윈도우 판정으로 state_fn 구성):

import heartbeat as _hb
# lifespan 내, poll_loop task 생성 부근:
def _trader_state():
    # scheduler의 폴링 윈도우 판정 결과로 market_open/market_closed 결정
    # (AI 세션: scheduler의 실제 윈도우 함수로 교체)
    state = "market_open" if scheduler.is_polling_window() else "market_closed"
    return state, len(state_singleton.signals)
hb_task = asyncio.create_task(_hb.heartbeat_loop(_trader_state))
# 종료 시 hb_task.cancel() + await (CancelledError 무시)

requirements.txtredis>=5.0 추가(미포함 시).

  • Step 4: Run test to verify it passes

Run: python -m pytest ai_trade/tests/test_heartbeat.py -v Expected: PASS

  • Step 5: Commit
git add ai_trade/heartbeat.py ai_trade/main.py ai_trade/tests/test_heartbeat.py requirements.txt
git commit -m "feat(ai_trade): NAS Redis heartbeat (trader market_open/closed)"

Part B — web-backend agent-office 집계 + 경보 (이 BE 세션 소유)

repo: web-backend. 테스트: agent-office는 pytest + pytest-asyncio. 로컬에서 pytest 실행 가능(docker 아님).

Task B1: agent-office에 redis 의존성 추가

Files:

  • Modify: agent-office/requirements.txt
  • Modify: agent-office/app/config.py
  • Modify: docker-compose.yml (agent-office 블록 — compose 락 필요)

Interfaces:

  • Produces: config.REDIS_URL, agent-office 컨테이너에 REDIS_URL env + depends_on: redis

  • Step 1: requirements.txt에 redis 추가

redis>=5.0

(파일 끝에 한 줄 추가)

  • Step 2: config.py에 REDIS_URL + 임계 추가
# config.py 끝에 추가
# Redis (node monitor)
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379")
NODE_ALERT_DEADLETTER_THRESHOLD = int(os.getenv("NODE_ALERT_DEADLETTER_THRESHOLD", "1"))
  • Step 3: docker-compose.yml agent-office 블록 수정 (acquire_lock("compose","BE") 후)

agent-office 서비스 블록의 environment:에 추가:

      - REDIS_URL=${REDIS_URL:-redis://redis:6379}

depends_on:redis 추가(블록에 depends_on 없으면 신설):

    depends_on:
      - redis
  • Step 4: Commit
git add agent-office/requirements.txt agent-office/app/config.py docker-compose.yml
git commit -m "chore(agent-office): redis 의존성 + REDIS_URL/dead-letter 임계 설정"

Task B2: node_monitor.collect_status

Files:

  • Create: agent-office/app/node_monitor.py
  • Test: agent-office/tests/test_node_monitor.py

Interfaces:

  • Produces: WORKER_REGISTRY(list of dict), async def collect_status(redis=None) -> dict (계약 2 스키마)

  • Consumes: config.REDIS_URL

  • Step 1: Write the failing test

# agent-office/tests/test_node_monitor.py
import json, pytest
from app import node_monitor

class FakeRedis:
    """worker heartbeat + queue llen + scan_iter 흉내."""
    def __init__(self, kv=None, lists=None):
        self._kv = kv or {}           # key(str) -> bytes
        self._lists = lists or {}     # key(str) -> length(int)
    async def get(self, key):
        return self._kv.get(key)
    async def llen(self, key):
        return self._lists.get(key, 0)
    async def scan_iter(self, match=None):
        prefix = match.rstrip("*")
        for k in list(self._lists):
            if k.startswith(prefix):
                yield k

def _hb(name, kind, state, **extra):
    return json.dumps({"name": name, "kind": kind, "state": state, "ts": "2026-06-29T00:00:00Z",
                       "last_job_at": None, "jobs_done": 0, "jobs_failed": 0, **extra}).encode()

@pytest.mark.asyncio
async def test_alive_worker_healthy_link():
    r = FakeRedis(kv={"worker:image-render:heartbeat": _hb("image-render","render","idle")})
    st = await node_monitor.collect_status(redis=r)
    img = next(w for w in st["workers"] if w["name"] == "image-render")
    assert img["alive"] is True and img["state"] == "idle"
    link = next(l for l in st["links"] if l["to"] == "image-render")
    assert link["status"] == "healthy" and link["type"] == "redis-queue"

@pytest.mark.asyncio
async def test_missing_heartbeat_is_dead_and_down():
    r = FakeRedis()  # heartbeat 없음
    st = await node_monitor.collect_status(redis=r)
    img = next(w for w in st["workers"] if w["name"] == "image-render")
    assert img["alive"] is False
    link = next(l for l in st["links"] if l["to"] == "image-render")
    assert link["status"] == "down"

@pytest.mark.asyncio
async def test_dead_letter_makes_degraded():
    r = FakeRedis(kv={"worker:video-render:heartbeat": _hb("video-render","render","idle")},
                  lists={"dead_letter:queue:video-render": 2})
    st = await node_monitor.collect_status(redis=r)
    vid = next(w for w in st["workers"] if w["name"] == "video-render")
    assert vid["dead_letter"] == 2
    link = next(l for l in st["links"] if l["to"] == "video-render")
    assert link["status"] == "degraded"

@pytest.mark.asyncio
async def test_paused_reason_from_watcher():
    r = FakeRedis(kv={"queue:paused": b"1",
                      "worker:task-watcher:heartbeat": _hb("task-watcher","watcher","trading",mode="trading")})
    st = await node_monitor.collect_status(redis=r)
    assert st["paused"] is True and st["paused_reason"] == "trading"

@pytest.mark.asyncio
async def test_trader_http_pull_link():
    r = FakeRedis(kv={"worker:ai_trade:heartbeat": _hb("ai_trade","trader","market_open")})
    st = await node_monitor.collect_status(redis=r)
    link = next(l for l in st["links"] if l["from"] == "ai_trade")
    assert link["type"] == "http-pull" and link["status"] == "healthy"
  • Step 2: Run test to verify it fails

Run: python -m pytest agent-office/tests/test_node_monitor.py -v Expected: FAIL with ModuleNotFoundError: No module named 'app.node_monitor'

  • Step 3: Write minimal implementation
# agent-office/app/node_monitor.py
"""분산 워커 상태 집계 (read-only). Global Constraints 계약 2 스키마 생성."""
from __future__ import annotations
import datetime as dt, json, logging
import redis.asyncio as aioredis
from .config import REDIS_URL

logger = logging.getLogger("agent-office.node_monitor")

WORKER_REGISTRY = [
    {"name": "music-render", "kind": "render", "queue": "queue:music-render"},
    {"name": "video-render", "kind": "render", "queue": "queue:video-render"},
    {"name": "image-render", "kind": "render", "queue": "queue:image-render"},
    {"name": "insta-render", "kind": "render", "queue": "queue:insta-render"},
    {"name": "task-watcher", "kind": "watcher", "queue": None},
    {"name": "ai_trade",     "kind": "trader",  "queue": None},
]

_redis = None
def _get_redis():
    global _redis
    if _redis is None:
        _redis = aioredis.from_url(REDIS_URL, decode_responses=False)
    return _redis


def _beat_age(ts_str, now):
    try:
        beat = dt.datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
        return max(0, int((now - beat).total_seconds()))
    except Exception:
        return None


def _render_link_status(w):
    if not w["alive"]:
        return "down"
    if w["state"] == "paused":
        return "paused"
    if w["dead_letter"] > 0:
        return "degraded"
    return "healthy"


async def collect_status(redis=None) -> dict:
    r = redis or _get_redis()
    now = dt.datetime.now(dt.timezone.utc)
    out = {"redis_ok": True, "paused": False, "paused_reason": None,
           "generated_at": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
           "workers": [], "links": []}
    try:
        out["paused"] = (await r.get("queue:paused")) == b"1"
    except Exception:
        logger.exception("redis 접근 실패")
        out["redis_ok"] = False
        return out

    for w in WORKER_REGISTRY:
        info = {"name": w["name"], "kind": w["kind"], "alive": False, "state": None,
                "last_beat_age_s": None, "queue_depth": 0, "dead_letter": 0,
                "processing": 0, "jobs_done": 0, "jobs_failed": 0, "last_job_at": None}
        raw = await r.get(f"worker:{w['name']}:heartbeat")
        if raw:
            try:
                hb = json.loads(raw)
                info.update(alive=True, state=hb.get("state"),
                            jobs_done=hb.get("jobs_done", 0), jobs_failed=hb.get("jobs_failed", 0),
                            last_job_at=hb.get("last_job_at"),
                            last_beat_age_s=_beat_age(hb.get("ts", ""), now))
                if w["kind"] == "watcher" and hb.get("mode"):
                    out["paused_reason"] = hb["mode"]
            except json.JSONDecodeError:
                logger.warning("heartbeat JSON 파싱 실패 name=%s", w["name"])
        if w["queue"]:
            info["queue_depth"] = await r.llen(w["queue"])
            info["dead_letter"] = await r.llen(f"dead_letter:{w['queue']}")
            proc = 0
            async for key in r.scan_iter(match=f"processing:{w['queue']}:*"):
                proc += await r.llen(key)
            info["processing"] = proc
        out["workers"].append(info)

    for w in out["workers"]:
        if w["kind"] == "trader":
            out["links"].append({"from": "ai_trade", "to": "nas-stock", "type": "http-pull",
                                 "status": "healthy" if w["alive"] else "down"})
        elif w["kind"] == "render":
            out["links"].append({"from": "nas", "to": w["name"], "type": "redis-queue",
                                 "status": _render_link_status(w)})
    if out["paused"] and not out["paused_reason"]:
        out["paused_reason"] = "trading"
    return out
  • Step 4: Run test to verify it passes

Run: python -m pytest agent-office/tests/test_node_monitor.py -v Expected: PASS (5 passed)

  • Step 5: Commit
git add agent-office/app/node_monitor.py agent-office/tests/test_node_monitor.py
git commit -m "feat(agent-office): node_monitor.collect_status (heartbeat+큐+dead-letter 집계)"

Task B3: GET /api/agent-office/nodes 엔드포인트

Files:

  • Modify: agent-office/app/main.py
  • Test: agent-office/tests/test_nodes_endpoint.py

Interfaces:

  • Consumes: node_monitor.collect_status

  • Produces: GET /api/agent-office/nodes → 계약 2 JSON

  • Step 1: Write the failing test

# agent-office/tests/test_nodes_endpoint.py
import pytest
from fastapi.testclient import TestClient

@pytest.fixture
def client(monkeypatch):
    from app import main
    async def fake_collect(redis=None):
        return {"redis_ok": True, "paused": False, "paused_reason": None,
                "generated_at": "2026-06-29T00:00:00Z", "workers": [], "links": []}
    monkeypatch.setattr("app.node_monitor.collect_status", fake_collect)
    return TestClient(main.app)

def test_nodes_endpoint_returns_contract(client):
    resp = client.get("/api/agent-office/nodes")
    assert resp.status_code == 200
    body = resp.json()
    assert set(["redis_ok","paused","workers","links"]).issubset(body)
  • Step 2: Run test to verify it fails

Run: python -m pytest agent-office/tests/test_nodes_endpoint.py -v Expected: FAIL with 404 (route 없음)

  • Step 3: Write minimal implementation (main.py 엔드포인트 추가 — /states 근처)
@app.get("/api/agent-office/nodes")
async def nodes_status():
    from .node_monitor import collect_status
    return await collect_status()
  • Step 4: Run test to verify it passes

Run: python -m pytest agent-office/tests/test_nodes_endpoint.py -v Expected: PASS

  • Step 5: Commit
git add agent-office/app/main.py agent-office/tests/test_nodes_endpoint.py
git commit -m "feat(agent-office): GET /api/agent-office/nodes 엔드포인트"

Task B4: 노드 헬스 경보 cron

Files:

  • Modify: agent-office/app/node_monitor.py (check_and_alert 추가)
  • Modify: agent-office/app/scheduler.py (interval job 등록)
  • Test: agent-office/tests/test_node_monitor.py (경보 전이 테스트 추가)

Interfaces:

  • Consumes: collect_status, telegram.messaging.send_raw, db.add_log

  • Produces: async def check_and_alert(status=None) -> list[str] (발송된 경보 텍스트 목록 — 테스트용 반환)

  • Step 1: Write the failing test

# test_node_monitor.py 에 추가
import app.node_monitor as nm

@pytest.mark.asyncio
async def test_alert_on_alive_to_dead(monkeypatch):
    sent = []
    async def fake_send_raw(text, **kw): sent.append(text); return {"ok": True}
    monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw)
    monkeypatch.setattr("app.db.add_log", lambda *a, **k: None)
    nm._node_state.clear(); nm._dl_notified.clear()
    alive = {"workers": [{"name":"image-render","alive":True,"dead_letter":0}], "links": []}
    dead =  {"workers": [{"name":"image-render","alive":False,"dead_letter":0}], "links": []}
    await nm.check_and_alert(status=alive)   # 첫 관측 — 경보 없음
    assert sent == []
    await nm.check_and_alert(status=dead)    # alive→dead 전이
    assert any("다운" in t for t in sent)

@pytest.mark.asyncio
async def test_alert_on_dead_letter_growth(monkeypatch):
    sent = []
    async def fake_send_raw(text, **kw): sent.append(text); return {"ok": True}
    monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw)
    monkeypatch.setattr("app.db.add_log", lambda *a, **k: None)
    nm._node_state.clear(); nm._dl_notified.clear()
    s = {"workers": [{"name":"video-render","alive":True,"dead_letter":2}], "links": []}
    await nm.check_and_alert(status=s)
    assert any("dead-letter" in t for t in sent)
  • Step 2: Run test to verify it fails

Run: python -m pytest agent-office/tests/test_node_monitor.py -v Expected: FAIL (AttributeError: module has no attribute '_node_state'/check_and_alert)

  • Step 3: Write minimal implementation (node_monitor.py에 추가)
from .config import NODE_ALERT_DEADLETTER_THRESHOLD

_node_state: dict[str, bool] = {}   # name -> 직전 alive
_dl_notified: dict[str, int] = {}   # name -> 직전 알린 dead_letter 수


async def check_and_alert(status=None) -> list[str]:
    from .telegram.messaging import send_raw
    from .db import add_log
    st = status or await collect_status()
    sent: list[str] = []
    for w in st["workers"]:
        name, alive = w["name"], w.get("alive", False)
        prev = _node_state.get(name)
        if prev is True and not alive:
            text = f"🔴 [{name}] 워커 다운"
            if (await send_raw(text=text)).get("ok"):
                add_log("node_monitor", f"{name} 다운", "warning"); sent.append(text)
        elif prev is False and alive:
            text = f"🟢 [{name}] 워커 복구"
            if (await send_raw(text=text)).get("ok"):
                add_log("node_monitor", f"{name} 복구", "info"); sent.append(text)
        _node_state[name] = alive
        dl = w.get("dead_letter", 0)
        if dl >= NODE_ALERT_DEADLETTER_THRESHOLD and dl != _dl_notified.get(name, 0):
            text = f"❌ [{name}] 실패 누적 {dl}건 (dead-letter)"
            if (await send_raw(text=text)).get("ok"):
                add_log("node_monitor", f"{name} dead-letter {dl}", "warning"); sent.append(text)
            _dl_notified[name] = dl
        elif dl == 0:
            _dl_notified.pop(name, None)
    return sent

함정: 첫 관측(prev=None)엔 경보 없음 — 부팅 시 false alarm 방지. alive→dead 전이만 잡으므로 "2주 사일런트 사망"(살아있다 죽음) 시나리오는 포착, 부팅 시 이미 dead인 워커는 미경보(Phase 2에서 보강).

scheduler.py — import + job 등록:

from . import node_monitor   # 상단

async def _run_node_health_check():
    await node_monitor.check_and_alert()

# init_scheduler() 안, _poll_pipelines 근처:
    scheduler.add_job(_run_node_health_check, "interval", seconds=60, id="node_health_check", replace_existing=True)
  • Step 4: Run test to verify it passes

Run: python -m pytest agent-office/tests/ -v Expected: PASS (전체)

  • Step 5: Commit
git add agent-office/app/node_monitor.py agent-office/app/scheduler.py agent-office/tests/test_node_monitor.py
git commit -m "feat(agent-office): 노드 헬스 1분 cron + 텔레그램 경보(다운/복구/dead-letter)"

Task B5: 배포 + 운영 검증

Files: (코드 변경 없음 — 배포·검증)

  • Step 1: nas-deploy 락 획득 + push 배포
# acquire_lock("nas-deploy","BE") 후
git push   # Gitea webhook → deployer 자동 배포 (agent-office rebuild: redis 의존성 반영)
  • Step 2: 운영 검증 (배포 완료 후)
# /nodes 응답 확인 (워커 미배포 상태면 전부 alive=false 예상)
curl -s https://gahusb.synology.me/api/agent-office/nodes | python -m json.tool

Expected: redis_ok:true, workers 6종 노출. Part A 배포 전이면 alive 전부 false(정상 — heartbeat 아직 없음).

  • Step 3: release_lock("nas-deploy")

Part C — web-ui Three.js 대시보드 (FE 세션 소유)

repo: web-ui. React + Vite. 빌드는 로컬에서만(NAS Celeron 빌드 금지). 배포는 npm run release:nas. Three.js 시각화(C4)는 designer 스킬을 활성화해 구현 — 창의적 비주얼이라 line-by-line 사전 코드 대신 데이터 계약 + 상태→비주얼 매핑 + 수용 기준으로 명세한다.

Task C1: 의존성 + 라우트 + 네비 등록

Files:

  • Modify: package.json (three, @react-three/fiber, @react-three/drei)

  • Modify: src/routes.jsx (lazy route + navLinks 엔트리)

  • Step 1: 의존성 설치

npm install three @react-three/fiber @react-three/drei
  • Step 2: routes.jsx에 route 추가 (appRoutes 배열에 — agent-office 패턴과 동일 lazy)
    {
        path: 'infra',
        lazy: () => import('./pages/infra/InfraMonitor'),
    },
  • Step 3: navLinks에 엔트리 추가
    {
        id: 'infra',
        label: 'Infra',
        path: '/infra',
        subtitle: 'NODE PIPELINE',
        description: 'NAS↔Windows 워커 파이프라인 실시간 상태',
        icon: <span style={{fontSize:'1.2em'}}>🛰️</span>,
        accent: '#22d3ee',
    },
  • Step 4: Commit
git add package.json package-lock.json src/routes.jsx
git commit -m "feat(infra): three.js 의존성 + /infra 라우트·네비 등록"

Task C2: api 헬퍼 + useNodeStatus 폴링 훅

Files:

  • Modify: src/api.js (getNodeStatus 추가)
  • Create: src/pages/infra/useNodeStatus.js
  • Test: src/pages/infra/useNodeStatus.test.js (Vitest — 프로젝트 테스트 러너 확인)

Interfaces:

  • Produces: getNodeStatus() → 계약 2 객체, useNodeStatus(intervalMs=3000){data, error, loading}

  • Step 1: Write the failing test

// src/pages/infra/useNodeStatus.test.js
import { renderHook, waitFor } from '@testing-library/react';
import { describe, it, expect, vi } from 'vitest';
import { useNodeStatus } from './useNodeStatus';
import * as api from '../../api';

describe('useNodeStatus', () => {
  it('fetches node status', async () => {
    vi.spyOn(api, 'getNodeStatus').mockResolvedValue({ redis_ok: true, workers: [], links: [] });
    const { result } = renderHook(() => useNodeStatus(99999));
    await waitFor(() => expect(result.current.data).toBeTruthy());
    expect(result.current.data.redis_ok).toBe(true);
  });
});
  • Step 2: Run test to verify it fails

Run: npm test -- useNodeStatus Expected: FAIL (useNodeStatus/getNodeStatus 미정의)

  • Step 3: Write minimal implementation

src/api.js에 추가 (기존 fetch 헬퍼 패턴 따라 — 상대경로):

export async function getNodeStatus() {
  const res = await fetch('/api/agent-office/nodes');
  if (!res.ok) throw new Error(`nodes ${res.status}`);
  return res.json();
}

src/pages/infra/useNodeStatus.js:

import { useEffect, useState, useRef } from 'react';
import { getNodeStatus } from '../../api';

export function useNodeStatus(intervalMs = 3000) {
  const [data, setData] = useState(null);
  const [error, setError] = useState(null);
  const [loading, setLoading] = useState(true);
  const timer = useRef(null);

  useEffect(() => {
    let alive = true;
    async function tick() {
      try {
        const d = await getNodeStatus();
        if (alive) { setData(d); setError(null); }
      } catch (e) {
        if (alive) setError(e);
      } finally {
        if (alive) setLoading(false);
      }
    }
    tick();
    timer.current = setInterval(tick, intervalMs);
    return () => { alive = false; clearInterval(timer.current); };
  }, [intervalMs]);

  return { data, error, loading };
}
  • Step 4: Run test to verify it passes

Run: npm test -- useNodeStatus Expected: PASS

  • Step 5: Commit
git add src/api.js src/pages/infra/useNodeStatus.js src/pages/infra/useNodeStatus.test.js
git commit -m "feat(infra): getNodeStatus + useNodeStatus 3초 폴링 훅"

Task C3: 2D 폴백 패널 (먼저 동작하는 deliverable)

Files:

  • Create: src/pages/infra/InfraMonitor.jsx (먼저 2D 카드 뷰)
  • Create: src/pages/infra/WorkerCard.jsx
  • Create: src/pages/infra/statusVisual.js (상태→색/라벨 매핑 — Three.js와 공유)
  • Test: src/pages/infra/statusVisual.test.js

Interfaces:

  • Produces: linkColor(status), workerStateLabel(worker) — C4 Three.js가 재사용

  • Step 1: Write the failing test

// src/pages/infra/statusVisual.test.js
import { describe, it, expect } from 'vitest';
import { linkColor, workerStateLabel } from './statusVisual';

describe('statusVisual', () => {
  it('maps link status to color', () => {
    expect(linkColor('healthy')).toBe('#22c55e');
    expect(linkColor('down')).toBe('#ef4444');
    expect(linkColor('paused')).toBe('#f59e0b');
    expect(linkColor('degraded')).toBe('#eab308');
  });
  it('labels dead worker', () => {
    expect(workerStateLabel({ alive: false })).toMatch(/다운|down/i);
  });
});
  • Step 2: Run test to verify it fails

Run: npm test -- statusVisual Expected: FAIL (모듈 없음)

  • Step 3: Write minimal implementation

src/pages/infra/statusVisual.js:

export const LINK_COLORS = {
  healthy: '#22c55e', paused: '#f59e0b', degraded: '#eab308', down: '#ef4444',
};
export function linkColor(status) { return LINK_COLORS[status] || '#64748b'; }

export function workerStateLabel(w) {
  if (!w.alive) return '🔴 다운';
  if (w.state === 'paused') return '⏸ 작업중(트레이딩)';
  if (w.state === 'busy') return '⚙️ 처리 중';
  if (w.state === 'market_open') return '📈 장중';
  if (w.state === 'market_closed') return '🌙 휴장';
  return '🟢 대기';
}

WorkerCard.jsx + InfraMonitor.jsx: useNodeStatus()로 받은 data.workers를 카드 그리드로 렌더(이름·상태라벨·queue_depth·dead_letter·last_beat_age_s). data.redis_ok===false면 "집계 서버/Redis 연결 끊김" 배너. (표준 React 카드 — designer 스킬 불필요)

  • Step 4: Run test to verify it passes

Run: npm test -- statusVisual Expected: PASS

  • Step 5: Commit
git add src/pages/infra/
git commit -m "feat(infra): 2D 워커 상태 패널 + statusVisual 매핑"

Task C4: Three.js 파이프라인 시각화 (designer 스킬)

Files:

  • Create: src/pages/infra/PipelineScene.jsx (r3f Canvas 토폴로지)
  • Modify: src/pages/infra/InfraMonitor.jsx (3D Scene + 2D 패널 토글)

구현 방식: 이 Task는 designer 스킬을 활성화해 구현한다. 아래는 designer가 따라야 할 정확한 데이터 계약·상태 매핑·수용 기준이다.

  • Step 1: designer 스킬 활성화Skill(designer) 호출 후 아래 명세로 진행.

  • Step 2: 토폴로지 + 데이터 계약

    • 입력: useNodeStatus()data(계약 2). data.links로 파이프라인, data.workers로 노드 상태.
    • 노드 배치: 좌측 NAS 서버(박스) + 게이트웨이 라벨 / 중앙 Redis 큐 버스(글로우 코어) / 우측 Windows 노드(박스) + 워커 sub-node 6개(music/video/image/insta-render, task-watcher, ai_trade).
    • 파이프라인(튜브/라인): render 4종 = NAS→Redis→워커 경로(type:redis-queue). ai_trade = NAS stock⇄ai_trade 직결(type:http-pull, Redis 버스 우회).
    • 색은 statusVisual.linkColor(link.status) 재사용.
  • Step 3: 상태별 애니메이션 (수용 기준)

    • healthy: 시안/그린 튜브 + 파티클이 흐름(흐르는 통신 — 사용자 핵심 요구). state==='busy'면 파티클 속도↑.
    • paused: 앰버, 파티클 정지/감속 + "⏸ 작업중(트레이딩)" 라벨.
    • down: 빨강 + 흐름 정지 + 끊긴 지점 스파크/단절 비주얼 + ⚠ 아이콘 + "last beat Xs ago".
    • degraded: 튜브에 dead-letter 카운트 뱃지.
    • redis_ok===false: 중앙 버스 전체 빨강 + "집계 서버 연결 끊김" 오버레이.
    • 각 워커 노드 HUD: 이름, 상태 라벨(workerStateLabel), queue_depth, dead_letter.
  • Step 4: 폴백 + 검증

    • WebGL 미지원/모바일: C3의 2D 패널로 자동 폴백(토글 버튼 제공).
    • 검증: npm run dev로 4가지 상태를 mock data로 강제 주입(useNodeStatus를 stub)해 healthy/paused/down/degraded 비주얼이 명확히 구분되는지 육안 확인. RC 환경이면 스크린샷/녹화로 사용자 확인.
  • Step 5: Commit

git add src/pages/infra/PipelineScene.jsx src/pages/infra/InfraMonitor.jsx
git commit -m "feat(infra): three.js NAS↔Windows 파이프라인 시각화 (상태별 흐름/장애)"
  • Step 6: 배포
npm run build && npm run release:nas

실행 순서 & 세션 협업 (co-gahusb)

  1. 선행 게이트: 본 plan의 Global Constraints(계약 2개)를 co-gahusb post_message로 FE/AI 세션에 공유 → 3 Part 병렬 시작 가능.
  2. Part B(BE, 이 세션) 먼저 진행 가능 — FakeRedis 테스트로 워커 없이도 완결. B5 배포 후 /nodesalive:false로라도 응답.
  3. Part A(AI 세션) 완료·워커 재배포 시 /nodes에 alive:true 채워짐.
  4. **Part C(FE 세션)**는 /nodes 계약만으로 mock 개발 가능 → 실데이터 연동.
  5. 공유 리소스: B1의 docker-compose 변경 = compose 락, B5 배포 = nas-deploy 락.

Self-Review (작성자 점검)

  • 스펙 커버리지: G1(heartbeat 6종)=A1A4 / G2(큐·dead-letter·processing·paused 집계)=B2 / G3(텔레그램 경보)=B4 / G4(Three.js /infra)=C1C4. 계약1=A1·Global, 계약2=B2·Global. ✓ 누락 없음.
  • Placeholder 스캔: C4는 designer 스킬 위임이나 데이터 계약·상태 매핑·수용 기준을 구체 명시(placeholder 아님). 그 외 전 Task에 실제 코드 포함. ✓
  • 타입 일관성: WorkerStats(A1)↔worker.stats(A2), collect_status 반환(B2)↔check_and_alert 입력(B4)·/nodes(B3)·useNodeStatus(C2)·linkColor/workerStateLabel(C3 재사용 by C4) 일치. heartbeat 키/값(계약1)↔node_monitor 파싱(B2) 일치. ✓