Files
web-page-backend/docs/superpowers/plans/2026-06-12-co-gahusb-team-bus.md
2026-06-12 01:31:06 +09:00

40 KiB

co-gahusb 팀 버스 Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: 4개 Claude Code 세션(FE/BE/AI/Producer)이 역할을 갖고 비동기 협업하되 공유 리소스는 동시 쓰기를 막는 NAS 호스팅 MCP 팀 버스 컨테이너(co-gahusb)를 만든다.

Architecture: Python mcp SDK(FastMCP, streamable-http)로 MCP 툴을 노출하고 Redis(기존 공유 컨테이너)에 상태 저장. 메시지/작업/락 로직은 순수 함수(store.py/locks.py)로 분리해 fakeredis로 TDD. 동시쓰기 분리는 소유권 파티션(repo별 단일 역할) + 어드바이저리 락(TTL).

Tech Stack: Python 3.12, mcp SDK(FastMCP), Starlette, redis.asyncio, uvicorn, pytest + pytest-asyncio + fakeredis. nginx 프록시 + Bearer 정적키.

중요 — 동시성 구현 방침: Redis Stream/Lua 대신 INCR-seq + Hash + List + WATCH/MULTI 트랜잭션 + SET NX EX만 사용한다(전부 fakeredis 지원 → 결정적 테스트, 원자성 유지). spec의 "Stream" 표현은 본 플랜에서 list+seq로 구체화한다(동작 동일, 테스트 가능성 향상).

경로/실행 메모:

  • 모든 경로는 web-backend/co-gahusb/ 기준. 작업 디렉토리: C:\Users\jaeoh\Desktop\workspace\web-backend.
  • 테스트는 로컬에서 venv로 실행(파이썬 OK, Docker는 NAS 전용이라 로컬 docker 금지). 배포만 Gitea push.
  • 커밋은 web-backend repo(브랜치 feat/co-gahusb-team-bus)에서.

파일 구조

파일 책임
co-gahusb/Dockerfile python:3.12-slim, uvicorn app.server:app :8000
co-gahusb/requirements.txt mcp, starlette, uvicorn, redis, pytest, pytest-asyncio, fakeredis
co-gahusb/app/__init__.py 패키지 마커
co-gahusb/app/config.py env (REDIS_URL, CO_BUS_KEY, ROLES, LOCKABLE_RESOURCES)
co-gahusb/app/locks.py acquire/release/heartbeat/list_locks (Redis 원자적)
co-gahusb/app/store.py 메시지(post/read/mark_read), 작업(create/claim/update/list), team_log
co-gahusb/app/server.py FastMCP 툴 등록 + Bearer 인증 + /health + ASGI app
co-gahusb/tests/conftest.py fakeredis async fixture
co-gahusb/tests/test_locks.py 락 단위 테스트
co-gahusb/tests/test_messages.py 메시지 단위 테스트
co-gahusb/tests/test_tasks.py 작업 단위 테스트
co-gahusb/tests/test_server.py 서버 스모크(인증/health)
docker-compose.yml (수정) co-gahusb 서비스
nginx default.conf (수정) location /api/co/
deploy 스크립트 (수정) SERVICES 화이트리스트
.mcp.json (web-backend, web-ui) co-gahusb MCP 등록
CLAUDE.md 역할 블록

Task 1: 스캐폴드 (디렉토리·Dockerfile·requirements·config)

Files:

  • Create: co-gahusb/Dockerfile, co-gahusb/requirements.txt, co-gahusb/app/__init__.py, co-gahusb/app/config.py, co-gahusb/tests/__init__.py

  • Step 1: requirements.txt 작성

co-gahusb/requirements.txt:

mcp>=1.2.0
starlette>=0.37
uvicorn[standard]==0.34.0
redis>=5.0
pytest>=8.0
pytest-asyncio>=0.24
fakeredis>=2.21
  • Step 2: Dockerfile 작성

co-gahusb/Dockerfile:

FROM python:3.12-slim-bookworm
ENV PYTHONUNBUFFERED=1

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt

COPY . .

