# co-gahusb/app/server.py import logging import redis.asyncio as aioredis from mcp.server.fastmcp import FastMCP from mcp.server.transport_security import TransportSecuritySettings from starlette.applications import Starlette from starlette.middleware import Middleware from starlette.middleware.base import BaseHTTPMiddleware from starlette.responses import JSONResponse from starlette.routing import Mount, Route from app import config, locks, store log = logging.getLogger("co-gahusb") _auth_failed_logged = False _redis = aioredis.from_url(config.REDIS_URL, decode_responses=True) # DNS-rebinding 보호 비활성화: 실 보안은 nginx 앞단 Bearer 인증(MCP 도달 전 401)이다. # 원격 HTTPS + 정적키 모델이라 Host 화이트리스트는 보안가치 ~0이고, 도메인 변경 시 또 깨진다. mcp = FastMCP( "co-gahusb", transport_security=TransportSecuritySettings(enable_dns_rebinding_protection=False), ) # ---- 메시지 ---- @mcp.tool() async def post_message(from_role: str, to_role: str, body: str, thread_id: str = "") -> dict: """다른 역할의 우편함에 메시지를 보낸다.""" res = await store.post_message(_redis, from_role, to_role, body, thread_id or None) await store.log_event(_redis, "message", f"{from_role}→{to_role}: {body[:60]}") return res @mcp.tool() async def read_inbox(role: str, after_id: int = 0, mark_read: bool = False) -> dict: """내 역할 우편함을 커서 기반으로 읽는다.""" return await store.read_inbox(_redis, role, after_id, mark_read) # ---- 작업 ---- @mcp.tool() async def create_task(title: str, assignee_role: str, created_by: str, detail: str = "") -> dict: """작업을 만들어 특정 역할에 배정한다.""" res = await store.create_task(_redis, title, assignee_role, created_by, detail or None) await store.log_event(_redis, "task", f"{created_by} created '{title}' → {assignee_role}") return res @mcp.tool() async def claim_task(task_id: int, role: str) -> dict: """open 작업을 점유(in_progress)한다. 이미 점유면 거부.""" res = await store.claim_task(_redis, task_id, role) if res.get("ok"): await store.log_event(_redis, "task", f"{role} claimed task#{task_id}") return res @mcp.tool() async def update_task(task_id: int, status: str, role: str, note: str = "") -> dict: """작업 상태를 갱신한다 (open/in_progress/blocked/done).""" res = await store.update_task(_redis, task_id, status, role, note or None) await store.log_event(_redis, "task", f"{role} set task#{task_id} → {status}") return res @mcp.tool() async def list_tasks(status: str = "", assignee_role: str = "") -> dict: """작업 목록을 조회한다(상태/담당 필터).""" return await store.list_tasks(_redis, status or None, assignee_role or None) # ---- 락 ---- @mcp.tool() async def acquire_lock(resource: str, role: str, ttl_sec: int = config.DEFAULT_LOCK_TTL) -> dict: """공유 리소스 변경 전 어드바이저리 락을 획득한다. 점유 중이면 acquired=false.""" res = await locks.acquire_lock(_redis, resource, role, ttl_sec) if res.get("acquired"): await store.log_event(_redis, "lock", f"{role} acquired {resource}") return res @mcp.tool() async def release_lock(resource: str, role: str) -> dict: """소유한 락을 해제한다.""" res = await locks.release_lock(_redis, resource, role) if res.get("released"): await store.log_event(_redis, "lock", f"{role} released {resource}") return res @mcp.tool() async def heartbeat_lock(resource: str, role: str, ttl_sec: int = config.DEFAULT_LOCK_TTL) -> dict: """긴 작업 중 락 TTL을 갱신한다(소유자만).""" return await locks.heartbeat_lock(_redis, resource, role, ttl_sec) @mcp.tool() async def list_locks() -> dict: """현재 점유 중인 모든 락을 조회한다.""" return await locks.list_locks(_redis) # ---- 가시성 ---- @mcp.tool() async def team_log(after_id: int = 0) -> dict: """팀 전체 최근 활동 피드(메시지·작업·락)를 조회한다.""" return await store.read_team_log(_redis, after_id) # ---- Bearer 인증 미들웨어 ---- class BearerAuth(BaseHTTPMiddleware): async def dispatch(self, request, call_next): global _auth_failed_logged if request.url.path.startswith("/health"): return await call_next(request) expected = f"Bearer {config.CO_BUS_KEY}" if not config.CO_BUS_KEY or request.headers.get("authorization") != expected: if not _auth_failed_logged: log.error("co-gahusb 인증 실패 (이후 동일 로그 생략)") _auth_failed_logged = True return JSONResponse({"error": "unauthorized"}, status_code=401) return await call_next(request) async def _health(request): return JSONResponse({"status": "ok"}) _mcp_app = mcp.streamable_http_app() app = Starlette( routes=[Route("/health", _health), Mount("/", app=_mcp_app)], middleware=[Middleware(BearerAuth)], lifespan=_mcp_app.router.lifespan_context, )