From c69b18243be945874cdb3bd9976069263fd8b37e Mon Sep 17 00:00:00 2001 From: gahusb Date: Mon, 29 Jun 2026 17:33:16 +0900 Subject: [PATCH] =?UTF-8?q?docs:=20=EB=B6=84=EC=82=B0=20=EC=9B=8C=EC=BB=A4?= =?UTF-8?q?=20=EA=B4=80=EC=B8=A1=20=EC=8B=9C=EC=8A=A4=ED=85=9C=20=EA=B5=AC?= =?UTF-8?q?=ED=98=84=20=EA=B3=84=ED=9A=8D(3-repo=20TDD=20plan)=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Part A(web-ai heartbeat) / Part B(agent-office 집계+경보) / Part C(web-ui Three.js 대시보드). 각 Part 독립 실행·테스트 가능, 계약 2개를 Global Constraints로 잠금. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq --- .../plans/2026-06-29-worker-observability.md | 1105 +++++++++++++++++ 1 file changed, 1105 insertions(+) create mode 100644 docs/superpowers/plans/2026-06-29-worker-observability.md diff --git a/docs/superpowers/plans/2026-06-29-worker-observability.md b/docs/superpowers/plans/2026-06-29-worker-observability.md new file mode 100644 index 0000000..d82c7e9 --- /dev/null +++ b/docs/superpowers/plans/2026-06-29-worker-observability.md @@ -0,0 +1,1105 @@ +# 분산 워커 관측 시스템 Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** NAS↔Windows 분산 워커 6종의 생사·큐·실패 상태를 heartbeat로 관측하고, agent-office가 집계·텔레그램 경보하며, web-ui `/infra`에서 Three.js 파이프라인으로 시각화한다. + +**Architecture:** 워커가 NAS Redis에 `worker::heartbeat`(TTL 45s) push → agent-office `node_monitor`가 heartbeat + ReliableQueue의 `processing:*`/`dead_letter:*` + `queue:paused`를 읽어 `GET /api/agent-office/nodes`로 노출 → 1분 cron이 상태 전이 시 텔레그램 경보 → web-ui가 3초 폴링해 토폴로지 시각화. + +**Tech Stack:** Python 3.12 / FastAPI / redis.asyncio / APScheduler (web-backend·web-ai), React + Vite / three + @react-three/fiber + drei (web-ui). + +## Global Constraints + +이 계약 2개는 3 Part가 독립 병렬 작업하기 위한 잠금 규약. 모든 Task는 이 값을 그대로 사용한다. + +**[계약 1] Heartbeat 키 스키마** +- 키: `worker::heartbeat` (name ∈ `music-render`, `video-render`, `image-render`, `insta-render`, `task-watcher`, `ai_trade`) +- 명령: `SET worker::heartbeat EX 45` +- 값(JSON): `{"name": str, "kind": "render"|"watcher"|"trader", "state": str, "ts": "", "last_job_at": str|null, "jobs_done": int, "jobs_failed": int, "mode": str(optional, task-watcher 전용)}` +- `state` kind별: render=`idle`|`busy`|`paused` / watcher=`trading`|`free` / trader=`market_open`|`market_closed` +- 발신 주기 15초, TTL 45초(3배). redis 클라이언트는 `decode_responses=False`(바이트) 기준 — 기존 워커와 동일. + +**[계약 2] `GET /api/agent-office/nodes` 응답 스키마** +```json +{ "redis_ok": true, "paused": false, "paused_reason": "trading", "generated_at": "...Z", + "workers": [{"name":"image-render","kind":"render","alive":true,"state":"idle", + "last_beat_age_s":3,"queue_depth":0,"dead_letter":0,"processing":0, + "jobs_done":42,"jobs_failed":1,"last_job_at":"...Z"}], + "links": [{"from":"nas","to":"image-render","type":"redis-queue","status":"healthy"}] } +``` +- `links[].status` ∈ `healthy`|`paused`|`down`|`degraded`, `links[].type` ∈ `redis-queue`|`http-pull` + +**공통 규칙**: `.env` 커밋 금지. Docker는 NAS에서만 구동(로컬 docker 명령 금지). 각 repo는 자기 경로에서만 커밋. NAS Redis 주소 `redis://192.168.45.54:6379`(워커 측), 컨테이너 내부 `redis://redis:6379`(NAS 측). + +--- + +# Part A — web-ai 워커 heartbeat (AI 세션 소유) + +> repo: `web-ai`. 테스트: 시스템 Python(`C:\Users\jaeoh\AppData\Local\Programs\Python\Python312\python.exe -m pytest`) — `.venv` 한글경로 깨짐(web-ai CLAUDE.md). + +## Task A1: 공용 heartbeat 모듈 + +**Files:** +- Create: `services/_shared/heartbeat.py` +- Test: `services/_shared/tests/test_heartbeat.py` + +**Interfaces:** +- Produces: `class WorkerStats`(busy:bool, jobs_done:int, jobs_failed:int, last_job_at:str|None), `def utc_now_iso()->str`, `def build_payload(name,kind,state,stats,extra=None)->str`, `async def render_state(redis,stats,paused_key="queue:paused")->str`, `async def heartbeat_loop(redis,name,kind,stats,*,interval=15,ttl=45,paused_key="queue:paused",state_fn=None)` + +- [ ] **Step 1: Write the failing test** +```python +# services/_shared/tests/test_heartbeat.py +import json +import pytest +from _shared.heartbeat import WorkerStats, build_payload, render_state + +def test_build_payload_has_contract_fields(): + s = WorkerStats(); s.jobs_done = 3; s.last_job_at = "2026-06-29T00:00:00Z" + payload = json.loads(build_payload("image-render", "render", "idle", s)) + assert payload["name"] == "image-render" + assert payload["kind"] == "render" + assert payload["state"] == "idle" + assert payload["jobs_done"] == 3 + assert payload["last_job_at"] == "2026-06-29T00:00:00Z" + assert payload["ts"].endswith("Z") + +def test_build_payload_merges_extra(): + payload = json.loads(build_payload("task-watcher", "watcher", "free", WorkerStats(), extra={"mode": "free"})) + assert payload["mode"] == "free" + +class _FakeRedis: + def __init__(self, paused): self._paused = paused + async def get(self, key): return b"1" if self._paused else None + +@pytest.mark.asyncio +async def test_render_state_paused_overrides_busy(): + s = WorkerStats(); s.busy = True + assert await render_state(_FakeRedis(paused=True), s) == "paused" + +@pytest.mark.asyncio +async def test_render_state_busy_then_idle(): + s = WorkerStats(); s.busy = True + assert await render_state(_FakeRedis(paused=False), s) == "busy" + s.busy = False + assert await render_state(_FakeRedis(paused=False), s) == "idle" +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `python -m pytest services/_shared/tests/test_heartbeat.py -v` +Expected: FAIL with `ModuleNotFoundError: No module named '_shared.heartbeat'` + +- [ ] **Step 3: Write minimal implementation** +```python +# services/_shared/heartbeat.py +"""분산 워커 heartbeat — worker::heartbeat SET (TTL). Global Constraints 계약 1.""" +from __future__ import annotations +import asyncio, datetime as dt, json, logging, os + +logger = logging.getLogger(__name__) +DEFAULT_INTERVAL = int(os.getenv("HEARTBEAT_INTERVAL", "15")) +DEFAULT_TTL = int(os.getenv("HEARTBEAT_TTL", "45")) + + +class WorkerStats: + """worker_loop가 갱신, heartbeat_loop가 읽는 가변 카운터.""" + def __init__(self): + self.busy = False + self.jobs_done = 0 + self.jobs_failed = 0 + self.last_job_at = None # ISO str | None + + +def utc_now_iso() -> str: + return dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def build_payload(name: str, kind: str, state: str, stats: WorkerStats, extra: dict | None = None) -> str: + payload = { + "name": name, "kind": kind, "state": state, "ts": utc_now_iso(), + "last_job_at": stats.last_job_at, + "jobs_done": stats.jobs_done, "jobs_failed": stats.jobs_failed, + } + if extra: + payload.update(extra) + return json.dumps(payload) + + +async def render_state(redis, stats: WorkerStats, paused_key: str = "queue:paused") -> str: + if await redis.get(paused_key) == b"1": + return "paused" + return "busy" if stats.busy else "idle" + + +async def heartbeat_loop(redis, name, kind, stats, *, interval=DEFAULT_INTERVAL, + ttl=DEFAULT_TTL, paused_key="queue:paused", state_fn=None): + key = f"worker:{name}:heartbeat" + logger.info("heartbeat 시작 name=%s ttl=%ds", name, ttl) + while True: + try: + if state_fn is not None: + state, extra = await state_fn(redis, stats) + else: + state, extra = await render_state(redis, stats, paused_key), None + await redis.set(key, build_payload(name, kind, state, stats, extra), ex=ttl) + except asyncio.CancelledError: + raise + except Exception: + logger.exception("heartbeat 발신 실패 name=%s", name) + await asyncio.sleep(interval) +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `python -m pytest services/_shared/tests/test_heartbeat.py -v` +Expected: PASS (4 passed) + +- [ ] **Step 5: Commit** +```bash +git add services/_shared/heartbeat.py services/_shared/tests/test_heartbeat.py +git commit -m "feat(_shared): 워커 heartbeat 모듈 (worker::heartbeat TTL SET)" +``` + +## Task A2: image-render 워커에 heartbeat 배선 (+ 나머지 3 render 워커 동형 적용) + +**Files:** +- Modify: `services/image-render/worker.py` (stats 추가 + poll_once 카운터) +- Modify: `services/image-render/main.py` (lifespan에 heartbeat_loop 태스크) +- Test: `services/image-render/tests/test_worker.py` (poll_once 카운터 검증 추가) + +**Interfaces:** +- Consumes: `_shared.heartbeat.WorkerStats`, `heartbeat_loop`, `utc_now_iso` +- Produces: `worker.stats`(모듈 레벨 WorkerStats) — main.py가 heartbeat에 전달 + +- [ ] **Step 1: Write the failing test** (poll_once가 성공 시 jobs_done 증가·busy 복귀) +```python +# services/image-render/tests/test_worker.py 에 추가 +import asyncio, pytest +import worker + +class _OneJobQueue: + def __init__(self): self.acked = False + async def dequeue(self, timeout=5): + if self.acked: return None + return ({"job_type": "flux_generation", "task_id": "t1", "params": {}}, b"raw") + async def ack(self, raw): self.acked = True + async def fail(self, raw, payload): pass + +@pytest.mark.asyncio +async def test_poll_once_increments_jobs_done(monkeypatch): + worker.stats.jobs_done = 0 + monkeypatch.setattr(worker, "run_flux_generation", lambda task_id, params: None) + handled = await worker.poll_once(_OneJobQueue()) + assert handled is True + assert worker.stats.jobs_done == 1 + assert worker.stats.busy is False + assert worker.stats.last_job_at is not None +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `python -m pytest services/image-render/tests/test_worker.py::test_poll_once_increments_jobs_done -v` +Expected: FAIL with `AttributeError: module 'worker' has no attribute 'stats'` + +- [ ] **Step 3: Write minimal implementation** + +`worker.py` — import + 모듈 레벨 stats 추가, `poll_once` 교체: +```python +from _shared.heartbeat import WorkerStats, utc_now_iso # 상단 import 블록에 추가 + +stats = WorkerStats() # 모듈 레벨 (REDIS_URL 상수 근처) + +async def poll_once(queue: ReliableQueue) -> bool: + result = await queue.dequeue(timeout=5) + if result is None: + return False + payload, raw = result + stats.busy = True + try: + await asyncio.to_thread(_dispatch, payload) + except Exception: + logger.exception("dispatch unhandled exception task_id=%s", payload.get("task_id")) + await queue.fail(raw, payload) + stats.jobs_failed += 1 + stats.last_job_at = utc_now_iso() + stats.busy = False + return True + await queue.ack(raw) + stats.jobs_done += 1 + stats.last_job_at = utc_now_iso() + stats.busy = False + return True +``` + +`main.py` — lifespan에 heartbeat 태스크 추가: +```python +import os +import redis.asyncio as aioredis +from _shared.heartbeat import heartbeat_loop + +@asynccontextmanager +async def lifespan(app: FastAPI): + worker_task = asyncio.create_task(worker.worker_loop()) + hb_redis = aioredis.from_url(os.getenv("REDIS_URL", "redis://192.168.45.54:6379"), decode_responses=False) + hb_task = asyncio.create_task(heartbeat_loop(hb_redis, "image-render", "render", worker.stats)) + logger.info("image-render lifespan 시작") + try: + yield + finally: + for t in (worker_task, hb_task): + t.cancel() + try: + await t + except asyncio.CancelledError: + pass + await hb_redis.aclose() + logger.info("image-render lifespan 종료") +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `python -m pytest services/image-render/tests/ -v` +Expected: PASS (기존 + 신규 1) + +- [ ] **Step 5: 나머지 3 render 워커 동형 적용** + +`video-render`, `music-render`, `insta-render` 각각에 Step 3과 **동일 패턴** 적용. 유일한 차이는 main.py의 워커 이름 문자열: +- `services/video-render/main.py`: `heartbeat_loop(hb_redis, "video-render", "render", worker.stats)` +- `services/music-render/main.py`: `heartbeat_loop(hb_redis, "music-render", "render", worker.stats)` +- `services/insta-render/main.py`: `heartbeat_loop(hb_redis, "insta-render", "render", worker.stats)` +- 각 `worker.py`에 `from _shared.heartbeat import WorkerStats, utc_now_iso` + `stats = WorkerStats()` + poll_once 카운터(ack→`jobs_done`, fail→`jobs_failed`, 둘 다 `last_job_at`/`busy` 갱신). music-render는 worker.py 구조가 더 복잡(12 job_type)하지만 hook 지점은 동일(dispatch 전 `busy=True`, ack/fail 후 카운터+`busy=False`). +- 각 `tests/test_worker.py`에 Step 1 테스트를 워커별 job_type으로 맞춰 추가. + +- [ ] **Step 6: Commit** +```bash +git add services/image-render services/video-render services/music-render services/insta-render +git commit -m "feat(render-workers): 4 render 워커 heartbeat 배선 + poll_once 카운터" +``` + +## Task A3: task-watcher heartbeat (mode 포함) + +**Files:** +- Modify: `services/task-watcher/watcher.py` +- Test: `services/task-watcher/tests/test_watcher.py` (신규) + +**Interfaces:** +- Consumes: `_shared.heartbeat.build_payload`, `WorkerStats` +- Produces: `worker:task-watcher:heartbeat` 값에 `state`=mode, `mode`=mode + +- [ ] **Step 1: Write the failing test** +```python +# services/task-watcher/tests/test_watcher.py +import json +from _shared.heartbeat import build_payload, WorkerStats + +def test_watcher_heartbeat_payload_carries_mode(): + payload = json.loads(build_payload("task-watcher", "watcher", "trading", WorkerStats(), extra={"mode": "trading"})) + assert payload["kind"] == "watcher" + assert payload["state"] == "trading" + assert payload["mode"] == "trading" +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `python -m pytest services/task-watcher/tests/test_watcher.py -v` +Expected: FAIL (`ModuleNotFoundError` 또는 import 경로 — task-watcher의 PYTHONPATH에 `_shared` 추가 필요. compose `PYTHONPATH=/app:/shared` 확인) + +- [ ] **Step 3: Write minimal implementation** (`watcher_loop`에 heartbeat SET 추가) +```python +# watcher.py 상단 import +from _shared.heartbeat import build_payload, WorkerStats +_HB_STATS = WorkerStats() +HEARTBEAT_KEY = "worker:task-watcher:heartbeat" +HEARTBEAT_TTL = 45 + +# watcher_loop while 본문 — mode 판정 직후 추가: + mode = current_mode(now, holidays) + if mode == "trading": + await redis.set(PAUSED_KEY, b"1", ex=PAUSED_TTL) + else: + await redis.delete(PAUSED_KEY) + # heartbeat (LOOP_INTERVAL=30s < TTL 45s) + await redis.set(HEARTBEAT_KEY, + build_payload("task-watcher", "watcher", mode, _HB_STATS, extra={"mode": mode}), + ex=HEARTBEAT_TTL) +``` +> 참고: watcher 루프 주기 30s < TTL 45s 라 만료 전 갱신됨. + +- [ ] **Step 4: Run test to verify it passes** + +Run: `python -m pytest services/task-watcher/tests/test_watcher.py -v` +Expected: PASS + +- [ ] **Step 5: Commit** +```bash +git add services/task-watcher +git commit -m "feat(task-watcher): heartbeat 발신 (state=mode, paused 이유 노출)" +``` + +## Task A4: ai_trade heartbeat (다른 런타임) + +**Files:** +- Modify: `ai_trade/requirements.txt` 또는 루트 `requirements.txt` (redis 추가 — 기존 미포함 시) +- Create: `ai_trade/heartbeat.py` (자체 미니 — 호스트 실행이라 `_shared` import 경로 다름) +- Modify: `ai_trade/main.py` (lifespan에 heartbeat 태스크) +- Test: `ai_trade/tests/test_heartbeat.py` + +**Interfaces:** +- Produces: `worker:ai_trade:heartbeat` (kind=`trader`, state=`market_open`|`market_closed`) +- Consumes: `ai_trade/scheduler.py`의 폴링 윈도우 판정 함수 (AI 세션이 정확한 함수명 확인 — web-ai CLAUDE.md상 `scheduler.py`가 polling window 판정 소유) + +- [ ] **Step 1: Write the failing test** +```python +# ai_trade/tests/test_heartbeat.py +import json +from heartbeat import build_trader_payload + +def test_trader_payload_market_open(): + p = json.loads(build_trader_payload("market_open", signals=2)) + assert p["name"] == "ai_trade" + assert p["kind"] == "trader" + assert p["state"] == "market_open" + assert p["ts"].endswith("Z") +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `python -m pytest ai_trade/tests/test_heartbeat.py -v` +Expected: FAIL (`ModuleNotFoundError: heartbeat`) + +- [ ] **Step 3: Write minimal implementation** +```python +# ai_trade/heartbeat.py +"""ai_trade heartbeat — NAS Redis로 worker:ai_trade:heartbeat SET. Global Constraints 계약 1.""" +from __future__ import annotations +import asyncio, datetime as dt, json, logging, os +import redis.asyncio as aioredis + +logger = logging.getLogger(__name__) +REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379") +KEY = "worker:ai_trade:heartbeat" +INTERVAL = int(os.getenv("HEARTBEAT_INTERVAL", "15")) +TTL = int(os.getenv("HEARTBEAT_TTL", "45")) + + +def build_trader_payload(state: str, signals: int = 0) -> str: + return json.dumps({ + "name": "ai_trade", "kind": "trader", "state": state, + "ts": dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + "last_job_at": None, "jobs_done": signals, "jobs_failed": 0, + }) + + +async def heartbeat_loop(state_fn): + """state_fn() -> (state:str, signals:int). poll window는 호출자가 주입.""" + redis = aioredis.from_url(REDIS_URL, decode_responses=False) + try: + while True: + try: + state, signals = state_fn() + await redis.set(KEY, build_trader_payload(state, signals), ex=TTL) + except asyncio.CancelledError: + raise + except Exception: + logger.exception("ai_trade heartbeat 실패") + await asyncio.sleep(INTERVAL) + finally: + await redis.aclose() +``` + +`ai_trade/main.py` lifespan에 추가 (AI 세션이 실제 폴링 윈도우 판정으로 state_fn 구성): +```python +import heartbeat as _hb +# lifespan 내, poll_loop task 생성 부근: +def _trader_state(): + # scheduler의 폴링 윈도우 판정 결과로 market_open/market_closed 결정 + # (AI 세션: scheduler의 실제 윈도우 함수로 교체) + state = "market_open" if scheduler.is_polling_window() else "market_closed" + return state, len(state_singleton.signals) +hb_task = asyncio.create_task(_hb.heartbeat_loop(_trader_state)) +# 종료 시 hb_task.cancel() + await (CancelledError 무시) +``` + +`requirements.txt`에 `redis>=5.0` 추가(미포함 시). + +- [ ] **Step 4: Run test to verify it passes** + +Run: `python -m pytest ai_trade/tests/test_heartbeat.py -v` +Expected: PASS + +- [ ] **Step 5: Commit** +```bash +git add ai_trade/heartbeat.py ai_trade/main.py ai_trade/tests/test_heartbeat.py requirements.txt +git commit -m "feat(ai_trade): NAS Redis heartbeat (trader market_open/closed)" +``` + +--- + +# Part B — web-backend agent-office 집계 + 경보 (이 BE 세션 소유) + +> repo: `web-backend`. 테스트: agent-office는 pytest + pytest-asyncio. 로컬에서 pytest 실행 가능(docker 아님). + +## Task B1: agent-office에 redis 의존성 추가 + +**Files:** +- Modify: `agent-office/requirements.txt` +- Modify: `agent-office/app/config.py` +- Modify: `docker-compose.yml` (agent-office 블록 — `compose` 락 필요) + +**Interfaces:** +- Produces: `config.REDIS_URL`, agent-office 컨테이너에 `REDIS_URL` env + `depends_on: redis` + +- [ ] **Step 1: requirements.txt에 redis 추가** +``` +redis>=5.0 +``` +(파일 끝에 한 줄 추가) + +- [ ] **Step 2: config.py에 REDIS_URL + 임계 추가** +```python +# config.py 끝에 추가 +# Redis (node monitor) +REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379") +NODE_ALERT_DEADLETTER_THRESHOLD = int(os.getenv("NODE_ALERT_DEADLETTER_THRESHOLD", "1")) +``` + +- [ ] **Step 3: docker-compose.yml agent-office 블록 수정** (`acquire_lock("compose","BE")` 후) + +agent-office 서비스 블록의 `environment:`에 추가: +```yaml + - REDIS_URL=${REDIS_URL:-redis://redis:6379} +``` +`depends_on:`에 `redis` 추가(블록에 depends_on 없으면 신설): +```yaml + depends_on: + - redis +``` + +- [ ] **Step 4: Commit** +```bash +git add agent-office/requirements.txt agent-office/app/config.py docker-compose.yml +git commit -m "chore(agent-office): redis 의존성 + REDIS_URL/dead-letter 임계 설정" +``` + +## Task B2: node_monitor.collect_status + +**Files:** +- Create: `agent-office/app/node_monitor.py` +- Test: `agent-office/tests/test_node_monitor.py` + +**Interfaces:** +- Produces: `WORKER_REGISTRY`(list of dict), `async def collect_status(redis=None) -> dict` (계약 2 스키마) +- Consumes: `config.REDIS_URL` + +- [ ] **Step 1: Write the failing test** +```python +# agent-office/tests/test_node_monitor.py +import json, pytest +from app import node_monitor + +class FakeRedis: + """worker heartbeat + queue llen + scan_iter 흉내.""" + def __init__(self, kv=None, lists=None): + self._kv = kv or {} # key(str) -> bytes + self._lists = lists or {} # key(str) -> length(int) + async def get(self, key): + return self._kv.get(key) + async def llen(self, key): + return self._lists.get(key, 0) + async def scan_iter(self, match=None): + prefix = match.rstrip("*") + for k in list(self._lists): + if k.startswith(prefix): + yield k + +def _hb(name, kind, state, **extra): + return json.dumps({"name": name, "kind": kind, "state": state, "ts": "2026-06-29T00:00:00Z", + "last_job_at": None, "jobs_done": 0, "jobs_failed": 0, **extra}).encode() + +@pytest.mark.asyncio +async def test_alive_worker_healthy_link(): + r = FakeRedis(kv={"worker:image-render:heartbeat": _hb("image-render","render","idle")}) + st = await node_monitor.collect_status(redis=r) + img = next(w for w in st["workers"] if w["name"] == "image-render") + assert img["alive"] is True and img["state"] == "idle" + link = next(l for l in st["links"] if l["to"] == "image-render") + assert link["status"] == "healthy" and link["type"] == "redis-queue" + +@pytest.mark.asyncio +async def test_missing_heartbeat_is_dead_and_down(): + r = FakeRedis() # heartbeat 없음 + st = await node_monitor.collect_status(redis=r) + img = next(w for w in st["workers"] if w["name"] == "image-render") + assert img["alive"] is False + link = next(l for l in st["links"] if l["to"] == "image-render") + assert link["status"] == "down" + +@pytest.mark.asyncio +async def test_dead_letter_makes_degraded(): + r = FakeRedis(kv={"worker:video-render:heartbeat": _hb("video-render","render","idle")}, + lists={"dead_letter:queue:video-render": 2}) + st = await node_monitor.collect_status(redis=r) + vid = next(w for w in st["workers"] if w["name"] == "video-render") + assert vid["dead_letter"] == 2 + link = next(l for l in st["links"] if l["to"] == "video-render") + assert link["status"] == "degraded" + +@pytest.mark.asyncio +async def test_paused_reason_from_watcher(): + r = FakeRedis(kv={"queue:paused": b"1", + "worker:task-watcher:heartbeat": _hb("task-watcher","watcher","trading",mode="trading")}) + st = await node_monitor.collect_status(redis=r) + assert st["paused"] is True and st["paused_reason"] == "trading" + +@pytest.mark.asyncio +async def test_trader_http_pull_link(): + r = FakeRedis(kv={"worker:ai_trade:heartbeat": _hb("ai_trade","trader","market_open")}) + st = await node_monitor.collect_status(redis=r) + link = next(l for l in st["links"] if l["from"] == "ai_trade") + assert link["type"] == "http-pull" and link["status"] == "healthy" +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `python -m pytest agent-office/tests/test_node_monitor.py -v` +Expected: FAIL with `ModuleNotFoundError: No module named 'app.node_monitor'` + +- [ ] **Step 3: Write minimal implementation** +```python +# agent-office/app/node_monitor.py +"""분산 워커 상태 집계 (read-only). Global Constraints 계약 2 스키마 생성.""" +from __future__ import annotations +import datetime as dt, json, logging +import redis.asyncio as aioredis +from .config import REDIS_URL + +logger = logging.getLogger("agent-office.node_monitor") + +WORKER_REGISTRY = [ + {"name": "music-render", "kind": "render", "queue": "queue:music-render"}, + {"name": "video-render", "kind": "render", "queue": "queue:video-render"}, + {"name": "image-render", "kind": "render", "queue": "queue:image-render"}, + {"name": "insta-render", "kind": "render", "queue": "queue:insta-render"}, + {"name": "task-watcher", "kind": "watcher", "queue": None}, + {"name": "ai_trade", "kind": "trader", "queue": None}, +] + +_redis = None +def _get_redis(): + global _redis + if _redis is None: + _redis = aioredis.from_url(REDIS_URL, decode_responses=False) + return _redis + + +def _beat_age(ts_str, now): + try: + beat = dt.datetime.fromisoformat(ts_str.replace("Z", "+00:00")) + return max(0, int((now - beat).total_seconds())) + except Exception: + return None + + +def _render_link_status(w): + if not w["alive"]: + return "down" + if w["state"] == "paused": + return "paused" + if w["dead_letter"] > 0: + return "degraded" + return "healthy" + + +async def collect_status(redis=None) -> dict: + r = redis or _get_redis() + now = dt.datetime.now(dt.timezone.utc) + out = {"redis_ok": True, "paused": False, "paused_reason": None, + "generated_at": now.strftime("%Y-%m-%dT%H:%M:%SZ"), + "workers": [], "links": []} + try: + out["paused"] = (await r.get("queue:paused")) == b"1" + except Exception: + logger.exception("redis 접근 실패") + out["redis_ok"] = False + return out + + for w in WORKER_REGISTRY: + info = {"name": w["name"], "kind": w["kind"], "alive": False, "state": None, + "last_beat_age_s": None, "queue_depth": 0, "dead_letter": 0, + "processing": 0, "jobs_done": 0, "jobs_failed": 0, "last_job_at": None} + raw = await r.get(f"worker:{w['name']}:heartbeat") + if raw: + try: + hb = json.loads(raw) + info.update(alive=True, state=hb.get("state"), + jobs_done=hb.get("jobs_done", 0), jobs_failed=hb.get("jobs_failed", 0), + last_job_at=hb.get("last_job_at"), + last_beat_age_s=_beat_age(hb.get("ts", ""), now)) + if w["kind"] == "watcher" and hb.get("mode"): + out["paused_reason"] = hb["mode"] + except json.JSONDecodeError: + logger.warning("heartbeat JSON 파싱 실패 name=%s", w["name"]) + if w["queue"]: + info["queue_depth"] = await r.llen(w["queue"]) + info["dead_letter"] = await r.llen(f"dead_letter:{w['queue']}") + proc = 0 + async for key in r.scan_iter(match=f"processing:{w['queue']}:*"): + proc += await r.llen(key) + info["processing"] = proc + out["workers"].append(info) + + for w in out["workers"]: + if w["kind"] == "trader": + out["links"].append({"from": "ai_trade", "to": "nas-stock", "type": "http-pull", + "status": "healthy" if w["alive"] else "down"}) + elif w["kind"] == "render": + out["links"].append({"from": "nas", "to": w["name"], "type": "redis-queue", + "status": _render_link_status(w)}) + if out["paused"] and not out["paused_reason"]: + out["paused_reason"] = "trading" + return out +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `python -m pytest agent-office/tests/test_node_monitor.py -v` +Expected: PASS (5 passed) + +- [ ] **Step 5: Commit** +```bash +git add agent-office/app/node_monitor.py agent-office/tests/test_node_monitor.py +git commit -m "feat(agent-office): node_monitor.collect_status (heartbeat+큐+dead-letter 집계)" +``` + +## Task B3: GET /api/agent-office/nodes 엔드포인트 + +**Files:** +- Modify: `agent-office/app/main.py` +- Test: `agent-office/tests/test_nodes_endpoint.py` + +**Interfaces:** +- Consumes: `node_monitor.collect_status` +- Produces: `GET /api/agent-office/nodes` → 계약 2 JSON + +- [ ] **Step 1: Write the failing test** +```python +# agent-office/tests/test_nodes_endpoint.py +import pytest +from fastapi.testclient import TestClient + +@pytest.fixture +def client(monkeypatch): + from app import main + async def fake_collect(redis=None): + return {"redis_ok": True, "paused": False, "paused_reason": None, + "generated_at": "2026-06-29T00:00:00Z", "workers": [], "links": []} + monkeypatch.setattr("app.node_monitor.collect_status", fake_collect) + return TestClient(main.app) + +def test_nodes_endpoint_returns_contract(client): + resp = client.get("/api/agent-office/nodes") + assert resp.status_code == 200 + body = resp.json() + assert set(["redis_ok","paused","workers","links"]).issubset(body) +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `python -m pytest agent-office/tests/test_nodes_endpoint.py -v` +Expected: FAIL with 404 (route 없음) + +- [ ] **Step 3: Write minimal implementation** (`main.py` 엔드포인트 추가 — `/states` 근처) +```python +@app.get("/api/agent-office/nodes") +async def nodes_status(): + from .node_monitor import collect_status + return await collect_status() +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `python -m pytest agent-office/tests/test_nodes_endpoint.py -v` +Expected: PASS + +- [ ] **Step 5: Commit** +```bash +git add agent-office/app/main.py agent-office/tests/test_nodes_endpoint.py +git commit -m "feat(agent-office): GET /api/agent-office/nodes 엔드포인트" +``` + +## Task B4: 노드 헬스 경보 cron + +**Files:** +- Modify: `agent-office/app/node_monitor.py` (`check_and_alert` 추가) +- Modify: `agent-office/app/scheduler.py` (interval job 등록) +- Test: `agent-office/tests/test_node_monitor.py` (경보 전이 테스트 추가) + +**Interfaces:** +- Consumes: `collect_status`, `telegram.messaging.send_raw`, `db.add_log` +- Produces: `async def check_and_alert(status=None) -> list[str]` (발송된 경보 텍스트 목록 — 테스트용 반환) + +- [ ] **Step 1: Write the failing test** +```python +# test_node_monitor.py 에 추가 +import app.node_monitor as nm + +@pytest.mark.asyncio +async def test_alert_on_alive_to_dead(monkeypatch): + sent = [] + async def fake_send_raw(text, **kw): sent.append(text); return {"ok": True} + monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw) + monkeypatch.setattr("app.db.add_log", lambda *a, **k: None) + nm._node_state.clear(); nm._dl_notified.clear() + alive = {"workers": [{"name":"image-render","alive":True,"dead_letter":0}], "links": []} + dead = {"workers": [{"name":"image-render","alive":False,"dead_letter":0}], "links": []} + await nm.check_and_alert(status=alive) # 첫 관측 — 경보 없음 + assert sent == [] + await nm.check_and_alert(status=dead) # alive→dead 전이 + assert any("다운" in t for t in sent) + +@pytest.mark.asyncio +async def test_alert_on_dead_letter_growth(monkeypatch): + sent = [] + async def fake_send_raw(text, **kw): sent.append(text); return {"ok": True} + monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw) + monkeypatch.setattr("app.db.add_log", lambda *a, **k: None) + nm._node_state.clear(); nm._dl_notified.clear() + s = {"workers": [{"name":"video-render","alive":True,"dead_letter":2}], "links": []} + await nm.check_and_alert(status=s) + assert any("dead-letter" in t for t in sent) +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `python -m pytest agent-office/tests/test_node_monitor.py -v` +Expected: FAIL (`AttributeError: module has no attribute '_node_state'`/`check_and_alert`) + +- [ ] **Step 3: Write minimal implementation** (`node_monitor.py`에 추가) +```python +from .config import NODE_ALERT_DEADLETTER_THRESHOLD + +_node_state: dict[str, bool] = {} # name -> 직전 alive +_dl_notified: dict[str, int] = {} # name -> 직전 알린 dead_letter 수 + + +async def check_and_alert(status=None) -> list[str]: + from .telegram.messaging import send_raw + from .db import add_log + st = status or await collect_status() + sent: list[str] = [] + for w in st["workers"]: + name, alive = w["name"], w.get("alive", False) + prev = _node_state.get(name) + if prev is True and not alive: + text = f"🔴 [{name}] 워커 다운" + if (await send_raw(text=text)).get("ok"): + add_log("node_monitor", f"{name} 다운", "warning"); sent.append(text) + elif prev is False and alive: + text = f"🟢 [{name}] 워커 복구" + if (await send_raw(text=text)).get("ok"): + add_log("node_monitor", f"{name} 복구", "info"); sent.append(text) + _node_state[name] = alive + dl = w.get("dead_letter", 0) + if dl >= NODE_ALERT_DEADLETTER_THRESHOLD and dl != _dl_notified.get(name, 0): + text = f"❌ [{name}] 실패 누적 {dl}건 (dead-letter)" + if (await send_raw(text=text)).get("ok"): + add_log("node_monitor", f"{name} dead-letter {dl}", "warning"); sent.append(text) + _dl_notified[name] = dl + elif dl == 0: + _dl_notified.pop(name, None) + return sent +``` +> 함정: 첫 관측(prev=None)엔 경보 없음 — 부팅 시 false alarm 방지. alive→dead 전이만 잡으므로 "2주 사일런트 사망"(살아있다 죽음) 시나리오는 포착, 부팅 시 이미 dead인 워커는 미경보(Phase 2에서 보강). + +`scheduler.py` — import + job 등록: +```python +from . import node_monitor # 상단 + +async def _run_node_health_check(): + await node_monitor.check_and_alert() + +# init_scheduler() 안, _poll_pipelines 근처: + scheduler.add_job(_run_node_health_check, "interval", seconds=60, id="node_health_check", replace_existing=True) +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `python -m pytest agent-office/tests/ -v` +Expected: PASS (전체) + +- [ ] **Step 5: Commit** +```bash +git add agent-office/app/node_monitor.py agent-office/app/scheduler.py agent-office/tests/test_node_monitor.py +git commit -m "feat(agent-office): 노드 헬스 1분 cron + 텔레그램 경보(다운/복구/dead-letter)" +``` + +## Task B5: 배포 + 운영 검증 + +**Files:** (코드 변경 없음 — 배포·검증) + +- [ ] **Step 1: nas-deploy 락 획득 + push 배포** +```bash +# acquire_lock("nas-deploy","BE") 후 +git push # Gitea webhook → deployer 자동 배포 (agent-office rebuild: redis 의존성 반영) +``` + +- [ ] **Step 2: 운영 검증** (배포 완료 후) +```bash +# /nodes 응답 확인 (워커 미배포 상태면 전부 alive=false 예상) +curl -s https://gahusb.synology.me/api/agent-office/nodes | python -m json.tool +``` +Expected: `redis_ok:true`, workers 6종 노출. Part A 배포 전이면 alive 전부 false(정상 — heartbeat 아직 없음). + +- [ ] **Step 3: release_lock("nas-deploy")** + +--- + +# Part C — web-ui Three.js 대시보드 (FE 세션 소유) + +> repo: `web-ui`. React + Vite. 빌드는 로컬에서만(NAS Celeron 빌드 금지). 배포는 `npm run release:nas`. +> **Three.js 시각화(C4)는 `designer` 스킬을 활성화해 구현** — 창의적 비주얼이라 line-by-line 사전 코드 대신 데이터 계약 + 상태→비주얼 매핑 + 수용 기준으로 명세한다. + +## Task C1: 의존성 + 라우트 + 네비 등록 + +**Files:** +- Modify: `package.json` (three, @react-three/fiber, @react-three/drei) +- Modify: `src/routes.jsx` (lazy route + navLinks 엔트리) + +- [ ] **Step 1: 의존성 설치** +```bash +npm install three @react-three/fiber @react-three/drei +``` + +- [ ] **Step 2: routes.jsx에 route 추가** (`appRoutes` 배열에 — agent-office 패턴과 동일 lazy) +```jsx + { + path: 'infra', + lazy: () => import('./pages/infra/InfraMonitor'), + }, +``` + +- [ ] **Step 3: navLinks에 엔트리 추가** +```jsx + { + id: 'infra', + label: 'Infra', + path: '/infra', + subtitle: 'NODE PIPELINE', + description: 'NAS↔Windows 워커 파이프라인 실시간 상태', + icon: 🛰️, + accent: '#22d3ee', + }, +``` + +- [ ] **Step 4: Commit** +```bash +git add package.json package-lock.json src/routes.jsx +git commit -m "feat(infra): three.js 의존성 + /infra 라우트·네비 등록" +``` + +## Task C2: api 헬퍼 + useNodeStatus 폴링 훅 + +**Files:** +- Modify: `src/api.js` (`getNodeStatus` 추가) +- Create: `src/pages/infra/useNodeStatus.js` +- Test: `src/pages/infra/useNodeStatus.test.js` (Vitest — 프로젝트 테스트 러너 확인) + +**Interfaces:** +- Produces: `getNodeStatus()` → 계약 2 객체, `useNodeStatus(intervalMs=3000)` → `{data, error, loading}` + +- [ ] **Step 1: Write the failing test** +```js +// src/pages/infra/useNodeStatus.test.js +import { renderHook, waitFor } from '@testing-library/react'; +import { describe, it, expect, vi } from 'vitest'; +import { useNodeStatus } from './useNodeStatus'; +import * as api from '../../api'; + +describe('useNodeStatus', () => { + it('fetches node status', async () => { + vi.spyOn(api, 'getNodeStatus').mockResolvedValue({ redis_ok: true, workers: [], links: [] }); + const { result } = renderHook(() => useNodeStatus(99999)); + await waitFor(() => expect(result.current.data).toBeTruthy()); + expect(result.current.data.redis_ok).toBe(true); + }); +}); +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `npm test -- useNodeStatus` +Expected: FAIL (`useNodeStatus`/`getNodeStatus` 미정의) + +- [ ] **Step 3: Write minimal implementation** + +`src/api.js`에 추가 (기존 fetch 헬퍼 패턴 따라 — 상대경로): +```js +export async function getNodeStatus() { + const res = await fetch('/api/agent-office/nodes'); + if (!res.ok) throw new Error(`nodes ${res.status}`); + return res.json(); +} +``` +`src/pages/infra/useNodeStatus.js`: +```js +import { useEffect, useState, useRef } from 'react'; +import { getNodeStatus } from '../../api'; + +export function useNodeStatus(intervalMs = 3000) { + const [data, setData] = useState(null); + const [error, setError] = useState(null); + const [loading, setLoading] = useState(true); + const timer = useRef(null); + + useEffect(() => { + let alive = true; + async function tick() { + try { + const d = await getNodeStatus(); + if (alive) { setData(d); setError(null); } + } catch (e) { + if (alive) setError(e); + } finally { + if (alive) setLoading(false); + } + } + tick(); + timer.current = setInterval(tick, intervalMs); + return () => { alive = false; clearInterval(timer.current); }; + }, [intervalMs]); + + return { data, error, loading }; +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `npm test -- useNodeStatus` +Expected: PASS + +- [ ] **Step 5: Commit** +```bash +git add src/api.js src/pages/infra/useNodeStatus.js src/pages/infra/useNodeStatus.test.js +git commit -m "feat(infra): getNodeStatus + useNodeStatus 3초 폴링 훅" +``` + +## Task C3: 2D 폴백 패널 (먼저 동작하는 deliverable) + +**Files:** +- Create: `src/pages/infra/InfraMonitor.jsx` (먼저 2D 카드 뷰) +- Create: `src/pages/infra/WorkerCard.jsx` +- Create: `src/pages/infra/statusVisual.js` (상태→색/라벨 매핑 — Three.js와 공유) +- Test: `src/pages/infra/statusVisual.test.js` + +**Interfaces:** +- Produces: `linkColor(status)`, `workerStateLabel(worker)` — C4 Three.js가 재사용 + +- [ ] **Step 1: Write the failing test** +```js +// src/pages/infra/statusVisual.test.js +import { describe, it, expect } from 'vitest'; +import { linkColor, workerStateLabel } from './statusVisual'; + +describe('statusVisual', () => { + it('maps link status to color', () => { + expect(linkColor('healthy')).toBe('#22c55e'); + expect(linkColor('down')).toBe('#ef4444'); + expect(linkColor('paused')).toBe('#f59e0b'); + expect(linkColor('degraded')).toBe('#eab308'); + }); + it('labels dead worker', () => { + expect(workerStateLabel({ alive: false })).toMatch(/다운|down/i); + }); +}); +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `npm test -- statusVisual` +Expected: FAIL (모듈 없음) + +- [ ] **Step 3: Write minimal implementation** + +`src/pages/infra/statusVisual.js`: +```js +export const LINK_COLORS = { + healthy: '#22c55e', paused: '#f59e0b', degraded: '#eab308', down: '#ef4444', +}; +export function linkColor(status) { return LINK_COLORS[status] || '#64748b'; } + +export function workerStateLabel(w) { + if (!w.alive) return '🔴 다운'; + if (w.state === 'paused') return '⏸ 작업중(트레이딩)'; + if (w.state === 'busy') return '⚙️ 처리 중'; + if (w.state === 'market_open') return '📈 장중'; + if (w.state === 'market_closed') return '🌙 휴장'; + return '🟢 대기'; +} +``` +`WorkerCard.jsx` + `InfraMonitor.jsx`: `useNodeStatus()`로 받은 `data.workers`를 카드 그리드로 렌더(이름·상태라벨·queue_depth·dead_letter·last_beat_age_s). `data.redis_ok===false`면 "집계 서버/Redis 연결 끊김" 배너. (표준 React 카드 — designer 스킬 불필요) + +- [ ] **Step 4: Run test to verify it passes** + +Run: `npm test -- statusVisual` +Expected: PASS + +- [ ] **Step 5: Commit** +```bash +git add src/pages/infra/ +git commit -m "feat(infra): 2D 워커 상태 패널 + statusVisual 매핑" +``` + +## Task C4: Three.js 파이프라인 시각화 (designer 스킬) + +**Files:** +- Create: `src/pages/infra/PipelineScene.jsx` (r3f Canvas 토폴로지) +- Modify: `src/pages/infra/InfraMonitor.jsx` (3D Scene + 2D 패널 토글) + +**구현 방식**: 이 Task는 **`designer` 스킬을 활성화**해 구현한다. 아래는 designer가 따라야 할 정확한 데이터 계약·상태 매핑·수용 기준이다. + +- [ ] **Step 1: designer 스킬 활성화** — `Skill(designer)` 호출 후 아래 명세로 진행. + +- [ ] **Step 2: 토폴로지 + 데이터 계약** + - 입력: `useNodeStatus()`의 `data`(계약 2). `data.links`로 파이프라인, `data.workers`로 노드 상태. + - 노드 배치: **좌측** NAS 서버(박스) + 게이트웨이 라벨 / **중앙** Redis 큐 버스(글로우 코어) / **우측** Windows 노드(박스) + 워커 sub-node 6개(`music/video/image/insta-render`, `task-watcher`, `ai_trade`). + - 파이프라인(튜브/라인): render 4종 = NAS→Redis→워커 경로(`type:redis-queue`). `ai_trade` = NAS stock⇄ai_trade 직결(`type:http-pull`, Redis 버스 우회). + - 색은 `statusVisual.linkColor(link.status)` 재사용. + +- [ ] **Step 3: 상태별 애니메이션 (수용 기준)** + - `healthy`: 시안/그린 튜브 + 파티클이 흐름(흐르는 통신 — 사용자 핵심 요구). `state==='busy'`면 파티클 속도↑. + - `paused`: 앰버, 파티클 정지/감속 + "⏸ 작업중(트레이딩)" 라벨. + - `down`: 빨강 + 흐름 정지 + 끊긴 지점 스파크/단절 비주얼 + ⚠ 아이콘 + "last beat Xs ago". + - `degraded`: 튜브에 ❌ dead-letter 카운트 뱃지. + - `redis_ok===false`: 중앙 버스 전체 빨강 + "집계 서버 연결 끊김" 오버레이. + - 각 워커 노드 HUD: 이름, 상태 라벨(`workerStateLabel`), queue_depth, dead_letter. + +- [ ] **Step 4: 폴백 + 검증** + - WebGL 미지원/모바일: C3의 2D 패널로 자동 폴백(토글 버튼 제공). + - 검증: `npm run dev`로 4가지 상태를 mock data로 강제 주입(`useNodeStatus`를 stub)해 healthy/paused/down/degraded 비주얼이 명확히 구분되는지 육안 확인. RC 환경이면 스크린샷/녹화로 사용자 확인. + +- [ ] **Step 5: Commit** +```bash +git add src/pages/infra/PipelineScene.jsx src/pages/infra/InfraMonitor.jsx +git commit -m "feat(infra): three.js NAS↔Windows 파이프라인 시각화 (상태별 흐름/장애)" +``` + +- [ ] **Step 6: 배포** +```bash +npm run build && npm run release:nas +``` + +--- + +## 실행 순서 & 세션 협업 (co-gahusb) + +1. **선행 게이트**: 본 plan의 Global Constraints(계약 2개)를 co-gahusb `post_message`로 FE/AI 세션에 공유 → 3 Part 병렬 시작 가능. +2. **Part B(BE, 이 세션)** 먼저 진행 가능 — FakeRedis 테스트로 워커 없이도 완결. B5 배포 후 `/nodes`가 `alive:false`로라도 응답. +3. **Part A(AI 세션)** 완료·워커 재배포 시 `/nodes`에 alive:true 채워짐. +4. **Part C(FE 세션)**는 `/nodes` 계약만으로 mock 개발 가능 → 실데이터 연동. +5. 공유 리소스: B1의 docker-compose 변경 = `compose` 락, B5 배포 = `nas-deploy` 락. + +## Self-Review (작성자 점검) + +- **스펙 커버리지**: G1(heartbeat 6종)=A1~A4 / G2(큐·dead-letter·processing·paused 집계)=B2 / G3(텔레그램 경보)=B4 / G4(Three.js /infra)=C1~C4. 계약1=A1·Global, 계약2=B2·Global. ✓ 누락 없음. +- **Placeholder 스캔**: C4는 designer 스킬 위임이나 데이터 계약·상태 매핑·수용 기준을 구체 명시(placeholder 아님). 그 외 전 Task에 실제 코드 포함. ✓ +- **타입 일관성**: `WorkerStats`(A1)↔worker.stats(A2), `collect_status` 반환(B2)↔`check_and_alert` 입력(B4)·`/nodes`(B3)·`useNodeStatus`(C2)·`linkColor`/`workerStateLabel`(C3 재사용 by C4) 일치. heartbeat 키/값(계약1)↔node_monitor 파싱(B2) 일치. ✓