EXPOSE 8000
CMD ["uvicorn", "app.server:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]
  • Step 3: 패키지 마커 + config.py 작성

co-gahusb/app/__init__.py: 빈 파일. co-gahusb/tests/__init__.py: 빈 파일.

co-gahusb/app/config.py:

# co-gahusb/app/config.py
import os

REDIS_URL = os.environ.get("REDIS_URL", "redis://redis:6379")
CO_BUS_KEY = os.environ.get("CO_BUS_KEY", "")

# 협업 역할 (세션별 1:1)
ROLES = ("FE", "BE", "AI", "Producer")

# 교차 리소스 어드바이저리 락 대상 (이 외 이름도 락은 가능하나, 규약상 명시 대상)
LOCKABLE_RESOURCES = (
    "nas-deploy",
    "stock-db-schema",
    "lotto-db-schema",
    "memory-mirror",
    "nginx-conf",
    "compose",
)

DEFAULT_LOCK_TTL = 300
TEAM_LOG_MAXLEN = 500
  • Step 4: 로컬 venv 생성 + 의존성 설치

Run (Windows, web-backend/co-gahusb 디렉토리에서):

cd co-gahusb
python -m venv .venv
.venv\Scripts\python -m pip install -r requirements.txt

Expected: 설치 성공(mcp/redis/fakeredis 포함).

  • Step 5: .gitignore에 venv 제외 확인 후 Commit

co-gahusb/.gitignore 생성:

.venv/
__pycache__/
*.pyc

Commit:

git add co-gahusb/Dockerfile co-gahusb/requirements.txt co-gahusb/app/__init__.py co-gahusb/app/config.py co-gahusb/tests/__init__.py co-gahusb/.gitignore
git commit -m "feat(co-gahusb): 스캐폴드 (Dockerfile·requirements·config)"

Task 2: conftest fixture + 락 (TDD)

Files:

  • Create: co-gahusb/tests/conftest.py, co-gahusb/app/locks.py, co-gahusb/tests/test_locks.py

  • Step 1: conftest.py 작성 (fakeredis async fixture)

co-gahusb/tests/conftest.py:

# co-gahusb/tests/conftest.py
import pytest_asyncio
import fakeredis.aioredis


@pytest_asyncio.fixture
async def r():
    client = fakeredis.aioredis.FakeRedis(decode_responses=True)
    await client.flushall()
    yield client
    await client.aclose()

co-gahusb/pytest.ini:

[pytest]
asyncio_mode = auto
testpaths = tests
  • Step 2: 실패하는 락 테스트 작성

co-gahusb/tests/test_locks.py:

# co-gahusb/tests/test_locks.py
from app import locks


async def test_acquire_succeeds_then_blocks_other(r):
    res = await locks.acquire_lock(r, "nas-deploy", "BE", ttl_sec=300)
    assert res["acquired"] is True

    res2 = await locks.acquire_lock(r, "nas-deploy", "FE", ttl_sec=300)
    assert res2["acquired"] is False
    assert res2["held_by"] == "BE"
    assert res2["ttl_remaining"] > 0


async def test_release_only_by_owner(r):
    await locks.acquire_lock(r, "compose", "BE", ttl_sec=300)

    bad = await locks.release_lock(r, "compose", "FE")
    assert bad["released"] is False

    ok = await locks.release_lock(r, "compose", "BE")
    assert ok["released"] is True

    # released 후 재획득 가능
    again = await locks.acquire_lock(r, "compose", "FE", ttl_sec=300)
    assert again["acquired"] is True


async def test_heartbeat_only_by_owner_renews_ttl(r):
    await locks.acquire_lock(r, "nginx-conf", "BE", ttl_sec=10)

    bad = await locks.heartbeat_lock(r, "nginx-conf", "FE", ttl_sec=300)
    assert bad["renewed"] is False

    ok = await locks.heartbeat_lock(r, "nginx-conf", "BE", ttl_sec=300)
    assert ok["renewed"] is True
    assert await r.ttl("co:lock:nginx-conf") > 100


async def test_expired_lock_is_reacquirable(r):
    await locks.acquire_lock(r, "memory-mirror", "AI", ttl_sec=1)
    await r.delete("co:lock:memory-mirror")  # TTL 만료 시뮬레이션
    res = await locks.acquire_lock(r, "memory-mirror", "FE", ttl_sec=300)
    assert res["acquired"] is True


async def test_list_locks(r):
    await locks.acquire_lock(r, "nas-deploy", "BE", ttl_sec=300)
    await locks.acquire_lock(r, "compose", "FE", ttl_sec=300)
    listed = await locks.list_locks(r)
    held = {l["resource"]: l["held_by"] for l in listed["locks"]}
    assert held == {"nas-deploy": "BE", "compose": "FE"}
  • Step 3: 테스트 실패 확인

Run: .venv\Scripts\python -m pytest tests/test_locks.py -v Expected: FAIL — app.locks 모듈/함수 없음.

  • Step 4: locks.py 구현

co-gahusb/app/locks.py:

# co-gahusb/app/locks.py
from redis.exceptions import WatchError

LOCK_PREFIX = "co:lock:"


async def acquire_lock(r, resource, role, ttl_sec=300):
    key = LOCK_PREFIX + resource
    ok = await r.set(key, role, nx=True, ex=ttl_sec)
    if ok:
        return {"acquired": True}
    held_by = await r.get(key)
    ttl = await r.ttl(key)
    return {"acquired": False, "held_by": held_by, "ttl_remaining": max(ttl, 0)}


async def release_lock(r, resource, role):
    key = LOCK_PREFIX + resource
    async with r.pipeline() as pipe:
        while True:
            try:
                await pipe.watch(key)
                owner = await pipe.get(key)
                if owner != role:
                    await pipe.unwatch()
                    return {"released": False, "held_by": owner}
                pipe.multi()
                pipe.delete(key)
                await pipe.execute()
                return {"released": True}
            except WatchError:
                continue


async def heartbeat_lock(r, resource, role, ttl_sec=300):
    key = LOCK_PREFIX + resource
    async with r.pipeline() as pipe:
        while True:
            try:
                await pipe.watch(key)
                owner = await pipe.get(key)
                if owner != role:
                    await pipe.unwatch()
                    return {"renewed": False, "held_by": owner}
                pipe.multi()
                pipe.expire(key, ttl_sec)
                await pipe.execute()
                return {"renewed": True}
            except WatchError:
                continue


async def list_locks(r):
    keys = await r.keys(LOCK_PREFIX + "*")
    out = []
    for key in keys:
        held_by = await r.get(key)
        if held_by is None:
            continue
        ttl = await r.ttl(key)
        out.append({
            "resource": key[len(LOCK_PREFIX):],
            "held_by": held_by,
            "ttl_remaining": max(ttl, 0),
        })
    return {"locks": out}
  • Step 5: 테스트 통과 확인

Run: .venv\Scripts\python -m pytest tests/test_locks.py -v Expected: PASS (5 tests).

  • Step 6: Commit
git add co-gahusb/tests/conftest.py co-gahusb/pytest.ini co-gahusb/app/locks.py co-gahusb/tests/test_locks.py
git commit -m "feat(co-gahusb): 어드바이저리 락 (acquire/release/heartbeat/list, TDD)"

Task 3: 메시지 (TDD)

Files:

  • Create: co-gahusb/app/store.py (메시지 부분), co-gahusb/tests/test_messages.py

  • Step 1: 실패하는 메시지 테스트 작성

co-gahusb/tests/test_messages.py:

# co-gahusb/tests/test_messages.py
from app import store


async def test_post_and_read_ordering(r):
    id1 = (await store.post_message(r, "Producer", "BE", "first"))["message_id"]
    id2 = (await store.post_message(r, "Producer", "BE", "second"))["message_id"]
    assert id2 > id1

    res = await store.read_inbox(r, "BE")
    bodies = [m["body"] for m in res["messages"]]
    assert bodies == ["first", "second"]
    assert res["cursor"] == id2


async def test_read_inbox_after_id(r):
    id1 = (await store.post_message(r, "Producer", "BE", "first"))["message_id"]
    await store.post_message(r, "Producer", "BE", "second")
    res = await store.read_inbox(r, "BE", after_id=id1)
    assert [m["body"] for m in res["messages"]] == ["second"]


async def test_inboxes_isolated_per_role(r):
    await store.post_message(r, "Producer", "BE", "for-be")
    await store.post_message(r, "Producer", "FE", "for-fe")
    be = await store.read_inbox(r, "BE")
    fe = await store.read_inbox(r, "FE")
    assert [m["body"] for m in be["messages"]] == ["for-be"]
    assert [m["body"] for m in fe["messages"]] == ["for-fe"]


async def test_mark_read_advances_cursor(r):
    await store.post_message(r, "Producer", "BE", "first")
    res = await store.read_inbox(r, "BE", mark_read=True)
    last = res["cursor"]
    await store.post_message(r, "Producer", "BE", "second")
    # mark_read 이후 저장된 커서부터 다시 읽기
    res2 = await store.read_inbox(r, "BE", after_id=last)
    assert [m["body"] for m in res2["messages"]] == ["second"]


async def test_message_fields(r):
    await store.post_message(r, "Producer", "BE", "hi", thread_id="t1")
    res = await store.read_inbox(r, "BE")
    m = res["messages"][0]
    assert m["from_role"] == "Producer"
    assert m["thread_id"] == "t1"
    assert "ts" in m and "id" in m
  • Step 2: 테스트 실패 확인

Run: .venv\Scripts\python -m pytest tests/test_messages.py -v Expected: FAIL — app.store 없음.

  • Step 3: store.py 메시지 부분 구현

co-gahusb/app/store.py (신규 파일, 메시지 함수만 우선):

# co-gahusb/app/store.py
import json
import time

MSG_SEQ = "co:msgseq"
INBOX_PREFIX = "co:inbox:"      # list of message ids per role
MSG_PREFIX = "co:msg:"          # hash per message
READ_PREFIX = "co:read:"        # last-read cursor per role


def _now_iso():
    return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())


