Part A(web-ai heartbeat) / Part B(agent-office 집계+경보) / Part C(web-ui Three.js 대시보드). 각 Part 독립 실행·테스트 가능, 계약 2개를 Global Constraints로 잠금. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
1106 lines
44 KiB
Markdown
1106 lines
44 KiB
Markdown
# 분산 워커 관측 시스템 Implementation Plan
|
||
|
||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||
|
||
**Goal:** NAS↔Windows 분산 워커 6종의 생사·큐·실패 상태를 heartbeat로 관측하고, agent-office가 집계·텔레그램 경보하며, web-ui `/infra`에서 Three.js 파이프라인으로 시각화한다.
|
||
|
||
**Architecture:** 워커가 NAS Redis에 `worker:<name>:heartbeat`(TTL 45s) push → agent-office `node_monitor`가 heartbeat + ReliableQueue의 `processing:*`/`dead_letter:*` + `queue:paused`를 읽어 `GET /api/agent-office/nodes`로 노출 → 1분 cron이 상태 전이 시 텔레그램 경보 → web-ui가 3초 폴링해 토폴로지 시각화.
|
||
|
||
**Tech Stack:** Python 3.12 / FastAPI / redis.asyncio / APScheduler (web-backend·web-ai), React + Vite / three + @react-three/fiber + drei (web-ui).
|
||
|
||
## Global Constraints
|
||
|
||
이 계약 2개는 3 Part가 독립 병렬 작업하기 위한 잠금 규약. 모든 Task는 이 값을 그대로 사용한다.
|
||
|
||
**[계약 1] Heartbeat 키 스키마**
|
||
- 키: `worker:<name>:heartbeat` (name ∈ `music-render`, `video-render`, `image-render`, `insta-render`, `task-watcher`, `ai_trade`)
|
||
- 명령: `SET worker:<name>:heartbeat <json> EX 45`
|
||
- 값(JSON): `{"name": str, "kind": "render"|"watcher"|"trader", "state": str, "ts": "<UTC ISO8601 ...Z>", "last_job_at": str|null, "jobs_done": int, "jobs_failed": int, "mode": str(optional, task-watcher 전용)}`
|
||
- `state` kind별: render=`idle`|`busy`|`paused` / watcher=`trading`|`free` / trader=`market_open`|`market_closed`
|
||
- 발신 주기 15초, TTL 45초(3배). redis 클라이언트는 `decode_responses=False`(바이트) 기준 — 기존 워커와 동일.
|
||
|
||
**[계약 2] `GET /api/agent-office/nodes` 응답 스키마**
|
||
```json
|
||
{ "redis_ok": true, "paused": false, "paused_reason": "trading", "generated_at": "...Z",
|
||
"workers": [{"name":"image-render","kind":"render","alive":true,"state":"idle",
|
||
"last_beat_age_s":3,"queue_depth":0,"dead_letter":0,"processing":0,
|
||
"jobs_done":42,"jobs_failed":1,"last_job_at":"...Z"}],
|
||
"links": [{"from":"nas","to":"image-render","type":"redis-queue","status":"healthy"}] }
|
||
```
|
||
- `links[].status` ∈ `healthy`|`paused`|`down`|`degraded`, `links[].type` ∈ `redis-queue`|`http-pull`
|
||
|
||
**공통 규칙**: `.env` 커밋 금지. Docker는 NAS에서만 구동(로컬 docker 명령 금지). 각 repo는 자기 경로에서만 커밋. NAS Redis 주소 `redis://192.168.45.54:6379`(워커 측), 컨테이너 내부 `redis://redis:6379`(NAS 측).
|
||
|
||
---
|
||
|
||
# Part A — web-ai 워커 heartbeat (AI 세션 소유)
|
||
|
||
> repo: `web-ai`. 테스트: 시스템 Python(`C:\Users\jaeoh\AppData\Local\Programs\Python\Python312\python.exe -m pytest`) — `.venv` 한글경로 깨짐(web-ai CLAUDE.md).
|
||
|
||
## Task A1: 공용 heartbeat 모듈
|
||
|
||
**Files:**
|
||
- Create: `services/_shared/heartbeat.py`
|
||
- Test: `services/_shared/tests/test_heartbeat.py`
|
||
|
||
**Interfaces:**
|
||
- Produces: `class WorkerStats`(busy:bool, jobs_done:int, jobs_failed:int, last_job_at:str|None), `def utc_now_iso()->str`, `def build_payload(name,kind,state,stats,extra=None)->str`, `async def render_state(redis,stats,paused_key="queue:paused")->str`, `async def heartbeat_loop(redis,name,kind,stats,*,interval=15,ttl=45,paused_key="queue:paused",state_fn=None)`
|
||
|
||
- [ ] **Step 1: Write the failing test**
|
||
```python
|
||
# services/_shared/tests/test_heartbeat.py
|
||
import json
|
||
import pytest
|
||
from _shared.heartbeat import WorkerStats, build_payload, render_state
|
||
|
||
def test_build_payload_has_contract_fields():
|
||
s = WorkerStats(); s.jobs_done = 3; s.last_job_at = "2026-06-29T00:00:00Z"
|
||
payload = json.loads(build_payload("image-render", "render", "idle", s))
|
||
assert payload["name"] == "image-render"
|
||
assert payload["kind"] == "render"
|
||
assert payload["state"] == "idle"
|
||
assert payload["jobs_done"] == 3
|
||
assert payload["last_job_at"] == "2026-06-29T00:00:00Z"
|
||
assert payload["ts"].endswith("Z")
|
||
|
||
def test_build_payload_merges_extra():
|
||
payload = json.loads(build_payload("task-watcher", "watcher", "free", WorkerStats(), extra={"mode": "free"}))
|
||
assert payload["mode"] == "free"
|
||
|
||
class _FakeRedis:
|
||
def __init__(self, paused): self._paused = paused
|
||
async def get(self, key): return b"1" if self._paused else None
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_render_state_paused_overrides_busy():
|
||
s = WorkerStats(); s.busy = True
|
||
assert await render_state(_FakeRedis(paused=True), s) == "paused"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_render_state_busy_then_idle():
|
||
s = WorkerStats(); s.busy = True
|
||
assert await render_state(_FakeRedis(paused=False), s) == "busy"
|
||
s.busy = False
|
||
assert await render_state(_FakeRedis(paused=False), s) == "idle"
|
||
```
|
||
|
||
- [ ] **Step 2: Run test to verify it fails**
|
||
|
||
Run: `python -m pytest services/_shared/tests/test_heartbeat.py -v`
|
||
Expected: FAIL with `ModuleNotFoundError: No module named '_shared.heartbeat'`
|
||
|
||
- [ ] **Step 3: Write minimal implementation**
|
||
```python
|
||
# services/_shared/heartbeat.py
|
||
"""분산 워커 heartbeat — worker:<name>:heartbeat SET (TTL). Global Constraints 계약 1."""
|
||
from __future__ import annotations
|
||
import asyncio, datetime as dt, json, logging, os
|
||
|
||
logger = logging.getLogger(__name__)
|
||
DEFAULT_INTERVAL = int(os.getenv("HEARTBEAT_INTERVAL", "15"))
|
||
DEFAULT_TTL = int(os.getenv("HEARTBEAT_TTL", "45"))
|
||
|
||
|
||
class WorkerStats:
|
||
"""worker_loop가 갱신, heartbeat_loop가 읽는 가변 카운터."""
|
||
def __init__(self):
|
||
self.busy = False
|
||
self.jobs_done = 0
|
||
self.jobs_failed = 0
|
||
self.last_job_at = None # ISO str | None
|
||
|
||
|
||
def utc_now_iso() -> str:
|
||
return dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||
|
||
|
||
def build_payload(name: str, kind: str, state: str, stats: WorkerStats, extra: dict | None = None) -> str:
|
||
payload = {
|
||
"name": name, "kind": kind, "state": state, "ts": utc_now_iso(),
|
||
"last_job_at": stats.last_job_at,
|
||
"jobs_done": stats.jobs_done, "jobs_failed": stats.jobs_failed,
|
||
}
|
||
if extra:
|
||
payload.update(extra)
|
||
return json.dumps(payload)
|
||
|
||
|
||
async def render_state(redis, stats: WorkerStats, paused_key: str = "queue:paused") -> str:
|
||
if await redis.get(paused_key) == b"1":
|
||
return "paused"
|
||
return "busy" if stats.busy else "idle"
|
||
|
||
|
||
async def heartbeat_loop(redis, name, kind, stats, *, interval=DEFAULT_INTERVAL,
|
||
ttl=DEFAULT_TTL, paused_key="queue:paused", state_fn=None):
|
||
key = f"worker:{name}:heartbeat"
|
||
logger.info("heartbeat 시작 name=%s ttl=%ds", name, ttl)
|
||
while True:
|
||
try:
|
||
if state_fn is not None:
|
||
state, extra = await state_fn(redis, stats)
|
||
else:
|
||
state, extra = await render_state(redis, stats, paused_key), None
|
||
await redis.set(key, build_payload(name, kind, state, stats, extra), ex=ttl)
|
||
except asyncio.CancelledError:
|
||
raise
|
||
except Exception:
|
||
logger.exception("heartbeat 발신 실패 name=%s", name)
|
||
await asyncio.sleep(interval)
|
||
```
|
||
|
||
- [ ] **Step 4: Run test to verify it passes**
|
||
|
||
Run: `python -m pytest services/_shared/tests/test_heartbeat.py -v`
|
||
Expected: PASS (4 passed)
|
||
|
||
- [ ] **Step 5: Commit**
|
||
```bash
|
||
git add services/_shared/heartbeat.py services/_shared/tests/test_heartbeat.py
|
||
git commit -m "feat(_shared): 워커 heartbeat 모듈 (worker:<name>:heartbeat TTL SET)"
|
||
```
|
||
|
||
## Task A2: image-render 워커에 heartbeat 배선 (+ 나머지 3 render 워커 동형 적용)
|
||
|
||
**Files:**
|
||
- Modify: `services/image-render/worker.py` (stats 추가 + poll_once 카운터)
|
||
- Modify: `services/image-render/main.py` (lifespan에 heartbeat_loop 태스크)
|
||
- Test: `services/image-render/tests/test_worker.py` (poll_once 카운터 검증 추가)
|
||
|
||
**Interfaces:**
|
||
- Consumes: `_shared.heartbeat.WorkerStats`, `heartbeat_loop`, `utc_now_iso`
|
||
- Produces: `worker.stats`(모듈 레벨 WorkerStats) — main.py가 heartbeat에 전달
|
||
|
||
- [ ] **Step 1: Write the failing test** (poll_once가 성공 시 jobs_done 증가·busy 복귀)
|
||
```python
|
||
# services/image-render/tests/test_worker.py 에 추가
|
||
import asyncio, pytest
|
||
import worker
|
||
|
||
class _OneJobQueue:
|
||
def __init__(self): self.acked = False
|
||
async def dequeue(self, timeout=5):
|
||
if self.acked: return None
|
||
return ({"job_type": "flux_generation", "task_id": "t1", "params": {}}, b"raw")
|
||
async def ack(self, raw): self.acked = True
|
||
async def fail(self, raw, payload): pass
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_poll_once_increments_jobs_done(monkeypatch):
|
||
worker.stats.jobs_done = 0
|
||
monkeypatch.setattr(worker, "run_flux_generation", lambda task_id, params: None)
|
||
handled = await worker.poll_once(_OneJobQueue())
|
||
assert handled is True
|
||
assert worker.stats.jobs_done == 1
|
||
assert worker.stats.busy is False
|
||
assert worker.stats.last_job_at is not None
|
||
```
|
||
|
||
- [ ] **Step 2: Run test to verify it fails**
|
||
|
||
Run: `python -m pytest services/image-render/tests/test_worker.py::test_poll_once_increments_jobs_done -v`
|
||
Expected: FAIL with `AttributeError: module 'worker' has no attribute 'stats'`
|
||
|
||
- [ ] **Step 3: Write minimal implementation**
|
||
|
||
`worker.py` — import + 모듈 레벨 stats 추가, `poll_once` 교체:
|
||
```python
|
||
from _shared.heartbeat import WorkerStats, utc_now_iso # 상단 import 블록에 추가
|
||
|
||
stats = WorkerStats() # 모듈 레벨 (REDIS_URL 상수 근처)
|
||
|
||
async def poll_once(queue: ReliableQueue) -> bool:
|
||
result = await queue.dequeue(timeout=5)
|
||
if result is None:
|
||
return False
|
||
payload, raw = result
|
||
stats.busy = True
|
||
try:
|
||
await asyncio.to_thread(_dispatch, payload)
|
||
except Exception:
|
||
logger.exception("dispatch unhandled exception task_id=%s", payload.get("task_id"))
|
||
await queue.fail(raw, payload)
|
||
stats.jobs_failed += 1
|
||
stats.last_job_at = utc_now_iso()
|
||
stats.busy = False
|
||
return True
|
||
await queue.ack(raw)
|
||
stats.jobs_done += 1
|
||
stats.last_job_at = utc_now_iso()
|
||
stats.busy = False
|
||
return True
|
||
```
|
||
|
||
`main.py` — lifespan에 heartbeat 태스크 추가:
|
||
```python
|
||
import os
|
||
import redis.asyncio as aioredis
|
||
from _shared.heartbeat import heartbeat_loop
|
||
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI):
|
||
worker_task = asyncio.create_task(worker.worker_loop())
|
||
hb_redis = aioredis.from_url(os.getenv("REDIS_URL", "redis://192.168.45.54:6379"), decode_responses=False)
|
||
hb_task = asyncio.create_task(heartbeat_loop(hb_redis, "image-render", "render", worker.stats))
|
||
logger.info("image-render lifespan 시작")
|
||
try:
|
||
yield
|
||
finally:
|
||
for t in (worker_task, hb_task):
|
||
t.cancel()
|
||
try:
|
||
await t
|
||
except asyncio.CancelledError:
|
||
pass
|
||
await hb_redis.aclose()
|
||
logger.info("image-render lifespan 종료")
|
||
```
|
||
|
||
- [ ] **Step 4: Run test to verify it passes**
|
||
|
||
Run: `python -m pytest services/image-render/tests/ -v`
|
||
Expected: PASS (기존 + 신규 1)
|
||
|
||
- [ ] **Step 5: 나머지 3 render 워커 동형 적용**
|
||
|
||
`video-render`, `music-render`, `insta-render` 각각에 Step 3과 **동일 패턴** 적용. 유일한 차이는 main.py의 워커 이름 문자열:
|
||
- `services/video-render/main.py`: `heartbeat_loop(hb_redis, "video-render", "render", worker.stats)`
|
||
- `services/music-render/main.py`: `heartbeat_loop(hb_redis, "music-render", "render", worker.stats)`
|
||
- `services/insta-render/main.py`: `heartbeat_loop(hb_redis, "insta-render", "render", worker.stats)`
|
||
- 각 `worker.py`에 `from _shared.heartbeat import WorkerStats, utc_now_iso` + `stats = WorkerStats()` + poll_once 카운터(ack→`jobs_done`, fail→`jobs_failed`, 둘 다 `last_job_at`/`busy` 갱신). music-render는 worker.py 구조가 더 복잡(12 job_type)하지만 hook 지점은 동일(dispatch 전 `busy=True`, ack/fail 후 카운터+`busy=False`).
|
||
- 각 `tests/test_worker.py`에 Step 1 테스트를 워커별 job_type으로 맞춰 추가.
|
||
|
||
- [ ] **Step 6: Commit**
|
||
```bash
|
||
git add services/image-render services/video-render services/music-render services/insta-render
|
||
git commit -m "feat(render-workers): 4 render 워커 heartbeat 배선 + poll_once 카운터"
|
||
```
|
||
|
||
## Task A3: task-watcher heartbeat (mode 포함)
|
||
|
||
**Files:**
|
||
- Modify: `services/task-watcher/watcher.py`
|
||
- Test: `services/task-watcher/tests/test_watcher.py` (신규)
|
||
|
||
**Interfaces:**
|
||
- Consumes: `_shared.heartbeat.build_payload`, `WorkerStats`
|
||
- Produces: `worker:task-watcher:heartbeat` 값에 `state`=mode, `mode`=mode
|
||
|
||
- [ ] **Step 1: Write the failing test**
|
||
```python
|
||
# services/task-watcher/tests/test_watcher.py
|
||
import json
|
||
from _shared.heartbeat import build_payload, WorkerStats
|
||
|
||
def test_watcher_heartbeat_payload_carries_mode():
|
||
payload = json.loads(build_payload("task-watcher", "watcher", "trading", WorkerStats(), extra={"mode": "trading"}))
|
||
assert payload["kind"] == "watcher"
|
||
assert payload["state"] == "trading"
|
||
assert payload["mode"] == "trading"
|
||
```
|
||
|
||
- [ ] **Step 2: Run test to verify it fails**
|
||
|
||
Run: `python -m pytest services/task-watcher/tests/test_watcher.py -v`
|
||
Expected: FAIL (`ModuleNotFoundError` 또는 import 경로 — task-watcher의 PYTHONPATH에 `_shared` 추가 필요. compose `PYTHONPATH=/app:/shared` 확인)
|
||
|
||
- [ ] **Step 3: Write minimal implementation** (`watcher_loop`에 heartbeat SET 추가)
|
||
```python
|
||
# watcher.py 상단 import
|
||
from _shared.heartbeat import build_payload, WorkerStats
|
||
_HB_STATS = WorkerStats()
|
||
HEARTBEAT_KEY = "worker:task-watcher:heartbeat"
|
||
HEARTBEAT_TTL = 45
|
||
|
||
# watcher_loop while 본문 — mode 판정 직후 추가:
|
||
mode = current_mode(now, holidays)
|
||
if mode == "trading":
|
||
await redis.set(PAUSED_KEY, b"1", ex=PAUSED_TTL)
|
||
else:
|
||
await redis.delete(PAUSED_KEY)
|
||
# heartbeat (LOOP_INTERVAL=30s < TTL 45s)
|
||
await redis.set(HEARTBEAT_KEY,
|
||
build_payload("task-watcher", "watcher", mode, _HB_STATS, extra={"mode": mode}),
|
||
ex=HEARTBEAT_TTL)
|
||
```
|
||
> 참고: watcher 루프 주기 30s < TTL 45s 라 만료 전 갱신됨.
|
||
|
||
- [ ] **Step 4: Run test to verify it passes**
|
||
|
||
Run: `python -m pytest services/task-watcher/tests/test_watcher.py -v`
|
||
Expected: PASS
|
||
|
||
- [ ] **Step 5: Commit**
|
||
```bash
|
||
git add services/task-watcher
|
||
git commit -m "feat(task-watcher): heartbeat 발신 (state=mode, paused 이유 노출)"
|
||
```
|
||
|
||
## Task A4: ai_trade heartbeat (다른 런타임)
|
||
|
||
**Files:**
|
||
- Modify: `ai_trade/requirements.txt` 또는 루트 `requirements.txt` (redis 추가 — 기존 미포함 시)
|
||
- Create: `ai_trade/heartbeat.py` (자체 미니 — 호스트 실행이라 `_shared` import 경로 다름)
|
||
- Modify: `ai_trade/main.py` (lifespan에 heartbeat 태스크)
|
||
- Test: `ai_trade/tests/test_heartbeat.py`
|
||
|
||
**Interfaces:**
|
||
- Produces: `worker:ai_trade:heartbeat` (kind=`trader`, state=`market_open`|`market_closed`)
|
||
- Consumes: `ai_trade/scheduler.py`의 폴링 윈도우 판정 함수 (AI 세션이 정확한 함수명 확인 — web-ai CLAUDE.md상 `scheduler.py`가 polling window 판정 소유)
|
||
|
||
- [ ] **Step 1: Write the failing test**
|
||
```python
|
||
# ai_trade/tests/test_heartbeat.py
|
||
import json
|
||
from heartbeat import build_trader_payload
|
||
|
||
def test_trader_payload_market_open():
|
||
p = json.loads(build_trader_payload("market_open", signals=2))
|
||
assert p["name"] == "ai_trade"
|
||
assert p["kind"] == "trader"
|
||
assert p["state"] == "market_open"
|
||
assert p["ts"].endswith("Z")
|
||
```
|
||
|
||
- [ ] **Step 2: Run test to verify it fails**
|
||
|
||
Run: `python -m pytest ai_trade/tests/test_heartbeat.py -v`
|
||
Expected: FAIL (`ModuleNotFoundError: heartbeat`)
|
||
|
||
- [ ] **Step 3: Write minimal implementation**
|
||
```python
|
||
# ai_trade/heartbeat.py
|
||
"""ai_trade heartbeat — NAS Redis로 worker:ai_trade:heartbeat SET. Global Constraints 계약 1."""
|
||
from __future__ import annotations
|
||
import asyncio, datetime as dt, json, logging, os
|
||
import redis.asyncio as aioredis
|
||
|
||
logger = logging.getLogger(__name__)
|
||
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
|
||
KEY = "worker:ai_trade:heartbeat"
|
||
INTERVAL = int(os.getenv("HEARTBEAT_INTERVAL", "15"))
|
||
TTL = int(os.getenv("HEARTBEAT_TTL", "45"))
|
||
|
||
|
||
def build_trader_payload(state: str, signals: int = 0) -> str:
|
||
return json.dumps({
|
||
"name": "ai_trade", "kind": "trader", "state": state,
|
||
"ts": dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
|
||
"last_job_at": None, "jobs_done": signals, "jobs_failed": 0,
|
||
})
|
||
|
||
|
||
async def heartbeat_loop(state_fn):
|
||
"""state_fn() -> (state:str, signals:int). poll window는 호출자가 주입."""
|
||
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
|
||
try:
|
||
while True:
|
||
try:
|
||
state, signals = state_fn()
|
||
await redis.set(KEY, build_trader_payload(state, signals), ex=TTL)
|
||
except asyncio.CancelledError:
|
||
raise
|
||
except Exception:
|
||
logger.exception("ai_trade heartbeat 실패")
|
||
await asyncio.sleep(INTERVAL)
|
||
finally:
|
||
await redis.aclose()
|
||
```
|
||
|
||
`ai_trade/main.py` lifespan에 추가 (AI 세션이 실제 폴링 윈도우 판정으로 state_fn 구성):
|
||
```python
|
||
import heartbeat as _hb
|
||
# lifespan 내, poll_loop task 생성 부근:
|
||
def _trader_state():
|
||
# scheduler의 폴링 윈도우 판정 결과로 market_open/market_closed 결정
|
||
# (AI 세션: scheduler의 실제 윈도우 함수로 교체)
|
||
state = "market_open" if scheduler.is_polling_window() else "market_closed"
|
||
return state, len(state_singleton.signals)
|
||
hb_task = asyncio.create_task(_hb.heartbeat_loop(_trader_state))
|
||
# 종료 시 hb_task.cancel() + await (CancelledError 무시)
|
||
```
|
||
|
||
`requirements.txt`에 `redis>=5.0` 추가(미포함 시).
|
||
|
||
- [ ] **Step 4: Run test to verify it passes**
|
||
|
||
Run: `python -m pytest ai_trade/tests/test_heartbeat.py -v`
|
||
Expected: PASS
|
||
|
||
- [ ] **Step 5: Commit**
|
||
```bash
|
||
git add ai_trade/heartbeat.py ai_trade/main.py ai_trade/tests/test_heartbeat.py requirements.txt
|
||
git commit -m "feat(ai_trade): NAS Redis heartbeat (trader market_open/closed)"
|
||
```
|
||
|
||
---
|
||
|
||
# Part B — web-backend agent-office 집계 + 경보 (이 BE 세션 소유)
|
||
|
||
> repo: `web-backend`. 테스트: agent-office는 pytest + pytest-asyncio. 로컬에서 pytest 실행 가능(docker 아님).
|
||
|
||
## Task B1: agent-office에 redis 의존성 추가
|
||
|
||
**Files:**
|
||
- Modify: `agent-office/requirements.txt`
|
||
- Modify: `agent-office/app/config.py`
|
||
- Modify: `docker-compose.yml` (agent-office 블록 — `compose` 락 필요)
|
||
|
||
**Interfaces:**
|
||
- Produces: `config.REDIS_URL`, agent-office 컨테이너에 `REDIS_URL` env + `depends_on: redis`
|
||
|
||
- [ ] **Step 1: requirements.txt에 redis 추가**
|
||
```
|
||
redis>=5.0
|
||
```
|
||
(파일 끝에 한 줄 추가)
|
||
|
||
- [ ] **Step 2: config.py에 REDIS_URL + 임계 추가**
|
||
```python
|
||
# config.py 끝에 추가
|
||
# Redis (node monitor)
|
||
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379")
|
||
NODE_ALERT_DEADLETTER_THRESHOLD = int(os.getenv("NODE_ALERT_DEADLETTER_THRESHOLD", "1"))
|
||
```
|
||
|
||
- [ ] **Step 3: docker-compose.yml agent-office 블록 수정** (`acquire_lock("compose","BE")` 후)
|
||
|
||
agent-office 서비스 블록의 `environment:`에 추가:
|
||
```yaml
|
||
- REDIS_URL=${REDIS_URL:-redis://redis:6379}
|
||
```
|
||
`depends_on:`에 `redis` 추가(블록에 depends_on 없으면 신설):
|
||
```yaml
|
||
depends_on:
|
||
- redis
|
||
```
|
||
|
||
- [ ] **Step 4: Commit**
|
||
```bash
|
||
git add agent-office/requirements.txt agent-office/app/config.py docker-compose.yml
|
||
git commit -m "chore(agent-office): redis 의존성 + REDIS_URL/dead-letter 임계 설정"
|
||
```
|
||
|
||
## Task B2: node_monitor.collect_status
|
||
|
||
**Files:**
|
||
- Create: `agent-office/app/node_monitor.py`
|
||
- Test: `agent-office/tests/test_node_monitor.py`
|
||
|
||
**Interfaces:**
|
||
- Produces: `WORKER_REGISTRY`(list of dict), `async def collect_status(redis=None) -> dict` (계약 2 스키마)
|
||
- Consumes: `config.REDIS_URL`
|
||
|
||
- [ ] **Step 1: Write the failing test**
|
||
```python
|
||
# agent-office/tests/test_node_monitor.py
|
||
import json, pytest
|
||
from app import node_monitor
|
||
|
||
class FakeRedis:
|
||
"""worker heartbeat + queue llen + scan_iter 흉내."""
|
||
def __init__(self, kv=None, lists=None):
|
||
self._kv = kv or {} # key(str) -> bytes
|
||
self._lists = lists or {} # key(str) -> length(int)
|
||
async def get(self, key):
|
||
return self._kv.get(key)
|
||
async def llen(self, key):
|
||
return self._lists.get(key, 0)
|
||
async def scan_iter(self, match=None):
|
||
prefix = match.rstrip("*")
|
||
for k in list(self._lists):
|
||
if k.startswith(prefix):
|
||
yield k
|
||
|
||
def _hb(name, kind, state, **extra):
|
||
return json.dumps({"name": name, "kind": kind, "state": state, "ts": "2026-06-29T00:00:00Z",
|
||
"last_job_at": None, "jobs_done": 0, "jobs_failed": 0, **extra}).encode()
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_alive_worker_healthy_link():
|
||
r = FakeRedis(kv={"worker:image-render:heartbeat": _hb("image-render","render","idle")})
|
||
st = await node_monitor.collect_status(redis=r)
|
||
img = next(w for w in st["workers"] if w["name"] == "image-render")
|
||
assert img["alive"] is True and img["state"] == "idle"
|
||
link = next(l for l in st["links"] if l["to"] == "image-render")
|
||
assert link["status"] == "healthy" and link["type"] == "redis-queue"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_missing_heartbeat_is_dead_and_down():
|
||
r = FakeRedis() # heartbeat 없음
|
||
st = await node_monitor.collect_status(redis=r)
|
||
img = next(w for w in st["workers"] if w["name"] == "image-render")
|
||
assert img["alive"] is False
|
||
link = next(l for l in st["links"] if l["to"] == "image-render")
|
||
assert link["status"] == "down"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_dead_letter_makes_degraded():
|
||
r = FakeRedis(kv={"worker:video-render:heartbeat": _hb("video-render","render","idle")},
|
||
lists={"dead_letter:queue:video-render": 2})
|
||
st = await node_monitor.collect_status(redis=r)
|
||
vid = next(w for w in st["workers"] if w["name"] == "video-render")
|
||
assert vid["dead_letter"] == 2
|
||
link = next(l for l in st["links"] if l["to"] == "video-render")
|
||
assert link["status"] == "degraded"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_paused_reason_from_watcher():
|
||
r = FakeRedis(kv={"queue:paused": b"1",
|
||
"worker:task-watcher:heartbeat": _hb("task-watcher","watcher","trading",mode="trading")})
|
||
st = await node_monitor.collect_status(redis=r)
|
||
assert st["paused"] is True and st["paused_reason"] == "trading"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_trader_http_pull_link():
|
||
r = FakeRedis(kv={"worker:ai_trade:heartbeat": _hb("ai_trade","trader","market_open")})
|
||
st = await node_monitor.collect_status(redis=r)
|
||
link = next(l for l in st["links"] if l["from"] == "ai_trade")
|
||
assert link["type"] == "http-pull" and link["status"] == "healthy"
|
||
```
|
||
|
||
- [ ] **Step 2: Run test to verify it fails**
|
||
|
||
Run: `python -m pytest agent-office/tests/test_node_monitor.py -v`
|
||
Expected: FAIL with `ModuleNotFoundError: No module named 'app.node_monitor'`
|
||
|
||
- [ ] **Step 3: Write minimal implementation**
|
||
```python
|
||
# agent-office/app/node_monitor.py
|
||
"""분산 워커 상태 집계 (read-only). Global Constraints 계약 2 스키마 생성."""
|
||
from __future__ import annotations
|
||
import datetime as dt, json, logging
|
||
import redis.asyncio as aioredis
|
||
from .config import REDIS_URL
|
||
|
||
logger = logging.getLogger("agent-office.node_monitor")
|
||
|
||
WORKER_REGISTRY = [
|
||
{"name": "music-render", "kind": "render", "queue": "queue:music-render"},
|
||
{"name": "video-render", "kind": "render", "queue": "queue:video-render"},
|
||
{"name": "image-render", "kind": "render", "queue": "queue:image-render"},
|
||
{"name": "insta-render", "kind": "render", "queue": "queue:insta-render"},
|
||
{"name": "task-watcher", "kind": "watcher", "queue": None},
|
||
{"name": "ai_trade", "kind": "trader", "queue": None},
|
||
]
|
||
|
||
_redis = None
|
||
def _get_redis():
|
||
global _redis
|
||
if _redis is None:
|
||
_redis = aioredis.from_url(REDIS_URL, decode_responses=False)
|
||
return _redis
|
||
|
||
|
||
def _beat_age(ts_str, now):
|
||
try:
|
||
beat = dt.datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
|
||
return max(0, int((now - beat).total_seconds()))
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _render_link_status(w):
|
||
if not w["alive"]:
|
||
return "down"
|
||
if w["state"] == "paused":
|
||
return "paused"
|
||
if w["dead_letter"] > 0:
|
||
return "degraded"
|
||
return "healthy"
|
||
|
||
|
||
async def collect_status(redis=None) -> dict:
|
||
r = redis or _get_redis()
|
||
now = dt.datetime.now(dt.timezone.utc)
|
||
out = {"redis_ok": True, "paused": False, "paused_reason": None,
|
||
"generated_at": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
|
||
"workers": [], "links": []}
|
||
try:
|
||
out["paused"] = (await r.get("queue:paused")) == b"1"
|
||
except Exception:
|
||
logger.exception("redis 접근 실패")
|
||
out["redis_ok"] = False
|
||
return out
|
||
|
||
for w in WORKER_REGISTRY:
|
||
info = {"name": w["name"], "kind": w["kind"], "alive": False, "state": None,
|
||
"last_beat_age_s": None, "queue_depth": 0, "dead_letter": 0,
|
||
"processing": 0, "jobs_done": 0, "jobs_failed": 0, "last_job_at": None}
|
||
raw = await r.get(f"worker:{w['name']}:heartbeat")
|
||
if raw:
|
||
try:
|
||
hb = json.loads(raw)
|
||
info.update(alive=True, state=hb.get("state"),
|
||
jobs_done=hb.get("jobs_done", 0), jobs_failed=hb.get("jobs_failed", 0),
|
||
last_job_at=hb.get("last_job_at"),
|
||
last_beat_age_s=_beat_age(hb.get("ts", ""), now))
|
||
if w["kind"] == "watcher" and hb.get("mode"):
|
||
out["paused_reason"] = hb["mode"]
|
||
except json.JSONDecodeError:
|
||
logger.warning("heartbeat JSON 파싱 실패 name=%s", w["name"])
|
||
if w["queue"]:
|
||
info["queue_depth"] = await r.llen(w["queue"])
|
||
info["dead_letter"] = await r.llen(f"dead_letter:{w['queue']}")
|
||
proc = 0
|
||
async for key in r.scan_iter(match=f"processing:{w['queue']}:*"):
|
||
proc += await r.llen(key)
|
||
info["processing"] = proc
|
||
out["workers"].append(info)
|
||
|
||
for w in out["workers"]:
|
||
if w["kind"] == "trader":
|
||
out["links"].append({"from": "ai_trade", "to": "nas-stock", "type": "http-pull",
|
||
"status": "healthy" if w["alive"] else "down"})
|
||
elif w["kind"] == "render":
|
||
out["links"].append({"from": "nas", "to": w["name"], "type": "redis-queue",
|
||
"status": _render_link_status(w)})
|
||
if out["paused"] and not out["paused_reason"]:
|
||
out["paused_reason"] = "trading"
|
||
return out
|
||
```
|
||
|
||
- [ ] **Step 4: Run test to verify it passes**
|
||
|
||
Run: `python -m pytest agent-office/tests/test_node_monitor.py -v`
|
||
Expected: PASS (5 passed)
|
||
|
||
- [ ] **Step 5: Commit**
|
||
```bash
|
||
git add agent-office/app/node_monitor.py agent-office/tests/test_node_monitor.py
|
||
git commit -m "feat(agent-office): node_monitor.collect_status (heartbeat+큐+dead-letter 집계)"
|
||
```
|
||
|
||
## Task B3: GET /api/agent-office/nodes 엔드포인트
|
||
|
||
**Files:**
|
||
- Modify: `agent-office/app/main.py`
|
||
- Test: `agent-office/tests/test_nodes_endpoint.py`
|
||
|
||
**Interfaces:**
|
||
- Consumes: `node_monitor.collect_status`
|
||
- Produces: `GET /api/agent-office/nodes` → 계약 2 JSON
|
||
|
||
- [ ] **Step 1: Write the failing test**
|
||
```python
|
||
# agent-office/tests/test_nodes_endpoint.py
|
||
import pytest
|
||
from fastapi.testclient import TestClient
|
||
|
||
@pytest.fixture
|
||
def client(monkeypatch):
|
||
from app import main
|
||
async def fake_collect(redis=None):
|
||
return {"redis_ok": True, "paused": False, "paused_reason": None,
|
||
"generated_at": "2026-06-29T00:00:00Z", "workers": [], "links": []}
|
||
monkeypatch.setattr("app.node_monitor.collect_status", fake_collect)
|
||
return TestClient(main.app)
|
||
|
||
def test_nodes_endpoint_returns_contract(client):
|
||
resp = client.get("/api/agent-office/nodes")
|
||
assert resp.status_code == 200
|
||
body = resp.json()
|
||
assert set(["redis_ok","paused","workers","links"]).issubset(body)
|
||
```
|
||
|
||
- [ ] **Step 2: Run test to verify it fails**
|
||
|
||
Run: `python -m pytest agent-office/tests/test_nodes_endpoint.py -v`
|
||
Expected: FAIL with 404 (route 없음)
|
||
|
||
- [ ] **Step 3: Write minimal implementation** (`main.py` 엔드포인트 추가 — `/states` 근처)
|
||
```python
|
||
@app.get("/api/agent-office/nodes")
|
||
async def nodes_status():
|
||
from .node_monitor import collect_status
|
||
return await collect_status()
|
||
```
|
||
|
||
- [ ] **Step 4: Run test to verify it passes**
|
||
|
||
Run: `python -m pytest agent-office/tests/test_nodes_endpoint.py -v`
|
||
Expected: PASS
|
||
|
||
- [ ] **Step 5: Commit**
|
||
```bash
|
||
git add agent-office/app/main.py agent-office/tests/test_nodes_endpoint.py
|
||
git commit -m "feat(agent-office): GET /api/agent-office/nodes 엔드포인트"
|
||
```
|
||
|
||
## Task B4: 노드 헬스 경보 cron
|
||
|
||
**Files:**
|
||
- Modify: `agent-office/app/node_monitor.py` (`check_and_alert` 추가)
|
||
- Modify: `agent-office/app/scheduler.py` (interval job 등록)
|
||
- Test: `agent-office/tests/test_node_monitor.py` (경보 전이 테스트 추가)
|
||
|
||
**Interfaces:**
|
||
- Consumes: `collect_status`, `telegram.messaging.send_raw`, `db.add_log`
|
||
- Produces: `async def check_and_alert(status=None) -> list[str]` (발송된 경보 텍스트 목록 — 테스트용 반환)
|
||
|
||
- [ ] **Step 1: Write the failing test**
|
||
```python
|
||
# test_node_monitor.py 에 추가
|
||
import app.node_monitor as nm
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_alert_on_alive_to_dead(monkeypatch):
|
||
sent = []
|
||
async def fake_send_raw(text, **kw): sent.append(text); return {"ok": True}
|
||
monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw)
|
||
monkeypatch.setattr("app.db.add_log", lambda *a, **k: None)
|
||
nm._node_state.clear(); nm._dl_notified.clear()
|
||
alive = {"workers": [{"name":"image-render","alive":True,"dead_letter":0}], "links": []}
|
||
dead = {"workers": [{"name":"image-render","alive":False,"dead_letter":0}], "links": []}
|
||
await nm.check_and_alert(status=alive) # 첫 관측 — 경보 없음
|
||
assert sent == []
|
||
await nm.check_and_alert(status=dead) # alive→dead 전이
|
||
assert any("다운" in t for t in sent)
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_alert_on_dead_letter_growth(monkeypatch):
|
||
sent = []
|
||
async def fake_send_raw(text, **kw): sent.append(text); return {"ok": True}
|
||
monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw)
|
||
monkeypatch.setattr("app.db.add_log", lambda *a, **k: None)
|
||
nm._node_state.clear(); nm._dl_notified.clear()
|
||
s = {"workers": [{"name":"video-render","alive":True,"dead_letter":2}], "links": []}
|
||
await nm.check_and_alert(status=s)
|
||
assert any("dead-letter" in t for t in sent)
|
||
```
|
||
|
||
- [ ] **Step 2: Run test to verify it fails**
|
||
|
||
Run: `python -m pytest agent-office/tests/test_node_monitor.py -v`
|
||
Expected: FAIL (`AttributeError: module has no attribute '_node_state'`/`check_and_alert`)
|
||
|
||
- [ ] **Step 3: Write minimal implementation** (`node_monitor.py`에 추가)
|
||
```python
|
||
from .config import NODE_ALERT_DEADLETTER_THRESHOLD
|
||
|
||
_node_state: dict[str, bool] = {} # name -> 직전 alive
|
||
_dl_notified: dict[str, int] = {} # name -> 직전 알린 dead_letter 수
|
||
|
||
|
||
async def check_and_alert(status=None) -> list[str]:
|
||
from .telegram.messaging import send_raw
|
||
from .db import add_log
|
||
st = status or await collect_status()
|
||
sent: list[str] = []
|
||
for w in st["workers"]:
|
||
name, alive = w["name"], w.get("alive", False)
|
||
prev = _node_state.get(name)
|
||
if prev is True and not alive:
|
||
text = f"🔴 [{name}] 워커 다운"
|
||
if (await send_raw(text=text)).get("ok"):
|
||
add_log("node_monitor", f"{name} 다운", "warning"); sent.append(text)
|
||
elif prev is False and alive:
|
||
text = f"🟢 [{name}] 워커 복구"
|
||
if (await send_raw(text=text)).get("ok"):
|
||
add_log("node_monitor", f"{name} 복구", "info"); sent.append(text)
|
||
_node_state[name] = alive
|
||
dl = w.get("dead_letter", 0)
|
||
if dl >= NODE_ALERT_DEADLETTER_THRESHOLD and dl != _dl_notified.get(name, 0):
|
||
text = f"❌ [{name}] 실패 누적 {dl}건 (dead-letter)"
|
||
if (await send_raw(text=text)).get("ok"):
|
||
add_log("node_monitor", f"{name} dead-letter {dl}", "warning"); sent.append(text)
|
||
_dl_notified[name] = dl
|
||
elif dl == 0:
|
||
_dl_notified.pop(name, None)
|
||
return sent
|
||
```
|
||
> 함정: 첫 관측(prev=None)엔 경보 없음 — 부팅 시 false alarm 방지. alive→dead 전이만 잡으므로 "2주 사일런트 사망"(살아있다 죽음) 시나리오는 포착, 부팅 시 이미 dead인 워커는 미경보(Phase 2에서 보강).
|
||
|
||
`scheduler.py` — import + job 등록:
|
||
```python
|
||
from . import node_monitor # 상단
|
||
|
||
async def _run_node_health_check():
|
||
await node_monitor.check_and_alert()
|
||
|
||
# init_scheduler() 안, _poll_pipelines 근처:
|
||
scheduler.add_job(_run_node_health_check, "interval", seconds=60, id="node_health_check", replace_existing=True)
|
||
```
|
||
|
||
- [ ] **Step 4: Run test to verify it passes**
|
||
|
||
Run: `python -m pytest agent-office/tests/ -v`
|
||
Expected: PASS (전체)
|
||
|
||
- [ ] **Step 5: Commit**
|
||
```bash
|
||
git add agent-office/app/node_monitor.py agent-office/app/scheduler.py agent-office/tests/test_node_monitor.py
|
||
git commit -m "feat(agent-office): 노드 헬스 1분 cron + 텔레그램 경보(다운/복구/dead-letter)"
|
||
```
|
||
|
||
## Task B5: 배포 + 운영 검증
|
||
|
||
**Files:** (코드 변경 없음 — 배포·검증)
|
||
|
||
- [ ] **Step 1: nas-deploy 락 획득 + push 배포**
|
||
```bash
|
||
# acquire_lock("nas-deploy","BE") 후
|
||
git push # Gitea webhook → deployer 자동 배포 (agent-office rebuild: redis 의존성 반영)
|
||
```
|
||
|
||
- [ ] **Step 2: 운영 검증** (배포 완료 후)
|
||
```bash
|
||
# /nodes 응답 확인 (워커 미배포 상태면 전부 alive=false 예상)
|
||
curl -s https://gahusb.synology.me/api/agent-office/nodes | python -m json.tool
|
||
```
|
||
Expected: `redis_ok:true`, workers 6종 노출. Part A 배포 전이면 alive 전부 false(정상 — heartbeat 아직 없음).
|
||
|
||
- [ ] **Step 3: release_lock("nas-deploy")**
|
||
|
||
---
|
||
|
||
# Part C — web-ui Three.js 대시보드 (FE 세션 소유)
|
||
|
||
> repo: `web-ui`. React + Vite. 빌드는 로컬에서만(NAS Celeron 빌드 금지). 배포는 `npm run release:nas`.
|
||
> **Three.js 시각화(C4)는 `designer` 스킬을 활성화해 구현** — 창의적 비주얼이라 line-by-line 사전 코드 대신 데이터 계약 + 상태→비주얼 매핑 + 수용 기준으로 명세한다.
|
||
|
||
## Task C1: 의존성 + 라우트 + 네비 등록
|
||
|
||
**Files:**
|
||
- Modify: `package.json` (three, @react-three/fiber, @react-three/drei)
|
||
- Modify: `src/routes.jsx` (lazy route + navLinks 엔트리)
|
||
|
||
- [ ] **Step 1: 의존성 설치**
|
||
```bash
|
||
npm install three @react-three/fiber @react-three/drei
|
||
```
|
||
|
||
- [ ] **Step 2: routes.jsx에 route 추가** (`appRoutes` 배열에 — agent-office 패턴과 동일 lazy)
|
||
```jsx
|
||
{
|
||
path: 'infra',
|
||
lazy: () => import('./pages/infra/InfraMonitor'),
|
||
},
|
||
```
|
||
|
||
- [ ] **Step 3: navLinks에 엔트리 추가**
|
||
```jsx
|
||
{
|
||
id: 'infra',
|
||
label: 'Infra',
|
||
path: '/infra',
|
||
subtitle: 'NODE PIPELINE',
|
||
description: 'NAS↔Windows 워커 파이프라인 실시간 상태',
|
||
icon: <span style={{fontSize:'1.2em'}}>🛰️</span>,
|
||
accent: '#22d3ee',
|
||
},
|
||
```
|
||
|
||
- [ ] **Step 4: Commit**
|
||
```bash
|
||
git add package.json package-lock.json src/routes.jsx
|
||
git commit -m "feat(infra): three.js 의존성 + /infra 라우트·네비 등록"
|
||
```
|
||
|
||
## Task C2: api 헬퍼 + useNodeStatus 폴링 훅
|
||
|
||
**Files:**
|
||
- Modify: `src/api.js` (`getNodeStatus` 추가)
|
||
- Create: `src/pages/infra/useNodeStatus.js`
|
||
- Test: `src/pages/infra/useNodeStatus.test.js` (Vitest — 프로젝트 테스트 러너 확인)
|
||
|
||
**Interfaces:**
|
||
- Produces: `getNodeStatus()` → 계약 2 객체, `useNodeStatus(intervalMs=3000)` → `{data, error, loading}`
|
||
|
||
- [ ] **Step 1: Write the failing test**
|
||
```js
|
||
// src/pages/infra/useNodeStatus.test.js
|
||
import { renderHook, waitFor } from '@testing-library/react';
|
||
import { describe, it, expect, vi } from 'vitest';
|
||
import { useNodeStatus } from './useNodeStatus';
|
||
import * as api from '../../api';
|
||
|
||
describe('useNodeStatus', () => {
|
||
it('fetches node status', async () => {
|
||
vi.spyOn(api, 'getNodeStatus').mockResolvedValue({ redis_ok: true, workers: [], links: [] });
|
||
const { result } = renderHook(() => useNodeStatus(99999));
|
||
await waitFor(() => expect(result.current.data).toBeTruthy());
|
||
expect(result.current.data.redis_ok).toBe(true);
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2: Run test to verify it fails**
|
||
|
||
Run: `npm test -- useNodeStatus`
|
||
Expected: FAIL (`useNodeStatus`/`getNodeStatus` 미정의)
|
||
|
||
- [ ] **Step 3: Write minimal implementation**
|
||
|
||
`src/api.js`에 추가 (기존 fetch 헬퍼 패턴 따라 — 상대경로):
|
||
```js
|
||
export async function getNodeStatus() {
|
||
const res = await fetch('/api/agent-office/nodes');
|
||
if (!res.ok) throw new Error(`nodes ${res.status}`);
|
||
return res.json();
|
||
}
|
||
```
|
||
`src/pages/infra/useNodeStatus.js`:
|
||
```js
|
||
import { useEffect, useState, useRef } from 'react';
|
||
import { getNodeStatus } from '../../api';
|
||
|
||
export function useNodeStatus(intervalMs = 3000) {
|
||
const [data, setData] = useState(null);
|
||
const [error, setError] = useState(null);
|
||
const [loading, setLoading] = useState(true);
|
||
const timer = useRef(null);
|
||
|
||
useEffect(() => {
|
||
let alive = true;
|
||
async function tick() {
|
||
try {
|
||
const d = await getNodeStatus();
|
||
if (alive) { setData(d); setError(null); }
|
||
} catch (e) {
|
||
if (alive) setError(e);
|
||
} finally {
|
||
if (alive) setLoading(false);
|
||
}
|
||
}
|
||
tick();
|
||
timer.current = setInterval(tick, intervalMs);
|
||
return () => { alive = false; clearInterval(timer.current); };
|
||
}, [intervalMs]);
|
||
|
||
return { data, error, loading };
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4: Run test to verify it passes**
|
||
|
||
Run: `npm test -- useNodeStatus`
|
||
Expected: PASS
|
||
|
||
- [ ] **Step 5: Commit**
|
||
```bash
|
||
git add src/api.js src/pages/infra/useNodeStatus.js src/pages/infra/useNodeStatus.test.js
|
||
git commit -m "feat(infra): getNodeStatus + useNodeStatus 3초 폴링 훅"
|
||
```
|
||
|
||
## Task C3: 2D 폴백 패널 (먼저 동작하는 deliverable)
|
||
|
||
**Files:**
|
||
- Create: `src/pages/infra/InfraMonitor.jsx` (먼저 2D 카드 뷰)
|
||
- Create: `src/pages/infra/WorkerCard.jsx`
|
||
- Create: `src/pages/infra/statusVisual.js` (상태→색/라벨 매핑 — Three.js와 공유)
|
||
- Test: `src/pages/infra/statusVisual.test.js`
|
||
|
||
**Interfaces:**
|
||
- Produces: `linkColor(status)`, `workerStateLabel(worker)` — C4 Three.js가 재사용
|
||
|
||
- [ ] **Step 1: Write the failing test**
|
||
```js
|
||
// src/pages/infra/statusVisual.test.js
|
||
import { describe, it, expect } from 'vitest';
|
||
import { linkColor, workerStateLabel } from './statusVisual';
|
||
|
||
describe('statusVisual', () => {
|
||
it('maps link status to color', () => {
|
||
expect(linkColor('healthy')).toBe('#22c55e');
|
||
expect(linkColor('down')).toBe('#ef4444');
|
||
expect(linkColor('paused')).toBe('#f59e0b');
|
||
expect(linkColor('degraded')).toBe('#eab308');
|
||
});
|
||
it('labels dead worker', () => {
|
||
expect(workerStateLabel({ alive: false })).toMatch(/다운|down/i);
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2: Run test to verify it fails**
|
||
|
||
Run: `npm test -- statusVisual`
|
||
Expected: FAIL (모듈 없음)
|
||
|
||
- [ ] **Step 3: Write minimal implementation**
|
||
|
||
`src/pages/infra/statusVisual.js`:
|
||
```js
|
||
export const LINK_COLORS = {
|
||
healthy: '#22c55e', paused: '#f59e0b', degraded: '#eab308', down: '#ef4444',
|
||
};
|
||
export function linkColor(status) { return LINK_COLORS[status] || '#64748b'; }
|
||
|
||
export function workerStateLabel(w) {
|
||
if (!w.alive) return '🔴 다운';
|
||
if (w.state === 'paused') return '⏸ 작업중(트레이딩)';
|
||
if (w.state === 'busy') return '⚙️ 처리 중';
|
||
if (w.state === 'market_open') return '📈 장중';
|
||
if (w.state === 'market_closed') return '🌙 휴장';
|
||
return '🟢 대기';
|
||
}
|
||
```
|
||
`WorkerCard.jsx` + `InfraMonitor.jsx`: `useNodeStatus()`로 받은 `data.workers`를 카드 그리드로 렌더(이름·상태라벨·queue_depth·dead_letter·last_beat_age_s). `data.redis_ok===false`면 "집계 서버/Redis 연결 끊김" 배너. (표준 React 카드 — designer 스킬 불필요)
|
||
|
||
- [ ] **Step 4: Run test to verify it passes**
|
||
|
||
Run: `npm test -- statusVisual`
|
||
Expected: PASS
|
||
|
||
- [ ] **Step 5: Commit**
|
||
```bash
|
||
git add src/pages/infra/
|
||
git commit -m "feat(infra): 2D 워커 상태 패널 + statusVisual 매핑"
|
||
```
|
||
|
||
## Task C4: Three.js 파이프라인 시각화 (designer 스킬)
|
||
|
||
**Files:**
|
||
- Create: `src/pages/infra/PipelineScene.jsx` (r3f Canvas 토폴로지)
|
||
- Modify: `src/pages/infra/InfraMonitor.jsx` (3D Scene + 2D 패널 토글)
|
||
|
||
**구현 방식**: 이 Task는 **`designer` 스킬을 활성화**해 구현한다. 아래는 designer가 따라야 할 정확한 데이터 계약·상태 매핑·수용 기준이다.
|
||
|
||
- [ ] **Step 1: designer 스킬 활성화** — `Skill(designer)` 호출 후 아래 명세로 진행.
|
||
|
||
- [ ] **Step 2: 토폴로지 + 데이터 계약**
|
||
- 입력: `useNodeStatus()`의 `data`(계약 2). `data.links`로 파이프라인, `data.workers`로 노드 상태.
|
||
- 노드 배치: **좌측** NAS 서버(박스) + 게이트웨이 라벨 / **중앙** Redis 큐 버스(글로우 코어) / **우측** Windows 노드(박스) + 워커 sub-node 6개(`music/video/image/insta-render`, `task-watcher`, `ai_trade`).
|
||
- 파이프라인(튜브/라인): render 4종 = NAS→Redis→워커 경로(`type:redis-queue`). `ai_trade` = NAS stock⇄ai_trade 직결(`type:http-pull`, Redis 버스 우회).
|
||
- 색은 `statusVisual.linkColor(link.status)` 재사용.
|
||
|
||
- [ ] **Step 3: 상태별 애니메이션 (수용 기준)**
|
||
- `healthy`: 시안/그린 튜브 + 파티클이 흐름(흐르는 통신 — 사용자 핵심 요구). `state==='busy'`면 파티클 속도↑.
|
||
- `paused`: 앰버, 파티클 정지/감속 + "⏸ 작업중(트레이딩)" 라벨.
|
||
- `down`: 빨강 + 흐름 정지 + 끊긴 지점 스파크/단절 비주얼 + ⚠ 아이콘 + "last beat Xs ago".
|
||
- `degraded`: 튜브에 ❌ dead-letter 카운트 뱃지.
|
||
- `redis_ok===false`: 중앙 버스 전체 빨강 + "집계 서버 연결 끊김" 오버레이.
|
||
- 각 워커 노드 HUD: 이름, 상태 라벨(`workerStateLabel`), queue_depth, dead_letter.
|
||
|
||
- [ ] **Step 4: 폴백 + 검증**
|
||
- WebGL 미지원/모바일: C3의 2D 패널로 자동 폴백(토글 버튼 제공).
|
||
- 검증: `npm run dev`로 4가지 상태를 mock data로 강제 주입(`useNodeStatus`를 stub)해 healthy/paused/down/degraded 비주얼이 명확히 구분되는지 육안 확인. RC 환경이면 스크린샷/녹화로 사용자 확인.
|
||
|
||
- [ ] **Step 5: Commit**
|
||
```bash
|
||
git add src/pages/infra/PipelineScene.jsx src/pages/infra/InfraMonitor.jsx
|
||
git commit -m "feat(infra): three.js NAS↔Windows 파이프라인 시각화 (상태별 흐름/장애)"
|
||
```
|
||
|
||
- [ ] **Step 6: 배포**
|
||
```bash
|
||
npm run build && npm run release:nas
|
||
```
|
||
|
||
---
|
||
|
||
## 실행 순서 & 세션 협업 (co-gahusb)
|
||
|
||
1. **선행 게이트**: 본 plan의 Global Constraints(계약 2개)를 co-gahusb `post_message`로 FE/AI 세션에 공유 → 3 Part 병렬 시작 가능.
|
||
2. **Part B(BE, 이 세션)** 먼저 진행 가능 — FakeRedis 테스트로 워커 없이도 완결. B5 배포 후 `/nodes`가 `alive:false`로라도 응답.
|
||
3. **Part A(AI 세션)** 완료·워커 재배포 시 `/nodes`에 alive:true 채워짐.
|
||
4. **Part C(FE 세션)**는 `/nodes` 계약만으로 mock 개발 가능 → 실데이터 연동.
|
||
5. 공유 리소스: B1의 docker-compose 변경 = `compose` 락, B5 배포 = `nas-deploy` 락.
|
||
|
||
## Self-Review (작성자 점검)
|
||
|
||
- **스펙 커버리지**: G1(heartbeat 6종)=A1~A4 / G2(큐·dead-letter·processing·paused 집계)=B2 / G3(텔레그램 경보)=B4 / G4(Three.js /infra)=C1~C4. 계약1=A1·Global, 계약2=B2·Global. ✓ 누락 없음.
|
||
- **Placeholder 스캔**: C4는 designer 스킬 위임이나 데이터 계약·상태 매핑·수용 기준을 구체 명시(placeholder 아님). 그 외 전 Task에 실제 코드 포함. ✓
|
||
- **타입 일관성**: `WorkerStats`(A1)↔worker.stats(A2), `collect_status` 반환(B2)↔`check_and_alert` 입력(B4)·`/nodes`(B3)·`useNodeStatus`(C2)·`linkColor`/`workerStateLabel`(C3 재사용 by C4) 일치. heartbeat 키/값(계약1)↔node_monitor 파싱(B2) 일치. ✓
|