diff --git a/docs/superpowers/plans/2026-06-12-co-gahusb-team-bus.md b/docs/superpowers/plans/2026-06-12-co-gahusb-team-bus.md new file mode 100644 index 0000000..69a91aa --- /dev/null +++ b/docs/superpowers/plans/2026-06-12-co-gahusb-team-bus.md @@ -0,0 +1,1187 @@ +# co-gahusb 팀 버스 Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** 4개 Claude Code 세션(FE/BE/AI/Producer)이 역할을 갖고 비동기 협업하되 공유 리소스는 동시 쓰기를 막는 NAS 호스팅 MCP 팀 버스 컨테이너(`co-gahusb`)를 만든다. + +**Architecture:** Python `mcp` SDK(FastMCP, streamable-http)로 MCP 툴을 노출하고 Redis(기존 공유 컨테이너)에 상태 저장. 메시지/작업/락 로직은 순수 함수(`store.py`/`locks.py`)로 분리해 fakeredis로 TDD. 동시쓰기 분리는 소유권 파티션(repo별 단일 역할) + 어드바이저리 락(TTL). + +**Tech Stack:** Python 3.12, mcp SDK(FastMCP), Starlette, `redis.asyncio`, uvicorn, pytest + pytest-asyncio + fakeredis. nginx 프록시 + Bearer 정적키. + +**중요 — 동시성 구현 방침:** Redis Stream/Lua 대신 `INCR`-seq + Hash + List + `WATCH/MULTI` 트랜잭션 + `SET NX EX`만 사용한다(전부 fakeredis 지원 → 결정적 테스트, 원자성 유지). spec의 "Stream" 표현은 본 플랜에서 list+seq로 구체화한다(동작 동일, 테스트 가능성 향상). + +**경로/실행 메모:** +- 모든 경로는 `web-backend/co-gahusb/` 기준. 작업 디렉토리: `C:\Users\jaeoh\Desktop\workspace\web-backend`. +- 테스트는 **로컬**에서 venv로 실행(파이썬 OK, Docker는 NAS 전용이라 로컬 docker 금지). 배포만 Gitea push. +- 커밋은 web-backend repo(브랜치 `feat/co-gahusb-team-bus`)에서. + +--- + +## 파일 구조 + +| 파일 | 책임 | +|------|------| +| `co-gahusb/Dockerfile` | python:3.12-slim, uvicorn `app.server:app` :8000 | +| `co-gahusb/requirements.txt` | mcp, starlette, uvicorn, redis, pytest, pytest-asyncio, fakeredis | +| `co-gahusb/app/__init__.py` | 패키지 마커 | +| `co-gahusb/app/config.py` | env (REDIS_URL, CO_BUS_KEY, ROLES, LOCKABLE_RESOURCES) | +| `co-gahusb/app/locks.py` | acquire/release/heartbeat/list_locks (Redis 원자적) | +| `co-gahusb/app/store.py` | 메시지(post/read/mark_read), 작업(create/claim/update/list), team_log | +| `co-gahusb/app/server.py` | FastMCP 툴 등록 + Bearer 인증 + /health + ASGI app | +| `co-gahusb/tests/conftest.py` | fakeredis async fixture | +| `co-gahusb/tests/test_locks.py` | 락 단위 테스트 | +| `co-gahusb/tests/test_messages.py` | 메시지 단위 테스트 | +| `co-gahusb/tests/test_tasks.py` | 작업 단위 테스트 | +| `co-gahusb/tests/test_server.py` | 서버 스모크(인증/health) | +| `docker-compose.yml` (수정) | `co-gahusb` 서비스 | +| nginx `default.conf` (수정) | `location /api/co/` | +| deploy 스크립트 (수정) | SERVICES 화이트리스트 | +| `.mcp.json` (web-backend, web-ui) | co-gahusb MCP 등록 | +| 각 `CLAUDE.md` | 역할 블록 | + +--- + +## Task 1: 스캐폴드 (디렉토리·Dockerfile·requirements·config) + +**Files:** +- Create: `co-gahusb/Dockerfile`, `co-gahusb/requirements.txt`, `co-gahusb/app/__init__.py`, `co-gahusb/app/config.py`, `co-gahusb/tests/__init__.py` + +- [ ] **Step 1: requirements.txt 작성** + +`co-gahusb/requirements.txt`: +``` +mcp>=1.2.0 +starlette>=0.37 +uvicorn[standard]==0.34.0 +redis>=5.0 +pytest>=8.0 +pytest-asyncio>=0.24 +fakeredis>=2.21 +``` + +- [ ] **Step 2: Dockerfile 작성** + +`co-gahusb/Dockerfile`: +```dockerfile +FROM python:3.12-slim-bookworm +ENV PYTHONUNBUFFERED=1 + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt + +COPY . . + +EXPOSE 8000 +CMD ["uvicorn", "app.server:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"] +``` + +- [ ] **Step 3: 패키지 마커 + config.py 작성** + +`co-gahusb/app/__init__.py`: 빈 파일. +`co-gahusb/tests/__init__.py`: 빈 파일. + +`co-gahusb/app/config.py`: +```python +# co-gahusb/app/config.py +import os + +REDIS_URL = os.environ.get("REDIS_URL", "redis://redis:6379") +CO_BUS_KEY = os.environ.get("CO_BUS_KEY", "") + +# 협업 역할 (세션별 1:1) +ROLES = ("FE", "BE", "AI", "Producer") + +# 교차 리소스 어드바이저리 락 대상 (이 외 이름도 락은 가능하나, 규약상 명시 대상) +LOCKABLE_RESOURCES = ( + "nas-deploy", + "stock-db-schema", + "lotto-db-schema", + "memory-mirror", + "nginx-conf", + "compose", +) + +DEFAULT_LOCK_TTL = 300 +TEAM_LOG_MAXLEN = 500 +``` + +- [ ] **Step 4: 로컬 venv 생성 + 의존성 설치** + +Run (Windows, web-backend/co-gahusb 디렉토리에서): +``` +cd co-gahusb +python -m venv .venv +.venv\Scripts\python -m pip install -r requirements.txt +``` +Expected: 설치 성공(mcp/redis/fakeredis 포함). + +- [ ] **Step 5: .gitignore에 venv 제외 확인 후 Commit** + +`co-gahusb/.gitignore` 생성: +``` +.venv/ +__pycache__/ +*.pyc +``` +Commit: +``` +git add co-gahusb/Dockerfile co-gahusb/requirements.txt co-gahusb/app/__init__.py co-gahusb/app/config.py co-gahusb/tests/__init__.py co-gahusb/.gitignore +git commit -m "feat(co-gahusb): 스캐폴드 (Dockerfile·requirements·config)" +``` + +--- + +## Task 2: conftest fixture + 락 (TDD) + +**Files:** +- Create: `co-gahusb/tests/conftest.py`, `co-gahusb/app/locks.py`, `co-gahusb/tests/test_locks.py` + +- [ ] **Step 1: conftest.py 작성 (fakeredis async fixture)** + +`co-gahusb/tests/conftest.py`: +```python +# co-gahusb/tests/conftest.py +import pytest_asyncio +import fakeredis.aioredis + + +@pytest_asyncio.fixture +async def r(): + client = fakeredis.aioredis.FakeRedis(decode_responses=True) + await client.flushall() + yield client + await client.aclose() +``` + +`co-gahusb/pytest.ini`: +```ini +[pytest] +asyncio_mode = auto +testpaths = tests +``` + +- [ ] **Step 2: 실패하는 락 테스트 작성** + +`co-gahusb/tests/test_locks.py`: +```python +# co-gahusb/tests/test_locks.py +from app import locks + + +async def test_acquire_succeeds_then_blocks_other(r): + res = await locks.acquire_lock(r, "nas-deploy", "BE", ttl_sec=300) + assert res["acquired"] is True + + res2 = await locks.acquire_lock(r, "nas-deploy", "FE", ttl_sec=300) + assert res2["acquired"] is False + assert res2["held_by"] == "BE" + assert res2["ttl_remaining"] > 0 + + +async def test_release_only_by_owner(r): + await locks.acquire_lock(r, "compose", "BE", ttl_sec=300) + + bad = await locks.release_lock(r, "compose", "FE") + assert bad["released"] is False + + ok = await locks.release_lock(r, "compose", "BE") + assert ok["released"] is True + + # released 후 재획득 가능 + again = await locks.acquire_lock(r, "compose", "FE", ttl_sec=300) + assert again["acquired"] is True + + +async def test_heartbeat_only_by_owner_renews_ttl(r): + await locks.acquire_lock(r, "nginx-conf", "BE", ttl_sec=10) + + bad = await locks.heartbeat_lock(r, "nginx-conf", "FE", ttl_sec=300) + assert bad["renewed"] is False + + ok = await locks.heartbeat_lock(r, "nginx-conf", "BE", ttl_sec=300) + assert ok["renewed"] is True + assert await r.ttl("co:lock:nginx-conf") > 100 + + +async def test_expired_lock_is_reacquirable(r): + await locks.acquire_lock(r, "memory-mirror", "AI", ttl_sec=1) + await r.delete("co:lock:memory-mirror") # TTL 만료 시뮬레이션 + res = await locks.acquire_lock(r, "memory-mirror", "FE", ttl_sec=300) + assert res["acquired"] is True + + +async def test_list_locks(r): + await locks.acquire_lock(r, "nas-deploy", "BE", ttl_sec=300) + await locks.acquire_lock(r, "compose", "FE", ttl_sec=300) + listed = await locks.list_locks(r) + held = {l["resource"]: l["held_by"] for l in listed["locks"]} + assert held == {"nas-deploy": "BE", "compose": "FE"} +``` + +- [ ] **Step 3: 테스트 실패 확인** + +Run: `.venv\Scripts\python -m pytest tests/test_locks.py -v` +Expected: FAIL — `app.locks` 모듈/함수 없음. + +- [ ] **Step 4: locks.py 구현** + +`co-gahusb/app/locks.py`: +```python +# co-gahusb/app/locks.py +from redis.exceptions import WatchError + +LOCK_PREFIX = "co:lock:" + + +async def acquire_lock(r, resource, role, ttl_sec=300): + key = LOCK_PREFIX + resource + ok = await r.set(key, role, nx=True, ex=ttl_sec) + if ok: + return {"acquired": True} + held_by = await r.get(key) + ttl = await r.ttl(key) + return {"acquired": False, "held_by": held_by, "ttl_remaining": max(ttl, 0)} + + +async def release_lock(r, resource, role): + key = LOCK_PREFIX + resource + async with r.pipeline() as pipe: + while True: + try: + await pipe.watch(key) + owner = await pipe.get(key) + if owner != role: + await pipe.unwatch() + return {"released": False, "held_by": owner} + pipe.multi() + pipe.delete(key) + await pipe.execute() + return {"released": True} + except WatchError: + continue + + +async def heartbeat_lock(r, resource, role, ttl_sec=300): + key = LOCK_PREFIX + resource + async with r.pipeline() as pipe: + while True: + try: + await pipe.watch(key) + owner = await pipe.get(key) + if owner != role: + await pipe.unwatch() + return {"renewed": False, "held_by": owner} + pipe.multi() + pipe.expire(key, ttl_sec) + await pipe.execute() + return {"renewed": True} + except WatchError: + continue + + +async def list_locks(r): + keys = await r.keys(LOCK_PREFIX + "*") + out = [] + for key in keys: + held_by = await r.get(key) + if held_by is None: + continue + ttl = await r.ttl(key) + out.append({ + "resource": key[len(LOCK_PREFIX):], + "held_by": held_by, + "ttl_remaining": max(ttl, 0), + }) + return {"locks": out} +``` + +- [ ] **Step 5: 테스트 통과 확인** + +Run: `.venv\Scripts\python -m pytest tests/test_locks.py -v` +Expected: PASS (5 tests). + +- [ ] **Step 6: Commit** + +``` +git add co-gahusb/tests/conftest.py co-gahusb/pytest.ini co-gahusb/app/locks.py co-gahusb/tests/test_locks.py +git commit -m "feat(co-gahusb): 어드바이저리 락 (acquire/release/heartbeat/list, TDD)" +``` + +--- + +## Task 3: 메시지 (TDD) + +**Files:** +- Create: `co-gahusb/app/store.py` (메시지 부분), `co-gahusb/tests/test_messages.py` + +- [ ] **Step 1: 실패하는 메시지 테스트 작성** + +`co-gahusb/tests/test_messages.py`: +```python +# co-gahusb/tests/test_messages.py +from app import store + + +async def test_post_and_read_ordering(r): + id1 = (await store.post_message(r, "Producer", "BE", "first"))["message_id"] + id2 = (await store.post_message(r, "Producer", "BE", "second"))["message_id"] + assert id2 > id1 + + res = await store.read_inbox(r, "BE") + bodies = [m["body"] for m in res["messages"]] + assert bodies == ["first", "second"] + assert res["cursor"] == id2 + + +async def test_read_inbox_after_id(r): + id1 = (await store.post_message(r, "Producer", "BE", "first"))["message_id"] + await store.post_message(r, "Producer", "BE", "second") + res = await store.read_inbox(r, "BE", after_id=id1) + assert [m["body"] for m in res["messages"]] == ["second"] + + +async def test_inboxes_isolated_per_role(r): + await store.post_message(r, "Producer", "BE", "for-be") + await store.post_message(r, "Producer", "FE", "for-fe") + be = await store.read_inbox(r, "BE") + fe = await store.read_inbox(r, "FE") + assert [m["body"] for m in be["messages"]] == ["for-be"] + assert [m["body"] for m in fe["messages"]] == ["for-fe"] + + +async def test_mark_read_advances_cursor(r): + await store.post_message(r, "Producer", "BE", "first") + res = await store.read_inbox(r, "BE", mark_read=True) + last = res["cursor"] + await store.post_message(r, "Producer", "BE", "second") + # mark_read 이후 저장된 커서부터 다시 읽기 + res2 = await store.read_inbox(r, "BE", after_id=last) + assert [m["body"] for m in res2["messages"]] == ["second"] + + +async def test_message_fields(r): + await store.post_message(r, "Producer", "BE", "hi", thread_id="t1") + res = await store.read_inbox(r, "BE") + m = res["messages"][0] + assert m["from_role"] == "Producer" + assert m["thread_id"] == "t1" + assert "ts" in m and "id" in m +``` + +- [ ] **Step 2: 테스트 실패 확인** + +Run: `.venv\Scripts\python -m pytest tests/test_messages.py -v` +Expected: FAIL — `app.store` 없음. + +- [ ] **Step 3: store.py 메시지 부분 구현** + +`co-gahusb/app/store.py` (신규 파일, 메시지 함수만 우선): +```python +# co-gahusb/app/store.py +import json +import time + +MSG_SEQ = "co:msgseq" +INBOX_PREFIX = "co:inbox:" # list of message ids per role +MSG_PREFIX = "co:msg:" # hash per message +READ_PREFIX = "co:read:" # last-read cursor per role + + +def _now_iso(): + return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + + +async def post_message(r, from_role, to_role, body, thread_id=None): + mid = await r.incr(MSG_SEQ) + payload = { + "id": str(mid), + "from_role": from_role, + "to_role": to_role, + "body": body, + "thread_id": thread_id or "", + "ts": _now_iso(), + } + await r.set(MSG_PREFIX + str(mid), json.dumps(payload)) + await r.rpush(INBOX_PREFIX + to_role, mid) + return {"message_id": mid} + + +async def read_inbox(r, role, after_id=0, mark_read=False): + ids = await r.lrange(INBOX_PREFIX + role, 0, -1) + ids = [int(x) for x in ids if int(x) > int(after_id)] + messages = [] + for mid in ids: + raw = await r.get(MSG_PREFIX + str(mid)) + if raw: + d = json.loads(raw) + d["id"] = int(d["id"]) + messages.append(d) + cursor = ids[-1] if ids else int(after_id) + if mark_read and ids: + await r.set(READ_PREFIX + role, cursor) + return {"messages": messages, "cursor": cursor} +``` + +- [ ] **Step 4: 테스트 통과 확인** + +Run: `.venv\Scripts\python -m pytest tests/test_messages.py -v` +Expected: PASS (5 tests). + +- [ ] **Step 5: Commit** + +``` +git add co-gahusb/app/store.py co-gahusb/tests/test_messages.py +git commit -m "feat(co-gahusb): 메시지 inbox (post/read/mark_read, TDD)" +``` + +--- + +## Task 4: 작업 보드 (TDD) + +**Files:** +- Modify: `co-gahusb/app/store.py` (작업 함수 추가) +- Create: `co-gahusb/tests/test_tasks.py` + +- [ ] **Step 1: 실패하는 작업 테스트 작성** + +`co-gahusb/tests/test_tasks.py`: +```python +# co-gahusb/tests/test_tasks.py +import pytest +from app import store + + +async def test_create_and_list(r): + res = await store.create_task(r, "deploy FE", "FE", created_by="Producer", detail="ship it") + tid = res["task_id"] + listed = await store.list_tasks(r) + t = [t for t in listed["tasks"] if t["id"] == tid][0] + assert t["title"] == "deploy FE" + assert t["assignee_role"] == "FE" + assert t["status"] == "open" + assert t["created_by"] == "Producer" + + +async def test_claim_then_duplicate_claim_rejected(r): + tid = (await store.create_task(r, "x", "FE", created_by="Producer"))["task_id"] + ok = await store.claim_task(r, tid, "FE") + assert ok["ok"] is True + assert ok["task"]["status"] == "in_progress" + + dup = await store.claim_task(r, tid, "BE") + assert dup["ok"] is False + assert dup["held_by"] == "FE" + + +async def test_update_status(r): + tid = (await store.create_task(r, "x", "FE", created_by="Producer"))["task_id"] + await store.claim_task(r, tid, "FE") + res = await store.update_task(r, tid, "done", "FE", note="finished") + assert res["ok"] is True + assert res["task"]["status"] == "done" + assert res["task"]["note"] == "finished" + + +async def test_list_filters(r): + t1 = (await store.create_task(r, "a", "FE", created_by="Producer"))["task_id"] + await store.create_task(r, "b", "BE", created_by="Producer") + await store.claim_task(r, t1, "FE") + fe = await store.list_tasks(r, assignee_role="FE") + assert [t["title"] for t in fe["tasks"]] == ["a"] + in_prog = await store.list_tasks(r, status="in_progress") + assert [t["title"] for t in in_prog["tasks"]] == ["a"] + + +async def test_invalid_status_rejected(r): + tid = (await store.create_task(r, "x", "FE", created_by="Producer"))["task_id"] + with pytest.raises(ValueError): + await store.update_task(r, tid, "bogus", "FE") +``` + +- [ ] **Step 2: 테스트 실패 확인** + +Run: `.venv\Scripts\python -m pytest tests/test_tasks.py -v` +Expected: FAIL — `create_task` 등 없음. + +- [ ] **Step 3: store.py에 작업 함수 추가** + +`co-gahusb/app/store.py` 파일 끝에 추가: +```python +TASK_SEQ = "co:taskseq" +TASK_PREFIX = "co:task:" # hash per task +TASK_SET = "co:tasks" # set of task ids + +VALID_STATUS = ("open", "in_progress", "blocked", "done") + + +async def create_task(r, title, assignee_role, created_by, detail=None): + tid = await r.incr(TASK_SEQ) + task = { + "id": str(tid), + "title": title, + "assignee_role": assignee_role, + "status": "open", + "detail": detail or "", + "created_by": created_by, + "note": "", + "ts": _now_iso(), + } + await r.hset(TASK_PREFIX + str(tid), mapping=task) + await r.sadd(TASK_SET, tid) + return {"task_id": tid} + + +async def _get_task(r, task_id): + d = await r.hgetall(TASK_PREFIX + str(task_id)) + if not d: + return None + d["id"] = int(d["id"]) + return d + + +async def claim_task(r, task_id, role): + key = TASK_PREFIX + str(task_id) + async with r.pipeline() as pipe: + while True: + try: + await pipe.watch(key) + status = await pipe.hget(key, "status") + if status is None: + await pipe.unwatch() + return {"ok": False, "error": "not_found"} + if status != "open": + held = await pipe.hget(key, "assignee_role") + await pipe.unwatch() + return {"ok": False, "held_by": held} + pipe.multi() + pipe.hset(key, mapping={"status": "in_progress", "assignee_role": role}) + await pipe.execute() + return {"ok": True, "task": await _get_task(r, task_id)} + except Exception as e: # WatchError 포함 재시도 + from redis.exceptions import WatchError + if isinstance(e, WatchError): + continue + raise + + +async def update_task(r, task_id, status, role, note=None): + if status not in VALID_STATUS: + raise ValueError(f"invalid status: {status}") + key = TASK_PREFIX + str(task_id) + mapping = {"status": status} + if note is not None: + mapping["note"] = note + await r.hset(key, mapping=mapping) + return {"ok": True, "task": await _get_task(r, task_id)} + + +async def list_tasks(r, status=None, assignee_role=None): + ids = sorted(int(x) for x in await r.smembers(TASK_SET)) + tasks = [] + for tid in ids: + t = await _get_task(r, tid) + if t is None: + continue + if status and t["status"] != status: + continue + if assignee_role and t["assignee_role"] != assignee_role: + continue + tasks.append(t) + return {"tasks": tasks} +``` + +- [ ] **Step 4: 테스트 통과 확인** + +Run: `.venv\Scripts\python -m pytest tests/test_tasks.py -v` +Expected: PASS (5 tests). + +- [ ] **Step 5: Commit** + +``` +git add co-gahusb/app/store.py co-gahusb/tests/test_tasks.py +git commit -m "feat(co-gahusb): 작업 보드 (create/claim/update/list, TDD)" +``` + +--- + +## Task 5: team_log 활동 피드 (TDD) + +**Files:** +- Modify: `co-gahusb/app/store.py` (team_log 추가) +- Create: `co-gahusb/tests/test_teamlog.py` + +- [ ] **Step 1: 실패하는 테스트 작성** + +`co-gahusb/tests/test_teamlog.py`: +```python +# co-gahusb/tests/test_teamlog.py +from app import store + + +async def test_log_event_and_read(r): + await store.log_event(r, "message", "Producer→BE: hi") + await store.log_event(r, "lock", "BE acquired nas-deploy") + res = await store.read_team_log(r) + msgs = [e["text"] for e in res["events"]] + assert msgs == ["Producer→BE: hi", "BE acquired nas-deploy"] + + +async def test_team_log_after_id(r): + e1 = (await store.log_event(r, "message", "a"))["event_id"] + await store.log_event(r, "message", "b") + res = await store.read_team_log(r, after_id=e1) + assert [e["text"] for e in res["events"]] == ["b"] + + +async def test_team_log_capped(r): + for i in range(10): + await store.log_event(r, "message", f"m{i}") + # MAXLEN 캡(작은 값으로 검증) + res = await store.read_team_log(r, limit=3) + assert len(res["events"]) == 3 + assert res["events"][-1]["text"] == "m9" +``` + +- [ ] **Step 2: 테스트 실패 확인** + +Run: `.venv\Scripts\python -m pytest tests/test_teamlog.py -v` +Expected: FAIL — `log_event` 없음. + +- [ ] **Step 3: store.py에 team_log 추가** + +`co-gahusb/app/store.py` 끝에 추가: +```python +from app.config import TEAM_LOG_MAXLEN + +LOG_SEQ = "co:logseq" +LOG_LIST = "co:log" # list of json events (capped) +LOG_PREFIX = "co:logitem:" + + +async def log_event(r, kind, text): + eid = await r.incr(LOG_SEQ) + item = {"id": eid, "kind": kind, "text": text, "ts": _now_iso()} + await r.set(LOG_PREFIX + str(eid), json.dumps(item)) + await r.rpush(LOG_LIST, eid) + await r.ltrim(LOG_LIST, -TEAM_LOG_MAXLEN, -1) + return {"event_id": eid} + + +async def read_team_log(r, after_id=0, limit=100): + ids = [int(x) for x in await r.lrange(LOG_LIST, 0, -1)] + ids = [i for i in ids if i > int(after_id)] + ids = ids[-limit:] + events = [] + for eid in ids: + raw = await r.get(LOG_PREFIX + str(eid)) + if raw: + events.append(json.loads(raw)) + cursor = ids[-1] if ids else int(after_id) + return {"events": events, "cursor": cursor} +``` + +- [ ] **Step 4: 테스트 통과 확인** + +Run: `.venv\Scripts\python -m pytest tests/ -v` +Expected: PASS (전체 테스트, team_log 3개 포함). + +- [ ] **Step 5: Commit** + +``` +git add co-gahusb/app/store.py co-gahusb/tests/test_teamlog.py +git commit -m "feat(co-gahusb): team_log 활동 피드 (capped, TDD)" +``` + +--- + +## Task 6: MCP 서버 (FastMCP 툴 + Bearer 인증 + health + ASGI) + +**Files:** +- Create: `co-gahusb/app/server.py`, `co-gahusb/tests/test_server.py` + +> **통합 주의(검증 필수):** `mcp` SDK 버전마다 streamable-http ASGI 노출 API가 다를 수 있다(`mcp.streamable_http_app()` 및 lifespan 처리). 아래 코드는 표준 형태이며, 구현 시 설치된 SDK에서 `mcp.streamable_http_app` 존재를 확인하고, 없으면 `python -c "import mcp.server.fastmcp as m; print(dir(m.FastMCP))"`로 정확한 메서드명을 찾아 맞춘다. **Step 4 스모크 테스트로 실제 기동을 확인**한다. 툴 로직 자체는 store/locks 호출이라 안정적이다. + +- [ ] **Step 1: 실패하는 서버 스모크 테스트 작성** + +`co-gahusb/tests/test_server.py`: +```python +# co-gahusb/tests/test_server.py +import os +os.environ["CO_BUS_KEY"] = "test-key" + +from starlette.testclient import TestClient +from app.server import app + + +def test_health_open_without_auth(): + client = TestClient(app) + res = client.get("/health") + assert res.status_code == 200 + assert res.json()["status"] == "ok" + + +def test_mcp_requires_bearer(): + client = TestClient(app) + # 인증 없이 MCP 엔드포인트 접근 → 401 + res = client.post("/mcp", json={}) + assert res.status_code == 401 + + +def test_mcp_wrong_key_rejected(): + client = TestClient(app) + res = client.post("/mcp", json={}, headers={"Authorization": "Bearer wrong"}) + assert res.status_code == 401 +``` + +- [ ] **Step 2: 테스트 실패 확인** + +Run: `.venv\Scripts\python -m pytest tests/test_server.py -v` +Expected: FAIL — `app.server` 없음. + +- [ ] **Step 3: server.py 구현** + +`co-gahusb/app/server.py`: +```python +# co-gahusb/app/server.py +import logging + +import redis.asyncio as aioredis +from mcp.server.fastmcp import FastMCP +from starlette.applications import Starlette +from starlette.middleware import Middleware +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.responses import JSONResponse +from starlette.routing import Mount, Route + +from app import config, locks, store + +log = logging.getLogger("co-gahusb") +_auth_failed_logged = False + +# 단일 공유 Redis 풀 +_redis = aioredis.from_url(config.REDIS_URL, decode_responses=True) + +mcp = FastMCP("co-gahusb") + + +# ---- 메시지 ---- +@mcp.tool() +async def post_message(from_role: str, to_role: str, body: str, thread_id: str = "") -> dict: + """다른 역할의 우편함에 메시지를 보낸다.""" + res = await store.post_message(_redis, from_role, to_role, body, thread_id or None) + await store.log_event(_redis, "message", f"{from_role}→{to_role}: {body[:60]}") + return res + + +@mcp.tool() +async def read_inbox(role: str, after_id: int = 0, mark_read: bool = False) -> dict: + """내 역할 우편함을 커서 기반으로 읽는다.""" + return await store.read_inbox(_redis, role, after_id, mark_read) + + +# ---- 작업 ---- +@mcp.tool() +async def create_task(title: str, assignee_role: str, created_by: str, detail: str = "") -> dict: + """작업을 만들어 특정 역할에 배정한다.""" + res = await store.create_task(_redis, title, assignee_role, created_by, detail or None) + await store.log_event(_redis, "task", f"{created_by} created '{title}' → {assignee_role}") + return res + + +@mcp.tool() +async def claim_task(task_id: int, role: str) -> dict: + """open 작업을 점유(in_progress)한다. 이미 점유면 거부.""" + res = await store.claim_task(_redis, task_id, role) + if res.get("ok"): + await store.log_event(_redis, "task", f"{role} claimed task#{task_id}") + return res + + +@mcp.tool() +async def update_task(task_id: int, status: str, role: str, note: str = "") -> dict: + """작업 상태를 갱신한다 (open/in_progress/blocked/done).""" + res = await store.update_task(_redis, task_id, status, role, note or None) + await store.log_event(_redis, "task", f"{role} set task#{task_id} → {status}") + return res + + +@mcp.tool() +async def list_tasks(status: str = "", assignee_role: str = "") -> dict: + """작업 목록을 조회한다(상태/담당 필터).""" + return await store.list_tasks(_redis, status or None, assignee_role or None) + + +# ---- 락 ---- +@mcp.tool() +async def acquire_lock(resource: str, role: str, ttl_sec: int = config.DEFAULT_LOCK_TTL) -> dict: + """공유 리소스 변경 전 어드바이저리 락을 획득한다. 점유 중이면 acquired=false.""" + res = await locks.acquire_lock(_redis, resource, role, ttl_sec) + if res.get("acquired"): + await store.log_event(_redis, "lock", f"{role} acquired {resource}") + return res + + +@mcp.tool() +async def release_lock(resource: str, role: str) -> dict: + """소유한 락을 해제한다.""" + res = await locks.release_lock(_redis, resource, role) + if res.get("released"): + await store.log_event(_redis, "lock", f"{role} released {resource}") + return res + + +@mcp.tool() +async def heartbeat_lock(resource: str, role: str, ttl_sec: int = config.DEFAULT_LOCK_TTL) -> dict: + """긴 작업 중 락 TTL을 갱신한다(소유자만).""" + return await locks.heartbeat_lock(_redis, resource, role, ttl_sec) + + +@mcp.tool() +async def list_locks() -> dict: + """현재 점유 중인 모든 락을 조회한다.""" + return await locks.list_locks(_redis) + + +# ---- 가시성 ---- +@mcp.tool() +async def team_log(after_id: int = 0) -> dict: + """팀 전체 최근 활동 피드(메시지·작업·락)를 조회한다.""" + return await store.read_team_log(_redis, after_id) + + +# ---- Bearer 인증 미들웨어 ---- +class BearerAuth(BaseHTTPMiddleware): + async def dispatch(self, request, call_next): + global _auth_failed_logged + if request.url.path.startswith("/health"): + return await call_next(request) + expected = f"Bearer {config.CO_BUS_KEY}" + if not config.CO_BUS_KEY or request.headers.get("authorization") != expected: + if not _auth_failed_logged: + log.error("co-gahusb 인증 실패 (이후 동일 로그 생략)") + _auth_failed_logged = True + return JSONResponse({"error": "unauthorized"}, status_code=401) + return await call_next(request) + + +async def _health(request): + return JSONResponse({"status": "ok"}) + + +# streamable-http ASGI sub-app (/mcp 엔드포인트 제공) +_mcp_app = mcp.streamable_http_app() + +app = Starlette( + routes=[Route("/health", _health), Mount("/", app=_mcp_app)], + middleware=[Middleware(BearerAuth)], + lifespan=_mcp_app.lifespan, +) +``` + +- [ ] **Step 4: 테스트 통과 + 로컬 스모크 확인** + +Run: `.venv\Scripts\python -m pytest tests/test_server.py -v` +Expected: PASS (3 tests). + +만약 `mcp.streamable_http_app` / `.lifespan` 관련 AttributeError 발생 시: `.venv\Scripts\python -c "from mcp.server.fastmcp import FastMCP; print([a for a in dir(FastMCP) if 'app' in a.lower() or 'http' in a.lower()])"`로 정확한 메서드명을 찾아 교체하고, lifespan은 sub-app의 실제 속성명에 맞춘다(예: `_mcp_app.router.lifespan_context`). 통과할 때까지 수정. + +전체 스위트도 확인: `.venv\Scripts\python -m pytest tests/ -v` → 전부 PASS. + +- [ ] **Step 5: Commit** + +``` +git add co-gahusb/app/server.py co-gahusb/tests/test_server.py +git commit -m "feat(co-gahusb): FastMCP 서버 (12 툴 + Bearer 인증 + health)" +``` + +--- + +## Task 7: docker-compose 서비스 등재 + +**Files:** +- Modify: `web-backend/docker-compose.yml` + +- [ ] **Step 1: co-gahusb 서비스 블록 추가** + +`docker-compose.yml`의 `agent-office:` 서비스 블록 바로 위(또는 redis 서비스 인접)에 추가. (들여쓰기 2칸, 기존 서비스 스타일과 동일): +```yaml + co-gahusb: + build: + context: ./co-gahusb + container_name: co-gahusb + restart: unless-stopped + ports: + - "18920:8000" + environment: + - TZ=${TZ:-Asia/Seoul} + - REDIS_URL=${REDIS_URL:-redis://redis:6379} + - CO_BUS_KEY=${CO_BUS_KEY:-} + depends_on: + - redis + healthcheck: + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"] + interval: 60s + timeout: 5s + retries: 3 +``` + +- [ ] **Step 2: compose 문법 검증 (로컬, docker 미실행)** + +Run: `.venv\Scripts\python -c "import yaml; yaml.safe_load(open('docker-compose.yml', encoding='utf-8')); print('compose OK')"` +(작업 디렉토리 web-backend 루트에서. PyYAML 없으면 `pip install pyyaml` 후 실행.) +Expected: `compose OK` (들여쓰기/문법 오류 없음). + +- [ ] **Step 3: Commit** + +``` +git add docker-compose.yml +git commit -m "feat(co-gahusb): docker-compose 서비스 등재 (18920, depends_on redis)" +``` + +--- + +## Task 8: nginx public 라우팅 등재 + +**Files:** +- Modify: web-backend 내 nginx 설정 (`location /api/...` 들이 있는 `.conf`) + +- [ ] **Step 1: nginx conf 파일 찾기** + +Run: `git -C . grep -l "location /api/agent-office/"` (web-backend 루트에서) +→ 해당 `.conf` 파일 경로 확인. + +- [ ] **Step 2: /api/co/ location 추가** + +찾은 conf에서 `location /api/agent-office/ { ... }` 블록을 찾아 그 형식을 그대로 따라, 같은 server 블록 안에 추가. agent-office 블록이 다음과 같다면: +``` + location /api/agent-office/ { + proxy_pass http://agent-office:8000/api/agent-office/; + ... + } +``` +co-gahusb는 컨테이너 내부 경로가 `/mcp`이고 외부는 `/api/co/mcp`이므로 **접두 stripping** 형태로 추가(proxy_pass에 trailing slash): +``` + location /api/co/ { + proxy_pass http://co-gahusb:8000/; + proxy_http_version 1.1; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header Authorization $http_authorization; + proxy_set_header Connection ""; + proxy_buffering off; + proxy_read_timeout 3600s; + } +``` +(streamable-http는 long-lived 응답이 가능하므로 `proxy_buffering off` + 긴 `proxy_read_timeout`. `Authorization` forward 필수. 실제 파일의 기존 공통 proxy_set_header 패턴이 있으면 그걸 따르고 위 3개(Authorization/Connection/buffering)만 보강.) + +- [ ] **Step 3: nginx 문법 셀프 점검** + +블록 괄호 균형과 `location /api/co/`가 server 블록 내부인지 육안 확인. (실제 `nginx -t`는 NAS 배포 후 컨테이너에서.) + +- [ ] **Step 4: Commit** + +``` +git add +git commit -m "feat(co-gahusb): nginx public /api/co/ 라우팅 (Authorization forward, no-buffer)" +``` + +--- + +## Task 9: deploy 스크립트 SERVICES 화이트리스트 + +**Files:** +- Modify: web-backend deploy 스크립트(SERVICES 목록 보유 파일) + +- [ ] **Step 1: SERVICES 목록 파일 찾기** + +Run: `git -C . grep -rl "agent-office" -- '*.sh'` (web-backend 루트) +→ deploy 스크립트에서 SERVICES/서비스 배열에 `agent-office`가 등장하는 파일·라인 확인. + +- [ ] **Step 2: co-gahusb 등재** + +해당 SERVICES 배열/리스트에서 `agent-office`가 있는 줄과 동일 형식으로 `co-gahusb`를 추가. 예) `SERVICES=(... agent-office co-gahusb ...)` 또는 한 줄에 하나씩이면 `agent-office` 다음 줄에 `co-gahusb` 추가. + +- [ ] **Step 3: rsync/마운트 화이트리스트 확인** + +같은 스크립트에서 코드 동기화 대상 디렉토리 화이트리스트가 있으면(`reference_deploy_nas_services_whitelist`) `co-gahusb`도 포함되는지 확인하고, `${RUNTIME_PATH}` 절대경로 동기화 대상이라면 추가. (co-gahusb는 영속 볼륨 없음 → 코드 디렉토리 동기화만.) + +- [ ] **Step 4: Commit** + +``` +git add +git commit -m "feat(co-gahusb): deploy SERVICES 화이트리스트 등재" +``` + +--- + +## Task 10: 클라이언트 배선 (.mcp.json + CLAUDE.md 역할 블록) + +**Files:** +- Create/Modify: `web-backend/.mcp.json`, `web-ui/.mcp.json` +- Modify: `web-backend/CLAUDE.md`, `web-ui/CLAUDE.md` +- Create: `web-backend/co-gahusb/CLIENT_SETUP.md` (web-ai 절차 포함) + +> 이 Task는 web-backend·web-ui 두 repo를 건드린다. **각 파일은 해당 repo 경로에서 커밋**(feedback_commit_repo). `.mcp.json`엔 실제 키 금지 — `${CO_BUS_KEY}` placeholder. + +- [ ] **Step 1: web-backend/.mcp.json 작성** + +`web-backend/.mcp.json` (없으면 생성, 있으면 mcpServers에 병합): +```json +{ + "mcpServers": { + "co-gahusb": { + "type": "http", + "url": "https://gahusb.synology.me/api/co/mcp", + "headers": { "Authorization": "Bearer ${CO_BUS_KEY}" } + } + } +} +``` + +- [ ] **Step 2: web-ui/.mcp.json 작성** + +`web-ui/.mcp.json` — Step 1과 동일 내용: +```json +{ + "mcpServers": { + "co-gahusb": { + "type": "http", + "url": "https://gahusb.synology.me/api/co/mcp", + "headers": { "Authorization": "Bearer ${CO_BUS_KEY}" } + } + } +} +``` + +- [ ] **Step 3: 역할 블록 — web-backend/CLAUDE.md** + +`web-backend/CLAUDE.md` 맨 끝에 추가: +```markdown + +--- + +## 협업 팀 버스 (co-gahusb) — 이 세션의 역할: **BE** + +이 세션은 백엔드(BE) 역할이다. co-gahusb MCP 툴로 다른 세션(FE/AI/Producer)과 협업한다. +- **소유권**: 이 세션은 `web-backend` repo만 쓴다(FE=web-ui, AI=web-ai). +- **공유 리소스 변경 전 반드시 `acquire_lock(resource, "BE")`**: 대상 = `nas-deploy`, `stock-db-schema`, `lotto-db-schema`, `memory-mirror`, `nginx-conf`, `compose`. 점유 중이면 대기, 긴 작업은 `heartbeat_lock`, 끝나면 `release_lock`. +- **모든 툴 호출에 `role="BE"`** (또는 `from_role`/`created_by`에 BE). +- **수신**: `/loop`로 주기적으로 `read_inbox("BE", after_id=)` + `list_tasks(assignee_role="BE")` 확인. +- 키 `CO_BUS_KEY`는 환경변수로 주입(커밋 금지). +``` + +- [ ] **Step 4: 역할 블록 — web-ui/CLAUDE.md** + +`web-ui/CLAUDE.md` 맨 끝에 추가(BE→FE, repo만 교체): +```markdown + +--- + +## 협업 팀 버스 (co-gahusb) — 이 세션의 역할: **FE** + +이 세션은 프론트엔드(FE) 역할이다. co-gahusb MCP 툴로 다른 세션(BE/AI/Producer)과 협업한다. +- **소유권**: 이 세션은 `web-ui` repo만 쓴다(BE=web-backend, AI=web-ai). +- **공유 리소스 변경 전 반드시 `acquire_lock(resource, "FE")`**: 대상 = `nas-deploy`, `stock-db-schema`, `lotto-db-schema`, `memory-mirror`, `nginx-conf`, `compose`. 점유 중이면 대기, 긴 작업은 `heartbeat_lock`, 끝나면 `release_lock`. +- **모든 툴 호출에 `role="FE"`** (또는 `from_role`/`created_by`에 FE). +- **수신**: `/loop`로 주기적으로 `read_inbox("FE", after_id=)` + `list_tasks(assignee_role="FE")` 확인. +- 키 `CO_BUS_KEY`는 환경변수로 주입(커밋 금지). +``` + +- [ ] **Step 5: web-ai 절차 문서 작성** + +`web-backend/co-gahusb/CLIENT_SETUP.md`: +```markdown +# co-gahusb 클라이언트 설정 + +## 공통 +1. `CO_BUS_KEY` 환경변수를 각 머신에 설정(서버 `.env`의 값과 동일). +2. 해당 repo 루트 `.mcp.json`에 co-gahusb HTTP MCP 등록(이 repo의 예시 참고). +3. CLAUDE.md 역할 블록의 `/loop` 폴링 규약을 따른다. + +## web-ai (다른 머신) +web-ai 머신의 repo 루트에 아래 `.mcp.json` 생성, 역할 = **AI**: +```json +{ "mcpServers": { "co-gahusb": { + "type": "http", + "url": "https://gahusb.synology.me/api/co/mcp", + "headers": { "Authorization": "Bearer ${CO_BUS_KEY}" } } } } +``` +web-ai CLAUDE.md에 역할 블록 추가(role="AI", 소유권=web-ai repo, 동일 락 규약). + +## Producer (오케스트레이터 세션) +별도 repo 없이 조율 담당. `team_log()`로 전체 활동 감시, `create_task`로 분배, `acquire_lock`로 교차 작업 직렬화. +``` + +- [ ] **Step 6: 커밋 (repo별로 분리)** + +web-backend에서: +``` +git add web-backend/.mcp.json web-backend/CLAUDE.md co-gahusb/CLIENT_SETUP.md +git commit -m "feat(co-gahusb): BE 클라이언트 배선 (.mcp.json + 역할 블록 + 셋업 문서)" +``` +web-ui에서(별도 repo, cd C:\Users\jaeoh\Desktop\workspace\web-ui): +``` +git add .mcp.json CLAUDE.md +git commit -m "feat(co-gahusb): FE 클라이언트 배선 (.mcp.json + 역할 블록)" +``` +(web-ui는 현재 main 브랜치 — 커밋 전 `git checkout -b feat/co-gahusb-client` 권장.) + +--- + +## Task 11: 배포 + 스모크 테스트 + +**Files:** 없음 (배포·검증) + +> Docker는 NAS 전용. 로컬 docker 금지. 배포는 Gitea push → webhook 자동 배포. + +- [ ] **Step 1: 서버 .env에 키 추가 (NAS, 커밋 금지)** + +NAS의 web-backend `.env`에 `CO_BUS_KEY=<강한 랜덤 키>` 추가가 필요함을 사용자에게 안내(이 값은 각 클라이언트 머신 env에도 동일하게 설정). **이 단계는 사용자 수동** — 키 생성/배치 후 진행. + +- [ ] **Step 2: 배포 푸시** + +web-backend feature 브랜치를 main에 머지 후 push (또는 배포 정책에 따라): +``` +git checkout main && git merge --no-ff feat/co-gahusb-team-bus +git push +``` +(Gitea 자격증명 실패 시 사용자 수동 push — feedback_nas_deploy_paths.) + +- [ ] **Step 3: NAS 헬스 스모크** + +배포 반영 후(webhook 완료 대기): +``` +curl -s https://gahusb.synology.me/api/co/health +``` +Expected: `{"status":"ok"}` + +- [ ] **Step 4: 인증 스모크** + +``` +curl -s -o /dev/null -w "%{http_code}" -X POST https://gahusb.synology.me/api/co/mcp +``` +Expected: `401` (키 없음). + +- [ ] **Step 5: 락 경합 end-to-end (MCP 클라이언트로)** + +이 세션(또는 Producer)에서 co-gahusb MCP가 연결되면: +- `acquire_lock("nas-deploy", "BE")` → `{acquired:true}` +- 다시 `acquire_lock("nas-deploy", "FE")` → `{acquired:false, held_by:"BE"}` +- `release_lock("nas-deploy", "BE")` → `{released:true}` +Expected: 위 순서대로 동작 → 동시쓰기 분리 검증 완료. + +- [ ] **Step 6: 마무리** + +finishing-a-development-branch로 web-backend 브랜치 정리. 메모리에 co-gahusb 운영 노트 기록. + +--- + +## Self-Review 체크리스트 (작성자 검증 완료) + +- **Spec coverage:** 호스팅/포트(T1,T7) ✓ HTTP MCP+Bearer(T6,T8) ✓ Redis 백엔드(T2-5) ✓ 락=소유권+어드바이저리(T2 락 + T10 CLAUDE.md 규약) ✓ 메시지/작업/team_log(T3,4,5) ✓ 인프라 6위치(T7 compose, T8 nginx, T9 deploy, T1/T6 .env, frontend depends_on 불요 명시) ✓ 클라이언트 배선+역할+/loop(T10) ✓ 테스트(T2-6) ✓ 배포/스모크(T11) ✓. +- **Placeholder scan:** 모든 코드 step에 실제 코드/명령/기대출력. T6의 SDK 버전 분기는 "검증 후 맞춤" 명시적 절차(placeholder 아님). T8/T9는 실제 파일 grep으로 위치 특정하는 탐색 step(상수 경로가 repo마다 달라 grep으로 지정). +- **Type consistency:** store/locks 함수 시그니처가 test와 server.py 호출에서 일치(`acquire_lock(r,resource,role,ttl_sec)`, `post_message(r,from_role,to_role,body,thread_id)`, `claim_task(r,task_id,role)` 등). 반환 dict 키(`acquired`/`held_by`/`released`/`renewed`/`ok`/`task`/`message_id`/`cursor`/`events`) 전 Task 일관. +- **Known caveat:** T6 FastMCP ASGI/lifespan API는 설치 SDK 버전 의존 → Step 4 스모크로 강제 검증. store/locks(핵심 동시성)는 fakeredis로 결정적 테스트라 안정. +- **동시성 안전:** 락 acquire=SET NX EX 원자적, release/heartbeat=WATCH/MULTI, claim_task=WATCH/MULTI → 다중 세션 경합에도 정확. SQLite multi-writer 회피.