async def post_message(r, from_role, to_role, body, thread_id=None):
    mid = await r.incr(MSG_SEQ)
    payload = {
        "id": str(mid),
        "from_role": from_role,
        "to_role": to_role,
        "body": body,
        "thread_id": thread_id or "",
        "ts": _now_iso(),
    }
    await r.set(MSG_PREFIX + str(mid), json.dumps(payload))
    await r.rpush(INBOX_PREFIX + to_role, mid)
    return {"message_id": mid}


async def read_inbox(r, role, after_id=0, mark_read=False):
    ids = await r.lrange(INBOX_PREFIX + role, 0, -1)
    ids = [int(x) for x in ids if int(x) > int(after_id)]
    messages = []
    for mid in ids:
        raw = await r.get(MSG_PREFIX + str(mid))
        if raw:
            d = json.loads(raw)
            d["id"] = int(d["id"])
            messages.append(d)
    cursor = ids[-1] if ids else int(after_id)
    if mark_read and ids:
        await r.set(READ_PREFIX + role, cursor)
    return {"messages": messages, "cursor": cursor}
  • Step 4: 테스트 통과 확인

Run: .venv\Scripts\python -m pytest tests/test_messages.py -v Expected: PASS (5 tests).

  • Step 5: Commit
git add co-gahusb/app/store.py co-gahusb/tests/test_messages.py
git commit -m "feat(co-gahusb): 메시지 inbox (post/read/mark_read, TDD)"

Task 4: 작업 보드 (TDD)

Files:

  • Modify: co-gahusb/app/store.py (작업 함수 추가)

  • Create: co-gahusb/tests/test_tasks.py

  • Step 1: 실패하는 작업 테스트 작성

co-gahusb/tests/test_tasks.py:

# co-gahusb/tests/test_tasks.py
import pytest
from app import store


async def test_create_and_list(r):
    res = await store.create_task(r, "deploy FE", "FE", created_by="Producer", detail="ship it")
    tid = res["task_id"]
    listed = await store.list_tasks(r)
    t = [t for t in listed["tasks"] if t["id"] == tid][0]
    assert t["title"] == "deploy FE"
    assert t["assignee_role"] == "FE"
    assert t["status"] == "open"
    assert t["created_by"] == "Producer"


async def test_claim_then_duplicate_claim_rejected(r):
    tid = (await store.create_task(r, "x", "FE", created_by="Producer"))["task_id"]
    ok = await store.claim_task(r, tid, "FE")
    assert ok["ok"] is True
    assert ok["task"]["status"] == "in_progress"

    dup = await store.claim_task(r, tid, "BE")
    assert dup["ok"] is False
    assert dup["held_by"] == "FE"


async def test_update_status(r):
    tid = (await store.create_task(r, "x", "FE", created_by="Producer"))["task_id"]
    await store.claim_task(r, tid, "FE")
    res = await store.update_task(r, tid, "done", "FE", note="finished")
    assert res["ok"] is True
    assert res["task"]["status"] == "done"
    assert res["task"]["note"] == "finished"


