Files
web-page-backend/docs/superpowers/plans/2026-06-29-worker-observability.md
gahusb c69b18243b docs: 분산 워커 관측 시스템 구현 계획(3-repo TDD plan) 추가
Part A(web-ai heartbeat) / Part B(agent-office 집계+경보) / Part C(web-ui
Three.js 대시보드). 각 Part 독립 실행·테스트 가능, 계약 2개를 Global
Constraints로 잠금.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
2026-06-29 17:33:16 +09:00

1106 lines
44 KiB
Markdown
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 분산 워커 관측 시스템 Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** NAS↔Windows 분산 워커 6종의 생사·큐·실패 상태를 heartbeat로 관측하고, agent-office가 집계·텔레그램 경보하며, web-ui `/infra`에서 Three.js 파이프라인으로 시각화한다.
**Architecture:** 워커가 NAS Redis에 `worker:<name>:heartbeat`(TTL 45s) push → agent-office `node_monitor`가 heartbeat + ReliableQueue의 `processing:*`/`dead_letter:*` + `queue:paused`를 읽어 `GET /api/agent-office/nodes`로 노출 → 1분 cron이 상태 전이 시 텔레그램 경보 → web-ui가 3초 폴링해 토폴로지 시각화.
**Tech Stack:** Python 3.12 / FastAPI / redis.asyncio / APScheduler (web-backend·web-ai), React + Vite / three + @react-three/fiber + drei (web-ui).
## Global Constraints
이 계약 2개는 3 Part가 독립 병렬 작업하기 위한 잠금 규약. 모든 Task는 이 값을 그대로 사용한다.
**[계약 1] Heartbeat 키 스키마**
- 키: `worker:<name>:heartbeat` (name ∈ `music-render`, `video-render`, `image-render`, `insta-render`, `task-watcher`, `ai_trade`)
- 명령: `SET worker:<name>:heartbeat <json> EX 45`
- 값(JSON): `{"name": str, "kind": "render"|"watcher"|"trader", "state": str, "ts": "<UTC ISO8601 ...Z>", "last_job_at": str|null, "jobs_done": int, "jobs_failed": int, "mode": str(optional, task-watcher 전용)}`
- `state` kind별: render=`idle`|`busy`|`paused` / watcher=`trading`|`free` / trader=`market_open`|`market_closed`
- 발신 주기 15초, TTL 45초(3배). redis 클라이언트는 `decode_responses=False`(바이트) 기준 — 기존 워커와 동일.
**[계약 2] `GET /api/agent-office/nodes` 응답 스키마**
```json
{ "redis_ok": true, "paused": false, "paused_reason": "trading", "generated_at": "...Z",
"workers": [{"name":"image-render","kind":"render","alive":true,"state":"idle",
"last_beat_age_s":3,"queue_depth":0,"dead_letter":0,"processing":0,
"jobs_done":42,"jobs_failed":1,"last_job_at":"...Z"}],
"links": [{"from":"nas","to":"image-render","type":"redis-queue","status":"healthy"}] }
```
- `links[].status``healthy`|`paused`|`down`|`degraded`, `links[].type``redis-queue`|`http-pull`
**공통 규칙**: `.env` 커밋 금지. Docker는 NAS에서만 구동(로컬 docker 명령 금지). 각 repo는 자기 경로에서만 커밋. NAS Redis 주소 `redis://192.168.45.54:6379`(워커 측), 컨테이너 내부 `redis://redis:6379`(NAS 측).
---
# Part A — web-ai 워커 heartbeat (AI 세션 소유)
> repo: `web-ai`. 테스트: 시스템 Python(`C:\Users\jaeoh\AppData\Local\Programs\Python\Python312\python.exe -m pytest`) — `.venv` 한글경로 깨짐(web-ai CLAUDE.md).
## Task A1: 공용 heartbeat 모듈
**Files:**
- Create: `services/_shared/heartbeat.py`
- Test: `services/_shared/tests/test_heartbeat.py`
**Interfaces:**
- Produces: `class WorkerStats`(busy:bool, jobs_done:int, jobs_failed:int, last_job_at:str|None), `def utc_now_iso()->str`, `def build_payload(name,kind,state,stats,extra=None)->str`, `async def render_state(redis,stats,paused_key="queue:paused")->str`, `async def heartbeat_loop(redis,name,kind,stats,*,interval=15,ttl=45,paused_key="queue:paused",state_fn=None)`
- [ ] **Step 1: Write the failing test**
```python
# services/_shared/tests/test_heartbeat.py
import json
import pytest
from _shared.heartbeat import WorkerStats, build_payload, render_state
def test_build_payload_has_contract_fields():
s = WorkerStats(); s.jobs_done = 3; s.last_job_at = "2026-06-29T00:00:00Z"
payload = json.loads(build_payload("image-render", "render", "idle", s))
assert payload["name"] == "image-render"
assert payload["kind"] == "render"
assert payload["state"] == "idle"
assert payload["jobs_done"] == 3
assert payload["last_job_at"] == "2026-06-29T00:00:00Z"
assert payload["ts"].endswith("Z")
def test_build_payload_merges_extra():
payload = json.loads(build_payload("task-watcher", "watcher", "free", WorkerStats(), extra={"mode": "free"}))
assert payload["mode"] == "free"
class _FakeRedis:
def __init__(self, paused): self._paused = paused
async def get(self, key): return b"1" if self._paused else None
@pytest.mark.asyncio
async def test_render_state_paused_overrides_busy():
s = WorkerStats(); s.busy = True
assert await render_state(_FakeRedis(paused=True), s) == "paused"
@pytest.mark.asyncio
async def test_render_state_busy_then_idle():
s = WorkerStats(); s.busy = True
assert await render_state(_FakeRedis(paused=False), s) == "busy"
s.busy = False
assert await render_state(_FakeRedis(paused=False), s) == "idle"
```
- [ ] **Step 2: Run test to verify it fails**
Run: `python -m pytest services/_shared/tests/test_heartbeat.py -v`
Expected: FAIL with `ModuleNotFoundError: No module named '_shared.heartbeat'`
- [ ] **Step 3: Write minimal implementation**
```python
# services/_shared/heartbeat.py
"""분산 워커 heartbeat — worker:<name>:heartbeat SET (TTL). Global Constraints 계약 1."""
from __future__ import annotations
import asyncio, datetime as dt, json, logging, os
logger = logging.getLogger(__name__)
DEFAULT_INTERVAL = int(os.getenv("HEARTBEAT_INTERVAL", "15"))
DEFAULT_TTL = int(os.getenv("HEARTBEAT_TTL", "45"))
class WorkerStats:
"""worker_loop가 갱신, heartbeat_loop가 읽는 가변 카운터."""
def __init__(self):
self.busy = False
self.jobs_done = 0
self.jobs_failed = 0
self.last_job_at = None # ISO str | None
def utc_now_iso() -> str:
return dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def build_payload(name: str, kind: str, state: str, stats: WorkerStats, extra: dict | None = None) -> str:
payload = {
"name": name, "kind": kind, "state": state, "ts": utc_now_iso(),
"last_job_at": stats.last_job_at,
"jobs_done": stats.jobs_done, "jobs_failed": stats.jobs_failed,
}
if extra:
payload.update(extra)
return json.dumps(payload)
async def render_state(redis, stats: WorkerStats, paused_key: str = "queue:paused") -> str:
if await redis.get(paused_key) == b"1":
return "paused"
return "busy" if stats.busy else "idle"
async def heartbeat_loop(redis, name, kind, stats, *, interval=DEFAULT_INTERVAL,
ttl=DEFAULT_TTL, paused_key="queue:paused", state_fn=None):
key = f"worker:{name}:heartbeat"
logger.info("heartbeat 시작 name=%s ttl=%ds", name, ttl)
while True:
try:
if state_fn is not None:
state, extra = await state_fn(redis, stats)
else:
state, extra = await render_state(redis, stats, paused_key), None
await redis.set(key, build_payload(name, kind, state, stats, extra), ex=ttl)
except asyncio.CancelledError:
raise
except Exception:
logger.exception("heartbeat 발신 실패 name=%s", name)
await asyncio.sleep(interval)
```
- [ ] **Step 4: Run test to verify it passes**
Run: `python -m pytest services/_shared/tests/test_heartbeat.py -v`
Expected: PASS (4 passed)
- [ ] **Step 5: Commit**
```bash
git add services/_shared/heartbeat.py services/_shared/tests/test_heartbeat.py
git commit -m "feat(_shared): 워커 heartbeat 모듈 (worker:<name>:heartbeat TTL SET)"
```
## Task A2: image-render 워커에 heartbeat 배선 (+ 나머지 3 render 워커 동형 적용)
**Files:**
- Modify: `services/image-render/worker.py` (stats 추가 + poll_once 카운터)
- Modify: `services/image-render/main.py` (lifespan에 heartbeat_loop 태스크)
- Test: `services/image-render/tests/test_worker.py` (poll_once 카운터 검증 추가)
**Interfaces:**
- Consumes: `_shared.heartbeat.WorkerStats`, `heartbeat_loop`, `utc_now_iso`
- Produces: `worker.stats`(모듈 레벨 WorkerStats) — main.py가 heartbeat에 전달
- [ ] **Step 1: Write the failing test** (poll_once가 성공 시 jobs_done 증가·busy 복귀)
```python
# services/image-render/tests/test_worker.py 에 추가
import asyncio, pytest
import worker
class _OneJobQueue:
def __init__(self): self.acked = False
async def dequeue(self, timeout=5):
if self.acked: return None
return ({"job_type": "flux_generation", "task_id": "t1", "params": {}}, b"raw")
async def ack(self, raw): self.acked = True
async def fail(self, raw, payload): pass
@pytest.mark.asyncio
async def test_poll_once_increments_jobs_done(monkeypatch):
worker.stats.jobs_done = 0
monkeypatch.setattr(worker, "run_flux_generation", lambda task_id, params: None)
handled = await worker.poll_once(_OneJobQueue())
assert handled is True
assert worker.stats.jobs_done == 1
assert worker.stats.busy is False
assert worker.stats.last_job_at is not None
```
- [ ] **Step 2: Run test to verify it fails**
Run: `python -m pytest services/image-render/tests/test_worker.py::test_poll_once_increments_jobs_done -v`
Expected: FAIL with `AttributeError: module 'worker' has no attribute 'stats'`
- [ ] **Step 3: Write minimal implementation**
`worker.py` — import + 모듈 레벨 stats 추가, `poll_once` 교체:
```python
from _shared.heartbeat import WorkerStats, utc_now_iso # 상단 import 블록에 추가
stats = WorkerStats() # 모듈 레벨 (REDIS_URL 상수 근처)
async def poll_once(queue: ReliableQueue) -> bool:
result = await queue.dequeue(timeout=5)
if result is None:
return False
payload, raw = result
stats.busy = True
try:
await asyncio.to_thread(_dispatch, payload)
except Exception:
logger.exception("dispatch unhandled exception task_id=%s", payload.get("task_id"))
await queue.fail(raw, payload)
stats.jobs_failed += 1
stats.last_job_at = utc_now_iso()
stats.busy = False
return True
await queue.ack(raw)
stats.jobs_done += 1
stats.last_job_at = utc_now_iso()
stats.busy = False
return True
```
`main.py` — lifespan에 heartbeat 태스크 추가:
```python
import os
import redis.asyncio as aioredis
from _shared.heartbeat import heartbeat_loop
@asynccontextmanager
async def lifespan(app: FastAPI):
worker_task = asyncio.create_task(worker.worker_loop())
hb_redis = aioredis.from_url(os.getenv("REDIS_URL", "redis://192.168.45.54:6379"), decode_responses=False)
hb_task = asyncio.create_task(heartbeat_loop(hb_redis, "image-render", "render", worker.stats))
logger.info("image-render lifespan 시작")
try:
yield
finally:
for t in (worker_task, hb_task):
t.cancel()
try:
await t
except asyncio.CancelledError:
pass
await hb_redis.aclose()
logger.info("image-render lifespan 종료")
```
- [ ] **Step 4: Run test to verify it passes**
Run: `python -m pytest services/image-render/tests/ -v`
Expected: PASS (기존 + 신규 1)
- [ ] **Step 5: 나머지 3 render 워커 동형 적용**
`video-render`, `music-render`, `insta-render` 각각에 Step 3과 **동일 패턴** 적용. 유일한 차이는 main.py의 워커 이름 문자열:
- `services/video-render/main.py`: `heartbeat_loop(hb_redis, "video-render", "render", worker.stats)`
- `services/music-render/main.py`: `heartbeat_loop(hb_redis, "music-render", "render", worker.stats)`
- `services/insta-render/main.py`: `heartbeat_loop(hb_redis, "insta-render", "render", worker.stats)`
-`worker.py``from _shared.heartbeat import WorkerStats, utc_now_iso` + `stats = WorkerStats()` + poll_once 카운터(ack→`jobs_done`, fail→`jobs_failed`, 둘 다 `last_job_at`/`busy` 갱신). music-render는 worker.py 구조가 더 복잡(12 job_type)하지만 hook 지점은 동일(dispatch 전 `busy=True`, ack/fail 후 카운터+`busy=False`).
-`tests/test_worker.py`에 Step 1 테스트를 워커별 job_type으로 맞춰 추가.
- [ ] **Step 6: Commit**
```bash
git add services/image-render services/video-render services/music-render services/insta-render
git commit -m "feat(render-workers): 4 render 워커 heartbeat 배선 + poll_once 카운터"
```
## Task A3: task-watcher heartbeat (mode 포함)
**Files:**
- Modify: `services/task-watcher/watcher.py`
- Test: `services/task-watcher/tests/test_watcher.py` (신규)
**Interfaces:**
- Consumes: `_shared.heartbeat.build_payload`, `WorkerStats`
- Produces: `worker:task-watcher:heartbeat` 값에 `state`=mode, `mode`=mode
- [ ] **Step 1: Write the failing test**
```python
# services/task-watcher/tests/test_watcher.py
import json
from _shared.heartbeat import build_payload, WorkerStats
def test_watcher_heartbeat_payload_carries_mode():
payload = json.loads(build_payload("task-watcher", "watcher", "trading", WorkerStats(), extra={"mode": "trading"}))
assert payload["kind"] == "watcher"
assert payload["state"] == "trading"
assert payload["mode"] == "trading"
```
- [ ] **Step 2: Run test to verify it fails**
Run: `python -m pytest services/task-watcher/tests/test_watcher.py -v`
Expected: FAIL (`ModuleNotFoundError` 또는 import 경로 — task-watcher의 PYTHONPATH에 `_shared` 추가 필요. compose `PYTHONPATH=/app:/shared` 확인)
- [ ] **Step 3: Write minimal implementation** (`watcher_loop`에 heartbeat SET 추가)
```python
# watcher.py 상단 import
from _shared.heartbeat import build_payload, WorkerStats
_HB_STATS = WorkerStats()
HEARTBEAT_KEY = "worker:task-watcher:heartbeat"
HEARTBEAT_TTL = 45
# watcher_loop while 본문 — mode 판정 직후 추가:
mode = current_mode(now, holidays)
if mode == "trading":
await redis.set(PAUSED_KEY, b"1", ex=PAUSED_TTL)
else:
await redis.delete(PAUSED_KEY)
# heartbeat (LOOP_INTERVAL=30s < TTL 45s)
await redis.set(HEARTBEAT_KEY,
build_payload("task-watcher", "watcher", mode, _HB_STATS, extra={"mode": mode}),
ex=HEARTBEAT_TTL)
```
> 참고: watcher 루프 주기 30s < TTL 45s 라 만료 전 갱신됨.
- [ ] **Step 4: Run test to verify it passes**
Run: `python -m pytest services/task-watcher/tests/test_watcher.py -v`
Expected: PASS
- [ ] **Step 5: Commit**
```bash
git add services/task-watcher
git commit -m "feat(task-watcher): heartbeat 발신 (state=mode, paused 이유 노출)"
```
## Task A4: ai_trade heartbeat (다른 런타임)
**Files:**
- Modify: `ai_trade/requirements.txt` 또는 루트 `requirements.txt` (redis 추가 — 기존 미포함 시)
- Create: `ai_trade/heartbeat.py` (자체 미니 — 호스트 실행이라 `_shared` import 경로 다름)
- Modify: `ai_trade/main.py` (lifespan에 heartbeat 태스크)
- Test: `ai_trade/tests/test_heartbeat.py`
**Interfaces:**
- Produces: `worker:ai_trade:heartbeat` (kind=`trader`, state=`market_open`|`market_closed`)
- Consumes: `ai_trade/scheduler.py`의 폴링 윈도우 판정 함수 (AI 세션이 정확한 함수명 확인 — web-ai CLAUDE.md상 `scheduler.py`가 polling window 판정 소유)
- [ ] **Step 1: Write the failing test**
```python
# ai_trade/tests/test_heartbeat.py
import json
from heartbeat import build_trader_payload
def test_trader_payload_market_open():
p = json.loads(build_trader_payload("market_open", signals=2))
assert p["name"] == "ai_trade"
assert p["kind"] == "trader"
assert p["state"] == "market_open"
assert p["ts"].endswith("Z")
```
- [ ] **Step 2: Run test to verify it fails**
Run: `python -m pytest ai_trade/tests/test_heartbeat.py -v`
Expected: FAIL (`ModuleNotFoundError: heartbeat`)
- [ ] **Step 3: Write minimal implementation**
```python
# ai_trade/heartbeat.py
"""ai_trade heartbeat — NAS Redis로 worker:ai_trade:heartbeat SET. Global Constraints 계약 1."""
from __future__ import annotations
import asyncio, datetime as dt, json, logging, os
import redis.asyncio as aioredis
logger = logging.getLogger(__name__)
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
KEY = "worker:ai_trade:heartbeat"
INTERVAL = int(os.getenv("HEARTBEAT_INTERVAL", "15"))
TTL = int(os.getenv("HEARTBEAT_TTL", "45"))
def build_trader_payload(state: str, signals: int = 0) -> str:
return json.dumps({
"name": "ai_trade", "kind": "trader", "state": state,
"ts": dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"last_job_at": None, "jobs_done": signals, "jobs_failed": 0,
})
async def heartbeat_loop(state_fn):
"""state_fn() -> (state:str, signals:int). poll window는 호출자가 주입."""
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
try:
while True:
try:
state, signals = state_fn()
await redis.set(KEY, build_trader_payload(state, signals), ex=TTL)
except asyncio.CancelledError:
raise
except Exception:
logger.exception("ai_trade heartbeat 실패")
await asyncio.sleep(INTERVAL)
finally:
await redis.aclose()
```
`ai_trade/main.py` lifespan에 추가 (AI 세션이 실제 폴링 윈도우 판정으로 state_fn 구성):
```python
import heartbeat as _hb
# lifespan 내, poll_loop task 생성 부근:
def _trader_state():
# scheduler의 폴링 윈도우 판정 결과로 market_open/market_closed 결정
# (AI 세션: scheduler의 실제 윈도우 함수로 교체)
state = "market_open" if scheduler.is_polling_window() else "market_closed"
return state, len(state_singleton.signals)
hb_task = asyncio.create_task(_hb.heartbeat_loop(_trader_state))
# 종료 시 hb_task.cancel() + await (CancelledError 무시)
```
`requirements.txt``redis>=5.0` 추가(미포함 시).
- [ ] **Step 4: Run test to verify it passes**
Run: `python -m pytest ai_trade/tests/test_heartbeat.py -v`
Expected: PASS
- [ ] **Step 5: Commit**
```bash
git add ai_trade/heartbeat.py ai_trade/main.py ai_trade/tests/test_heartbeat.py requirements.txt
git commit -m "feat(ai_trade): NAS Redis heartbeat (trader market_open/closed)"
```
---
# Part B — web-backend agent-office 집계 + 경보 (이 BE 세션 소유)
> repo: `web-backend`. 테스트: agent-office는 pytest + pytest-asyncio. 로컬에서 pytest 실행 가능(docker 아님).
## Task B1: agent-office에 redis 의존성 추가
**Files:**
- Modify: `agent-office/requirements.txt`
- Modify: `agent-office/app/config.py`
- Modify: `docker-compose.yml` (agent-office 블록 — `compose` 락 필요)
**Interfaces:**
- Produces: `config.REDIS_URL`, agent-office 컨테이너에 `REDIS_URL` env + `depends_on: redis`
- [ ] **Step 1: requirements.txt에 redis 추가**
```
redis>=5.0
```
(파일 끝에 한 줄 추가)
- [ ] **Step 2: config.py에 REDIS_URL + 임계 추가**
```python
# config.py 끝에 추가
# Redis (node monitor)
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379")
NODE_ALERT_DEADLETTER_THRESHOLD = int(os.getenv("NODE_ALERT_DEADLETTER_THRESHOLD", "1"))
```
- [ ] **Step 3: docker-compose.yml agent-office 블록 수정** (`acquire_lock("compose","BE")` 후)
agent-office 서비스 블록의 `environment:`에 추가:
```yaml
- REDIS_URL=${REDIS_URL:-redis://redis:6379}
```
`depends_on:``redis` 추가(블록에 depends_on 없으면 신설):
```yaml
depends_on:
- redis
```
- [ ] **Step 4: Commit**
```bash
git add agent-office/requirements.txt agent-office/app/config.py docker-compose.yml
git commit -m "chore(agent-office): redis 의존성 + REDIS_URL/dead-letter 임계 설정"
```
## Task B2: node_monitor.collect_status
**Files:**
- Create: `agent-office/app/node_monitor.py`
- Test: `agent-office/tests/test_node_monitor.py`
**Interfaces:**
- Produces: `WORKER_REGISTRY`(list of dict), `async def collect_status(redis=None) -> dict` (계약 2 스키마)
- Consumes: `config.REDIS_URL`
- [ ] **Step 1: Write the failing test**
```python
# agent-office/tests/test_node_monitor.py
import json, pytest
from app import node_monitor
class FakeRedis:
"""worker heartbeat + queue llen + scan_iter 흉내."""
def __init__(self, kv=None, lists=None):
self._kv = kv or {} # key(str) -> bytes
self._lists = lists or {} # key(str) -> length(int)
async def get(self, key):
return self._kv.get(key)
async def llen(self, key):
return self._lists.get(key, 0)
async def scan_iter(self, match=None):
prefix = match.rstrip("*")
for k in list(self._lists):
if k.startswith(prefix):
yield k
def _hb(name, kind, state, **extra):
return json.dumps({"name": name, "kind": kind, "state": state, "ts": "2026-06-29T00:00:00Z",
"last_job_at": None, "jobs_done": 0, "jobs_failed": 0, **extra}).encode()
@pytest.mark.asyncio
async def test_alive_worker_healthy_link():
r = FakeRedis(kv={"worker:image-render:heartbeat": _hb("image-render","render","idle")})
st = await node_monitor.collect_status(redis=r)
img = next(w for w in st["workers"] if w["name"] == "image-render")
assert img["alive"] is True and img["state"] == "idle"
link = next(l for l in st["links"] if l["to"] == "image-render")
assert link["status"] == "healthy" and link["type"] == "redis-queue"
@pytest.mark.asyncio
async def test_missing_heartbeat_is_dead_and_down():
r = FakeRedis() # heartbeat 없음
st = await node_monitor.collect_status(redis=r)
img = next(w for w in st["workers"] if w["name"] == "image-render")
assert img["alive"] is False
link = next(l for l in st["links"] if l["to"] == "image-render")
assert link["status"] == "down"
@pytest.mark.asyncio
async def test_dead_letter_makes_degraded():
r = FakeRedis(kv={"worker:video-render:heartbeat": _hb("video-render","render","idle")},
lists={"dead_letter:queue:video-render": 2})
st = await node_monitor.collect_status(redis=r)
vid = next(w for w in st["workers"] if w["name"] == "video-render")
assert vid["dead_letter"] == 2
link = next(l for l in st["links"] if l["to"] == "video-render")
assert link["status"] == "degraded"
@pytest.mark.asyncio
async def test_paused_reason_from_watcher():
r = FakeRedis(kv={"queue:paused": b"1",
"worker:task-watcher:heartbeat": _hb("task-watcher","watcher","trading",mode="trading")})
st = await node_monitor.collect_status(redis=r)
assert st["paused"] is True and st["paused_reason"] == "trading"
@pytest.mark.asyncio
async def test_trader_http_pull_link():
r = FakeRedis(kv={"worker:ai_trade:heartbeat": _hb("ai_trade","trader","market_open")})
st = await node_monitor.collect_status(redis=r)
link = next(l for l in st["links"] if l["from"] == "ai_trade")
assert link["type"] == "http-pull" and link["status"] == "healthy"
```
- [ ] **Step 2: Run test to verify it fails**
Run: `python -m pytest agent-office/tests/test_node_monitor.py -v`
Expected: FAIL with `ModuleNotFoundError: No module named 'app.node_monitor'`
- [ ] **Step 3: Write minimal implementation**
```python
# agent-office/app/node_monitor.py
"""분산 워커 상태 집계 (read-only). Global Constraints 계약 2 스키마 생성."""
from __future__ import annotations
import datetime as dt, json, logging
import redis.asyncio as aioredis
from .config import REDIS_URL
logger = logging.getLogger("agent-office.node_monitor")
WORKER_REGISTRY = [
{"name": "music-render", "kind": "render", "queue": "queue:music-render"},
{"name": "video-render", "kind": "render", "queue": "queue:video-render"},
{"name": "image-render", "kind": "render", "queue": "queue:image-render"},
{"name": "insta-render", "kind": "render", "queue": "queue:insta-render"},
{"name": "task-watcher", "kind": "watcher", "queue": None},
{"name": "ai_trade", "kind": "trader", "queue": None},
]
_redis = None
def _get_redis():
global _redis
if _redis is None:
_redis = aioredis.from_url(REDIS_URL, decode_responses=False)
return _redis
def _beat_age(ts_str, now):
try:
beat = dt.datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
return max(0, int((now - beat).total_seconds()))
except Exception:
return None
def _render_link_status(w):
if not w["alive"]:
return "down"
if w["state"] == "paused":
return "paused"
if w["dead_letter"] > 0:
return "degraded"
return "healthy"
async def collect_status(redis=None) -> dict:
r = redis or _get_redis()
now = dt.datetime.now(dt.timezone.utc)
out = {"redis_ok": True, "paused": False, "paused_reason": None,
"generated_at": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
"workers": [], "links": []}
try:
out["paused"] = (await r.get("queue:paused")) == b"1"
except Exception:
logger.exception("redis 접근 실패")
out["redis_ok"] = False
return out
for w in WORKER_REGISTRY:
info = {"name": w["name"], "kind": w["kind"], "alive": False, "state": None,
"last_beat_age_s": None, "queue_depth": 0, "dead_letter": 0,
"processing": 0, "jobs_done": 0, "jobs_failed": 0, "last_job_at": None}
raw = await r.get(f"worker:{w['name']}:heartbeat")
if raw:
try:
hb = json.loads(raw)
info.update(alive=True, state=hb.get("state"),
jobs_done=hb.get("jobs_done", 0), jobs_failed=hb.get("jobs_failed", 0),
last_job_at=hb.get("last_job_at"),
last_beat_age_s=_beat_age(hb.get("ts", ""), now))
if w["kind"] == "watcher" and hb.get("mode"):
out["paused_reason"] = hb["mode"]
except json.JSONDecodeError:
logger.warning("heartbeat JSON 파싱 실패 name=%s", w["name"])
if w["queue"]:
info["queue_depth"] = await r.llen(w["queue"])
info["dead_letter"] = await r.llen(f"dead_letter:{w['queue']}")
proc = 0
async for key in r.scan_iter(match=f"processing:{w['queue']}:*"):
proc += await r.llen(key)
info["processing"] = proc
out["workers"].append(info)
for w in out["workers"]:
if w["kind"] == "trader":
out["links"].append({"from": "ai_trade", "to": "nas-stock", "type": "http-pull",
"status": "healthy" if w["alive"] else "down"})
elif w["kind"] == "render":
out["links"].append({"from": "nas", "to": w["name"], "type": "redis-queue",
"status": _render_link_status(w)})
if out["paused"] and not out["paused_reason"]:
out["paused_reason"] = "trading"
return out
```
- [ ] **Step 4: Run test to verify it passes**
Run: `python -m pytest agent-office/tests/test_node_monitor.py -v`
Expected: PASS (5 passed)
- [ ] **Step 5: Commit**
```bash
git add agent-office/app/node_monitor.py agent-office/tests/test_node_monitor.py
git commit -m "feat(agent-office): node_monitor.collect_status (heartbeat+큐+dead-letter 집계)"
```
## Task B3: GET /api/agent-office/nodes 엔드포인트
**Files:**
- Modify: `agent-office/app/main.py`
- Test: `agent-office/tests/test_nodes_endpoint.py`
**Interfaces:**
- Consumes: `node_monitor.collect_status`
- Produces: `GET /api/agent-office/nodes` → 계약 2 JSON
- [ ] **Step 1: Write the failing test**
```python
# agent-office/tests/test_nodes_endpoint.py
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def client(monkeypatch):
from app import main
async def fake_collect(redis=None):
return {"redis_ok": True, "paused": False, "paused_reason": None,
"generated_at": "2026-06-29T00:00:00Z", "workers": [], "links": []}
monkeypatch.setattr("app.node_monitor.collect_status", fake_collect)
return TestClient(main.app)
def test_nodes_endpoint_returns_contract(client):
resp = client.get("/api/agent-office/nodes")
assert resp.status_code == 200
body = resp.json()
assert set(["redis_ok","paused","workers","links"]).issubset(body)
```
- [ ] **Step 2: Run test to verify it fails**
Run: `python -m pytest agent-office/tests/test_nodes_endpoint.py -v`
Expected: FAIL with 404 (route 없음)
- [ ] **Step 3: Write minimal implementation** (`main.py` 엔드포인트 추가 — `/states` 근처)
```python
@app.get("/api/agent-office/nodes")
async def nodes_status():
from .node_monitor import collect_status
return await collect_status()
```
- [ ] **Step 4: Run test to verify it passes**
Run: `python -m pytest agent-office/tests/test_nodes_endpoint.py -v`
Expected: PASS
- [ ] **Step 5: Commit**
```bash
git add agent-office/app/main.py agent-office/tests/test_nodes_endpoint.py
git commit -m "feat(agent-office): GET /api/agent-office/nodes 엔드포인트"
```
## Task B4: 노드 헬스 경보 cron
**Files:**
- Modify: `agent-office/app/node_monitor.py` (`check_and_alert` 추가)
- Modify: `agent-office/app/scheduler.py` (interval job 등록)
- Test: `agent-office/tests/test_node_monitor.py` (경보 전이 테스트 추가)
**Interfaces:**
- Consumes: `collect_status`, `telegram.messaging.send_raw`, `db.add_log`
- Produces: `async def check_and_alert(status=None) -> list[str]` (발송된 경보 텍스트 목록 — 테스트용 반환)
- [ ] **Step 1: Write the failing test**
```python
# test_node_monitor.py 에 추가
import app.node_monitor as nm
@pytest.mark.asyncio
async def test_alert_on_alive_to_dead(monkeypatch):
sent = []
async def fake_send_raw(text, **kw): sent.append(text); return {"ok": True}
monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw)
monkeypatch.setattr("app.db.add_log", lambda *a, **k: None)
nm._node_state.clear(); nm._dl_notified.clear()
alive = {"workers": [{"name":"image-render","alive":True,"dead_letter":0}], "links": []}
dead = {"workers": [{"name":"image-render","alive":False,"dead_letter":0}], "links": []}
await nm.check_and_alert(status=alive) # 첫 관측 — 경보 없음
assert sent == []
await nm.check_and_alert(status=dead) # alive→dead 전이
assert any("다운" in t for t in sent)
@pytest.mark.asyncio
async def test_alert_on_dead_letter_growth(monkeypatch):
sent = []
async def fake_send_raw(text, **kw): sent.append(text); return {"ok": True}
monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw)
monkeypatch.setattr("app.db.add_log", lambda *a, **k: None)
nm._node_state.clear(); nm._dl_notified.clear()
s = {"workers": [{"name":"video-render","alive":True,"dead_letter":2}], "links": []}
await nm.check_and_alert(status=s)
assert any("dead-letter" in t for t in sent)
```
- [ ] **Step 2: Run test to verify it fails**
Run: `python -m pytest agent-office/tests/test_node_monitor.py -v`
Expected: FAIL (`AttributeError: module has no attribute '_node_state'`/`check_and_alert`)
- [ ] **Step 3: Write minimal implementation** (`node_monitor.py`에 추가)
```python
from .config import NODE_ALERT_DEADLETTER_THRESHOLD
_node_state: dict[str, bool] = {} # name -> 직전 alive
_dl_notified: dict[str, int] = {} # name -> 직전 알린 dead_letter 수
async def check_and_alert(status=None) -> list[str]:
from .telegram.messaging import send_raw
from .db import add_log
st = status or await collect_status()
sent: list[str] = []
for w in st["workers"]:
name, alive = w["name"], w.get("alive", False)
prev = _node_state.get(name)
if prev is True and not alive:
text = f"🔴 [{name}] 워커 다운"
if (await send_raw(text=text)).get("ok"):
add_log("node_monitor", f"{name} 다운", "warning"); sent.append(text)
elif prev is False and alive:
text = f"🟢 [{name}] 워커 복구"
if (await send_raw(text=text)).get("ok"):
add_log("node_monitor", f"{name} 복구", "info"); sent.append(text)
_node_state[name] = alive
dl = w.get("dead_letter", 0)
if dl >= NODE_ALERT_DEADLETTER_THRESHOLD and dl != _dl_notified.get(name, 0):
text = f"❌ [{name}] 실패 누적 {dl}건 (dead-letter)"
if (await send_raw(text=text)).get("ok"):
add_log("node_monitor", f"{name} dead-letter {dl}", "warning"); sent.append(text)
_dl_notified[name] = dl
elif dl == 0:
_dl_notified.pop(name, None)
return sent
```
> 함정: 첫 관측(prev=None)엔 경보 없음 — 부팅 시 false alarm 방지. alive→dead 전이만 잡으므로 "2주 사일런트 사망"(살아있다 죽음) 시나리오는 포착, 부팅 시 이미 dead인 워커는 미경보(Phase 2에서 보강).
`scheduler.py` — import + job 등록:
```python
from . import node_monitor # 상단
async def _run_node_health_check():
await node_monitor.check_and_alert()
# init_scheduler() 안, _poll_pipelines 근처:
scheduler.add_job(_run_node_health_check, "interval", seconds=60, id="node_health_check", replace_existing=True)
```
- [ ] **Step 4: Run test to verify it passes**
Run: `python -m pytest agent-office/tests/ -v`
Expected: PASS (전체)
- [ ] **Step 5: Commit**
```bash
git add agent-office/app/node_monitor.py agent-office/app/scheduler.py agent-office/tests/test_node_monitor.py
git commit -m "feat(agent-office): 노드 헬스 1분 cron + 텔레그램 경보(다운/복구/dead-letter)"
```
## Task B5: 배포 + 운영 검증
**Files:** (코드 변경 없음 — 배포·검증)
- [ ] **Step 1: nas-deploy 락 획득 + push 배포**
```bash
# acquire_lock("nas-deploy","BE") 후
git push # Gitea webhook → deployer 자동 배포 (agent-office rebuild: redis 의존성 반영)
```
- [ ] **Step 2: 운영 검증** (배포 완료 후)
```bash
# /nodes 응답 확인 (워커 미배포 상태면 전부 alive=false 예상)
curl -s https://gahusb.synology.me/api/agent-office/nodes | python -m json.tool
```
Expected: `redis_ok:true`, workers 6종 노출. Part A 배포 전이면 alive 전부 false(정상 — heartbeat 아직 없음).
- [ ] **Step 3: release_lock("nas-deploy")**
---
# Part C — web-ui Three.js 대시보드 (FE 세션 소유)
> repo: `web-ui`. React + Vite. 빌드는 로컬에서만(NAS Celeron 빌드 금지). 배포는 `npm run release:nas`.
> **Three.js 시각화(C4)는 `designer` 스킬을 활성화해 구현** — 창의적 비주얼이라 line-by-line 사전 코드 대신 데이터 계약 + 상태→비주얼 매핑 + 수용 기준으로 명세한다.
## Task C1: 의존성 + 라우트 + 네비 등록
**Files:**
- Modify: `package.json` (three, @react-three/fiber, @react-three/drei)
- Modify: `src/routes.jsx` (lazy route + navLinks 엔트리)
- [ ] **Step 1: 의존성 설치**
```bash
npm install three @react-three/fiber @react-three/drei
```
- [ ] **Step 2: routes.jsx에 route 추가** (`appRoutes` 배열에 — agent-office 패턴과 동일 lazy)
```jsx
{
path: 'infra',
lazy: () => import('./pages/infra/InfraMonitor'),
},
```
- [ ] **Step 3: navLinks에 엔트리 추가**
```jsx
{
id: 'infra',
label: 'Infra',
path: '/infra',
subtitle: 'NODE PIPELINE',
description: 'NAS↔Windows 워커 파이프라인 실시간 상태',
icon: <span style={{fontSize:'1.2em'}}>🛰</span>,
accent: '#22d3ee',
},
```
- [ ] **Step 4: Commit**
```bash
git add package.json package-lock.json src/routes.jsx
git commit -m "feat(infra): three.js 의존성 + /infra 라우트·네비 등록"
```
## Task C2: api 헬퍼 + useNodeStatus 폴링 훅
**Files:**
- Modify: `src/api.js` (`getNodeStatus` 추가)
- Create: `src/pages/infra/useNodeStatus.js`
- Test: `src/pages/infra/useNodeStatus.test.js` (Vitest — 프로젝트 테스트 러너 확인)
**Interfaces:**
- Produces: `getNodeStatus()` → 계약 2 객체, `useNodeStatus(intervalMs=3000)``{data, error, loading}`
- [ ] **Step 1: Write the failing test**
```js
// src/pages/infra/useNodeStatus.test.js
import { renderHook, waitFor } from '@testing-library/react';
import { describe, it, expect, vi } from 'vitest';
import { useNodeStatus } from './useNodeStatus';
import * as api from '../../api';
describe('useNodeStatus', () => {
it('fetches node status', async () => {
vi.spyOn(api, 'getNodeStatus').mockResolvedValue({ redis_ok: true, workers: [], links: [] });
const { result } = renderHook(() => useNodeStatus(99999));
await waitFor(() => expect(result.current.data).toBeTruthy());
expect(result.current.data.redis_ok).toBe(true);
});
});
```
- [ ] **Step 2: Run test to verify it fails**
Run: `npm test -- useNodeStatus`
Expected: FAIL (`useNodeStatus`/`getNodeStatus` 미정의)
- [ ] **Step 3: Write minimal implementation**
`src/api.js`에 추가 (기존 fetch 헬퍼 패턴 따라 — 상대경로):
```js
export async function getNodeStatus() {
const res = await fetch('/api/agent-office/nodes');
if (!res.ok) throw new Error(`nodes ${res.status}`);
return res.json();
}
```
`src/pages/infra/useNodeStatus.js`:
```js
import { useEffect, useState, useRef } from 'react';
import { getNodeStatus } from '../../api';
export function useNodeStatus(intervalMs = 3000) {
const [data, setData] = useState(null);
const [error, setError] = useState(null);
const [loading, setLoading] = useState(true);
const timer = useRef(null);
useEffect(() => {
let alive = true;
async function tick() {
try {
const d = await getNodeStatus();
if (alive) { setData(d); setError(null); }
} catch (e) {
if (alive) setError(e);
} finally {
if (alive) setLoading(false);
}
}
tick();
timer.current = setInterval(tick, intervalMs);
return () => { alive = false; clearInterval(timer.current); };
}, [intervalMs]);
return { data, error, loading };
}
```
- [ ] **Step 4: Run test to verify it passes**
Run: `npm test -- useNodeStatus`
Expected: PASS
- [ ] **Step 5: Commit**
```bash
git add src/api.js src/pages/infra/useNodeStatus.js src/pages/infra/useNodeStatus.test.js
git commit -m "feat(infra): getNodeStatus + useNodeStatus 3초 폴링 훅"
```
## Task C3: 2D 폴백 패널 (먼저 동작하는 deliverable)
**Files:**
- Create: `src/pages/infra/InfraMonitor.jsx` (먼저 2D 카드 뷰)
- Create: `src/pages/infra/WorkerCard.jsx`
- Create: `src/pages/infra/statusVisual.js` (상태→색/라벨 매핑 — Three.js와 공유)
- Test: `src/pages/infra/statusVisual.test.js`
**Interfaces:**
- Produces: `linkColor(status)`, `workerStateLabel(worker)` — C4 Three.js가 재사용
- [ ] **Step 1: Write the failing test**
```js
// src/pages/infra/statusVisual.test.js
import { describe, it, expect } from 'vitest';
import { linkColor, workerStateLabel } from './statusVisual';
describe('statusVisual', () => {
it('maps link status to color', () => {
expect(linkColor('healthy')).toBe('#22c55e');
expect(linkColor('down')).toBe('#ef4444');
expect(linkColor('paused')).toBe('#f59e0b');
expect(linkColor('degraded')).toBe('#eab308');
});
it('labels dead worker', () => {
expect(workerStateLabel({ alive: false })).toMatch(/다운|down/i);
});
});
```
- [ ] **Step 2: Run test to verify it fails**
Run: `npm test -- statusVisual`
Expected: FAIL (모듈 없음)
- [ ] **Step 3: Write minimal implementation**
`src/pages/infra/statusVisual.js`:
```js
export const LINK_COLORS = {
healthy: '#22c55e', paused: '#f59e0b', degraded: '#eab308', down: '#ef4444',
};
export function linkColor(status) { return LINK_COLORS[status] || '#64748b'; }
export function workerStateLabel(w) {
if (!w.alive) return '🔴 다운';
if (w.state === 'paused') return '⏸ 작업중(트레이딩)';
if (w.state === 'busy') return '⚙️ 처리 중';
if (w.state === 'market_open') return '📈 장중';
if (w.state === 'market_closed') return '🌙 휴장';
return '🟢 대기';
}
```
`WorkerCard.jsx` + `InfraMonitor.jsx`: `useNodeStatus()`로 받은 `data.workers`를 카드 그리드로 렌더(이름·상태라벨·queue_depth·dead_letter·last_beat_age_s). `data.redis_ok===false`면 "집계 서버/Redis 연결 끊김" 배너. (표준 React 카드 — designer 스킬 불필요)
- [ ] **Step 4: Run test to verify it passes**
Run: `npm test -- statusVisual`
Expected: PASS
- [ ] **Step 5: Commit**
```bash
git add src/pages/infra/
git commit -m "feat(infra): 2D 워커 상태 패널 + statusVisual 매핑"
```
## Task C4: Three.js 파이프라인 시각화 (designer 스킬)
**Files:**
- Create: `src/pages/infra/PipelineScene.jsx` (r3f Canvas 토폴로지)
- Modify: `src/pages/infra/InfraMonitor.jsx` (3D Scene + 2D 패널 토글)
**구현 방식**: 이 Task는 **`designer` 스킬을 활성화**해 구현한다. 아래는 designer가 따라야 할 정확한 데이터 계약·상태 매핑·수용 기준이다.
- [ ] **Step 1: designer 스킬 활성화**`Skill(designer)` 호출 후 아래 명세로 진행.
- [ ] **Step 2: 토폴로지 + 데이터 계약**
- 입력: `useNodeStatus()``data`(계약 2). `data.links`로 파이프라인, `data.workers`로 노드 상태.
- 노드 배치: **좌측** NAS 서버(박스) + 게이트웨이 라벨 / **중앙** Redis 큐 버스(글로우 코어) / **우측** Windows 노드(박스) + 워커 sub-node 6개(`music/video/image/insta-render`, `task-watcher`, `ai_trade`).
- 파이프라인(튜브/라인): render 4종 = NAS→Redis→워커 경로(`type:redis-queue`). `ai_trade` = NAS stock⇄ai_trade 직결(`type:http-pull`, Redis 버스 우회).
- 색은 `statusVisual.linkColor(link.status)` 재사용.
- [ ] **Step 3: 상태별 애니메이션 (수용 기준)**
- `healthy`: 시안/그린 튜브 + 파티클이 흐름(흐르는 통신 — 사용자 핵심 요구). `state==='busy'`면 파티클 속도↑.
- `paused`: 앰버, 파티클 정지/감속 + "⏸ 작업중(트레이딩)" 라벨.
- `down`: 빨강 + 흐름 정지 + 끊긴 지점 스파크/단절 비주얼 + ⚠ 아이콘 + "last beat Xs ago".
- `degraded`: 튜브에 ❌ dead-letter 카운트 뱃지.
- `redis_ok===false`: 중앙 버스 전체 빨강 + "집계 서버 연결 끊김" 오버레이.
- 각 워커 노드 HUD: 이름, 상태 라벨(`workerStateLabel`), queue_depth, dead_letter.
- [ ] **Step 4: 폴백 + 검증**
- WebGL 미지원/모바일: C3의 2D 패널로 자동 폴백(토글 버튼 제공).
- 검증: `npm run dev`로 4가지 상태를 mock data로 강제 주입(`useNodeStatus`를 stub)해 healthy/paused/down/degraded 비주얼이 명확히 구분되는지 육안 확인. RC 환경이면 스크린샷/녹화로 사용자 확인.
- [ ] **Step 5: Commit**
```bash
git add src/pages/infra/PipelineScene.jsx src/pages/infra/InfraMonitor.jsx
git commit -m "feat(infra): three.js NAS↔Windows 파이프라인 시각화 (상태별 흐름/장애)"
```
- [ ] **Step 6: 배포**
```bash
npm run build && npm run release:nas
```
---
## 실행 순서 & 세션 협업 (co-gahusb)
1. **선행 게이트**: 본 plan의 Global Constraints(계약 2개)를 co-gahusb `post_message`로 FE/AI 세션에 공유 → 3 Part 병렬 시작 가능.
2. **Part B(BE, 이 세션)** 먼저 진행 가능 — FakeRedis 테스트로 워커 없이도 완결. B5 배포 후 `/nodes``alive:false`로라도 응답.
3. **Part A(AI 세션)** 완료·워커 재배포 시 `/nodes`에 alive:true 채워짐.
4. **Part C(FE 세션)**는 `/nodes` 계약만으로 mock 개발 가능 → 실데이터 연동.
5. 공유 리소스: B1의 docker-compose 변경 = `compose` 락, B5 배포 = `nas-deploy` 락.
## Self-Review (작성자 점검)
- **스펙 커버리지**: G1(heartbeat 6종)=A1~A4 / G2(큐·dead-letter·processing·paused 집계)=B2 / G3(텔레그램 경보)=B4 / G4(Three.js /infra)=C1~C4. 계약1=A1·Global, 계약2=B2·Global. ✓ 누락 없음.
- **Placeholder 스캔**: C4는 designer 스킬 위임이나 데이터 계약·상태 매핑·수용 기준을 구체 명시(placeholder 아님). 그 외 전 Task에 실제 코드 포함. ✓
- **타입 일관성**: `WorkerStats`(A1)↔worker.stats(A2), `collect_status` 반환(B2)↔`check_and_alert` 입력(B4)·`/nodes`(B3)·`useNodeStatus`(C2)·`linkColor`/`workerStateLabel`(C3 재사용 by C4) 일치. heartbeat 키/값(계약1)↔node_monitor 파싱(B2) 일치. ✓