Compare commits
10 Commits
c7036212e2
...
94cddccaa7
| Author | SHA1 | Date | |
|---|---|---|---|
| 94cddccaa7 | |||
| b49cc14ef3 | |||
| 5d5ff27d29 | |||
| 2a0090a1d4 | |||
| ea1f0d103d | |||
| a3ae85cde1 | |||
| 363e95c5a9 | |||
| c69b18243b | |||
| f0fad05f2d | |||
| ed8ffdf343 |
@@ -21,7 +21,7 @@
|
|||||||
## 1. 프로젝트 개요
|
## 1. 프로젝트 개요
|
||||||
|
|
||||||
Synology NAS 기반의 개인 웹 플랫폼 백엔드 모노레포.
|
Synology NAS 기반의 개인 웹 플랫폼 백엔드 모노레포.
|
||||||
- **서비스 14개**: lotto, stock, music-lab, video-lab, image-lab, insta-lab, realestate-lab, agent-office, tarot-lab, saju-lab, personal, packs-lab, travel-proxy, deployer
|
- **서비스 15개**: lotto, stock, music-lab, video-lab, image-lab, insta-lab, realestate-lab, agent-office, tarot-lab, saju-lab, personal, packs-lab, travel-proxy, co-gahusb, deployer
|
||||||
- **공유 인프라**: `_shared/access_log` 모듈 (5개 서비스 공유), `redis` (music/video/image/insta-lab 큐 공유)
|
- **공유 인프라**: `_shared/access_log` 모듈 (5개 서비스 공유), `redis` (music/video/image/insta-lab 큐 공유)
|
||||||
- **렌더/생성 위임**: music/video/image/insta의 무거운 생성·렌더는 **Windows AI 워커**(`web-ai` 별도 레포)가 담당. NAS 서비스는 Redis 큐 push + 결과 webhook 수신만 한다.
|
- **렌더/생성 위임**: music/video/image/insta의 무거운 생성·렌더는 **Windows AI 워커**(`web-ai` 별도 레포)가 담당. NAS 서비스는 Redis 큐 push + 결과 webhook 수신만 한다.
|
||||||
- **프론트엔드**: 별도 레포 (React + Vite SPA), 빌드 산출물만 NAS에 배포
|
- **프론트엔드**: 별도 레포 (React + Vite SPA), 빌드 산출물만 NAS에 배포
|
||||||
@@ -80,7 +80,8 @@ Synology NAS 기반의 개인 웹 플랫폼 백엔드 모노레포.
|
|||||||
| `packs-lab` | 18950 | NAS 자료 다운로드 자동화 (DSM 공유 링크 + 5GB 업로드, Vercel SaaS와 HMAC 통신) |
|
| `packs-lab` | 18950 | NAS 자료 다운로드 자동화 (DSM 공유 링크 + 5GB 업로드, Vercel SaaS와 HMAC 통신) |
|
||||||
| `personal` | 18850 | 개인 서비스 (포트폴리오·블로그·투두 통합) |
|
| `personal` | 18850 | 개인 서비스 (포트폴리오·블로그·투두 통합) |
|
||||||
| `travel-proxy` | 19000 | 여행 사진 API + 썸네일 생성 |
|
| `travel-proxy` | 19000 | 여행 사진 API + 썸네일 생성 |
|
||||||
| `redis` | 6379 | 비동기 큐 (music/video/image/insta-lab 공유) |
|
| `co-gahusb` | 18920 | 세션 간 협업 팀 버스 (FastMCP streamable-http + Redis, Bearer `CO_BUS_KEY`, DNS-rebinding 보호 off) |
|
||||||
|
| `redis` | 6379 | 비동기 큐 (music/video/image/insta-lab + co-gahusb 공유) |
|
||||||
| `frontend` (nginx) | 8080 | 정적 SPA 서빙 + API 리버스 프록시 |
|
| `frontend` (nginx) | 8080 | 정적 SPA 서빙 + API 리버스 프록시 |
|
||||||
| `webpage-deployer` | 19010 | Gitea Webhook 수신 → 자동 배포 |
|
| `webpage-deployer` | 19010 | Gitea Webhook 수신 → 자동 배포 |
|
||||||
|
|
||||||
@@ -106,6 +107,7 @@ Synology NAS 기반의 개인 웹 플랫폼 백엔드 모노레포.
|
|||||||
| `/api/blog/` | `personal:8000` | 블로그 API |
|
| `/api/blog/` | `personal:8000` | 블로그 API |
|
||||||
| `/api/profile/` | `personal:8000` | 포트폴리오 API |
|
| `/api/profile/` | `personal:8000` | 포트폴리오 API |
|
||||||
| `/api/agent-office/` | `agent-office:8000` | AI 에이전트 오피스 API + WebSocket (86400s) |
|
| `/api/agent-office/` | `agent-office:8000` | AI 에이전트 오피스 API + WebSocket (86400s) |
|
||||||
|
| `/api/co/` | `co-gahusb:8000/` | MCP 팀 버스 (trailing-slash strip → `/mcp`, `Authorization` forward, `proxy_buffering off`, 3600s) |
|
||||||
| `/api/packs/upload` | `packs-lab:8000` | 5GB multipart 업로드 (`client_max_body_size 5G`, `proxy_request_buffering off`, **1800s** timeout) |
|
| `/api/packs/upload` | `packs-lab:8000` | 5GB multipart 업로드 (`client_max_body_size 5G`, `proxy_request_buffering off`, **1800s** timeout) |
|
||||||
| `/api/packs/` | `packs-lab:8000` | 다운로드/list |
|
| `/api/packs/` | `packs-lab:8000` | 다운로드/list |
|
||||||
| `/api/internal/insta/` | `insta-lab:8000` | Windows 워커 webhook (nginx IP 화이트리스트 + 앱 `X-Internal-Key`) |
|
| `/api/internal/insta/` | `insta-lab:8000` | Windows 워커 webhook (nginx IP 화이트리스트 + 앱 `X-Internal-Key`) |
|
||||||
|
|||||||
@@ -51,3 +51,9 @@ AGENT_CONTAINER_MAP: dict[str, tuple[str, int, _re.Pattern]] = {
|
|||||||
"insta": ("insta-lab", 8000, _re.compile(r"^/api/insta")),
|
"insta": ("insta-lab", 8000, _re.compile(r"^/api/insta")),
|
||||||
"realestate": ("realestate-lab", 8000, _re.compile(r"^/api/realestate")),
|
"realestate": ("realestate-lab", 8000, _re.compile(r"^/api/realestate")),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Redis (node monitor)
|
||||||
|
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379")
|
||||||
|
NODE_ALERT_DEADLETTER_THRESHOLD = int(os.getenv("NODE_ALERT_DEADLETTER_THRESHOLD", "1"))
|
||||||
|
# heartbeat TTL(45s)의 2배 — 키가 남아있어도 age>90s면 dead 판정
|
||||||
|
NODE_STALE_THRESHOLD_SEC = int(os.getenv("NODE_STALE_THRESHOLD_SEC", "90"))
|
||||||
|
|||||||
@@ -187,6 +187,11 @@ async def telegram_webhook(data: dict):
|
|||||||
def all_states():
|
def all_states():
|
||||||
return {"agents": get_all_agent_states()}
|
return {"agents": get_all_agent_states()}
|
||||||
|
|
||||||
|
@app.get("/api/agent-office/nodes")
|
||||||
|
async def nodes_status():
|
||||||
|
from .node_monitor import collect_status
|
||||||
|
return await collect_status()
|
||||||
|
|
||||||
@app.get("/api/agent-office/agents/{agent_id}/token-usage")
|
@app.get("/api/agent-office/agents/{agent_id}/token-usage")
|
||||||
def agent_token_usage(agent_id: str, days: int = 1):
|
def agent_token_usage(agent_id: str, days: int = 1):
|
||||||
from .db import get_token_usage_stats
|
from .db import get_token_usage_stats
|
||||||
|
|||||||
147
agent-office/app/node_monitor.py
Normal file
147
agent-office/app/node_monitor.py
Normal file
@@ -0,0 +1,147 @@
|
|||||||
|
"""분산 워커 상태 집계 (read-only). Global Constraints 계약 2 스키마 생성."""
|
||||||
|
from __future__ import annotations
|
||||||
|
import datetime as dt, json, logging
|
||||||
|
import redis.asyncio as aioredis
|
||||||
|
from .config import REDIS_URL, NODE_ALERT_DEADLETTER_THRESHOLD, NODE_STALE_THRESHOLD_SEC
|
||||||
|
|
||||||
|
logger = logging.getLogger("agent-office.node_monitor")
|
||||||
|
|
||||||
|
_node_state: dict[str, bool] = {} # name -> 직전 alive
|
||||||
|
_dl_notified: dict[str, int] = {} # name -> 직전 알린 dead_letter 수
|
||||||
|
|
||||||
|
WORKER_REGISTRY = [
|
||||||
|
{"name": "music-render", "kind": "render", "queue": "queue:music-render"},
|
||||||
|
{"name": "video-render", "kind": "render", "queue": "queue:video-render"},
|
||||||
|
{"name": "image-render", "kind": "render", "queue": "queue:image-render"},
|
||||||
|
{"name": "insta-render", "kind": "render", "queue": "queue:insta-render"},
|
||||||
|
{"name": "task-watcher", "kind": "watcher", "queue": None},
|
||||||
|
{"name": "ai_trade", "kind": "trader", "queue": None},
|
||||||
|
]
|
||||||
|
|
||||||
|
_redis = None
|
||||||
|
def _get_redis():
|
||||||
|
global _redis
|
||||||
|
if _redis is None:
|
||||||
|
_redis = aioredis.from_url(REDIS_URL, decode_responses=False)
|
||||||
|
return _redis
|
||||||
|
|
||||||
|
|
||||||
|
def _beat_age(ts_str, now):
|
||||||
|
try:
|
||||||
|
beat = dt.datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
|
||||||
|
return max(0, int((now - beat).total_seconds()))
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _render_link_status(w):
|
||||||
|
if not w["alive"]:
|
||||||
|
return "down"
|
||||||
|
if w["state"] == "paused":
|
||||||
|
return "paused"
|
||||||
|
if w["dead_letter"] > 0:
|
||||||
|
return "degraded"
|
||||||
|
return "healthy"
|
||||||
|
|
||||||
|
|
||||||
|
async def collect_status(redis=None) -> dict:
|
||||||
|
r = redis or _get_redis()
|
||||||
|
now = dt.datetime.now(dt.timezone.utc)
|
||||||
|
out = {"redis_ok": True, "paused": False, "paused_reason": None,
|
||||||
|
"generated_at": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
|
||||||
|
"workers": [], "links": []}
|
||||||
|
try:
|
||||||
|
out["paused"] = (await r.get("queue:paused")) == b"1"
|
||||||
|
except Exception:
|
||||||
|
logger.exception("redis 접근 실패")
|
||||||
|
out["redis_ok"] = False
|
||||||
|
return out
|
||||||
|
|
||||||
|
for w in WORKER_REGISTRY:
|
||||||
|
try:
|
||||||
|
info = {"name": w["name"], "kind": w["kind"], "alive": False, "state": None,
|
||||||
|
"last_beat_age_s": None, "queue_depth": 0, "dead_letter": 0,
|
||||||
|
"processing": 0, "jobs_done": 0, "jobs_failed": 0, "last_job_at": None}
|
||||||
|
raw = await r.get(f"worker:{w['name']}:heartbeat")
|
||||||
|
if raw:
|
||||||
|
try:
|
||||||
|
hb = json.loads(raw)
|
||||||
|
age = _beat_age(hb.get("ts") or "", now)
|
||||||
|
info["last_beat_age_s"] = age
|
||||||
|
info["alive"] = age is not None and age <= NODE_STALE_THRESHOLD_SEC
|
||||||
|
info["state"] = hb.get("state")
|
||||||
|
info["jobs_done"] = hb.get("jobs_done", 0)
|
||||||
|
info["jobs_failed"] = hb.get("jobs_failed", 0)
|
||||||
|
info["last_job_at"] = hb.get("last_job_at")
|
||||||
|
if w["kind"] == "watcher" and hb.get("mode"):
|
||||||
|
out["paused_reason"] = hb["mode"]
|
||||||
|
except (json.JSONDecodeError, UnicodeDecodeError):
|
||||||
|
logger.warning("heartbeat JSON 파싱 실패 name=%s", w["name"])
|
||||||
|
if w["queue"]:
|
||||||
|
info["queue_depth"] = await r.llen(w["queue"])
|
||||||
|
info["dead_letter"] = await r.llen(f"dead_letter:{w['queue']}")
|
||||||
|
proc = 0
|
||||||
|
async for key in r.scan_iter(match=f"processing:{w['queue']}:*"):
|
||||||
|
proc += await r.llen(key)
|
||||||
|
info["processing"] = proc
|
||||||
|
out["workers"].append(info)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("워커 상태 수집 실패 name=%s", w["name"])
|
||||||
|
out["redis_ok"] = False
|
||||||
|
break
|
||||||
|
|
||||||
|
for w in out["workers"]:
|
||||||
|
if w["kind"] == "trader":
|
||||||
|
out["links"].append({"from": "ai_trade", "to": "nas-stock", "type": "http-pull",
|
||||||
|
"status": "healthy" if w["alive"] else "down"})
|
||||||
|
elif w["kind"] == "render":
|
||||||
|
out["links"].append({"from": "nas", "to": w["name"], "type": "redis-queue",
|
||||||
|
"status": _render_link_status(w)})
|
||||||
|
if out["paused"] and not out["paused_reason"]:
|
||||||
|
out["paused_reason"] = "trading"
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
async def check_and_alert(status=None) -> list[str]:
|
||||||
|
"""워커 상태를 점검해 다운/복구/dead-letter 전이를 텔레그램으로 경보한다.
|
||||||
|
|
||||||
|
첫 관측(prev=None)엔 경보 없음 — 부팅 시 false alarm 방지.
|
||||||
|
반환값: 실제로 전송된 경보 텍스트 목록 (테스트용).
|
||||||
|
"""
|
||||||
|
from .telegram.messaging import send_raw
|
||||||
|
from .db import add_log
|
||||||
|
try:
|
||||||
|
st = status or await collect_status()
|
||||||
|
except Exception:
|
||||||
|
logger.exception("collect_status 예외")
|
||||||
|
return []
|
||||||
|
sent: list[str] = []
|
||||||
|
for w in st["workers"]:
|
||||||
|
name = w["name"]
|
||||||
|
alive = w.get("alive", False)
|
||||||
|
prev = _node_state.get(name)
|
||||||
|
transition_send_failed = False
|
||||||
|
if prev is True and not alive:
|
||||||
|
text = f"🔴 [{name}] 워커 다운"
|
||||||
|
if (await send_raw(text=text)).get("ok"):
|
||||||
|
add_log("node_monitor", f"{name} 다운", "warning"); sent.append(text)
|
||||||
|
else:
|
||||||
|
transition_send_failed = True
|
||||||
|
elif prev is False and alive:
|
||||||
|
text = f"🟢 [{name}] 워커 복구"
|
||||||
|
if (await send_raw(text=text)).get("ok"):
|
||||||
|
add_log("node_monitor", f"{name} 복구", "info"); sent.append(text)
|
||||||
|
else:
|
||||||
|
transition_send_failed = True
|
||||||
|
if not transition_send_failed:
|
||||||
|
_node_state[name] = alive
|
||||||
|
dl = w.get("dead_letter", 0)
|
||||||
|
if dl >= NODE_ALERT_DEADLETTER_THRESHOLD and dl != _dl_notified.get(name, 0):
|
||||||
|
text = f"❌ [{name}] 실패 누적 {dl}건 (dead-letter)"
|
||||||
|
if (await send_raw(text=text)).get("ok"):
|
||||||
|
add_log("node_monitor", f"{name} dead-letter {dl}", "warning")
|
||||||
|
sent.append(text)
|
||||||
|
_dl_notified[name] = dl
|
||||||
|
elif dl == 0:
|
||||||
|
_dl_notified.pop(name, None)
|
||||||
|
return sent
|
||||||
@@ -4,6 +4,7 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|||||||
|
|
||||||
from .agents import AGENT_REGISTRY
|
from .agents import AGENT_REGISTRY
|
||||||
from .db import delete_old_logs
|
from .db import delete_old_logs
|
||||||
|
from . import node_monitor
|
||||||
|
|
||||||
scheduler = AsyncIOScheduler(timezone="Asia/Seoul")
|
scheduler = AsyncIOScheduler(timezone="Asia/Seoul")
|
||||||
|
|
||||||
@@ -98,6 +99,9 @@ async def _poll_pipelines():
|
|||||||
if agent:
|
if agent:
|
||||||
await agent.poll_state_changes()
|
await agent.poll_state_changes()
|
||||||
|
|
||||||
|
async def _run_node_health_check():
|
||||||
|
await node_monitor.check_and_alert()
|
||||||
|
|
||||||
def _cleanup_old_logs():
|
def _cleanup_old_logs():
|
||||||
n = delete_old_logs(days=90)
|
n = delete_old_logs(days=90)
|
||||||
if n:
|
if n:
|
||||||
@@ -142,5 +146,6 @@ def init_scheduler():
|
|||||||
scheduler.add_job(_run_youtube_research, "cron", hour=9, minute=10, id="youtube_research")
|
scheduler.add_job(_run_youtube_research, "cron", hour=9, minute=10, id="youtube_research")
|
||||||
scheduler.add_job(_send_youtube_weekly_report, "cron", day_of_week="mon", hour=8, minute=0, id="youtube_weekly_report")
|
scheduler.add_job(_send_youtube_weekly_report, "cron", day_of_week="mon", hour=8, minute=0, id="youtube_weekly_report")
|
||||||
scheduler.add_job(_poll_pipelines, "interval", seconds=30, id="pipeline_poll")
|
scheduler.add_job(_poll_pipelines, "interval", seconds=30, id="pipeline_poll")
|
||||||
|
scheduler.add_job(_run_node_health_check, "interval", seconds=60, id="node_health_check", replace_existing=True)
|
||||||
scheduler.add_job(_cleanup_old_logs, "cron", hour=3, minute=0, id="cleanup_old_logs", replace_existing=True)
|
scheduler.add_job(_cleanup_old_logs, "cron", hour=3, minute=0, id="cleanup_old_logs", replace_existing=True)
|
||||||
scheduler.start()
|
scheduler.start()
|
||||||
|
|||||||
@@ -7,3 +7,4 @@ respx>=0.21
|
|||||||
pytest-asyncio>=0.23
|
pytest-asyncio>=0.23
|
||||||
google-api-python-client>=2.100.0
|
google-api-python-client>=2.100.0
|
||||||
pytrends>=4.9.2
|
pytrends>=4.9.2
|
||||||
|
redis>=5.0
|
||||||
|
|||||||
196
agent-office/tests/test_node_monitor.py
Normal file
196
agent-office/tests/test_node_monitor.py
Normal file
@@ -0,0 +1,196 @@
|
|||||||
|
# agent-office/tests/test_node_monitor.py
|
||||||
|
import datetime as dt
|
||||||
|
import json, pytest
|
||||||
|
from app import node_monitor
|
||||||
|
import app.node_monitor as nm
|
||||||
|
|
||||||
|
class FakeRedis:
|
||||||
|
"""worker heartbeat + queue llen + scan_iter 흉내."""
|
||||||
|
def __init__(self, kv=None, lists=None):
|
||||||
|
self._kv = kv or {} # key(str) -> bytes
|
||||||
|
self._lists = lists or {} # key(str) -> length(int)
|
||||||
|
async def get(self, key):
|
||||||
|
return self._kv.get(key)
|
||||||
|
async def llen(self, key):
|
||||||
|
return self._lists.get(key, 0)
|
||||||
|
async def scan_iter(self, match=None):
|
||||||
|
prefix = match.rstrip("*")
|
||||||
|
for k in list(self._lists):
|
||||||
|
if k.startswith(prefix):
|
||||||
|
yield k
|
||||||
|
|
||||||
|
def _hb(name, kind, state, ts=None, **extra):
|
||||||
|
"""heartbeat 페이로드 생성. ts 기본값은 현재 시각(신선한 heartbeat)."""
|
||||||
|
if ts is None:
|
||||||
|
ts = dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||||
|
return json.dumps({"name": name, "kind": kind, "state": state, "ts": ts,
|
||||||
|
"last_job_at": None, "jobs_done": 0, "jobs_failed": 0, **extra}).encode()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_alive_worker_healthy_link():
|
||||||
|
r = FakeRedis(kv={"worker:image-render:heartbeat": _hb("image-render","render","idle")})
|
||||||
|
st = await node_monitor.collect_status(redis=r)
|
||||||
|
img = next(w for w in st["workers"] if w["name"] == "image-render")
|
||||||
|
assert img["alive"] is True and img["state"] == "idle"
|
||||||
|
link = next(l for l in st["links"] if l["to"] == "image-render")
|
||||||
|
assert link["status"] == "healthy" and link["type"] == "redis-queue"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_missing_heartbeat_is_dead_and_down():
|
||||||
|
r = FakeRedis() # heartbeat 없음
|
||||||
|
st = await node_monitor.collect_status(redis=r)
|
||||||
|
img = next(w for w in st["workers"] if w["name"] == "image-render")
|
||||||
|
assert img["alive"] is False
|
||||||
|
link = next(l for l in st["links"] if l["to"] == "image-render")
|
||||||
|
assert link["status"] == "down"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_dead_letter_makes_degraded():
|
||||||
|
r = FakeRedis(kv={"worker:video-render:heartbeat": _hb("video-render","render","idle")},
|
||||||
|
lists={"dead_letter:queue:video-render": 2})
|
||||||
|
st = await node_monitor.collect_status(redis=r)
|
||||||
|
vid = next(w for w in st["workers"] if w["name"] == "video-render")
|
||||||
|
assert vid["dead_letter"] == 2
|
||||||
|
link = next(l for l in st["links"] if l["to"] == "video-render")
|
||||||
|
assert link["status"] == "degraded"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_paused_reason_from_watcher():
|
||||||
|
r = FakeRedis(kv={"queue:paused": b"1",
|
||||||
|
"worker:task-watcher:heartbeat": _hb("task-watcher","watcher","trading",mode="trading")})
|
||||||
|
st = await node_monitor.collect_status(redis=r)
|
||||||
|
assert st["paused"] is True and st["paused_reason"] == "trading"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_trader_http_pull_link():
|
||||||
|
r = FakeRedis(kv={"worker:ai_trade:heartbeat": _hb("ai_trade","trader","market_open")})
|
||||||
|
st = await node_monitor.collect_status(redis=r)
|
||||||
|
link = next(l for l in st["links"] if l["from"] == "ai_trade")
|
||||||
|
assert link["type"] == "http-pull" and link["status"] == "healthy"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_paused_no_watcher_heartbeat_fallback_reason():
|
||||||
|
"""paused=True인데 watcher heartbeat 없으면 paused_reason == 'trading' 폴백."""
|
||||||
|
r = FakeRedis(kv={"queue:paused": b"1"}) # watcher heartbeat 없음
|
||||||
|
st = await node_monitor.collect_status(redis=r)
|
||||||
|
assert st["paused"] is True
|
||||||
|
assert st["paused_reason"] == "trading"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_processing_count_image_render():
|
||||||
|
"""processing:<queue>:<worker_id> 리스트가 있으면 processing 필드에 합산된다."""
|
||||||
|
worker_id = "abc123"
|
||||||
|
proc_key = f"processing:queue:image-render:{worker_id}"
|
||||||
|
r = FakeRedis(
|
||||||
|
kv={"worker:image-render:heartbeat": _hb("image-render", "render", "busy")},
|
||||||
|
lists={proc_key: 3},
|
||||||
|
)
|
||||||
|
st = await node_monitor.collect_status(redis=r)
|
||||||
|
img = next(w for w in st["workers"] if w["name"] == "image-render")
|
||||||
|
assert img["processing"] == 3
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_llen_exception_returns_redis_ok_false():
|
||||||
|
"""워커 루프 중 llen 예외 발생 시 예외를 전파하지 않고 redis_ok=False 반환 (Blocker 회귀)."""
|
||||||
|
class BrokenLlenRedis(FakeRedis):
|
||||||
|
async def llen(self, key):
|
||||||
|
raise ConnectionError("Redis 연결 끊김")
|
||||||
|
|
||||||
|
r = BrokenLlenRedis(
|
||||||
|
kv={"worker:music-render:heartbeat": _hb("music-render", "render", "idle")}
|
||||||
|
)
|
||||||
|
st = await node_monitor.collect_status(redis=r)
|
||||||
|
assert st["redis_ok"] is False
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_alert_on_alive_to_dead(monkeypatch):
|
||||||
|
sent = []
|
||||||
|
async def fake_send_raw(text, **kw): sent.append(text); return {"ok": True}
|
||||||
|
monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw)
|
||||||
|
monkeypatch.setattr("app.db.add_log", lambda *a, **k: None)
|
||||||
|
nm._node_state.clear(); nm._dl_notified.clear()
|
||||||
|
alive = {"workers": [{"name":"image-render","alive":True,"dead_letter":0}], "links": []}
|
||||||
|
dead = {"workers": [{"name":"image-render","alive":False,"dead_letter":0}], "links": []}
|
||||||
|
await nm.check_and_alert(status=alive) # 첫 관측 — 경보 없음
|
||||||
|
assert sent == []
|
||||||
|
await nm.check_and_alert(status=dead) # alive→dead 전이
|
||||||
|
assert any("다운" in t for t in sent)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_alert_on_dead_letter_growth(monkeypatch):
|
||||||
|
sent = []
|
||||||
|
async def fake_send_raw(text, **kw): sent.append(text); return {"ok": True}
|
||||||
|
monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw)
|
||||||
|
monkeypatch.setattr("app.db.add_log", lambda *a, **k: None)
|
||||||
|
nm._node_state.clear(); nm._dl_notified.clear()
|
||||||
|
s = {"workers": [{"name":"video-render","alive":True,"dead_letter":2}], "links": []}
|
||||||
|
await nm.check_and_alert(status=s)
|
||||||
|
assert any("dead-letter" in t for t in sent)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_dl_notified_not_updated_on_telegram_failure(monkeypatch):
|
||||||
|
"""텔레그램 실패(ok=False) 시 _dl_notified 갱신 안 됨 → 다음 사이클에서 재시도."""
|
||||||
|
calls = []
|
||||||
|
async def fake_send_raw(text, **kw):
|
||||||
|
calls.append(text)
|
||||||
|
if len(calls) == 1:
|
||||||
|
return {"ok": False} # 첫 호출: 텔레그램 다운
|
||||||
|
return {"ok": True} # 두 번째 호출: 성공
|
||||||
|
monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw)
|
||||||
|
monkeypatch.setattr("app.db.add_log", lambda *a, **k: None)
|
||||||
|
nm._node_state.clear(); nm._dl_notified.clear()
|
||||||
|
s = {"workers": [{"name": "video-render", "alive": True, "dead_letter": 2}], "links": []}
|
||||||
|
# 첫 호출: 텔레그램 다운 → ok=False → _dl_notified 갱신 안 됨
|
||||||
|
result1 = await nm.check_and_alert(status=s)
|
||||||
|
assert result1 == []
|
||||||
|
assert nm._dl_notified.get("video-render", 0) == 0
|
||||||
|
# 두 번째 호출: 같은 dl=2 → _dl_notified 미갱신으로 조건 재만족 → 재시도 발송
|
||||||
|
result2 = await nm.check_and_alert(status=s)
|
||||||
|
assert any("dead-letter" in t for t in result2)
|
||||||
|
assert nm._dl_notified.get("video-render") == 2
|
||||||
|
|
||||||
|
|
||||||
|
# ── I1: staleness 판정 신규 테스트 ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_stale_heartbeat_is_dead():
|
||||||
|
"""heartbeat 키가 존재해도 ts가 90s 초과면 alive=False (staleness 판정)."""
|
||||||
|
stale_ts = (dt.datetime.now(dt.timezone.utc) - dt.timedelta(seconds=300)).strftime(
|
||||||
|
"%Y-%m-%dT%H:%M:%SZ"
|
||||||
|
)
|
||||||
|
r = FakeRedis(kv={"worker:image-render:heartbeat": _hb("image-render", "render", "idle", ts=stale_ts)})
|
||||||
|
st = await node_monitor.collect_status(redis=r)
|
||||||
|
img = next(w for w in st["workers"] if w["name"] == "image-render")
|
||||||
|
assert img["alive"] is False
|
||||||
|
link = next(l for l in st["links"] if l["to"] == "image-render")
|
||||||
|
assert link["status"] == "down"
|
||||||
|
|
||||||
|
|
||||||
|
# ── I2: 전이 발송 실패 시 재시도 회귀 테스트 ──────────────────────────────────
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_transition_send_failure_retries_next_cycle(monkeypatch):
|
||||||
|
"""alive→dead 전이 시 send_raw 실패하면 _node_state 갱신 안 됨 → 다음 사이클 재시도."""
|
||||||
|
calls = []
|
||||||
|
async def fake_send_raw(text, **kw):
|
||||||
|
calls.append(text)
|
||||||
|
if len(calls) == 1:
|
||||||
|
return {"ok": False} # 첫 호출: 텔레그램 다운
|
||||||
|
return {"ok": True} # 두 번째 호출: 성공
|
||||||
|
monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw)
|
||||||
|
monkeypatch.setattr("app.db.add_log", lambda *a, **k: None)
|
||||||
|
nm._node_state.clear(); nm._dl_notified.clear()
|
||||||
|
alive = {"workers": [{"name": "music-render", "alive": True, "dead_letter": 0}], "links": []}
|
||||||
|
dead = {"workers": [{"name": "music-render", "alive": False, "dead_letter": 0}], "links": []}
|
||||||
|
# 첫 관측: baseline 설정(전이 없음)
|
||||||
|
await nm.check_and_alert(status=alive)
|
||||||
|
assert nm._node_state.get("music-render") is True
|
||||||
|
# alive→dead 전이, send_raw 실패 → _node_state 갱신 안 됨
|
||||||
|
result1 = await nm.check_and_alert(status=dead)
|
||||||
|
assert result1 == [] # 경보 미발송
|
||||||
|
assert nm._node_state.get("music-render") is True # 여전히 True
|
||||||
|
# 두 번째 사이클: 동일 dead, send_raw 성공 → 경보 발송
|
||||||
|
result2 = await nm.check_and_alert(status=dead)
|
||||||
|
assert any("다운" in t for t in result2)
|
||||||
|
assert nm._node_state.get("music-render") is False # 이제 갱신
|
||||||
18
agent-office/tests/test_nodes_endpoint.py
Normal file
18
agent-office/tests/test_nodes_endpoint.py
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
# agent-office/tests/test_nodes_endpoint.py
|
||||||
|
import pytest
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def client(monkeypatch):
|
||||||
|
from app import main
|
||||||
|
async def fake_collect(redis=None):
|
||||||
|
return {"redis_ok": True, "paused": False, "paused_reason": None,
|
||||||
|
"generated_at": "2026-06-29T00:00:00Z", "workers": [], "links": []}
|
||||||
|
monkeypatch.setattr("app.node_monitor.collect_status", fake_collect)
|
||||||
|
return TestClient(main.app)
|
||||||
|
|
||||||
|
def test_nodes_endpoint_returns_contract(client):
|
||||||
|
resp = client.get("/api/agent-office/nodes")
|
||||||
|
assert resp.status_code == 200
|
||||||
|
body = resp.json()
|
||||||
|
assert set(["redis_ok","paused","workers","links"]).issubset(body)
|
||||||
@@ -268,6 +268,7 @@ services:
|
|||||||
- CONVERSATION_HISTORY_LIMIT=${CONVERSATION_HISTORY_LIMIT:-20}
|
- CONVERSATION_HISTORY_LIMIT=${CONVERSATION_HISTORY_LIMIT:-20}
|
||||||
- CONVERSATION_RATE_PER_MIN=${CONVERSATION_RATE_PER_MIN:-6}
|
- CONVERSATION_RATE_PER_MIN=${CONVERSATION_RATE_PER_MIN:-6}
|
||||||
- YOUTUBE_DATA_API_KEY=${YOUTUBE_DATA_API_KEY:-}
|
- YOUTUBE_DATA_API_KEY=${YOUTUBE_DATA_API_KEY:-}
|
||||||
|
- REDIS_URL=${REDIS_URL:-redis://redis:6379}
|
||||||
volumes:
|
volumes:
|
||||||
- ${RUNTIME_PATH:-.}/data/agent-office:/app/data
|
- ${RUNTIME_PATH:-.}/data/agent-office:/app/data
|
||||||
depends_on:
|
depends_on:
|
||||||
@@ -275,6 +276,7 @@ services:
|
|||||||
- music-lab
|
- music-lab
|
||||||
- insta-lab
|
- insta-lab
|
||||||
- realestate-lab
|
- realestate-lab
|
||||||
|
- redis
|
||||||
healthcheck:
|
healthcheck:
|
||||||
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
|
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
|
||||||
interval: 60s
|
interval: 60s
|
||||||
|
|||||||
1105
docs/superpowers/plans/2026-06-29-worker-observability.md
Normal file
1105
docs/superpowers/plans/2026-06-29-worker-observability.md
Normal file
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,207 @@
|
|||||||
|
# 분산 워커 관측 시스템 (Distributed Worker Observability) — 설계 문서
|
||||||
|
|
||||||
|
> 작성일: 2026-06-29 · 작성 세션: BE (web-backend 소유)
|
||||||
|
> 대상 repo 3종: `web-ai`(워커) · `web-backend`(NAS 집계/경보) · `web-ui`(Three.js 대시보드)
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 1. 문제 정의 (Problem)
|
||||||
|
|
||||||
|
NAS 백엔드의 음악/영상/이미지/인스타 생성은 **무거운 작업을 Windows AI 머신(192.168.45.59)의 WSL2 Docker 워커**에 위임한다. NAS 게이트웨이(`music/video/image/insta-lab`)가 Redis 큐(`queue:<svc>-render`)에 job을 push하면, Windows 워커가 BLMOVE로 꺼내 처리하고 `/api/internal/<svc>/update` webhook으로 결과를 회신한다. 트레이딩봇 `ai_trade`(:8001)는 별도로 NAS stock(:18500)에서 HTTP pull을 한다.
|
||||||
|
|
||||||
|
**핵심 문제: 이 분산 워커들이 살아있는지 NAS·사용자가 알 길이 없다.**
|
||||||
|
- 각 워커에 로컬 `/health` 엔드포인트가 있으나 Windows 머신 안에서만 접근 가능.
|
||||||
|
- 실제 사고: `insta-render` 워커가 redis 블로킹 read 버그로 **2026-05-22 ~ 06-08 약 2주간 사일런트로 죽어 있었고**(모든 슬레이트 draft 정지) 아무도 몰랐다. 일감이 없을 때의 "한가함"과 "죽음"을 구분할 수단이 없었던 것이 근본 원인.
|
||||||
|
|
||||||
|
## 2. 목표 / 비목표 (Goals / Non-goals)
|
||||||
|
|
||||||
|
**목표 (Phase 1)**
|
||||||
|
- G1. 6개 워커(`music/video/image/insta-render` + `task-watcher` + `ai_trade`)의 생사·상태를 NAS에서 인지.
|
||||||
|
- G2. 큐 깊이·실패(dead-letter)·고아작업(processing)·일시정지(paused) 상태를 집계.
|
||||||
|
- G3. 상태 전이(다운/복구/실패누적)를 텔레그램으로 자동 경보.
|
||||||
|
- G4. web-ui 신규 페이지 `/infra`에서 NAS↔Windows 파이프라인을 **Three.js로 시각화** — 정상이면 통신이 흐르는 애니메이션, 장애면 해당 구간을 끊김/빨강으로 표시.
|
||||||
|
|
||||||
|
**비목표 (Phase 2 이후로 보류)**
|
||||||
|
- 원격 제어(워커 재시작, 큐 pause/resume, dead-letter 재처리) — Windows 머신 제어가 필요해 보안·구현 복잡도 큼.
|
||||||
|
- GPU 사용률(VRAM) 모니터링, stuck-task 자동 감지, WebSocket 라이브 푸시.
|
||||||
|
- 다중 노드 확장(현재 Windows 노드 1대).
|
||||||
|
|
||||||
|
## 3. 아키텍처 & 토폴로지
|
||||||
|
|
||||||
|
```
|
||||||
|
web-backend (NAS, 192.168.45.54) Windows 노드 (192.168.45.59)
|
||||||
|
┌──────────────────────────────────┐ ┌────────────────────────────────────┐
|
||||||
|
│ music-lab ─┐ │ ① job │ WSL2 Docker: │
|
||||||
|
│ video-lab ─┤ │ push │ ┌─ music-render │
|
||||||
|
│ image-lab ─┼─► [ Redis 큐 버스 ]═╪══════════╪══►├─ video-render (ReliableQueue) │
|
||||||
|
│ insta-lab ─┘ queue:*-render │ │ ├─ image-render │
|
||||||
|
│ queue:paused │◄═════════╪═══├─ insta-render │
|
||||||
|
│ │ ② webhook│ └─ task-watcher (paused 토글) │
|
||||||
|
│ agent-office │◄─────────╪── 각 워커 → worker:<name>:heartbeat│
|
||||||
|
│ ├─ node_monitor (집계) │◄─heartbeat (Redis SET, TTL 45s) │
|
||||||
|
│ └─ scheduler (1분 경보 cron) │ │ │
|
||||||
|
│ │ │ Windows 호스트(WSL 밖): │
|
||||||
|
│ stock (:18500) ◄── HTTP pull ────╪──────────╪── ai_trade (:8001) ─ heartbeat ───►│
|
||||||
|
└──────────────┬───────────────────┘ └────────────────────────────────────┘
|
||||||
|
│ GET /api/agent-office/nodes (FE 2~3초 폴링)
|
||||||
|
▼
|
||||||
|
web-ui /infra ← Three.js 파이프라인 시각화
|
||||||
|
```
|
||||||
|
|
||||||
|
**설계 기반(이미 존재하는 자산)**
|
||||||
|
- 워커들은 이미 NAS Redis(`redis://192.168.45.54:6379`)에 BLMOVE로 연결 → heartbeat도 같은 Redis에 SET하면 방화벽/인바운드 포트 불필요, `queue:paused`여도 heartbeat는 계속 뛰므로 "정지 중이지만 살아있음"과 "죽음"을 구분 가능.
|
||||||
|
- `_shared/reliable_queue.py`(ReliableQueue)가 이미 `processing:queue:<svc>-render:<worker_id>` 리스트와 `dead_letter:queue:<svc>-render` 리스트를 Redis에 남김 → 집계기가 **신규 워커 코드 없이** 큐 깊이·실패·고아작업을 읽을 수 있음.
|
||||||
|
|
||||||
|
**채택하지 않은 대안**
|
||||||
|
- 집계기를 게이트웨이 중 하나에 배치 → "어느 게이트웨이가 전체 노드 상태를 소유하나"가 의미상 어색. `agent-office`가 ops 브레인(텔레그램·스케줄러·WebSocket·서비스 로그 수집 보유)이라 의미상 정확.
|
||||||
|
- NAS→워커 HTTP `/health` 폴링 → 워커별 포트 노출 + NAS→Windows 인바운드 접속 필요. Redis heartbeat가 단방향(워커→Redis)이라 더 단순.
|
||||||
|
- 라이브 갱신을 WebSocket으로 → Phase 1은 2~3초 폴링으로 충분(단순). WebSocket은 Phase 2 강화.
|
||||||
|
|
||||||
|
## 4. 컴포넌트 설계
|
||||||
|
|
||||||
|
### 4.1 web-ai — heartbeat 생산자 (AI 세션 소유)
|
||||||
|
|
||||||
|
**4.1.1 render 워커 4종 (`services/*-render/`)**
|
||||||
|
- 신규 공용 모듈 `services/_shared/heartbeat.py`:
|
||||||
|
- `async def heartbeat_loop(redis, name, stats, interval=15, ttl=45)` — `interval`초마다 `worker:<name>:heartbeat` 키에 JSON 값을 `SET ... EX ttl`.
|
||||||
|
- 값 스키마는 §5.1 참조. 죽으면 키가 TTL 만료 → 집계기가 "missing = dead" 판정.
|
||||||
|
- 각 워커 `main.py` lifespan에서 `worker_loop`와 함께 `heartbeat_loop` 태스크 spawn.
|
||||||
|
- `state` 산정: `queue:paused`가 set이면 `paused`, 현재 job 처리 중이면 `busy`, 아니면 `idle`. 처리 중 여부와 카운터(`jobs_done`/`jobs_failed`/`last_job_at`)는 `poll_once`가 갱신하는 모듈 레벨 `stats` 객체로 추적.
|
||||||
|
- TTL=45s = interval(15s)의 3배 → 1~2회 누락은 dead로 오판하지 않음.
|
||||||
|
|
||||||
|
**4.1.2 task-watcher (`services/task-watcher/`)**
|
||||||
|
- `watcher_loop`에 동일 heartbeat 추가. `worker:task-watcher:heartbeat`에 `state` + 현재 `mode`(`trading`/`free`)를 함께 발행 → 대시보드가 paused의 **이유**("작업중(트레이딩)")를 표시.
|
||||||
|
|
||||||
|
**4.1.3 ai_trade (`ai_trade/`) — 다른 런타임**
|
||||||
|
- ai_trade는 Windows **호스트**에서 직접 uvicorn 실행(WSL Docker 아님), NAS Redis 큐에 연결되어 있지 않음(현재 NAS stock으로 HTTP pull만).
|
||||||
|
- 변경: `redis.asyncio` 의존성 추가 → `main.py` lifespan에 heartbeat 태스크 추가 → 같은 NAS Redis(`192.168.45.54:6379`)에 `worker:ai_trade:heartbeat` SET.
|
||||||
|
- Redis는 Windows 머신에서 이미 도달 가능(render 워커들이 같은 호스트에서 BLMOVE 중).
|
||||||
|
- heartbeat 로직은 ~10줄이므로 `ai_trade` 자체 미니 헬퍼로 둔다(`_shared` import 경로 의존 회피 — render 워커는 컨테이너 PYTHONPATH로 `_shared` 접근, ai_trade는 호스트 실행이라 경로가 다름). **계약(키 스키마)만 동일**하면 코드 공유 불필요.
|
||||||
|
- `state` 의미가 다름: render 워커의 idle/busy/paused가 아니라 `market_open`(poll_loop 활성·신호 생성 중) / `market_closed`(휴장·장외 idle). **task-watcher의 `queue:paused`와 무관**(트레이딩은 일시정지 대상 아님).
|
||||||
|
- 토폴로지 표현: Redis 큐 버스가 아니라 **HTTP pull 파이프라인**(ai_trade ⇄ NAS stock :18500)으로 별도 표시.
|
||||||
|
|
||||||
|
### 4.2 web-backend / agent-office — 집계기 + 경보 (이 BE 세션 소유)
|
||||||
|
|
||||||
|
**4.2.1 Redis 클라이언트 추가**
|
||||||
|
- `agent-office`는 현재 Redis 미사용 → `requirements.txt`에 `redis>=5.0`(asyncio) 추가, `docker-compose.yml` agent-office 블록에 `REDIS_URL` 환경변수 + `depends_on: redis` 추가.
|
||||||
|
|
||||||
|
**4.2.2 `app/node_monitor.py` 신규**
|
||||||
|
- 워커 레지스트리(상수): 각 워커의 `name`, 연관 `queue`(있으면), `internal webhook` 경로, 토폴로지 link 타입(`redis-queue` | `http-pull`).
|
||||||
|
- `async def collect_status() -> dict`:
|
||||||
|
- 각 워커: `GET worker:<name>:heartbeat` → 존재하면 `alive=True` + JSON 파싱 + `last_beat_age_s = now - ts`; 없으면 `alive=False`(dead).
|
||||||
|
- 각 render 큐: `LLEN queue:<svc>-render`(depth), `LLEN dead_letter:queue:<svc>-render`, `processing:queue:<svc>-render:*` 키 스캔으로 in-flight 수.
|
||||||
|
- `GET queue:paused` + TTL → paused 플래그 + reason(task-watcher heartbeat의 mode).
|
||||||
|
- Redis 연결 실패 → `redis_ok=False`(전 구간 degrade).
|
||||||
|
- link 상태 합성(§5.2).
|
||||||
|
- 응답 스키마는 §5.2.
|
||||||
|
|
||||||
|
**4.2.3 엔드포인트**
|
||||||
|
- `GET /api/agent-office/nodes` → `collect_status()`. nginx `/api/agent-office/` 이미 라우팅됨 → **nginx 변경 불필요**.
|
||||||
|
|
||||||
|
**4.2.4 경보 cron (scheduler)**
|
||||||
|
- `_run_node_health_check` (APScheduler, 1분 간격):
|
||||||
|
- 직전 상태 `_node_state`(인메모리 dict)와 비교:
|
||||||
|
- `alive → dead`: 🔴 `<name> 워커 다운 (last beat Xs ago)`
|
||||||
|
- `dead → alive`: 🟢 `<name> 워커 복구`
|
||||||
|
- `dead_letter` 카운트가 임계(`NODE_ALERT_DEADLETTER_THRESHOLD`, 기본 1) 신규 초과: ❌ `<queue> 실패 누적 N건`
|
||||||
|
- `_notified` 패턴(기존 `youtube_publisher.poll_state_changes` 재사용)으로 스팸 방지, 복구 시 재알림 가능하도록 set 차집합.
|
||||||
|
- 텔레그램 발송은 agent-office 기존 봇 재사용.
|
||||||
|
|
||||||
|
### 4.3 web-ui — Three.js 대시보드 (FE 세션 소유)
|
||||||
|
|
||||||
|
- 신규 의존성: `three` + `@react-three/fiber` + `@react-three/drei`(React 코드베이스이므로 r3f가 관용적).
|
||||||
|
- 신규 라우트 `/infra`(Router.jsx) + Nav 등록.
|
||||||
|
- `pages/infra/InfraMonitor.jsx`:
|
||||||
|
- r3f `<Canvas>` 토폴로지 — 좌측 NAS(게이트웨이 sub-node) / 중앙 Redis 큐 버스(글로우 코어) / 우측 Windows 노드(워커 sub-node). ai_trade는 별도 HTTP-pull 파이프라인.
|
||||||
|
- 노드 간 파이프라인(튜브) + 상태별 머티리얼/애니메이션(§6).
|
||||||
|
- `useNodeStatus` 훅: `GET /api/agent-office/nodes`를 2~3초 폴링 → 상태를 시각 상태로 매핑(`src/api.js`에 헬퍼 추가).
|
||||||
|
- **2D 폴백**: WebGL 미지원/모바일 대비 카드·테이블 요약 뷰 토글.
|
||||||
|
- 실제 구현 시 `designer` 스킬 활성화(브레인스토밍 단계에서는 금지).
|
||||||
|
|
||||||
|
## 5. 잠그는 계약 (Contracts)
|
||||||
|
|
||||||
|
> 3 세션이 독립 병렬 작업하려면 이 두 스키마만 고정하면 된다.
|
||||||
|
|
||||||
|
### 5.1 Heartbeat 키 스키마
|
||||||
|
|
||||||
|
- **키**: `worker:<name>:heartbeat` (name ∈ `music-render`, `video-render`, `image-render`, `insta-render`, `task-watcher`, `ai_trade`)
|
||||||
|
- **값**(JSON 문자열), `SET ... EX 45`:
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"name": "image-render",
|
||||||
|
"kind": "render", // "render" | "watcher" | "trader"
|
||||||
|
"state": "idle", // render: idle|busy|paused / watcher: trading|free / trader: market_open|market_closed
|
||||||
|
"ts": "2026-06-29T12:34:56Z", // UTC ISO8601 (heartbeat 발신 시각)
|
||||||
|
"last_job_at": "2026-06-29T12:30:00Z", // nullable
|
||||||
|
"jobs_done": 42,
|
||||||
|
"jobs_failed": 1,
|
||||||
|
"mode": "free" // task-watcher 전용(paused 이유), 그 외 생략 가능
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 5.2 `/api/agent-office/nodes` 응답 스키마
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"redis_ok": true,
|
||||||
|
"paused": false,
|
||||||
|
"paused_reason": "trading", // queue:paused가 set일 때 task-watcher mode
|
||||||
|
"generated_at": "2026-06-29T12:34:57Z",
|
||||||
|
"workers": [
|
||||||
|
{
|
||||||
|
"name": "image-render", "kind": "render",
|
||||||
|
"alive": true, "state": "idle", "last_beat_age_s": 3,
|
||||||
|
"queue_depth": 0, "dead_letter": 0, "processing": 0,
|
||||||
|
"jobs_done": 42, "jobs_failed": 1, "last_job_at": "2026-06-29T12:30:00Z"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"links": [
|
||||||
|
{ "from": "nas", "to": "image-render", "type": "redis-queue", "status": "healthy" },
|
||||||
|
{ "from": "ai_trade", "to": "nas-stock", "type": "http-pull", "status": "healthy" }
|
||||||
|
]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
- `link.status` ∈ `healthy` | `paused` | `down` | `degraded`. 산정: 워커 dead → `down`; paused → `paused`; dead_letter>0 → `degraded`; redis_ok=false → 전 링크 `down`.
|
||||||
|
|
||||||
|
## 6. 시각화 상태 (Three.js)
|
||||||
|
|
||||||
|
| 상태 | 파이프라인(튜브) | 노드 |
|
||||||
|
|------|------------------|------|
|
||||||
|
| **정상 idle** | 시안/그린, 파티클이 NAS→워커→NAS 루프로 흐름(느림) | 초록 글로우 + 큐깊이/처리수 HUD |
|
||||||
|
| **정상 busy** | 파티클 빠르게 흐름 | "처리 중 N" |
|
||||||
|
| **일시정지 paused** | 앰버, 파티클 느려짐/정지 | "⏸ 작업중(트레이딩)" 라벨 |
|
||||||
|
| **장애 dead / link down** | 빨강, 흐름 멈춤, 끊긴 지점 스파크/단절 | 빨강 + ⚠ 경고, "last beat Xs ago" |
|
||||||
|
| **실패누적 dead-letter>0** | 해당 튜브 ❌ 뱃지 | dead-letter 카운트 강조 |
|
||||||
|
| **Redis/집계기 다운** | 중앙 버스 전체 빨강 | "집계 서버 연결 끊김" 오버레이 |
|
||||||
|
|
||||||
|
- ai_trade의 HTTP-pull 파이프라인은 큐 흐름이 아닌 pull 방향(ai_trade→NAS stock) 파티클로 구분 표현. `market_closed`는 정상 idle과 동일 톤(휴장은 장애 아님).
|
||||||
|
|
||||||
|
## 7. 에러 처리
|
||||||
|
|
||||||
|
- heartbeat TTL 만료 = dead 판정(권위 신호). 큐가 비어 일감이 없어도 heartbeat가 살아있으면 alive로 정확 판정(2주 사일런트 사고 재발 방지).
|
||||||
|
- Redis 다운 → `/nodes`가 `redis_ok=false` 반환(500 아님) → 대시보드가 전 구간 degrade 표시.
|
||||||
|
- agent-office 다운 → FE 폴링 실패 → "집계 서버 연결 끊김" 오버레이.
|
||||||
|
- 집계기는 read-only(Redis에 쓰지 않음) → 워커 동작에 영향 0.
|
||||||
|
|
||||||
|
## 8. 테스트
|
||||||
|
|
||||||
|
- **web-ai**: `heartbeat.py` 단위 테스트(fakeredis/mock) — 발신 주기·TTL·state 전이·카운터. ai_trade heartbeat 별도 테스트.
|
||||||
|
- **web-backend**: `node_monitor.collect_status` 테스트(mock redis: 키 존재/만료/큐 깊이/dead-letter 케이스) + 경보 전이 테스트(alive→dead→alive, dead-letter 증가). TDD 적용.
|
||||||
|
- **web-ui**: `InfraMonitor` 컴포넌트가 mock 상태로 렌더 + 상태→색상 매핑 단위 테스트(r3f는 렌더 스모크 수준).
|
||||||
|
|
||||||
|
## 9. 단계 (Phasing)
|
||||||
|
|
||||||
|
- **Phase 1 (본 스펙 전체)**: 6 워커(render 4 + task-watcher + ai_trade) heartbeat / `/nodes` API / 텔레그램 경보 / Three.js `/infra` 대시보드.
|
||||||
|
- **Phase 2 (후속)**: GPU 사용률(VRAM 16GB 경합 가시화), stuck-task 감지, WebSocket 라이브 푸시, 원격 제어(워커 재시작·pause/resume·dead-letter 재처리).
|
||||||
|
|
||||||
|
## 10. 세션 분담 & 협업 (co-gahusb)
|
||||||
|
|
||||||
|
- **소유권**: BE(이 세션)=web-backend, AI 세션=web-ai, FE 세션=web-ui. 각자 자기 repo만 커밋.
|
||||||
|
- **선행 게이트**: §5의 두 계약(heartbeat 키 스키마 + `/nodes` 응답 스키마)을 먼저 확정·공유 → 3 세션 병렬 진행.
|
||||||
|
- **공유 리소스 락**: agent-office 의존성/compose 변경은 `compose` 락, nginx 무변경(불필요). 배포는 `nas-deploy` 락.
|
||||||
|
- BE 작업: agent-office redis 추가 + `node_monitor.py` + `/nodes` + 경보 cron + 본 메모리 기록. AI/FE 작업은 co-gahusb 태스크로 배분.
|
||||||
|
|
||||||
|
## 11. 메모리 갱신 계획
|
||||||
|
|
||||||
|
- 신규 cross-cutting 메모리 `infra_distributed_workers.md` 작성: 큐 계약 / webhook 계약 / ReliableQueue 키 / heartbeat 키 스키마 / task-watcher paused / node_monitor·`/nodes`·경보. `MEMORY.md` 인덱스 등재.
|
||||||
|
- 관련 서비스 메모리(`service_video/image/music/insta`)에 heartbeat·관측 추가 사실을 cross-link.
|
||||||
|
```
|
||||||
Reference in New Issue
Block a user