async def test_list_filters(r):
    t1 = (await store.create_task(r, "a", "FE", created_by="Producer"))["task_id"]
    await store.create_task(r, "b", "BE", created_by="Producer")
    await store.claim_task(r, t1, "FE")
    fe = await store.list_tasks(r, assignee_role="FE")
    assert [t["title"] for t in fe["tasks"]] == ["a"]
    in_prog = await store.list_tasks(r, status="in_progress")
    assert [t["title"] for t in in_prog["tasks"]] == ["a"]


async def test_invalid_status_rejected(r):
    tid = (await store.create_task(r, "x", "FE", created_by="Producer"))["task_id"]
    with pytest.raises(ValueError):
        await store.update_task(r, tid, "bogus", "FE")
  • Step 2: 테스트 실패 확인

Run: .venv\Scripts\python -m pytest tests/test_tasks.py -v Expected: FAIL — create_task 등 없음.

  • Step 3: store.py에 작업 함수 추가

co-gahusb/app/store.py 파일 끝에 추가:

TASK_SEQ = "co:taskseq"
TASK_PREFIX = "co:task:"        # hash per task
TASK_SET = "co:tasks"           # set of task ids

VALID_STATUS = ("open", "in_progress", "blocked", "done")


async def create_task(r, title, assignee_role, created_by, detail=None):
    tid = await r.incr(TASK_SEQ)
    task = {
        "id": str(tid),
        "title": title,
        "assignee_role": assignee_role,
        "status": "open",
        "detail": detail or "",
        "created_by": created_by,
        "note": "",
        "ts": _now_iso(),
    }
    await r.hset(TASK_PREFIX + str(tid), mapping=task)
    await r.sadd(TASK_SET, tid)
    return {"task_id": tid}


async def _get_task(r, task_id):
    d = await r.hgetall(TASK_PREFIX + str(task_id))
    if not d:
        return None
    d["id"] = int(d["id"])
    return d


async def claim_task(r, task_id, role):
    key = TASK_PREFIX + str(task_id)
    async with r.pipeline() as pipe:
        while True:
            try:
                await pipe.watch(key)
                status = await pipe.hget(key, "status")
                if status is None:
                    await pipe.unwatch()
                    return {"ok": False, "error": "not_found"}
                if status != "open":
                    held = await pipe.hget(key, "assignee_role")
                    await pipe.unwatch()
                    return {"ok": False, "held_by": held}
                pipe.multi()
                pipe.hset(key, mapping={"status": "in_progress", "assignee_role": role})
                await pipe.execute()
                return {"ok": True, "task": await _get_task(r, task_id)}
            except Exception as e:  # WatchError 포함 재시도
                from redis.exceptions import WatchError
                if isinstance(e, WatchError):
                    continue
                raise


async def update_task(r, task_id, status, role, note=None):
    if status not in VALID_STATUS:
        raise ValueError(f"invalid status: {status}")
    key = TASK_PREFIX + str(task_id)
    mapping = {"status": status}
    if note is not None:
        mapping["note"] = note
    await r.hset(key, mapping=mapping)
    return {"ok": True, "task": await _get_task(r, task_id)}


async def list_tasks(r, status=None, assignee_role=None):
    ids = sorted(int(x) for x in await r.smembers(TASK_SET))
    tasks = []
    for tid in ids:
        t = await _get_task(r, tid)
        if t is None:
            continue
        if status and t["status"] != status:
            continue
        if assignee_role and t["assignee_role"] != assignee_role:
            continue
        tasks.append(t)
    return {"tasks": tasks}
  • Step 4: 테스트 통과 확인

Run: .venv\Scripts\python -m pytest tests/test_tasks.py -v Expected: PASS (5 tests).

  • Step 5: Commit
git add co-gahusb/app/store.py co-gahusb/tests/test_tasks.py
git commit -m "feat(co-gahusb): 작업 보드 (create/claim/update/list, TDD)"

Task 5: team_log 활동 피드 (TDD)

Files:

  • Modify: co-gahusb/app/store.py (team_log 추가)

  • Create: co-gahusb/tests/test_teamlog.py

  • Step 1: 실패하는 테스트 작성

co-gahusb/tests/test_teamlog.py:

# co-gahusb/tests/test_teamlog.py
from app import store


async def test_log_event_and_read(r):
    await store.log_event(r, "message", "Producer→BE: hi")
    await store.log_event(r, "lock", "BE acquired nas-deploy")
    res = await store.read_team_log(r)
    msgs = [e["text"] for e in res["events"]]
    assert msgs == ["Producer→BE: hi", "BE acquired nas-deploy"]


async def test_team_log_after_id(r):
    e1 = (await store.log_event(r, "message", "a"))["event_id"]
    await store.log_event(r, "message", "b")
    res = await store.read_team_log(r, after_id=e1)
    assert [e["text"] for e in res["events"]] == ["b"]


async def test_team_log_capped(r):
    for i in range(10):
        await store.log_event(r, "message", f"m{i}")
    # MAXLEN 캡(작은 값으로 검증)
    res = await store.read_team_log(r, limit=3)
    assert len(res["events"]) == 3
    assert res["events"][-1]["text"] == "m9"
  • Step 2: 테스트 실패 확인

Run: .venv\Scripts\python -m pytest tests/test_teamlog.py -v Expected: FAIL — log_event 없음.

  • Step 3: store.py에 team_log 추가

co-gahusb/app/store.py 끝에 추가:

from app.config import TEAM_LOG_MAXLEN

LOG_SEQ = "co:logseq"
LOG_LIST = "co:log"             # list of json events (capped)
LOG_PREFIX = "co:logitem:"


