import os import json from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from .config import CORS_ALLOW_ORIGINS from .db import init_db, get_all_agents, get_agent_config, update_agent_config, get_agent_tasks, get_pending_approvals, get_task, get_logs, get_activity_feed from .models import CommandRequest, ApprovalRequest, AgentConfigUpdate from .websocket_manager import ws_manager from .agents import init_agents, get_agent, get_all_agent_states, AGENT_REGISTRY from .scheduler import init_scheduler from . import telegram_bot app = FastAPI() _cors_origins = CORS_ALLOW_ORIGINS.split(",") app.add_middleware( CORSMiddleware, allow_origins=[o.strip() for o in _cors_origins], allow_credentials=False, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], allow_headers=["Content-Type"], ) @app.on_event("startup") async def on_startup(): init_db() os.makedirs("/app/data", exist_ok=True) init_agents() for agent in AGENT_REGISTRY.values(): agent.set_ws_manager(ws_manager) init_scheduler() @app.get("/health") def health(): return {"ok": True} # --- WebSocket --- @app.websocket("/api/agent-office/ws") async def websocket_endpoint(ws: WebSocket): await ws_manager.connect(ws) try: await ws.send_text(json.dumps({ "type": "init", "agents": get_all_agent_states(), "pending": [t["id"] for t in get_pending_approvals()], }, ensure_ascii=False)) while True: data = await ws.receive_text() try: msg = json.loads(data) except json.JSONDecodeError: continue await _handle_ws_message(msg) except WebSocketDisconnect: pass finally: await ws_manager.disconnect(ws) async def _handle_ws_message(msg: dict): msg_type = msg.get("type") agent_id = msg.get("agent") agent = get_agent(agent_id) if agent_id else None if msg_type == "command" and agent: action = msg.get("action", "") params = msg.get("params", {}) result = await agent.on_command(action, params) await ws_manager.broadcast({"type": "command_result", "agent": agent_id, "result": result}) elif msg_type == "approval" and agent: task_id = msg.get("task_id") approved = msg.get("approved", False) if task_id: await agent.on_approval(task_id, approved) elif msg_type == "query" and agent: status = await agent.get_status() await ws_manager.broadcast({"type": "agent_status", "agent": agent_id, "status": status}) # --- REST Endpoints --- @app.get("/api/agent-office/agents") def list_agents(): return {"agents": get_all_agents()} @app.get("/api/agent-office/agents/{agent_id}") def agent_detail(agent_id: str): config = get_agent_config(agent_id) if not config: raise HTTPException(status_code=404, detail="Agent not found") agent = get_agent(agent_id) state_info = {"state": agent.state, "detail": agent.state_detail} if agent else {} return {**config, **state_info} @app.put("/api/agent-office/agents/{agent_id}") def update_agent(agent_id: str, body: AgentConfigUpdate): update_agent_config(agent_id, enabled=body.enabled, schedule_config=body.schedule_config, custom_config=body.custom_config) return {"ok": True} @app.get("/api/agent-office/agents/{agent_id}/tasks") def agent_tasks(agent_id: str, limit: int = 20): return {"tasks": get_agent_tasks(agent_id, limit)} @app.get("/api/agent-office/agents/{agent_id}/logs") def agent_logs(agent_id: str, limit: int = 50): return {"logs": get_logs(agent_id, limit)} @app.get("/api/agent-office/tasks/pending") def pending_tasks(): return {"tasks": get_pending_approvals()} @app.get("/api/agent-office/tasks/{task_id}") def task_detail(task_id: str): task = get_task(task_id) if not task: raise HTTPException(status_code=404, detail="Task not found") return task @app.post("/api/agent-office/command") async def send_command(body: CommandRequest): agent = get_agent(body.agent) if not agent: return {"error": f"Agent '{body.agent}' not found"} result = await agent.on_command(body.action, body.params or {}) return result @app.post("/api/agent-office/approve") async def approve(body: ApprovalRequest): agent = get_agent(body.agent) if not agent: return {"error": f"Agent '{body.agent}' not found"} await agent.on_approval(body.task_id, body.approved, body.feedback or "") return {"ok": True} # --- Telegram Webhook --- async def _agent_dispatcher(agent_id: str, command: str, params: dict) -> dict: """텔레그램 라우터가 호출하는 에이전트 디스패처.""" # 전역 상태 조회 if agent_id == "__global__" and command == "status": result = {} for aid, agent in AGENT_REGISTRY.items(): result[aid] = {"state": agent.state, "detail": agent.state_detail} return result agent = AGENT_REGISTRY.get(agent_id) if agent is None: return {"ok": False, "message": f"Unknown agent: {agent_id}"} return await agent.on_command(command, params or {}) @app.post("/api/agent-office/telegram/webhook") async def telegram_webhook(data: dict): result = await telegram_bot.handle_webhook(data, agent_dispatcher=_agent_dispatcher) # callback_query (승인/거절) → 기존 승인 흐름 if result and "approved" in result: agent = get_agent(result["agent_id"]) if agent: await agent.on_approval(result["task_id"], result["approved"]) return {"ok": True} @app.get("/api/agent-office/states") def all_states(): return {"agents": get_all_agent_states()} @app.get("/api/agent-office/agents/{agent_id}/token-usage") def agent_token_usage(agent_id: str, days: int = 1): from .db import get_token_usage_stats return get_token_usage_stats(agent_id, days) @app.get("/api/agent-office/conversation/stats") def conversation_stats(days: int = 7): from .db import get_conversation_stats return get_conversation_stats(days) @app.get("/api/agent-office/activity") def activity_feed(limit: int = 50, offset: int = 0): return get_activity_feed(limit, offset)