From e115eee159e7aae994756319170d3cc43c53a730 Mon Sep 17 00:00:00 2001 From: gahusb Date: Fri, 12 Jun 2026 07:25:47 +0900 Subject: [PATCH] =?UTF-8?q?feat(co-gahusb):=20FastMCP=20=EC=84=9C=EB=B2=84?= =?UTF-8?q?=20(12=20=ED=88=B4=20+=20Bearer=20=EC=9D=B8=EC=A6=9D=20+=20heal?= =?UTF-8?q?th)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- co-gahusb/app/server.py | 132 +++++++++++++++++++++++++++++++++ co-gahusb/tests/test_server.py | 25 +++++++ 2 files changed, 157 insertions(+) create mode 100644 co-gahusb/app/server.py create mode 100644 co-gahusb/tests/test_server.py diff --git a/co-gahusb/app/server.py b/co-gahusb/app/server.py new file mode 100644 index 0000000..3a95187 --- /dev/null +++ b/co-gahusb/app/server.py @@ -0,0 +1,132 @@ +# co-gahusb/app/server.py +import logging + +import redis.asyncio as aioredis +from mcp.server.fastmcp import FastMCP +from starlette.applications import Starlette +from starlette.middleware import Middleware +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.responses import JSONResponse +from starlette.routing import Mount, Route + +from app import config, locks, store + +log = logging.getLogger("co-gahusb") +_auth_failed_logged = False + +_redis = aioredis.from_url(config.REDIS_URL, decode_responses=True) + +mcp = FastMCP("co-gahusb") + + +# ---- 메시지 ---- +@mcp.tool() +async def post_message(from_role: str, to_role: str, body: str, thread_id: str = "") -> dict: + """다른 역할의 우편함에 메시지를 보낸다.""" + res = await store.post_message(_redis, from_role, to_role, body, thread_id or None) + await store.log_event(_redis, "message", f"{from_role}→{to_role}: {body[:60]}") + return res + + +@mcp.tool() +async def read_inbox(role: str, after_id: int = 0, mark_read: bool = False) -> dict: + """내 역할 우편함을 커서 기반으로 읽는다.""" + return await store.read_inbox(_redis, role, after_id, mark_read) + + +# ---- 작업 ---- +@mcp.tool() +async def create_task(title: str, assignee_role: str, created_by: str, detail: str = "") -> dict: + """작업을 만들어 특정 역할에 배정한다.""" + res = await store.create_task(_redis, title, assignee_role, created_by, detail or None) + await store.log_event(_redis, "task", f"{created_by} created '{title}' → {assignee_role}") + return res + + +@mcp.tool() +async def claim_task(task_id: int, role: str) -> dict: + """open 작업을 점유(in_progress)한다. 이미 점유면 거부.""" + res = await store.claim_task(_redis, task_id, role) + if res.get("ok"): + await store.log_event(_redis, "task", f"{role} claimed task#{task_id}") + return res + + +@mcp.tool() +async def update_task(task_id: int, status: str, role: str, note: str = "") -> dict: + """작업 상태를 갱신한다 (open/in_progress/blocked/done).""" + res = await store.update_task(_redis, task_id, status, role, note or None) + await store.log_event(_redis, "task", f"{role} set task#{task_id} → {status}") + return res + + +@mcp.tool() +async def list_tasks(status: str = "", assignee_role: str = "") -> dict: + """작업 목록을 조회한다(상태/담당 필터).""" + return await store.list_tasks(_redis, status or None, assignee_role or None) + + +# ---- 락 ---- +@mcp.tool() +async def acquire_lock(resource: str, role: str, ttl_sec: int = config.DEFAULT_LOCK_TTL) -> dict: + """공유 리소스 변경 전 어드바이저리 락을 획득한다. 점유 중이면 acquired=false.""" + res = await locks.acquire_lock(_redis, resource, role, ttl_sec) + if res.get("acquired"): + await store.log_event(_redis, "lock", f"{role} acquired {resource}") + return res + + +@mcp.tool() +async def release_lock(resource: str, role: str) -> dict: + """소유한 락을 해제한다.""" + res = await locks.release_lock(_redis, resource, role) + if res.get("released"): + await store.log_event(_redis, "lock", f"{role} released {resource}") + return res + + +@mcp.tool() +async def heartbeat_lock(resource: str, role: str, ttl_sec: int = config.DEFAULT_LOCK_TTL) -> dict: + """긴 작업 중 락 TTL을 갱신한다(소유자만).""" + return await locks.heartbeat_lock(_redis, resource, role, ttl_sec) + + +@mcp.tool() +async def list_locks() -> dict: + """현재 점유 중인 모든 락을 조회한다.""" + return await locks.list_locks(_redis) + + +# ---- 가시성 ---- +@mcp.tool() +async def team_log(after_id: int = 0) -> dict: + """팀 전체 최근 활동 피드(메시지·작업·락)를 조회한다.""" + return await store.read_team_log(_redis, after_id) + + +# ---- Bearer 인증 미들웨어 ---- +class BearerAuth(BaseHTTPMiddleware): + async def dispatch(self, request, call_next): + global _auth_failed_logged + if request.url.path.startswith("/health"): + return await call_next(request) + expected = f"Bearer {config.CO_BUS_KEY}" + if not config.CO_BUS_KEY or request.headers.get("authorization") != expected: + if not _auth_failed_logged: + log.error("co-gahusb 인증 실패 (이후 동일 로그 생략)") + _auth_failed_logged = True + return JSONResponse({"error": "unauthorized"}, status_code=401) + return await call_next(request) + + +async def _health(request): + return JSONResponse({"status": "ok"}) + + +_mcp_app = mcp.streamable_http_app() + +app = Starlette( + routes=[Route("/health", _health), Mount("/", app=_mcp_app)], + middleware=[Middleware(BearerAuth)], + lifespan=_mcp_app.router.lifespan_context, +) diff --git a/co-gahusb/tests/test_server.py b/co-gahusb/tests/test_server.py new file mode 100644 index 0000000..2bbdb1f --- /dev/null +++ b/co-gahusb/tests/test_server.py @@ -0,0 +1,25 @@ +# co-gahusb/tests/test_server.py +import os +os.environ["CO_BUS_KEY"] = "test-key" + +from starlette.testclient import TestClient +from app.server import app + + +def test_health_open_without_auth(): + client = TestClient(app) + res = client.get("/health") + assert res.status_code == 200 + assert res.json()["status"] == "ok" + + +def test_mcp_requires_bearer(): + client = TestClient(app) + res = client.post("/mcp", json={}) + assert res.status_code == 401 + + +def test_mcp_wrong_key_rejected(): + client = TestClient(app) + res = client.post("/mcp", json={}, headers={"Authorization": "Bearer wrong"}) + assert res.status_code == 401