async def log_event(r, kind, text):
    eid = await r.incr(LOG_SEQ)
    item = {"id": eid, "kind": kind, "text": text, "ts": _now_iso()}
    await r.set(LOG_PREFIX + str(eid), json.dumps(item))
    await r.rpush(LOG_LIST, eid)
    await r.ltrim(LOG_LIST, -TEAM_LOG_MAXLEN, -1)
    return {"event_id": eid}


async def read_team_log(r, after_id=0, limit=100):
    ids = [int(x) for x in await r.lrange(LOG_LIST, 0, -1)]
    ids = [i for i in ids if i > int(after_id)]
    ids = ids[-limit:]
    events = []
    for eid in ids:
        raw = await r.get(LOG_PREFIX + str(eid))
        if raw:
            events.append(json.loads(raw))
    cursor = ids[-1] if ids else int(after_id)
    return {"events": events, "cursor": cursor}
  • Step 4: 테스트 통과 확인

Run: .venv\Scripts\python -m pytest tests/ -v Expected: PASS (전체 테스트, team_log 3개 포함).

  • Step 5: Commit
git add co-gahusb/app/store.py co-gahusb/tests/test_teamlog.py
git commit -m "feat(co-gahusb): team_log 활동 피드 (capped, TDD)"

Task 6: MCP 서버 (FastMCP 툴 + Bearer 인증 + health + ASGI)

Files:

  • Create: co-gahusb/app/server.py, co-gahusb/tests/test_server.py

통합 주의(검증 필수): mcp SDK 버전마다 streamable-http ASGI 노출 API가 다를 수 있다(mcp.streamable_http_app() 및 lifespan 처리). 아래 코드는 표준 형태이며, 구현 시 설치된 SDK에서 mcp.streamable_http_app 존재를 확인하고, 없으면 python -c "import mcp.server.fastmcp as m; print(dir(m.FastMCP))"로 정확한 메서드명을 찾아 맞춘다. Step 4 스모크 테스트로 실제 기동을 확인한다. 툴 로직 자체는 store/locks 호출이라 안정적이다.

  • Step 1: 실패하는 서버 스모크 테스트 작성

co-gahusb/tests/test_server.py:

# co-gahusb/tests/test_server.py
import os
os.environ["CO_BUS_KEY"] = "test-key"

from starlette.testclient import TestClient
from app.server import app


def test_health_open_without_auth():
    client = TestClient(app)
    res = client.get("/health")
    assert res.status_code == 200
    assert res.json()["status"] == "ok"


def test_mcp_requires_bearer():
    client = TestClient(app)
    # 인증 없이 MCP 엔드포인트 접근 → 401
    res = client.post("/mcp", json={})
    assert res.status_code == 401


def test_mcp_wrong_key_rejected():
    client = TestClient(app)
    res = client.post("/mcp", json={}, headers={"Authorization": "Bearer wrong"})
    assert res.status_code == 401
  • Step 2: 테스트 실패 확인

Run: .venv\Scripts\python -m pytest tests/test_server.py -v Expected: FAIL — app.server 없음.

  • Step 3: server.py 구현

co-gahusb/app/server.py:

# co-gahusb/app/server.py
import logging

import redis.asyncio as aioredis
from mcp.server.fastmcp import FastMCP
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
from starlette.routing import Mount, Route

from app import config, locks, store

log = logging.getLogger("co-gahusb")
_auth_failed_logged = False

# 단일 공유 Redis 풀
_redis = aioredis.from_url(config.REDIS_URL, decode_responses=True)

mcp = FastMCP("co-gahusb")


# ---- 메시지 ----
@mcp.tool()
async def post_message(from_role: str, to_role: str, body: str, thread_id: str = "") -> dict:
    """다른 역할의 우편함에 메시지를 보낸다."""
    res = await store.post_message(_redis, from_role, to_role, body, thread_id or None)
    await store.log_event(_redis, "message", f"{from_role}{to_role}: {body[:60]}")
    return res


@mcp.tool()
async def read_inbox(role: str, after_id: int = 0, mark_read: bool = False) -> dict:
    """내 역할 우편함을 커서 기반으로 읽는다."""
    return await store.read_inbox(_redis, role, after_id, mark_read)


# ---- 작업 ----
@mcp.tool()
async def create_task(title: str, assignee_role: str, created_by: str, detail: str = "") -> dict:
    """작업을 만들어 특정 역할에 배정한다."""
    res = await store.create_task(_redis, title, assignee_role, created_by, detail or None)
    await store.log_event(_redis, "task", f"{created_by} created '{title}' → {assignee_role}")
    return res


@mcp.tool()
async def claim_task(task_id: int, role: str) -> dict:
    """open 작업을 점유(in_progress)한다. 이미 점유면 거부."""
    res = await store.claim_task(_redis, task_id, role)
    if res.get("ok"):
        await store.log_event(_redis, "task", f"{role} claimed task#{task_id}")
    return res


@mcp.tool()
async def update_task(task_id: int, status: str, role: str, note: str = "") -> dict:
    """작업 상태를 갱신한다 (open/in_progress/blocked/done)."""
    res = await store.update_task(_redis, task_id, status, role, note or None)
    await store.log_event(_redis, "task", f"{role} set task#{task_id}{status}")
    return res


@mcp.tool()
async def list_tasks(status: str = "", assignee_role: str = "") -> dict:
    """작업 목록을 조회한다(상태/담당 필터)."""
    return await store.list_tasks(_redis, status or None, assignee_role or None)


# ---- 락 ----
@mcp.tool()
async def acquire_lock(resource: str, role: str, ttl_sec: int = config.DEFAULT_LOCK_TTL) -> dict:
    """공유 리소스 변경 전 어드바이저리 락을 획득한다. 점유 중이면 acquired=false."""
    res = await locks.acquire_lock(_redis, resource, role, ttl_sec)
    if res.get("acquired"):
        await store.log_event(_redis, "lock", f"{role} acquired {resource}")
    return res


