merge: co-gahusb 세션 협업 팀 버스 (MCP + Redis + 어드바이저리 락)

- FastMCP streamable-http 서버(12툴) + Bearer 인증 + Redis 백엔드
- 메시지/작업보드/락/team_log, 동시쓰기 분리(소유권 파티션 + 락)
- compose(18920)/nginx(/api/co/)/deploy 등재 + 클라이언트 배선
- 22 테스트 (전부 통과)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-12 07:51:00 +09:00
25 changed files with 2006 additions and 4 deletions

9
.mcp.json Normal file
View File

@@ -0,0 +1,9 @@
{
"mcpServers": {
"co-gahusb": {
"type": "http",
"url": "https://gahusb.synology.me/api/co/mcp",
"headers": { "Authorization": "Bearer ${CO_BUS_KEY}" }
}
}
}

View File

@@ -485,3 +485,14 @@ Gitea Webhook 수신 → 자동 배포. HMAC SHA256 검증(`X-Gitea-Signature`
- **렌더/생성 워커 분리**: music/video/image/insta 무거운 작업은 Windows `web-ai` 워커. NAS 코드의 `*_provider.py`/`card_renderer.py`가 DEPRECATED stub면 실 로직은 web-ai 쪽이 authoritative - **렌더/생성 워커 분리**: music/video/image/insta 무거운 작업은 Windows `web-ai` 워커. NAS 코드의 `*_provider.py`/`card_renderer.py`가 DEPRECATED stub면 실 로직은 web-ai 쪽이 authoritative
- **Playwright Dockerfile**: bookworm 고정 + 수동 chromium deps, `--with-deps` 금지 (`feedback_playwright_dockerfile.md`) - **Playwright Dockerfile**: bookworm 고정 + 수동 chromium deps, `--with-deps` 금지 (`feedback_playwright_dockerfile.md`)
- **lab 네이밍**: `-lab`은 개발/연구 단계에만, 정식 서비스엔 미사용 (`feedback_lab_naming.md`) - **lab 네이밍**: `-lab`은 개발/연구 단계에만, 정식 서비스엔 미사용 (`feedback_lab_naming.md`)
---
## 협업 팀 버스 (co-gahusb) — 이 세션의 역할: **BE**
이 세션은 백엔드(BE) 역할이다. co-gahusb MCP 툴로 다른 세션(FE/AI/Producer)과 협업한다.
- **소유권**: 이 세션은 `web-backend` repo만 쓴다(FE=web-ui, AI=web-ai).
- **공유 리소스 변경 전 반드시 `acquire_lock(resource, "BE")`**: 대상 = `nas-deploy`, `stock-db-schema`, `lotto-db-schema`, `memory-mirror`, `nginx-conf`, `compose`. 점유 중이면 대기, 긴 작업은 `heartbeat_lock`, 끝나면 `release_lock`.
- **모든 툴 호출에 `role="BE"`** (또는 `from_role`/`created_by`에 BE).
- **수신**: `/loop`로 주기적으로 `read_inbox("BE", after_id=<last>)` + `list_tasks(assignee_role="BE")` 확인.
-`CO_BUS_KEY`는 환경변수로 주입(커밋 금지).

3
co-gahusb/.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
.venv/
__pycache__/
*.pyc

19
co-gahusb/CLIENT_SETUP.md Normal file
View File

@@ -0,0 +1,19 @@
# co-gahusb 클라이언트 설정
## 공통
1. `CO_BUS_KEY` 환경변수를 각 머신에 설정(서버 `.env`의 값과 동일).
2. 해당 repo 루트 `.mcp.json`에 co-gahusb HTTP MCP 등록(이 repo의 예시 참고).
3. CLAUDE.md 역할 블록의 `/loop` 폴링 규약을 따른다.
## web-ai (다른 머신)
web-ai 머신의 repo 루트에 아래 `.mcp.json` 생성, 역할 = **AI**:
```json
{ "mcpServers": { "co-gahusb": {
"type": "http",
"url": "https://gahusb.synology.me/api/co/mcp",
"headers": { "Authorization": "Bearer ${CO_BUS_KEY}" } } } }
```
web-ai CLAUDE.md에 역할 블록 추가(role="AI", 소유권=web-ai repo, 동일 락 규약).
## Producer (오케스트레이터 세션)
별도 repo 없이 조율 담당. `team_log()`로 전체 활동 감시, `create_task`로 분배, `acquire_lock`로 교차 작업 직렬화.

12
co-gahusb/Dockerfile Normal file
View File

@@ -0,0 +1,12 @@
FROM python:3.12-slim-bookworm
ENV PYTHONUNBUFFERED=1
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "app.server:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]

View File

21
co-gahusb/app/config.py Normal file
View File

@@ -0,0 +1,21 @@
# co-gahusb/app/config.py
import os
REDIS_URL = os.environ.get("REDIS_URL", "redis://redis:6379")
CO_BUS_KEY = os.environ.get("CO_BUS_KEY", "")
# 협업 역할 (세션별 1:1)
ROLES = ("FE", "BE", "AI", "Producer")
# 교차 리소스 어드바이저리 락 대상 (이 외 이름도 락은 가능하나, 규약상 명시 대상)
LOCKABLE_RESOURCES = (
"nas-deploy",
"stock-db-schema",
"lotto-db-schema",
"memory-mirror",
"nginx-conf",
"compose",
)
DEFAULT_LOCK_TTL = 300
TEAM_LOG_MAXLEN = 500

66
co-gahusb/app/locks.py Normal file
View File

@@ -0,0 +1,66 @@
# co-gahusb/app/locks.py
from redis.exceptions import WatchError
LOCK_PREFIX = "co:lock:"
async def acquire_lock(r, resource, role, ttl_sec=300):
key = LOCK_PREFIX + resource
ok = await r.set(key, role, nx=True, ex=ttl_sec)
if ok:
return {"acquired": True}
held_by = await r.get(key)
ttl = await r.ttl(key)
return {"acquired": False, "held_by": held_by, "ttl_remaining": max(ttl, 0)}
async def release_lock(r, resource, role):
key = LOCK_PREFIX + resource
async with r.pipeline() as pipe:
while True:
try:
await pipe.watch(key)
owner = await pipe.get(key)
if owner != role:
await pipe.unwatch()
return {"released": False, "held_by": owner}
pipe.multi()
pipe.delete(key)
await pipe.execute()
return {"released": True}
except WatchError:
continue
async def heartbeat_lock(r, resource, role, ttl_sec=300):
key = LOCK_PREFIX + resource
async with r.pipeline() as pipe:
while True:
try:
await pipe.watch(key)
owner = await pipe.get(key)
if owner != role:
await pipe.unwatch()
return {"renewed": False, "held_by": owner}
pipe.multi()
pipe.expire(key, ttl_sec)
await pipe.execute()
return {"renewed": True}
except WatchError:
continue
async def list_locks(r):
keys = await r.keys(LOCK_PREFIX + "*")
out = []
for key in keys:
held_by = await r.get(key)
if held_by is None:
continue
ttl = await r.ttl(key)
out.append({
"resource": key[len(LOCK_PREFIX):],
"held_by": held_by,
"ttl_remaining": max(ttl, 0),
})
return {"locks": out}

132
co-gahusb/app/server.py Normal file
View File

@@ -0,0 +1,132 @@
# co-gahusb/app/server.py
import logging
import redis.asyncio as aioredis
from mcp.server.fastmcp import FastMCP
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
from starlette.routing import Mount, Route
from app import config, locks, store
log = logging.getLogger("co-gahusb")
_auth_failed_logged = False
_redis = aioredis.from_url(config.REDIS_URL, decode_responses=True)
mcp = FastMCP("co-gahusb")
# ---- 메시지 ----
@mcp.tool()
async def post_message(from_role: str, to_role: str, body: str, thread_id: str = "") -> dict:
"""다른 역할의 우편함에 메시지를 보낸다."""
res = await store.post_message(_redis, from_role, to_role, body, thread_id or None)
await store.log_event(_redis, "message", f"{from_role}{to_role}: {body[:60]}")
return res
@mcp.tool()
async def read_inbox(role: str, after_id: int = 0, mark_read: bool = False) -> dict:
"""내 역할 우편함을 커서 기반으로 읽는다."""
return await store.read_inbox(_redis, role, after_id, mark_read)
# ---- 작업 ----
@mcp.tool()
async def create_task(title: str, assignee_role: str, created_by: str, detail: str = "") -> dict:
"""작업을 만들어 특정 역할에 배정한다."""
res = await store.create_task(_redis, title, assignee_role, created_by, detail or None)
await store.log_event(_redis, "task", f"{created_by} created '{title}'{assignee_role}")
return res
@mcp.tool()
async def claim_task(task_id: int, role: str) -> dict:
"""open 작업을 점유(in_progress)한다. 이미 점유면 거부."""
res = await store.claim_task(_redis, task_id, role)
if res.get("ok"):
await store.log_event(_redis, "task", f"{role} claimed task#{task_id}")
return res
@mcp.tool()
async def update_task(task_id: int, status: str, role: str, note: str = "") -> dict:
"""작업 상태를 갱신한다 (open/in_progress/blocked/done)."""
res = await store.update_task(_redis, task_id, status, role, note or None)
await store.log_event(_redis, "task", f"{role} set task#{task_id}{status}")
return res
@mcp.tool()
async def list_tasks(status: str = "", assignee_role: str = "") -> dict:
"""작업 목록을 조회한다(상태/담당 필터)."""
return await store.list_tasks(_redis, status or None, assignee_role or None)
# ---- 락 ----
@mcp.tool()
async def acquire_lock(resource: str, role: str, ttl_sec: int = config.DEFAULT_LOCK_TTL) -> dict:
"""공유 리소스 변경 전 어드바이저리 락을 획득한다. 점유 중이면 acquired=false."""
res = await locks.acquire_lock(_redis, resource, role, ttl_sec)
if res.get("acquired"):
await store.log_event(_redis, "lock", f"{role} acquired {resource}")
return res
@mcp.tool()
async def release_lock(resource: str, role: str) -> dict:
"""소유한 락을 해제한다."""
res = await locks.release_lock(_redis, resource, role)
if res.get("released"):
await store.log_event(_redis, "lock", f"{role} released {resource}")
return res
@mcp.tool()
async def heartbeat_lock(resource: str, role: str, ttl_sec: int = config.DEFAULT_LOCK_TTL) -> dict:
"""긴 작업 중 락 TTL을 갱신한다(소유자만)."""
return await locks.heartbeat_lock(_redis, resource, role, ttl_sec)
@mcp.tool()
async def list_locks() -> dict:
"""현재 점유 중인 모든 락을 조회한다."""
return await locks.list_locks(_redis)
# ---- 가시성 ----
@mcp.tool()
async def team_log(after_id: int = 0) -> dict:
"""팀 전체 최근 활동 피드(메시지·작업·락)를 조회한다."""
return await store.read_team_log(_redis, after_id)
# ---- Bearer 인증 미들웨어 ----
class BearerAuth(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
global _auth_failed_logged
if request.url.path.startswith("/health"):
return await call_next(request)
expected = f"Bearer {config.CO_BUS_KEY}"
if not config.CO_BUS_KEY or request.headers.get("authorization") != expected:
if not _auth_failed_logged:
log.error("co-gahusb 인증 실패 (이후 동일 로그 생략)")
_auth_failed_logged = True
return JSONResponse({"error": "unauthorized"}, status_code=401)
return await call_next(request)
async def _health(request):
return JSONResponse({"status": "ok"})
_mcp_app = mcp.streamable_http_app()
app = Starlette(
routes=[Route("/health", _health), Mount("/", app=_mcp_app)],
middleware=[Middleware(BearerAuth)],
lifespan=_mcp_app.router.lifespan_context,
)

157
co-gahusb/app/store.py Normal file
View File

@@ -0,0 +1,157 @@
# co-gahusb/app/store.py
import json
import time
from app.config import TEAM_LOG_MAXLEN
MSG_SEQ = "co:msgseq"
INBOX_PREFIX = "co:inbox:" # list of message ids per role
MSG_PREFIX = "co:msg:" # hash per message
READ_PREFIX = "co:read:" # last-read cursor per role
def _now_iso():
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
async def post_message(r, from_role, to_role, body, thread_id=None):
mid = await r.incr(MSG_SEQ)
payload = {
"id": str(mid),
"from_role": from_role,
"to_role": to_role,
"body": body,
"thread_id": thread_id or "",
"ts": _now_iso(),
}
await r.set(MSG_PREFIX + str(mid), json.dumps(payload))
await r.rpush(INBOX_PREFIX + to_role, mid)
return {"message_id": mid}
async def read_inbox(r, role, after_id=0, mark_read=False):
ids = await r.lrange(INBOX_PREFIX + role, 0, -1)
ids = [int(x) for x in ids if int(x) > int(after_id)]
messages = []
for mid in ids:
raw = await r.get(MSG_PREFIX + str(mid))
if raw:
d = json.loads(raw)
d["id"] = int(d["id"])
messages.append(d)
cursor = ids[-1] if ids else int(after_id)
if mark_read and ids:
await r.set(READ_PREFIX + role, cursor)
return {"messages": messages, "cursor": cursor}
TASK_SEQ = "co:taskseq"
TASK_PREFIX = "co:task:" # hash per task
TASK_SET = "co:tasks" # set of task ids
VALID_STATUS = ("open", "in_progress", "blocked", "done")
async def create_task(r, title, assignee_role, created_by, detail=None):
tid = await r.incr(TASK_SEQ)
task = {
"id": str(tid),
"title": title,
"assignee_role": assignee_role,
"status": "open",
"detail": detail or "",
"created_by": created_by,
"note": "",
"ts": _now_iso(),
}
await r.hset(TASK_PREFIX + str(tid), mapping=task)
await r.sadd(TASK_SET, tid)
return {"task_id": tid}
async def _get_task(r, task_id):
d = await r.hgetall(TASK_PREFIX + str(task_id))
if not d:
return None
d["id"] = int(d["id"])
return d
async def claim_task(r, task_id, role):
key = TASK_PREFIX + str(task_id)
async with r.pipeline() as pipe:
while True:
try:
await pipe.watch(key)
status = await pipe.hget(key, "status")
if status is None:
await pipe.unwatch()
return {"ok": False, "error": "not_found"}
if status != "open":
held = await pipe.hget(key, "assignee_role")
await pipe.unwatch()
return {"ok": False, "held_by": held}
pipe.multi()
pipe.hset(key, mapping={"status": "in_progress", "assignee_role": role})
await pipe.execute()
return {"ok": True, "task": await _get_task(r, task_id)}
except Exception as e:
from redis.exceptions import WatchError
if isinstance(e, WatchError):
continue
raise
async def update_task(r, task_id, status, role, note=None):
if status not in VALID_STATUS:
raise ValueError(f"invalid status: {status}")
key = TASK_PREFIX + str(task_id)
if not await r.exists(key):
return {"ok": False, "error": "not_found"}
mapping = {"status": status}
if note is not None:
mapping["note"] = note
await r.hset(key, mapping=mapping)
return {"ok": True, "task": await _get_task(r, task_id)}
async def list_tasks(r, status=None, assignee_role=None):
ids = sorted(int(x) for x in await r.smembers(TASK_SET))
tasks = []
for tid in ids:
t = await _get_task(r, tid)
if t is None:
continue
if status and t["status"] != status:
continue
if assignee_role and t["assignee_role"] != assignee_role:
continue
tasks.append(t)
return {"tasks": tasks}
LOG_SEQ = "co:logseq"
LOG_LIST = "co:log" # list of event ids (capped)
LOG_PREFIX = "co:logitem:"
async def log_event(r, kind, text):
eid = await r.incr(LOG_SEQ)
item = {"id": eid, "kind": kind, "text": text, "ts": _now_iso()}
await r.set(LOG_PREFIX + str(eid), json.dumps(item))
await r.rpush(LOG_LIST, eid)
await r.ltrim(LOG_LIST, -TEAM_LOG_MAXLEN, -1)
return {"event_id": eid}
async def read_team_log(r, after_id=0, limit=100):
ids = [int(x) for x in await r.lrange(LOG_LIST, 0, -1)]
ids = [i for i in ids if i > int(after_id)]
ids = ids[-limit:]
events = []
for eid in ids:
raw = await r.get(LOG_PREFIX + str(eid))
if raw:
events.append(json.loads(raw))
cursor = ids[-1] if ids else int(after_id)
return {"events": events, "cursor": cursor}

3
co-gahusb/pytest.ini Normal file
View File

@@ -0,0 +1,3 @@
[pytest]
asyncio_mode = auto
testpaths = tests

View File

@@ -0,0 +1,7 @@
mcp>=1.2.0
starlette>=0.37
uvicorn[standard]==0.34.0
redis>=5.0
pytest>=8.0
pytest-asyncio>=0.24
fakeredis>=2.21

View File

View File

@@ -0,0 +1,11 @@
# co-gahusb/tests/conftest.py
import pytest_asyncio
import fakeredis.aioredis
@pytest_asyncio.fixture
async def r():
client = fakeredis.aioredis.FakeRedis(decode_responses=True)
await client.flushall()
yield client
await client.aclose()

View File

@@ -0,0 +1,51 @@
# co-gahusb/tests/test_locks.py
from app import locks
async def test_acquire_succeeds_then_blocks_other(r):
res = await locks.acquire_lock(r, "nas-deploy", "BE", ttl_sec=300)
assert res["acquired"] is True
res2 = await locks.acquire_lock(r, "nas-deploy", "FE", ttl_sec=300)
assert res2["acquired"] is False
assert res2["held_by"] == "BE"
assert res2["ttl_remaining"] > 0
async def test_release_only_by_owner(r):
await locks.acquire_lock(r, "compose", "BE", ttl_sec=300)
bad = await locks.release_lock(r, "compose", "FE")
assert bad["released"] is False
ok = await locks.release_lock(r, "compose", "BE")
assert ok["released"] is True
again = await locks.acquire_lock(r, "compose", "FE", ttl_sec=300)
assert again["acquired"] is True
async def test_heartbeat_only_by_owner_renews_ttl(r):
await locks.acquire_lock(r, "nginx-conf", "BE", ttl_sec=10)
bad = await locks.heartbeat_lock(r, "nginx-conf", "FE", ttl_sec=300)
assert bad["renewed"] is False
ok = await locks.heartbeat_lock(r, "nginx-conf", "BE", ttl_sec=300)
assert ok["renewed"] is True
assert await r.ttl("co:lock:nginx-conf") > 100
async def test_expired_lock_is_reacquirable(r):
await locks.acquire_lock(r, "memory-mirror", "AI", ttl_sec=1)
await r.delete("co:lock:memory-mirror")
res = await locks.acquire_lock(r, "memory-mirror", "FE", ttl_sec=300)
assert res["acquired"] is True
async def test_list_locks(r):
await locks.acquire_lock(r, "nas-deploy", "BE", ttl_sec=300)
await locks.acquire_lock(r, "compose", "FE", ttl_sec=300)
listed = await locks.list_locks(r)
held = {l["resource"]: l["held_by"] for l in listed["locks"]}
assert held == {"nas-deploy": "BE", "compose": "FE"}

View File

@@ -0,0 +1,47 @@
# co-gahusb/tests/test_messages.py
from app import store
async def test_post_and_read_ordering(r):
id1 = (await store.post_message(r, "Producer", "BE", "first"))["message_id"]
id2 = (await store.post_message(r, "Producer", "BE", "second"))["message_id"]
assert id2 > id1
res = await store.read_inbox(r, "BE")
bodies = [m["body"] for m in res["messages"]]
assert bodies == ["first", "second"]
assert res["cursor"] == id2
async def test_read_inbox_after_id(r):
id1 = (await store.post_message(r, "Producer", "BE", "first"))["message_id"]
await store.post_message(r, "Producer", "BE", "second")
res = await store.read_inbox(r, "BE", after_id=id1)
assert [m["body"] for m in res["messages"]] == ["second"]
async def test_inboxes_isolated_per_role(r):
await store.post_message(r, "Producer", "BE", "for-be")
await store.post_message(r, "Producer", "FE", "for-fe")
be = await store.read_inbox(r, "BE")
fe = await store.read_inbox(r, "FE")
assert [m["body"] for m in be["messages"]] == ["for-be"]
assert [m["body"] for m in fe["messages"]] == ["for-fe"]
async def test_mark_read_advances_cursor(r):
await store.post_message(r, "Producer", "BE", "first")
res = await store.read_inbox(r, "BE", mark_read=True)
last = res["cursor"]
await store.post_message(r, "Producer", "BE", "second")
res2 = await store.read_inbox(r, "BE", after_id=last)
assert [m["body"] for m in res2["messages"]] == ["second"]
async def test_message_fields(r):
await store.post_message(r, "Producer", "BE", "hi", thread_id="t1")
res = await store.read_inbox(r, "BE")
m = res["messages"][0]
assert m["from_role"] == "Producer"
assert m["thread_id"] == "t1"
assert "ts" in m and "id" in m

View File

@@ -0,0 +1,25 @@
# co-gahusb/tests/test_server.py
import os
os.environ["CO_BUS_KEY"] = "test-key"
from starlette.testclient import TestClient
from app.server import app
def test_health_open_without_auth():
client = TestClient(app)
res = client.get("/health")
assert res.status_code == 200
assert res.json()["status"] == "ok"
def test_mcp_requires_bearer():
client = TestClient(app)
res = client.post("/mcp", json={})
assert res.status_code == 401
def test_mcp_wrong_key_rejected():
client = TestClient(app)
res = client.post("/mcp", json={}, headers={"Authorization": "Bearer wrong"})
assert res.status_code == 401

View File

@@ -0,0 +1,56 @@
# co-gahusb/tests/test_tasks.py
import pytest
from app import store
async def test_create_and_list(r):
res = await store.create_task(r, "deploy FE", "FE", created_by="Producer", detail="ship it")
tid = res["task_id"]
listed = await store.list_tasks(r)
t = [t for t in listed["tasks"] if t["id"] == tid][0]
assert t["title"] == "deploy FE"
assert t["assignee_role"] == "FE"
assert t["status"] == "open"
assert t["created_by"] == "Producer"
async def test_claim_then_duplicate_claim_rejected(r):
tid = (await store.create_task(r, "x", "FE", created_by="Producer"))["task_id"]
ok = await store.claim_task(r, tid, "FE")
assert ok["ok"] is True
assert ok["task"]["status"] == "in_progress"
dup = await store.claim_task(r, tid, "BE")
assert dup["ok"] is False
assert dup["held_by"] == "FE"
async def test_update_status(r):
tid = (await store.create_task(r, "x", "FE", created_by="Producer"))["task_id"]
await store.claim_task(r, tid, "FE")
res = await store.update_task(r, tid, "done", "FE", note="finished")
assert res["ok"] is True
assert res["task"]["status"] == "done"
assert res["task"]["note"] == "finished"
async def test_list_filters(r):
t1 = (await store.create_task(r, "a", "FE", created_by="Producer"))["task_id"]
await store.create_task(r, "b", "BE", created_by="Producer")
await store.claim_task(r, t1, "FE")
fe = await store.list_tasks(r, assignee_role="FE")
assert [t["title"] for t in fe["tasks"]] == ["a"]
in_prog = await store.list_tasks(r, status="in_progress")
assert [t["title"] for t in in_prog["tasks"]] == ["a"]
async def test_invalid_status_rejected(r):
tid = (await store.create_task(r, "x", "FE", created_by="Producer"))["task_id"]
with pytest.raises(ValueError):
await store.update_task(r, tid, "bogus", "FE")
async def test_update_nonexistent_task_returns_not_found(r):
res = await store.update_task(r, 999, "done", "FE")
assert res["ok"] is False
assert res["error"] == "not_found"

View File

@@ -0,0 +1,25 @@
# co-gahusb/tests/test_teamlog.py
from app import store
async def test_log_event_and_read(r):
await store.log_event(r, "message", "Producer→BE: hi")
await store.log_event(r, "lock", "BE acquired nas-deploy")
res = await store.read_team_log(r)
msgs = [e["text"] for e in res["events"]]
assert msgs == ["Producer→BE: hi", "BE acquired nas-deploy"]
async def test_team_log_after_id(r):
e1 = (await store.log_event(r, "message", "a"))["event_id"]
await store.log_event(r, "message", "b")
res = await store.read_team_log(r, after_id=e1)
assert [e["text"] for e in res["events"]] == ["b"]
async def test_team_log_capped(r):
for i in range(10):
await store.log_event(r, "message", f"m{i}")
res = await store.read_team_log(r, limit=3)
assert len(res["events"]) == 3
assert res["events"][-1]["text"] == "m9"

View File

@@ -221,6 +221,25 @@ services:
timeout: 5s timeout: 5s
retries: 3 retries: 3
co-gahusb:
build:
context: ./co-gahusb
container_name: co-gahusb
restart: unless-stopped
ports:
- "18920:8000"
environment:
- TZ=${TZ:-Asia/Seoul}
- REDIS_URL=${REDIS_URL:-redis://redis:6379}
- CO_BUS_KEY=${CO_BUS_KEY:-}
depends_on:
- redis
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
interval: 60s
timeout: 5s
retries: 3
agent-office: agent-office:
build: build:
context: ./agent-office context: ./agent-office

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,127 @@
# co-gahusb — 세션 간 협업 팀 버스 설계
작성일: 2026-06-12
대상 repo: `web-backend` (서버) + `web-ui`/`web-ai` (클라이언트 배선)
목적: 독립 실행되는 4개 Claude Code 세션(FE/BE/AI/Producer)이 역할을 갖고 비동기로 소통·협업하되, 공유 DB/리소스는 동시 쓰기를 방지한다.
## 배경
web-ui / web-backend / web-ai 세션은 각각 독립 프로세스라 서로의 컨텍스트를 못 본다. 협업하려면 세 곳(서로 다른 머신 포함)에서 닿는 공유 메시지 버스가 필요하다. 사용자가 방식 B(독립 MCP 서버)를 선택했고, 민감한 공유 영역의 동시 쓰기 분리를 핵심 요구로 명시했다.
## 결정 사항 (브레인스토밍 확정)
- 호스팅: 신규 독립 컨테이너 **`co-gahusb`**, NAS, 포트 **18920**(18900 agent-office 옆, 미사용 확인).
- 전송/인증: **HTTP streamable MCP** + 정적 **Bearer 키**([[reference_webai_auth_pattern]] 재사용). nginx `/api/co/``co-gahusb:18920`, `Authorization` forward.
- 백엔드: **Redis**(기존 공유 컨테이너 `redis://redis:6379`). 전 연산 원자적 → SQLite multi-writer 함정([[reference_sqlite_concurrency]]) 회피.
- 동시쓰기 분리: **소유권 파티션 + 어드바이저리 락**.
- 역할: web-ui=FE, web-backend=BE, web-ai=AI, 이 세션=Producer.
- 수신: 각 세션 **/loop 폴링**(`read_inbox` + `list_tasks`).
## 아키텍처
```
[FE 세션 web-ui] [BE 세션 web-backend] [AI 세션 web-ai(다른 머신)] [Producer 세션]
\ | / /
\ | / /
──────── .mcp.json HTTP + Bearer ───────────────────────────────
nginx /api/co/ (Authorization forward)
co-gahusb:18920 (FastMCP streamable-http)
Redis (원자적 연산)
```
서버 구현: **Python `mcp` SDK(FastMCP) + streamable-http transport**(모든 lab이 FastAPI/Python 스택과 일관). 단일 책임 모듈로 분리:
- `app/server.py` — FastMCP 인스턴스 + 툴 등록 + ASGI 앱(streamable-http) + Bearer 인증 미들웨어
- `app/store.py` — Redis 데이터 액세스 레이어(메시지/작업/락), 전 함수 원자적
- `app/locks.py` — 락 Lua 스크립트(소유자 확인 후 release/heartbeat)
- `app/models.py` — 입출력 dataclass/스키마
- `app/config.py` — env(REDIS_URL, CO_BUS_KEY, 포트)
## MCP 툴 표면 (MVP — YAGNI)
| 분류 | 툴 | 시그니처 → 반환 |
|------|-----|------|
| 메시지 | `post_message` | `(from_role, to_role, body, thread_id?)``{message_id}` |
| 메시지 | `read_inbox` | `(role, after_id?, mark_read?=false)``{messages:[{id, from_role, body, thread_id, ts}], cursor}` |
| 작업 | `create_task` | `(title, assignee_role, detail?, created_by)``{task_id}` |
| 작업 | `claim_task` | `(task_id, role)``{ok, task}` (이미 claim 시 `{ok:false, held_by}`) |
| 작업 | `update_task` | `(task_id, status, role, note?)``{ok, task}` (status ∈ open/in_progress/blocked/done) |
| 작업 | `list_tasks` | `(status?, assignee_role?)``{tasks:[...]}` |
| 락 | `acquire_lock` | `(resource, role, ttl_sec=300)``{acquired, held_by?, ttl_remaining?}` |
| 락 | `release_lock` | `(resource, role)``{released}` (소유자 아니면 `{released:false}`) |
| 락 | `heartbeat_lock` | `(resource, role, ttl_sec=300)``{renewed}` (소유자만) |
| 락 | `list_locks` | `()``{locks:[{resource, held_by, ttl_remaining}]}` |
| 가시성 | `team_log` | `(after_id?)``{events:[...], cursor}` (최근 활동 피드) |
## Redis 데이터 모델 (전부 원자적)
- **메시지**: `co:inbox:{role}` = Redis **Stream**. `post_message`=XADD, `read_inbox`=XREAD(`after_id` 커서, 비파괴). `mark_read``co:read:{role}` 키에 마지막 id 저장.
- **작업**: `co:task:{id}` Hash(title/assignee/status/detail/created_by/ts), `co:tasks` Set(id 목록), `INCR co:taskseq`로 id. `claim_task`/`update_task`는 **Lua 스크립트**로 read-modify-write 원자화(중복 claim/경합 방지).
- **락**: 획득 = `SET co:lock:{resource} {role} NX EX {ttl}`(원자적). `release_lock`/`heartbeat_lock` = **Lua**로 `GET` 소유자 일치 확인 후 `DEL`/`EXPIRE`(check-and-act 원자화 → 남의 락 조작 불가).
- **활동로그**: `co:log` = 캡트 Stream(`XADD ... MAXLEN ~ 500`). 메시지·작업·락 이벤트 기록 → Producer 오버사이트.
## 동시 쓰기 분리 (핵심 요구)
**1차 — 정적 소유권 파티션** (락 불필요한 자연 분리):
- `web-ui` → FE만, `web-backend` → BE만, `web-ai` → AI만 쓰기. 각 세션은 자기 repo만 편집 → git 충돌 원천 차단.
**2차 — 교차 리소스 어드바이저리 락** (여러 역할이 건드릴 수 있는 민감 영역만):
- 예약 resource 명: `nas-deploy`, `stock-db-schema`, `lotto-db-schema`, `memory-mirror`(web-ui↔web-ai 미러), `nginx-conf`, `compose`.
- 규약: 위 리소스 변경 전 `acquire_lock` 필수. 점유 중이면 `{acquired:false, held_by, ttl_remaining}` → 대기. **TTL 자동 해제로 세션 사망 시 데드락 방지**, 긴 작업은 `heartbeat_lock` 갱신.
- 어드바이저리(협조적): 버스는 FS를 강제 잠그지 않음 → 각 세션 CLAUDE.md에 "공유 리소스 = 락 먼저" 규약 명문화로 강제.
## 클라이언트 배선
- 각 repo `.mcp.json`:
```json
{ "mcpServers": { "co-gahusb": {
"type": "http",
"url": "https://gahusb.synology.me/api/co/mcp",
"headers": { "Authorization": "Bearer ${CO_BUS_KEY}" } } } }
```
(키는 커밋 금지 — 각 머신 env/로컬에서 주입. `.mcp.json`엔 placeholder, 실제 키는 `.env`/환경변수.)
- 각 repo CLAUDE.md에 역할 블록 추가: "너는 역할 X / 모든 co-gahusb 툴에 role=X / 공유 리소스 변경 전 acquire_lock / `/loop`로 inbox·tasks 폴링".
- web-ai는 다른 머신 → 해당 머신에서 `.mcp.json` 적용(스펙에 절차 명시).
## 인프라 등재 (신규 컨테이너 추가 의무 위치 — [[reference_nas_url_routing]], [[reference_deploy_nas_services_whitelist]])
1. `docker-compose.yml` — `co-gahusb` 서비스(build, `REDIS_URL`, `depends_on: redis`, `CO_BUS_KEY` env, `${RUNTIME_PATH}` 볼륨 불요(상태는 Redis)).
2. nginx `default.conf` — **public `location /api/co/`** 추가(7번째 등재 규칙; `/api/internal/` 불필요).
3. deploy 스크립트 SERVICES 화이트리스트에 `co-gahusb` 등재.
4. `${RUNTIME_PATH}` 절대경로 — 본 서비스는 영속 볼륨 없음(Redis 백엔드)이라 코드 디렉토리만.
5. frontend `depends_on` — 불필요(백엔드 전용 서비스).
6. `.env` — `CO_BUS_KEY` 추가(커밋 금지).
## 에러 / 엣지 처리
- 인증 실패 → 401, 1회만 ERROR 로그 후 조용([[reference_webai_auth_pattern]]).
- 락 획득 실패 → 예외 아닌 `{acquired:false, held_by, ttl_remaining}` 정상 반환.
- 만료 락 → Redis TTL 자동 소멸(별도 GC 불필요).
- 알 수 없는 role/resource → 명시적 에러 메시지.
- Redis 연결 실패 → 503 + 명확한 메시지.
## 테스트 (TDD, pytest + fakeredis)
- **락**: 두 역할 같은 resource 획득 → 2번째 거부 / TTL 만료 후 획득 / 소유자 아닌 release·heartbeat 거부 / heartbeat 갱신 후 ttl 증가.
- **메시지**: XADD 순서대로 `after_id` 커서 읽기 / mark_read 후 재읽기 시 제외 / 다른 role 우편함 격리.
- **작업**: create→claim(중복 claim 거부)→update status 전이 / list 필터.
- **인증**: 키 일치 통과 / 불일치 401.
- **team_log**: 이벤트 기록 + MAXLEN 캡.
## 구현 순서 (phase)
1. 스캐폴드: 디렉토리/Dockerfile/requirements/config (기존 lab 구조 미러)
2. `store.py` + `locks.py` (TDD, fakeredis) — 락 → 메시지 → 작업 → team_log
3. `server.py` — FastMCP 툴 등록 + Bearer 인증 + ASGI
4. 인프라 등재 6위치 (compose/nginx/deploy/env)
5. 클라이언트 배선: web-ui·web-backend `.mcp.json` + CLAUDE.md 역할 블록 (web-ai는 절차 문서화)
6. 배포(Gitea push → webhook) + 스모크 테스트(헬스/인증/락 경합)
## 비범위 (YAGNI)
- 실시간 push(텔레그램) — 후속. 우선 /loop 폴링.
- SQLite 감사로그 — Redis 캡트 스트림으로 충분.
- 웹 대시보드 — agent-office 오버사이트와 추후 통합 여지.
- 락의 FS 레벨 강제 — 어드바이저리로 충분(세션은 협조적).

View File

@@ -400,6 +400,20 @@ server {
proxy_pass http://$saju_backend$request_uri; proxy_pass http://$saju_backend$request_uri;
} }
# co-gahusb — FastMCP streamable-http bus
# Authorization forward required (Bearer key auth), no buffering, long read timeout
# trailing slash on proxy_pass strips /api/co/ prefix: /api/co/mcp → /mcp
location /api/co/ {
proxy_pass http://co-gahusb:8000/;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Authorization $http_authorization;
proxy_set_header Connection "";
proxy_buffering off;
proxy_read_timeout 3600s;
}
# agent-office API + WebSocket # agent-office API + WebSocket
location /api/agent-office/ { location /api/agent-office/ {
resolver 127.0.0.11 valid=10s; resolver 127.0.0.11 valid=10s;

View File

@@ -2,7 +2,7 @@
set -euo pipefail set -euo pipefail
# ── 서비스 목록 (한 곳에서만 관리) ── # ── 서비스 목록 (한 곳에서만 관리) ──
SERVICES="lotto travel-proxy deployer stock music-lab insta-lab realestate-lab agent-office personal packs-lab video-lab image-lab tarot-lab saju-lab nginx scripts _shared" SERVICES="lotto travel-proxy deployer stock music-lab insta-lab realestate-lab co-gahusb agent-office personal packs-lab video-lab image-lab tarot-lab saju-lab nginx scripts _shared"
# 1. 자동 감지: Docker 컨테이너 내부인가? # 1. 자동 감지: Docker 컨테이너 내부인가?
if [ -d "/repo" ] && [ -d "/runtime" ]; then if [ -d "/repo" ] && [ -d "/runtime" ]; then

View File

@@ -15,13 +15,13 @@ flock -n 200 || { echo "Deploy already running, skipping"; exit 0; }
# ── 서비스 목록 (한 곳에서만 관리) ── # ── 서비스 목록 (한 곳에서만 관리) ──
# docker compose 서비스명 (deployer 제외 — 자기 자신을 재빌드하면 스크립트 중단) # docker compose 서비스명 (deployer 제외 — 자기 자신을 재빌드하면 스크립트 중단)
BUILD_TARGETS="lotto travel-proxy stock music-lab insta-lab realestate-lab agent-office personal packs-lab video-lab image-lab tarot-lab saju-lab frontend" BUILD_TARGETS="lotto travel-proxy stock music-lab insta-lab realestate-lab co-gahusb agent-office personal packs-lab video-lab image-lab tarot-lab saju-lab frontend"
# 컨테이너 이름 (고아 정리용 — blog-lab은 폐기 대상으로 정리 리스트에 유지) # 컨테이너 이름 (고아 정리용 — blog-lab은 폐기 대상으로 정리 리스트에 유지)
CONTAINER_NAMES="lotto stock music-lab insta-lab blog-lab realestate-lab agent-office personal packs-lab travel-proxy video-lab image-lab tarot-lab saju-lab frontend" CONTAINER_NAMES="lotto stock music-lab insta-lab blog-lab realestate-lab co-gahusb agent-office personal packs-lab travel-proxy video-lab image-lab tarot-lab saju-lab frontend"
# Infra 서비스 (image-based, 영속 데이터 보존을 위해 stop/rm 없이 up만) # Infra 서비스 (image-based, 영속 데이터 보존을 위해 stop/rm 없이 up만)
INFRA_SERVICES="redis" INFRA_SERVICES="redis"
# 헬스체크 대상 # 헬스체크 대상
HEALTH_ENDPOINTS="lotto stock travel-proxy music-lab insta-lab realestate-lab agent-office personal packs-lab video-lab image-lab tarot-lab saju-lab redis" HEALTH_ENDPOINTS="lotto stock travel-proxy music-lab insta-lab realestate-lab co-gahusb agent-office personal packs-lab video-lab image-lab tarot-lab saju-lab redis"
# data 디렉토리 (packs-lab은 별도 media/packs 사용) # data 디렉토리 (packs-lab은 별도 media/packs 사용)
DATA_DIRS="music stock insta realestate agent-office personal video image tarot saju" DATA_DIRS="music stock insta realestate agent-office personal video image tarot saju"