# 분산 워커 관측 시스템 Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** NAS↔Windows 분산 워커 6종의 생사·큐·실패 상태를 heartbeat로 관측하고, agent-office가 집계·텔레그램 경보하며, web-ui `/infra`에서 Three.js 파이프라인으로 시각화한다. **Architecture:** 워커가 NAS Redis에 `worker::heartbeat`(TTL 45s) push → agent-office `node_monitor`가 heartbeat + ReliableQueue의 `processing:*`/`dead_letter:*` + `queue:paused`를 읽어 `GET /api/agent-office/nodes`로 노출 → 1분 cron이 상태 전이 시 텔레그램 경보 → web-ui가 3초 폴링해 토폴로지 시각화. **Tech Stack:** Python 3.12 / FastAPI / redis.asyncio / APScheduler (web-backend·web-ai), React + Vite / three + @react-three/fiber + drei (web-ui). ## Global Constraints 이 계약 2개는 3 Part가 독립 병렬 작업하기 위한 잠금 규약. 모든 Task는 이 값을 그대로 사용한다. **[계약 1] Heartbeat 키 스키마** - 키: `worker::heartbeat` (name ∈ `music-render`, `video-render`, `image-render`, `insta-render`, `task-watcher`, `ai_trade`) - 명령: `SET worker::heartbeat EX 45` - 값(JSON): `{"name": str, "kind": "render"|"watcher"|"trader", "state": str, "ts": "", "last_job_at": str|null, "jobs_done": int, "jobs_failed": int, "mode": str(optional, task-watcher 전용)}` - `state` kind별: render=`idle`|`busy`|`paused` / watcher=`trading`|`free` / trader=`market_open`|`market_closed` - 발신 주기 15초, TTL 45초(3배). redis 클라이언트는 `decode_responses=False`(바이트) 기준 — 기존 워커와 동일. **[계약 2] `GET /api/agent-office/nodes` 응답 스키마** ```json { "redis_ok": true, "paused": false, "paused_reason": "trading", "generated_at": "...Z", "workers": [{"name":"image-render","kind":"render","alive":true,"state":"idle", "last_beat_age_s":3,"queue_depth":0,"dead_letter":0,"processing":0, "jobs_done":42,"jobs_failed":1,"last_job_at":"...Z"}], "links": [{"from":"nas","to":"image-render","type":"redis-queue","status":"healthy"}] } ``` - `links[].status` ∈ `healthy`|`paused`|`down`|`degraded`, `links[].type` ∈ `redis-queue`|`http-pull` **공통 규칙**: `.env` 커밋 금지. Docker는 NAS에서만 구동(로컬 docker 명령 금지). 각 repo는 자기 경로에서만 커밋. NAS Redis 주소 `redis://192.168.45.54:6379`(워커 측), 컨테이너 내부 `redis://redis:6379`(NAS 측). --- # Part A — web-ai 워커 heartbeat (AI 세션 소유) > repo: `web-ai`. 테스트: 시스템 Python(`C:\Users\jaeoh\AppData\Local\Programs\Python\Python312\python.exe -m pytest`) — `.venv` 한글경로 깨짐(web-ai CLAUDE.md). ## Task A1: 공용 heartbeat 모듈 **Files:** - Create: `services/_shared/heartbeat.py` - Test: `services/_shared/tests/test_heartbeat.py` **Interfaces:** - Produces: `class WorkerStats`(busy:bool, jobs_done:int, jobs_failed:int, last_job_at:str|None), `def utc_now_iso()->str`, `def build_payload(name,kind,state,stats,extra=None)->str`, `async def render_state(redis,stats,paused_key="queue:paused")->str`, `async def heartbeat_loop(redis,name,kind,stats,*,interval=15,ttl=45,paused_key="queue:paused",state_fn=None)` - [ ] **Step 1: Write the failing test** ```python # services/_shared/tests/test_heartbeat.py import json import pytest from _shared.heartbeat import WorkerStats, build_payload, render_state def test_build_payload_has_contract_fields(): s = WorkerStats(); s.jobs_done = 3; s.last_job_at = "2026-06-29T00:00:00Z" payload = json.loads(build_payload("image-render", "render", "idle", s)) assert payload["name"] == "image-render" assert payload["kind"] == "render" assert payload["state"] == "idle" assert payload["jobs_done"] == 3 assert payload["last_job_at"] == "2026-06-29T00:00:00Z" assert payload["ts"].endswith("Z") def test_build_payload_merges_extra(): payload = json.loads(build_payload("task-watcher", "watcher", "free", WorkerStats(), extra={"mode": "free"})) assert payload["mode"] == "free" class _FakeRedis: def __init__(self, paused): self._paused = paused async def get(self, key): return b"1" if self._paused else None @pytest.mark.asyncio async def test_render_state_paused_overrides_busy(): s = WorkerStats(); s.busy = True assert await render_state(_FakeRedis(paused=True), s) == "paused" @pytest.mark.asyncio async def test_render_state_busy_then_idle(): s = WorkerStats(); s.busy = True assert await render_state(_FakeRedis(paused=False), s) == "busy" s.busy = False assert await render_state(_FakeRedis(paused=False), s) == "idle" ``` - [ ] **Step 2: Run test to verify it fails** Run: `python -m pytest services/_shared/tests/test_heartbeat.py -v` Expected: FAIL with `ModuleNotFoundError: No module named '_shared.heartbeat'` - [ ] **Step 3: Write minimal implementation** ```python # services/_shared/heartbeat.py """분산 워커 heartbeat — worker::heartbeat SET (TTL). Global Constraints 계약 1.""" from __future__ import annotations import asyncio, datetime as dt, json, logging, os logger = logging.getLogger(__name__) DEFAULT_INTERVAL = int(os.getenv("HEARTBEAT_INTERVAL", "15")) DEFAULT_TTL = int(os.getenv("HEARTBEAT_TTL", "45")) class WorkerStats: """worker_loop가 갱신, heartbeat_loop가 읽는 가변 카운터.""" def __init__(self): self.busy = False self.jobs_done = 0 self.jobs_failed = 0 self.last_job_at = None # ISO str | None def utc_now_iso() -> str: return dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def build_payload(name: str, kind: str, state: str, stats: WorkerStats, extra: dict | None = None) -> str: payload = { "name": name, "kind": kind, "state": state, "ts": utc_now_iso(), "last_job_at": stats.last_job_at, "jobs_done": stats.jobs_done, "jobs_failed": stats.jobs_failed, } if extra: payload.update(extra) return json.dumps(payload) async def render_state(redis, stats: WorkerStats, paused_key: str = "queue:paused") -> str: if await redis.get(paused_key) == b"1": return "paused" return "busy" if stats.busy else "idle" async def heartbeat_loop(redis, name, kind, stats, *, interval=DEFAULT_INTERVAL, ttl=DEFAULT_TTL, paused_key="queue:paused", state_fn=None): key = f"worker:{name}:heartbeat" logger.info("heartbeat 시작 name=%s ttl=%ds", name, ttl) while True: try: if state_fn is not None: state, extra = await state_fn(redis, stats) else: state, extra = await render_state(redis, stats, paused_key), None await redis.set(key, build_payload(name, kind, state, stats, extra), ex=ttl) except asyncio.CancelledError: raise except Exception: logger.exception("heartbeat 발신 실패 name=%s", name) await asyncio.sleep(interval) ``` - [ ] **Step 4: Run test to verify it passes** Run: `python -m pytest services/_shared/tests/test_heartbeat.py -v` Expected: PASS (4 passed) - [ ] **Step 5: Commit** ```bash git add services/_shared/heartbeat.py services/_shared/tests/test_heartbeat.py git commit -m "feat(_shared): 워커 heartbeat 모듈 (worker::heartbeat TTL SET)" ``` ## Task A2: image-render 워커에 heartbeat 배선 (+ 나머지 3 render 워커 동형 적용) **Files:** - Modify: `services/image-render/worker.py` (stats 추가 + poll_once 카운터) - Modify: `services/image-render/main.py` (lifespan에 heartbeat_loop 태스크) - Test: `services/image-render/tests/test_worker.py` (poll_once 카운터 검증 추가) **Interfaces:** - Consumes: `_shared.heartbeat.WorkerStats`, `heartbeat_loop`, `utc_now_iso` - Produces: `worker.stats`(모듈 레벨 WorkerStats) — main.py가 heartbeat에 전달 - [ ] **Step 1: Write the failing test** (poll_once가 성공 시 jobs_done 증가·busy 복귀) ```python # services/image-render/tests/test_worker.py 에 추가 import asyncio, pytest import worker class _OneJobQueue: def __init__(self): self.acked = False async def dequeue(self, timeout=5): if self.acked: return None return ({"job_type": "flux_generation", "task_id": "t1", "params": {}}, b"raw") async def ack(self, raw): self.acked = True async def fail(self, raw, payload): pass @pytest.mark.asyncio async def test_poll_once_increments_jobs_done(monkeypatch): worker.stats.jobs_done = 0 monkeypatch.setattr(worker, "run_flux_generation", lambda task_id, params: None) handled = await worker.poll_once(_OneJobQueue()) assert handled is True assert worker.stats.jobs_done == 1 assert worker.stats.busy is False assert worker.stats.last_job_at is not None ``` - [ ] **Step 2: Run test to verify it fails** Run: `python -m pytest services/image-render/tests/test_worker.py::test_poll_once_increments_jobs_done -v` Expected: FAIL with `AttributeError: module 'worker' has no attribute 'stats'` - [ ] **Step 3: Write minimal implementation** `worker.py` — import + 모듈 레벨 stats 추가, `poll_once` 교체: ```python from _shared.heartbeat import WorkerStats, utc_now_iso # 상단 import 블록에 추가 stats = WorkerStats() # 모듈 레벨 (REDIS_URL 상수 근처) async def poll_once(queue: ReliableQueue) -> bool: result = await queue.dequeue(timeout=5) if result is None: return False payload, raw = result stats.busy = True try: await asyncio.to_thread(_dispatch, payload) except Exception: logger.exception("dispatch unhandled exception task_id=%s", payload.get("task_id")) await queue.fail(raw, payload) stats.jobs_failed += 1 stats.last_job_at = utc_now_iso() stats.busy = False return True await queue.ack(raw) stats.jobs_done += 1 stats.last_job_at = utc_now_iso() stats.busy = False return True ``` `main.py` — lifespan에 heartbeat 태스크 추가: ```python import os import redis.asyncio as aioredis from _shared.heartbeat import heartbeat_loop @asynccontextmanager async def lifespan(app: FastAPI): worker_task = asyncio.create_task(worker.worker_loop()) hb_redis = aioredis.from_url(os.getenv("REDIS_URL", "redis://192.168.45.54:6379"), decode_responses=False) hb_task = asyncio.create_task(heartbeat_loop(hb_redis, "image-render", "render", worker.stats)) logger.info("image-render lifespan 시작") try: yield finally: for t in (worker_task, hb_task): t.cancel() try: await t except asyncio.CancelledError: pass await hb_redis.aclose() logger.info("image-render lifespan 종료") ``` - [ ] **Step 4: Run test to verify it passes** Run: `python -m pytest services/image-render/tests/ -v` Expected: PASS (기존 + 신규 1) - [ ] **Step 5: 나머지 3 render 워커 동형 적용** `video-render`, `music-render`, `insta-render` 각각에 Step 3과 **동일 패턴** 적용. 유일한 차이는 main.py의 워커 이름 문자열: - `services/video-render/main.py`: `heartbeat_loop(hb_redis, "video-render", "render", worker.stats)` - `services/music-render/main.py`: `heartbeat_loop(hb_redis, "music-render", "render", worker.stats)` - `services/insta-render/main.py`: `heartbeat_loop(hb_redis, "insta-render", "render", worker.stats)` - 각 `worker.py`에 `from _shared.heartbeat import WorkerStats, utc_now_iso` + `stats = WorkerStats()` + poll_once 카운터(ack→`jobs_done`, fail→`jobs_failed`, 둘 다 `last_job_at`/`busy` 갱신). music-render는 worker.py 구조가 더 복잡(12 job_type)하지만 hook 지점은 동일(dispatch 전 `busy=True`, ack/fail 후 카운터+`busy=False`). - 각 `tests/test_worker.py`에 Step 1 테스트를 워커별 job_type으로 맞춰 추가. - [ ] **Step 6: Commit** ```bash git add services/image-render services/video-render services/music-render services/insta-render git commit -m "feat(render-workers): 4 render 워커 heartbeat 배선 + poll_once 카운터" ``` ## Task A3: task-watcher heartbeat (mode 포함) **Files:** - Modify: `services/task-watcher/watcher.py` - Test: `services/task-watcher/tests/test_watcher.py` (신규) **Interfaces:** - Consumes: `_shared.heartbeat.build_payload`, `WorkerStats` - Produces: `worker:task-watcher:heartbeat` 값에 `state`=mode, `mode`=mode - [ ] **Step 1: Write the failing test** ```python # services/task-watcher/tests/test_watcher.py import json from _shared.heartbeat import build_payload, WorkerStats def test_watcher_heartbeat_payload_carries_mode(): payload = json.loads(build_payload("task-watcher", "watcher", "trading", WorkerStats(), extra={"mode": "trading"})) assert payload["kind"] == "watcher" assert payload["state"] == "trading" assert payload["mode"] == "trading" ``` - [ ] **Step 2: Run test to verify it fails** Run: `python -m pytest services/task-watcher/tests/test_watcher.py -v` Expected: FAIL (`ModuleNotFoundError` 또는 import 경로 — task-watcher의 PYTHONPATH에 `_shared` 추가 필요. compose `PYTHONPATH=/app:/shared` 확인) - [ ] **Step 3: Write minimal implementation** (`watcher_loop`에 heartbeat SET 추가) ```python # watcher.py 상단 import from _shared.heartbeat import build_payload, WorkerStats _HB_STATS = WorkerStats() HEARTBEAT_KEY = "worker:task-watcher:heartbeat" HEARTBEAT_TTL = 45 # watcher_loop while 본문 — mode 판정 직후 추가: mode = current_mode(now, holidays) if mode == "trading": await redis.set(PAUSED_KEY, b"1", ex=PAUSED_TTL) else: await redis.delete(PAUSED_KEY) # heartbeat (LOOP_INTERVAL=30s < TTL 45s) await redis.set(HEARTBEAT_KEY, build_payload("task-watcher", "watcher", mode, _HB_STATS, extra={"mode": mode}), ex=HEARTBEAT_TTL) ``` > 참고: watcher 루프 주기 30s < TTL 45s 라 만료 전 갱신됨. - [ ] **Step 4: Run test to verify it passes** Run: `python -m pytest services/task-watcher/tests/test_watcher.py -v` Expected: PASS - [ ] **Step 5: Commit** ```bash git add services/task-watcher git commit -m "feat(task-watcher): heartbeat 발신 (state=mode, paused 이유 노출)" ``` ## Task A4: ai_trade heartbeat (다른 런타임) **Files:** - Modify: `ai_trade/requirements.txt` 또는 루트 `requirements.txt` (redis 추가 — 기존 미포함 시) - Create: `ai_trade/heartbeat.py` (자체 미니 — 호스트 실행이라 `_shared` import 경로 다름) - Modify: `ai_trade/main.py` (lifespan에 heartbeat 태스크) - Test: `ai_trade/tests/test_heartbeat.py` **Interfaces:** - Produces: `worker:ai_trade:heartbeat` (kind=`trader`, state=`market_open`|`market_closed`) - Consumes: `ai_trade/scheduler.py`의 폴링 윈도우 판정 함수 (AI 세션이 정확한 함수명 확인 — web-ai CLAUDE.md상 `scheduler.py`가 polling window 판정 소유) - [ ] **Step 1: Write the failing test** ```python # ai_trade/tests/test_heartbeat.py import json from heartbeat import build_trader_payload def test_trader_payload_market_open(): p = json.loads(build_trader_payload("market_open", signals=2)) assert p["name"] == "ai_trade" assert p["kind"] == "trader" assert p["state"] == "market_open" assert p["ts"].endswith("Z") ``` - [ ] **Step 2: Run test to verify it fails** Run: `python -m pytest ai_trade/tests/test_heartbeat.py -v` Expected: FAIL (`ModuleNotFoundError: heartbeat`) - [ ] **Step 3: Write minimal implementation** ```python # ai_trade/heartbeat.py """ai_trade heartbeat — NAS Redis로 worker:ai_trade:heartbeat SET. Global Constraints 계약 1.""" from __future__ import annotations import asyncio, datetime as dt, json, logging, os import redis.asyncio as aioredis logger = logging.getLogger(__name__) REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379") KEY = "worker:ai_trade:heartbeat" INTERVAL = int(os.getenv("HEARTBEAT_INTERVAL", "15")) TTL = int(os.getenv("HEARTBEAT_TTL", "45")) def build_trader_payload(state: str, signals: int = 0) -> str: return json.dumps({ "name": "ai_trade", "kind": "trader", "state": state, "ts": dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "last_job_at": None, "jobs_done": signals, "jobs_failed": 0, }) async def heartbeat_loop(state_fn): """state_fn() -> (state:str, signals:int). poll window는 호출자가 주입.""" redis = aioredis.from_url(REDIS_URL, decode_responses=False) try: while True: try: state, signals = state_fn() await redis.set(KEY, build_trader_payload(state, signals), ex=TTL) except asyncio.CancelledError: raise except Exception: logger.exception("ai_trade heartbeat 실패") await asyncio.sleep(INTERVAL) finally: await redis.aclose() ``` `ai_trade/main.py` lifespan에 추가 (AI 세션이 실제 폴링 윈도우 판정으로 state_fn 구성): ```python import heartbeat as _hb # lifespan 내, poll_loop task 생성 부근: def _trader_state(): # scheduler의 폴링 윈도우 판정 결과로 market_open/market_closed 결정 # (AI 세션: scheduler의 실제 윈도우 함수로 교체) state = "market_open" if scheduler.is_polling_window() else "market_closed" return state, len(state_singleton.signals) hb_task = asyncio.create_task(_hb.heartbeat_loop(_trader_state)) # 종료 시 hb_task.cancel() + await (CancelledError 무시) ``` `requirements.txt`에 `redis>=5.0` 추가(미포함 시). - [ ] **Step 4: Run test to verify it passes** Run: `python -m pytest ai_trade/tests/test_heartbeat.py -v` Expected: PASS - [ ] **Step 5: Commit** ```bash git add ai_trade/heartbeat.py ai_trade/main.py ai_trade/tests/test_heartbeat.py requirements.txt git commit -m "feat(ai_trade): NAS Redis heartbeat (trader market_open/closed)" ``` --- # Part B — web-backend agent-office 집계 + 경보 (이 BE 세션 소유) > repo: `web-backend`. 테스트: agent-office는 pytest + pytest-asyncio. 로컬에서 pytest 실행 가능(docker 아님). ## Task B1: agent-office에 redis 의존성 추가 **Files:** - Modify: `agent-office/requirements.txt` - Modify: `agent-office/app/config.py` - Modify: `docker-compose.yml` (agent-office 블록 — `compose` 락 필요) **Interfaces:** - Produces: `config.REDIS_URL`, agent-office 컨테이너에 `REDIS_URL` env + `depends_on: redis` - [ ] **Step 1: requirements.txt에 redis 추가** ``` redis>=5.0 ``` (파일 끝에 한 줄 추가) - [ ] **Step 2: config.py에 REDIS_URL + 임계 추가** ```python # config.py 끝에 추가 # Redis (node monitor) REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379") NODE_ALERT_DEADLETTER_THRESHOLD = int(os.getenv("NODE_ALERT_DEADLETTER_THRESHOLD", "1")) ``` - [ ] **Step 3: docker-compose.yml agent-office 블록 수정** (`acquire_lock("compose","BE")` 후) agent-office 서비스 블록의 `environment:`에 추가: ```yaml - REDIS_URL=${REDIS_URL:-redis://redis:6379} ``` `depends_on:`에 `redis` 추가(블록에 depends_on 없으면 신설): ```yaml depends_on: - redis ``` - [ ] **Step 4: Commit** ```bash git add agent-office/requirements.txt agent-office/app/config.py docker-compose.yml git commit -m "chore(agent-office): redis 의존성 + REDIS_URL/dead-letter 임계 설정" ``` ## Task B2: node_monitor.collect_status **Files:** - Create: `agent-office/app/node_monitor.py` - Test: `agent-office/tests/test_node_monitor.py` **Interfaces:** - Produces: `WORKER_REGISTRY`(list of dict), `async def collect_status(redis=None) -> dict` (계약 2 스키마) - Consumes: `config.REDIS_URL` - [ ] **Step 1: Write the failing test** ```python # agent-office/tests/test_node_monitor.py import json, pytest from app import node_monitor class FakeRedis: """worker heartbeat + queue llen + scan_iter 흉내.""" def __init__(self, kv=None, lists=None): self._kv = kv or {} # key(str) -> bytes self._lists = lists or {} # key(str) -> length(int) async def get(self, key): return self._kv.get(key) async def llen(self, key): return self._lists.get(key, 0) async def scan_iter(self, match=None): prefix = match.rstrip("*") for k in list(self._lists): if k.startswith(prefix): yield k def _hb(name, kind, state, **extra): return json.dumps({"name": name, "kind": kind, "state": state, "ts": "2026-06-29T00:00:00Z", "last_job_at": None, "jobs_done": 0, "jobs_failed": 0, **extra}).encode() @pytest.mark.asyncio async def test_alive_worker_healthy_link(): r = FakeRedis(kv={"worker:image-render:heartbeat": _hb("image-render","render","idle")}) st = await node_monitor.collect_status(redis=r) img = next(w for w in st["workers"] if w["name"] == "image-render") assert img["alive"] is True and img["state"] == "idle" link = next(l for l in st["links"] if l["to"] == "image-render") assert link["status"] == "healthy" and link["type"] == "redis-queue" @pytest.mark.asyncio async def test_missing_heartbeat_is_dead_and_down(): r = FakeRedis() # heartbeat 없음 st = await node_monitor.collect_status(redis=r) img = next(w for w in st["workers"] if w["name"] == "image-render") assert img["alive"] is False link = next(l for l in st["links"] if l["to"] == "image-render") assert link["status"] == "down" @pytest.mark.asyncio async def test_dead_letter_makes_degraded(): r = FakeRedis(kv={"worker:video-render:heartbeat": _hb("video-render","render","idle")}, lists={"dead_letter:queue:video-render": 2}) st = await node_monitor.collect_status(redis=r) vid = next(w for w in st["workers"] if w["name"] == "video-render") assert vid["dead_letter"] == 2 link = next(l for l in st["links"] if l["to"] == "video-render") assert link["status"] == "degraded" @pytest.mark.asyncio async def test_paused_reason_from_watcher(): r = FakeRedis(kv={"queue:paused": b"1", "worker:task-watcher:heartbeat": _hb("task-watcher","watcher","trading",mode="trading")}) st = await node_monitor.collect_status(redis=r) assert st["paused"] is True and st["paused_reason"] == "trading" @pytest.mark.asyncio async def test_trader_http_pull_link(): r = FakeRedis(kv={"worker:ai_trade:heartbeat": _hb("ai_trade","trader","market_open")}) st = await node_monitor.collect_status(redis=r) link = next(l for l in st["links"] if l["from"] == "ai_trade") assert link["type"] == "http-pull" and link["status"] == "healthy" ``` - [ ] **Step 2: Run test to verify it fails** Run: `python -m pytest agent-office/tests/test_node_monitor.py -v` Expected: FAIL with `ModuleNotFoundError: No module named 'app.node_monitor'` - [ ] **Step 3: Write minimal implementation** ```python # agent-office/app/node_monitor.py """분산 워커 상태 집계 (read-only). Global Constraints 계약 2 스키마 생성.""" from __future__ import annotations import datetime as dt, json, logging import redis.asyncio as aioredis from .config import REDIS_URL logger = logging.getLogger("agent-office.node_monitor") WORKER_REGISTRY = [ {"name": "music-render", "kind": "render", "queue": "queue:music-render"}, {"name": "video-render", "kind": "render", "queue": "queue:video-render"}, {"name": "image-render", "kind": "render", "queue": "queue:image-render"}, {"name": "insta-render", "kind": "render", "queue": "queue:insta-render"}, {"name": "task-watcher", "kind": "watcher", "queue": None}, {"name": "ai_trade", "kind": "trader", "queue": None}, ] _redis = None def _get_redis(): global _redis if _redis is None: _redis = aioredis.from_url(REDIS_URL, decode_responses=False) return _redis def _beat_age(ts_str, now): try: beat = dt.datetime.fromisoformat(ts_str.replace("Z", "+00:00")) return max(0, int((now - beat).total_seconds())) except Exception: return None def _render_link_status(w): if not w["alive"]: return "down" if w["state"] == "paused": return "paused" if w["dead_letter"] > 0: return "degraded" return "healthy" async def collect_status(redis=None) -> dict: r = redis or _get_redis() now = dt.datetime.now(dt.timezone.utc) out = {"redis_ok": True, "paused": False, "paused_reason": None, "generated_at": now.strftime("%Y-%m-%dT%H:%M:%SZ"), "workers": [], "links": []} try: out["paused"] = (await r.get("queue:paused")) == b"1" except Exception: logger.exception("redis 접근 실패") out["redis_ok"] = False return out for w in WORKER_REGISTRY: info = {"name": w["name"], "kind": w["kind"], "alive": False, "state": None, "last_beat_age_s": None, "queue_depth": 0, "dead_letter": 0, "processing": 0, "jobs_done": 0, "jobs_failed": 0, "last_job_at": None} raw = await r.get(f"worker:{w['name']}:heartbeat") if raw: try: hb = json.loads(raw) info.update(alive=True, state=hb.get("state"), jobs_done=hb.get("jobs_done", 0), jobs_failed=hb.get("jobs_failed", 0), last_job_at=hb.get("last_job_at"), last_beat_age_s=_beat_age(hb.get("ts", ""), now)) if w["kind"] == "watcher" and hb.get("mode"): out["paused_reason"] = hb["mode"] except json.JSONDecodeError: logger.warning("heartbeat JSON 파싱 실패 name=%s", w["name"]) if w["queue"]: info["queue_depth"] = await r.llen(w["queue"]) info["dead_letter"] = await r.llen(f"dead_letter:{w['queue']}") proc = 0 async for key in r.scan_iter(match=f"processing:{w['queue']}:*"): proc += await r.llen(key) info["processing"] = proc out["workers"].append(info) for w in out["workers"]: if w["kind"] == "trader": out["links"].append({"from": "ai_trade", "to": "nas-stock", "type": "http-pull", "status": "healthy" if w["alive"] else "down"}) elif w["kind"] == "render": out["links"].append({"from": "nas", "to": w["name"], "type": "redis-queue", "status": _render_link_status(w)}) if out["paused"] and not out["paused_reason"]: out["paused_reason"] = "trading" return out ``` - [ ] **Step 4: Run test to verify it passes** Run: `python -m pytest agent-office/tests/test_node_monitor.py -v` Expected: PASS (5 passed) - [ ] **Step 5: Commit** ```bash git add agent-office/app/node_monitor.py agent-office/tests/test_node_monitor.py git commit -m "feat(agent-office): node_monitor.collect_status (heartbeat+큐+dead-letter 집계)" ``` ## Task B3: GET /api/agent-office/nodes 엔드포인트 **Files:** - Modify: `agent-office/app/main.py` - Test: `agent-office/tests/test_nodes_endpoint.py` **Interfaces:** - Consumes: `node_monitor.collect_status` - Produces: `GET /api/agent-office/nodes` → 계약 2 JSON - [ ] **Step 1: Write the failing test** ```python # agent-office/tests/test_nodes_endpoint.py import pytest from fastapi.testclient import TestClient @pytest.fixture def client(monkeypatch): from app import main async def fake_collect(redis=None): return {"redis_ok": True, "paused": False, "paused_reason": None, "generated_at": "2026-06-29T00:00:00Z", "workers": [], "links": []} monkeypatch.setattr("app.node_monitor.collect_status", fake_collect) return TestClient(main.app) def test_nodes_endpoint_returns_contract(client): resp = client.get("/api/agent-office/nodes") assert resp.status_code == 200 body = resp.json() assert set(["redis_ok","paused","workers","links"]).issubset(body) ``` - [ ] **Step 2: Run test to verify it fails** Run: `python -m pytest agent-office/tests/test_nodes_endpoint.py -v` Expected: FAIL with 404 (route 없음) - [ ] **Step 3: Write minimal implementation** (`main.py` 엔드포인트 추가 — `/states` 근처) ```python @app.get("/api/agent-office/nodes") async def nodes_status(): from .node_monitor import collect_status return await collect_status() ``` - [ ] **Step 4: Run test to verify it passes** Run: `python -m pytest agent-office/tests/test_nodes_endpoint.py -v` Expected: PASS - [ ] **Step 5: Commit** ```bash git add agent-office/app/main.py agent-office/tests/test_nodes_endpoint.py git commit -m "feat(agent-office): GET /api/agent-office/nodes 엔드포인트" ``` ## Task B4: 노드 헬스 경보 cron **Files:** - Modify: `agent-office/app/node_monitor.py` (`check_and_alert` 추가) - Modify: `agent-office/app/scheduler.py` (interval job 등록) - Test: `agent-office/tests/test_node_monitor.py` (경보 전이 테스트 추가) **Interfaces:** - Consumes: `collect_status`, `telegram.messaging.send_raw`, `db.add_log` - Produces: `async def check_and_alert(status=None) -> list[str]` (발송된 경보 텍스트 목록 — 테스트용 반환) - [ ] **Step 1: Write the failing test** ```python # test_node_monitor.py 에 추가 import app.node_monitor as nm @pytest.mark.asyncio async def test_alert_on_alive_to_dead(monkeypatch): sent = [] async def fake_send_raw(text, **kw): sent.append(text); return {"ok": True} monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw) monkeypatch.setattr("app.db.add_log", lambda *a, **k: None) nm._node_state.clear(); nm._dl_notified.clear() alive = {"workers": [{"name":"image-render","alive":True,"dead_letter":0}], "links": []} dead = {"workers": [{"name":"image-render","alive":False,"dead_letter":0}], "links": []} await nm.check_and_alert(status=alive) # 첫 관측 — 경보 없음 assert sent == [] await nm.check_and_alert(status=dead) # alive→dead 전이 assert any("다운" in t for t in sent) @pytest.mark.asyncio async def test_alert_on_dead_letter_growth(monkeypatch): sent = [] async def fake_send_raw(text, **kw): sent.append(text); return {"ok": True} monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw) monkeypatch.setattr("app.db.add_log", lambda *a, **k: None) nm._node_state.clear(); nm._dl_notified.clear() s = {"workers": [{"name":"video-render","alive":True,"dead_letter":2}], "links": []} await nm.check_and_alert(status=s) assert any("dead-letter" in t for t in sent) ``` - [ ] **Step 2: Run test to verify it fails** Run: `python -m pytest agent-office/tests/test_node_monitor.py -v` Expected: FAIL (`AttributeError: module has no attribute '_node_state'`/`check_and_alert`) - [ ] **Step 3: Write minimal implementation** (`node_monitor.py`에 추가) ```python from .config import NODE_ALERT_DEADLETTER_THRESHOLD _node_state: dict[str, bool] = {} # name -> 직전 alive _dl_notified: dict[str, int] = {} # name -> 직전 알린 dead_letter 수 async def check_and_alert(status=None) -> list[str]: from .telegram.messaging import send_raw from .db import add_log st = status or await collect_status() sent: list[str] = [] for w in st["workers"]: name, alive = w["name"], w.get("alive", False) prev = _node_state.get(name) if prev is True and not alive: text = f"🔴 [{name}] 워커 다운" if (await send_raw(text=text)).get("ok"): add_log("node_monitor", f"{name} 다운", "warning"); sent.append(text) elif prev is False and alive: text = f"🟢 [{name}] 워커 복구" if (await send_raw(text=text)).get("ok"): add_log("node_monitor", f"{name} 복구", "info"); sent.append(text) _node_state[name] = alive dl = w.get("dead_letter", 0) if dl >= NODE_ALERT_DEADLETTER_THRESHOLD and dl != _dl_notified.get(name, 0): text = f"❌ [{name}] 실패 누적 {dl}건 (dead-letter)" if (await send_raw(text=text)).get("ok"): add_log("node_monitor", f"{name} dead-letter {dl}", "warning"); sent.append(text) _dl_notified[name] = dl elif dl == 0: _dl_notified.pop(name, None) return sent ``` > 함정: 첫 관측(prev=None)엔 경보 없음 — 부팅 시 false alarm 방지. alive→dead 전이만 잡으므로 "2주 사일런트 사망"(살아있다 죽음) 시나리오는 포착, 부팅 시 이미 dead인 워커는 미경보(Phase 2에서 보강). `scheduler.py` — import + job 등록: ```python from . import node_monitor # 상단 async def _run_node_health_check(): await node_monitor.check_and_alert() # init_scheduler() 안, _poll_pipelines 근처: scheduler.add_job(_run_node_health_check, "interval", seconds=60, id="node_health_check", replace_existing=True) ``` - [ ] **Step 4: Run test to verify it passes** Run: `python -m pytest agent-office/tests/ -v` Expected: PASS (전체) - [ ] **Step 5: Commit** ```bash git add agent-office/app/node_monitor.py agent-office/app/scheduler.py agent-office/tests/test_node_monitor.py git commit -m "feat(agent-office): 노드 헬스 1분 cron + 텔레그램 경보(다운/복구/dead-letter)" ``` ## Task B5: 배포 + 운영 검증 **Files:** (코드 변경 없음 — 배포·검증) - [ ] **Step 1: nas-deploy 락 획득 + push 배포** ```bash # acquire_lock("nas-deploy","BE") 후 git push # Gitea webhook → deployer 자동 배포 (agent-office rebuild: redis 의존성 반영) ``` - [ ] **Step 2: 운영 검증** (배포 완료 후) ```bash # /nodes 응답 확인 (워커 미배포 상태면 전부 alive=false 예상) curl -s https://gahusb.synology.me/api/agent-office/nodes | python -m json.tool ``` Expected: `redis_ok:true`, workers 6종 노출. Part A 배포 전이면 alive 전부 false(정상 — heartbeat 아직 없음). - [ ] **Step 3: release_lock("nas-deploy")** --- # Part C — web-ui Three.js 대시보드 (FE 세션 소유) > repo: `web-ui`. React + Vite. 빌드는 로컬에서만(NAS Celeron 빌드 금지). 배포는 `npm run release:nas`. > **Three.js 시각화(C4)는 `designer` 스킬을 활성화해 구현** — 창의적 비주얼이라 line-by-line 사전 코드 대신 데이터 계약 + 상태→비주얼 매핑 + 수용 기준으로 명세한다. ## Task C1: 의존성 + 라우트 + 네비 등록 **Files:** - Modify: `package.json` (three, @react-three/fiber, @react-three/drei) - Modify: `src/routes.jsx` (lazy route + navLinks 엔트리) - [ ] **Step 1: 의존성 설치** ```bash npm install three @react-three/fiber @react-three/drei ``` - [ ] **Step 2: routes.jsx에 route 추가** (`appRoutes` 배열에 — agent-office 패턴과 동일 lazy) ```jsx { path: 'infra', lazy: () => import('./pages/infra/InfraMonitor'), }, ``` - [ ] **Step 3: navLinks에 엔트리 추가** ```jsx { id: 'infra', label: 'Infra', path: '/infra', subtitle: 'NODE PIPELINE', description: 'NAS↔Windows 워커 파이프라인 실시간 상태', icon: 🛰️, accent: '#22d3ee', }, ``` - [ ] **Step 4: Commit** ```bash git add package.json package-lock.json src/routes.jsx git commit -m "feat(infra): three.js 의존성 + /infra 라우트·네비 등록" ``` ## Task C2: api 헬퍼 + useNodeStatus 폴링 훅 **Files:** - Modify: `src/api.js` (`getNodeStatus` 추가) - Create: `src/pages/infra/useNodeStatus.js` - Test: `src/pages/infra/useNodeStatus.test.js` (Vitest — 프로젝트 테스트 러너 확인) **Interfaces:** - Produces: `getNodeStatus()` → 계약 2 객체, `useNodeStatus(intervalMs=3000)` → `{data, error, loading}` - [ ] **Step 1: Write the failing test** ```js // src/pages/infra/useNodeStatus.test.js import { renderHook, waitFor } from '@testing-library/react'; import { describe, it, expect, vi } from 'vitest'; import { useNodeStatus } from './useNodeStatus'; import * as api from '../../api'; describe('useNodeStatus', () => { it('fetches node status', async () => { vi.spyOn(api, 'getNodeStatus').mockResolvedValue({ redis_ok: true, workers: [], links: [] }); const { result } = renderHook(() => useNodeStatus(99999)); await waitFor(() => expect(result.current.data).toBeTruthy()); expect(result.current.data.redis_ok).toBe(true); }); }); ``` - [ ] **Step 2: Run test to verify it fails** Run: `npm test -- useNodeStatus` Expected: FAIL (`useNodeStatus`/`getNodeStatus` 미정의) - [ ] **Step 3: Write minimal implementation** `src/api.js`에 추가 (기존 fetch 헬퍼 패턴 따라 — 상대경로): ```js export async function getNodeStatus() { const res = await fetch('/api/agent-office/nodes'); if (!res.ok) throw new Error(`nodes ${res.status}`); return res.json(); } ``` `src/pages/infra/useNodeStatus.js`: ```js import { useEffect, useState, useRef } from 'react'; import { getNodeStatus } from '../../api'; export function useNodeStatus(intervalMs = 3000) { const [data, setData] = useState(null); const [error, setError] = useState(null); const [loading, setLoading] = useState(true); const timer = useRef(null); useEffect(() => { let alive = true; async function tick() { try { const d = await getNodeStatus(); if (alive) { setData(d); setError(null); } } catch (e) { if (alive) setError(e); } finally { if (alive) setLoading(false); } } tick(); timer.current = setInterval(tick, intervalMs); return () => { alive = false; clearInterval(timer.current); }; }, [intervalMs]); return { data, error, loading }; } ``` - [ ] **Step 4: Run test to verify it passes** Run: `npm test -- useNodeStatus` Expected: PASS - [ ] **Step 5: Commit** ```bash git add src/api.js src/pages/infra/useNodeStatus.js src/pages/infra/useNodeStatus.test.js git commit -m "feat(infra): getNodeStatus + useNodeStatus 3초 폴링 훅" ``` ## Task C3: 2D 폴백 패널 (먼저 동작하는 deliverable) **Files:** - Create: `src/pages/infra/InfraMonitor.jsx` (먼저 2D 카드 뷰) - Create: `src/pages/infra/WorkerCard.jsx` - Create: `src/pages/infra/statusVisual.js` (상태→색/라벨 매핑 — Three.js와 공유) - Test: `src/pages/infra/statusVisual.test.js` **Interfaces:** - Produces: `linkColor(status)`, `workerStateLabel(worker)` — C4 Three.js가 재사용 - [ ] **Step 1: Write the failing test** ```js // src/pages/infra/statusVisual.test.js import { describe, it, expect } from 'vitest'; import { linkColor, workerStateLabel } from './statusVisual'; describe('statusVisual', () => { it('maps link status to color', () => { expect(linkColor('healthy')).toBe('#22c55e'); expect(linkColor('down')).toBe('#ef4444'); expect(linkColor('paused')).toBe('#f59e0b'); expect(linkColor('degraded')).toBe('#eab308'); }); it('labels dead worker', () => { expect(workerStateLabel({ alive: false })).toMatch(/다운|down/i); }); }); ``` - [ ] **Step 2: Run test to verify it fails** Run: `npm test -- statusVisual` Expected: FAIL (모듈 없음) - [ ] **Step 3: Write minimal implementation** `src/pages/infra/statusVisual.js`: ```js export const LINK_COLORS = { healthy: '#22c55e', paused: '#f59e0b', degraded: '#eab308', down: '#ef4444', }; export function linkColor(status) { return LINK_COLORS[status] || '#64748b'; } export function workerStateLabel(w) { if (!w.alive) return '🔴 다운'; if (w.state === 'paused') return '⏸ 작업중(트레이딩)'; if (w.state === 'busy') return '⚙️ 처리 중'; if (w.state === 'market_open') return '📈 장중'; if (w.state === 'market_closed') return '🌙 휴장'; return '🟢 대기'; } ``` `WorkerCard.jsx` + `InfraMonitor.jsx`: `useNodeStatus()`로 받은 `data.workers`를 카드 그리드로 렌더(이름·상태라벨·queue_depth·dead_letter·last_beat_age_s). `data.redis_ok===false`면 "집계 서버/Redis 연결 끊김" 배너. (표준 React 카드 — designer 스킬 불필요) - [ ] **Step 4: Run test to verify it passes** Run: `npm test -- statusVisual` Expected: PASS - [ ] **Step 5: Commit** ```bash git add src/pages/infra/ git commit -m "feat(infra): 2D 워커 상태 패널 + statusVisual 매핑" ``` ## Task C4: Three.js 파이프라인 시각화 (designer 스킬) **Files:** - Create: `src/pages/infra/PipelineScene.jsx` (r3f Canvas 토폴로지) - Modify: `src/pages/infra/InfraMonitor.jsx` (3D Scene + 2D 패널 토글) **구현 방식**: 이 Task는 **`designer` 스킬을 활성화**해 구현한다. 아래는 designer가 따라야 할 정확한 데이터 계약·상태 매핑·수용 기준이다. - [ ] **Step 1: designer 스킬 활성화** — `Skill(designer)` 호출 후 아래 명세로 진행. - [ ] **Step 2: 토폴로지 + 데이터 계약** - 입력: `useNodeStatus()`의 `data`(계약 2). `data.links`로 파이프라인, `data.workers`로 노드 상태. - 노드 배치: **좌측** NAS 서버(박스) + 게이트웨이 라벨 / **중앙** Redis 큐 버스(글로우 코어) / **우측** Windows 노드(박스) + 워커 sub-node 6개(`music/video/image/insta-render`, `task-watcher`, `ai_trade`). - 파이프라인(튜브/라인): render 4종 = NAS→Redis→워커 경로(`type:redis-queue`). `ai_trade` = NAS stock⇄ai_trade 직결(`type:http-pull`, Redis 버스 우회). - 색은 `statusVisual.linkColor(link.status)` 재사용. - [ ] **Step 3: 상태별 애니메이션 (수용 기준)** - `healthy`: 시안/그린 튜브 + 파티클이 흐름(흐르는 통신 — 사용자 핵심 요구). `state==='busy'`면 파티클 속도↑. - `paused`: 앰버, 파티클 정지/감속 + "⏸ 작업중(트레이딩)" 라벨. - `down`: 빨강 + 흐름 정지 + 끊긴 지점 스파크/단절 비주얼 + ⚠ 아이콘 + "last beat Xs ago". - `degraded`: 튜브에 ❌ dead-letter 카운트 뱃지. - `redis_ok===false`: 중앙 버스 전체 빨강 + "집계 서버 연결 끊김" 오버레이. - 각 워커 노드 HUD: 이름, 상태 라벨(`workerStateLabel`), queue_depth, dead_letter. - [ ] **Step 4: 폴백 + 검증** - WebGL 미지원/모바일: C3의 2D 패널로 자동 폴백(토글 버튼 제공). - 검증: `npm run dev`로 4가지 상태를 mock data로 강제 주입(`useNodeStatus`를 stub)해 healthy/paused/down/degraded 비주얼이 명확히 구분되는지 육안 확인. RC 환경이면 스크린샷/녹화로 사용자 확인. - [ ] **Step 5: Commit** ```bash git add src/pages/infra/PipelineScene.jsx src/pages/infra/InfraMonitor.jsx git commit -m "feat(infra): three.js NAS↔Windows 파이프라인 시각화 (상태별 흐름/장애)" ``` - [ ] **Step 6: 배포** ```bash npm run build && npm run release:nas ``` --- ## 실행 순서 & 세션 협업 (co-gahusb) 1. **선행 게이트**: 본 plan의 Global Constraints(계약 2개)를 co-gahusb `post_message`로 FE/AI 세션에 공유 → 3 Part 병렬 시작 가능. 2. **Part B(BE, 이 세션)** 먼저 진행 가능 — FakeRedis 테스트로 워커 없이도 완결. B5 배포 후 `/nodes`가 `alive:false`로라도 응답. 3. **Part A(AI 세션)** 완료·워커 재배포 시 `/nodes`에 alive:true 채워짐. 4. **Part C(FE 세션)**는 `/nodes` 계약만으로 mock 개발 가능 → 실데이터 연동. 5. 공유 리소스: B1의 docker-compose 변경 = `compose` 락, B5 배포 = `nas-deploy` 락. ## Self-Review (작성자 점검) - **스펙 커버리지**: G1(heartbeat 6종)=A1~A4 / G2(큐·dead-letter·processing·paused 집계)=B2 / G3(텔레그램 경보)=B4 / G4(Three.js /infra)=C1~C4. 계약1=A1·Global, 계약2=B2·Global. ✓ 누락 없음. - **Placeholder 스캔**: C4는 designer 스킬 위임이나 데이터 계약·상태 매핑·수용 기준을 구체 명시(placeholder 아님). 그 외 전 Task에 실제 코드 포함. ✓ - **타입 일관성**: `WorkerStats`(A1)↔worker.stats(A2), `collect_status` 반환(B2)↔`check_and_alert` 입력(B4)·`/nodes`(B3)·`useNodeStatus`(C2)·`linkColor`/`workerStateLabel`(C3 재사용 by C4) 일치. heartbeat 키/값(계약1)↔node_monitor 파싱(B2) 일치. ✓