@mcp.tool()
async def release_lock(resource: str, role: str) -> dict:
    """소유한 락을 해제한다."""
    res = await locks.release_lock(_redis, resource, role)
    if res.get("released"):
        await store.log_event(_redis, "lock", f"{role} released {resource}")
    return res


@mcp.tool()
async def heartbeat_lock(resource: str, role: str, ttl_sec: int = config.DEFAULT_LOCK_TTL) -> dict:
    """긴 작업 중 락 TTL을 갱신한다(소유자만)."""
    return await locks.heartbeat_lock(_redis, resource, role, ttl_sec)


@mcp.tool()
async def list_locks() -> dict:
    """현재 점유 중인 모든 락을 조회한다."""
    return await locks.list_locks(_redis)


# ---- 가시성 ----
@mcp.tool()
async def team_log(after_id: int = 0) -> dict:
    """팀 전체 최근 활동 피드(메시지·작업·락)를 조회한다."""
    return await store.read_team_log(_redis, after_id)


# ---- Bearer 인증 미들웨어 ----
class BearerAuth(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        global _auth_failed_logged
        if request.url.path.startswith("/health"):
            return await call_next(request)
        expected = f"Bearer {config.CO_BUS_KEY}"
        if not config.CO_BUS_KEY or request.headers.get("authorization") != expected:
            if not _auth_failed_logged:
                log.error("co-gahusb 인증 실패 (이후 동일 로그 생략)")
                _auth_failed_logged = True
            return JSONResponse({"error": "unauthorized"}, status_code=401)
        return await call_next(request)


async def _health(request):
    return JSONResponse({"status": "ok"})


# streamable-http ASGI sub-app (/mcp 엔드포인트 제공)
_mcp_app = mcp.streamable_http_app()

app = Starlette(
    routes=[Route("/health", _health), Mount("/", app=_mcp_app)],
    middleware=[Middleware(BearerAuth)],
    lifespan=_mcp_app.lifespan,
)
  • Step 4: 테스트 통과 + 로컬 스모크 확인

Run: .venv\Scripts\python -m pytest tests/test_server.py -v Expected: PASS (3 tests).

만약 mcp.streamable_http_app / .lifespan 관련 AttributeError 발생 시: .venv\Scripts\python -c "from mcp.server.fastmcp import FastMCP; print([a for a in dir(FastMCP) if 'app' in a.lower() or 'http' in a.lower()])"로 정확한 메서드명을 찾아 교체하고, lifespan은 sub-app의 실제 속성명에 맞춘다(예: _mcp_app.router.lifespan_context). 통과할 때까지 수정.

전체 스위트도 확인: .venv\Scripts\python -m pytest tests/ -v → 전부 PASS.

  • Step 5: Commit
git add co-gahusb/app/server.py co-gahusb/tests/test_server.py
git commit -m "feat(co-gahusb): FastMCP 서버 (12 툴 + Bearer 인증 + health)"

Task 7: docker-compose 서비스 등재

Files:

  • Modify: web-backend/docker-compose.yml

  • Step 1: co-gahusb 서비스 블록 추가

docker-compose.ymlagent-office: 서비스 블록 바로 위(또는 redis 서비스 인접)에 추가. (들여쓰기 2칸, 기존 서비스 스타일과 동일):

  co-gahusb:
    build:
      context: ./co-gahusb
    container_name: co-gahusb
    restart: unless-stopped
    ports:
      - "18920:8000"
    environment:
      - TZ=${TZ:-Asia/Seoul}
      - REDIS_URL=${REDIS_URL:-redis://redis:6379}
      - CO_BUS_KEY=${CO_BUS_KEY:-}
    depends_on:
      - redis
    healthcheck:
      test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
      interval: 60s
      timeout: 5s
      retries: 3
  • Step 2: compose 문법 검증 (로컬, docker 미실행)

Run: .venv\Scripts\python -c "import yaml; yaml.safe_load(open('docker-compose.yml', encoding='utf-8')); print('compose OK')" (작업 디렉토리 web-backend 루트에서. PyYAML 없으면 pip install pyyaml 후 실행.) Expected: compose OK (들여쓰기/문법 오류 없음).

  • Step 3: Commit
git add docker-compose.yml
git commit -m "feat(co-gahusb): docker-compose 서비스 등재 (18920, depends_on redis)"

Task 8: nginx public 라우팅 등재

Files:

  • Modify: web-backend 내 nginx 설정 (location /api/... 들이 있는 .conf)

  • Step 1: nginx conf 파일 찾기

Run: git -C . grep -l "location /api/agent-office/" (web-backend 루트에서) → 해당 .conf 파일 경로 확인.

  • Step 2: /api/co/ location 추가

찾은 conf에서 location /api/agent-office/ { ... } 블록을 찾아 그 형식을 그대로 따라, 같은 server 블록 안에 추가. agent-office 블록이 다음과 같다면:

        location /api/agent-office/ {
            proxy_pass http://agent-office:8000/api/agent-office/;
            ...
        }

co-gahusb는 컨테이너 내부 경로가 /mcp이고 외부는 /api/co/mcp이므로 접두 stripping 형태로 추가(proxy_pass에 trailing slash):

        location /api/co/ {
            proxy_pass http://co-gahusb:8000/;
            proxy_http_version 1.1;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header Authorization $http_authorization;
            proxy_set_header Connection "";
            proxy_buffering off;
            proxy_read_timeout 3600s;
        }

(streamable-http는 long-lived 응답이 가능하므로 proxy_buffering off + 긴 proxy_read_timeout. Authorization forward 필수. 실제 파일의 기존 공통 proxy_set_header 패턴이 있으면 그걸 따르고 위 3개(Authorization/Connection/buffering)만 보강.)

  • Step 3: nginx 문법 셀프 점검

블록 괄호 균형과 location /api/co/가 server 블록 내부인지 육안 확인. (실제 nginx -t는 NAS 배포 후 컨테이너에서.)

  • Step 4: Commit
git add <nginx conf 경로>
git commit -m "feat(co-gahusb): nginx public /api/co/ 라우팅 (Authorization forward, no-buffer)"

Task 9: deploy 스크립트 SERVICES 화이트리스트

Files:

  • Modify: web-backend deploy 스크립트(SERVICES 목록 보유 파일)

  • Step 1: SERVICES 목록 파일 찾기

Run: git -C . grep -rl "agent-office" -- '*.sh' (web-backend 루트) → deploy 스크립트에서 SERVICES/서비스 배열에 agent-office가 등장하는 파일·라인 확인.

  • Step 2: co-gahusb 등재

해당 SERVICES 배열/리스트에서 agent-office가 있는 줄과 동일 형식으로 co-gahusb를 추가. 예) SERVICES=(... agent-office co-gahusb ...) 또는 한 줄에 하나씩이면 agent-office 다음 줄에 co-gahusb 추가.

  • Step 3: rsync/마운트 화이트리스트 확인

같은 스크립트에서 코드 동기화 대상 디렉토리 화이트리스트가 있으면(reference_deploy_nas_services_whitelist) co-gahusb도 포함되는지 확인하고, ${RUNTIME_PATH} 절대경로 동기화 대상이라면 추가. (co-gahusb는 영속 볼륨 없음 → 코드 디렉토리 동기화만.)

  • Step 4: Commit
git add <deploy 스크립트 경로>
git commit -m "feat(co-gahusb): deploy SERVICES 화이트리스트 등재"

Task 10: 클라이언트 배선 (.mcp.json + CLAUDE.md 역할 블록)

Files:

  • Create/Modify: web-backend/.mcp.json, web-ui/.mcp.json
  • Modify: web-backend/CLAUDE.md, web-ui/CLAUDE.md
  • Create: web-backend/co-gahusb/CLIENT_SETUP.md (web-ai 절차 포함)

이 Task는 web-backend·web-ui 두 repo를 건드린다. 각 파일은 해당 repo 경로에서 커밋(feedback_commit_repo). .mcp.json엔 실제 키 금지 — ${CO_BUS_KEY} placeholder.

  • Step 1: web-backend/.mcp.json 작성

web-backend/.mcp.json (없으면 생성, 있으면 mcpServers에 병합):

{
  "mcpServers": {
    "co-gahusb": {
      "type": "http",
      "url": "https://gahusb.synology.me/api/co/mcp",
      "headers": { "Authorization": "Bearer ${CO_BUS_KEY}" }
    }
  }
}
  • Step 2: web-ui/.mcp.json 작성

web-ui/.mcp.json — Step 1과 동일 내용:

{
  "mcpServers": {
    "co-gahusb": {
      "type": "http",
      "url": "https://gahusb.synology.me/api/co/mcp",
      "headers": { "Authorization": "Bearer ${CO_BUS_KEY}" }
    }
  }
}
  • Step 3: 역할 블록 — web-backend/CLAUDE.md

web-backend/CLAUDE.md 맨 끝에 추가:


---

## 협업 팀 버스 (co-gahusb) — 이 세션의 역할: **BE**

이 세션은 백엔드(BE) 역할이다. co-gahusb MCP 툴로 다른 세션(FE/AI/Producer)과 협업한다.
- **소유권**: 이 세션은 `web-backend` repo만 쓴다(FE=web-ui, AI=web-ai).
- **공유 리소스 변경 전 반드시 `acquire_lock(resource, "BE")`**: 대상 = `nas-deploy`, `stock-db-schema`, `lotto-db-schema`, `memory-mirror`, `nginx-conf`, `compose`. 점유 중이면 대기, 긴 작업은 `heartbeat_lock`, 끝나면 `release_lock`.
- **모든 툴 호출에 `role="BE"`** (또는 `from_role`/`created_by`에 BE).
- **수신**: `/loop`로 주기적으로 `read_inbox("BE", after_id=<last>)` + `list_tasks(assignee_role="BE")` 확인.
-`CO_BUS_KEY`는 환경변수로 주입(커밋 금지).
  • Step 4: 역할 블록 — web-ui/CLAUDE.md

web-ui/CLAUDE.md 맨 끝에 추가(BE→FE, repo만 교체):


---

## 협업 팀 버스 (co-gahusb) — 이 세션의 역할: **FE**

이 세션은 프론트엔드(FE) 역할이다. co-gahusb MCP 툴로 다른 세션(BE/AI/Producer)과 협업한다.
- **소유권**: 이 세션은 `web-ui` repo만 쓴다(BE=web-backend, AI=web-ai).
- **공유 리소스 변경 전 반드시 `acquire_lock(resource, "FE")`**: 대상 = `nas-deploy`, `stock-db-schema`, `lotto-db-schema`, `memory-mirror`, `nginx-conf`, `compose`. 점유 중이면 대기, 긴 작업은 `heartbeat_lock`, 끝나면 `release_lock`.
- **모든 툴 호출에 `role="FE"`** (또는 `from_role`/`created_by`에 FE).
- **수신**: `/loop`로 주기적으로 `read_inbox("FE", after_id=<last>)` + `list_tasks(assignee_role="FE")` 확인.
-`CO_BUS_KEY`는 환경변수로 주입(커밋 금지).
  • Step 5: web-ai 절차 문서 작성

web-backend/co-gahusb/CLIENT_SETUP.md:

# co-gahusb 클라이언트 설정

## 공통
1. `CO_BUS_KEY` 환경변수를 각 머신에 설정(서버 `.env`의 값과 동일).
2. 해당 repo 루트 `.mcp.json`에 co-gahusb HTTP MCP 등록(이 repo의 예시 참고).
3. CLAUDE.md 역할 블록의 `/loop` 폴링 규약을 따른다.

## web-ai (다른 머신)
web-ai 머신의 repo 루트에 아래 `.mcp.json` 생성, 역할 = **AI**:
```json
{ "mcpServers": { "co-gahusb": {
  "type": "http",
  "url": "https://gahusb.synology.me/api/co/mcp",
  "headers": { "Authorization": "Bearer ${CO_BUS_KEY}" } } } }

web-ai CLAUDE.md에 역할 블록 추가(role="AI", 소유권=web-ai repo, 동일 락 규약).

Producer (오케스트레이터 세션)

별도 repo 없이 조율 담당. team_log()로 전체 활동 감시, create_task로 분배, acquire_lock로 교차 작업 직렬화.


- [ ] **Step 6: 커밋 (repo별로 분리)**

web-backend에서:

git add web-backend/.mcp.json web-backend/CLAUDE.md co-gahusb/CLIENT_SETUP.md git commit -m "feat(co-gahusb): BE 클라이언트 배선 (.mcp.json + 역할 블록 + 셋업 문서)"

web-ui에서(별도 repo, cd C:\Users\jaeoh\Desktop\workspace\web-ui):

git add .mcp.json CLAUDE.md git commit -m "feat(co-gahusb): FE 클라이언트 배선 (.mcp.json + 역할 블록)"

(web-ui는 현재 main 브랜치 — 커밋 전 `git checkout -b feat/co-gahusb-client` 권장.)

---

## Task 11: 배포 + 스모크 테스트

**Files:** 없음 (배포·검증)

> Docker는 NAS 전용. 로컬 docker 금지. 배포는 Gitea push → webhook 자동 배포.

- [ ] **Step 1: 서버 .env에 키 추가 (NAS, 커밋 금지)**

NAS의 web-backend `.env`에 `CO_BUS_KEY=<강한 랜덤 키>` 추가가 필요함을 사용자에게 안내(이 값은 각 클라이언트 머신 env에도 동일하게 설정). **이 단계는 사용자 수동** — 키 생성/배치 후 진행.

- [ ] **Step 2: 배포 푸시**

web-backend feature 브랜치를 main에 머지 후 push (또는 배포 정책에 따라):

git checkout main && git merge --no-ff feat/co-gahusb-team-bus git push

(Gitea 자격증명 실패 시 사용자 수동 push — feedback_nas_deploy_paths.)

- [ ] **Step 3: NAS 헬스 스모크**

배포 반영 후(webhook 완료 대기):

curl -s https://gahusb.synology.me/api/co/health

Expected: `{"status":"ok"}`

- [ ] **Step 4: 인증 스모크**

curl -s -o /dev/null -w "%{http_code}" -X POST https://gahusb.synology.me/api/co/mcp

Expected: `401` (키 없음).

- [ ] **Step 5: 락 경합 end-to-end (MCP 클라이언트로)**

이 세션(또는 Producer)에서 co-gahusb MCP가 연결되면:
- `acquire_lock("nas-deploy", "BE")` → `{acquired:true}`
- 다시 `acquire_lock("nas-deploy", "FE")` → `{acquired:false, held_by:"BE"}`
- `release_lock("nas-deploy", "BE")` → `{released:true}`
Expected: 위 순서대로 동작 → 동시쓰기 분리 검증 완료.

- [ ] **Step 6: 마무리**

finishing-a-development-branch로 web-backend 브랜치 정리. 메모리에 co-gahusb 운영 노트 기록.

---

## Self-Review 체크리스트 (작성자 검증 완료)

- **Spec coverage:** 호스팅/포트(T1,T7) ✓ HTTP MCP+Bearer(T6,T8) ✓ Redis 백엔드(T2-5) ✓ 락=소유권+어드바이저리(T2 락 + T10 CLAUDE.md 규약) ✓ 메시지/작업/team_log(T3,4,5) ✓ 인프라 6위치(T7 compose, T8 nginx, T9 deploy, T1/T6 .env, frontend depends_on 불요 명시) ✓ 클라이언트 배선+역할+/loop(T10) ✓ 테스트(T2-6) ✓ 배포/스모크(T11) ✓.
- **Placeholder scan:** 모든 코드 step에 실제 코드/명령/기대출력. T6의 SDK 버전 분기는 "검증 후 맞춤" 명시적 절차(placeholder 아님). T8/T9는 실제 파일 grep으로 위치 특정하는 탐색 step(상수 경로가 repo마다 달라 grep으로 지정).
- **Type consistency:** store/locks 함수 시그니처가 test와 server.py 호출에서 일치(`acquire_lock(r,resource,role,ttl_sec)`, `post_message(r,from_role,to_role,body,thread_id)`, `claim_task(r,task_id,role)` 등). 반환 dict 키(`acquired`/`held_by`/`released`/`renewed`/`ok`/`task`/`message_id`/`cursor`/`events`) 전 Task 일관.
- **Known caveat:** T6 FastMCP ASGI/lifespan API는 설치 SDK 버전 의존 → Step 4 스모크로 강제 검증. store/locks(핵심 동시성)는 fakeredis로 결정적 테스트라 안정.
- **동시성 안전:** 락 acquire=SET NX EX 원자적, release/heartbeat=WATCH/MULTI, claim_task=WATCH/MULTI → 다중 세션 경합에도 정확. SQLite multi-writer 회피.