Files
web-page-backend/agent-office/app/main.py
gahusb 6f8b199548 feat: Agent Office — AI 에이전트 가상 오피스 (#2)
## Summary
- 2D 픽셀아트 가상 오피스에서 AI 에이전트(Stock, Music)가 실제 작업 수행
- FastAPI + WebSocket 실시간 상태 동기화 + 텔레그램 봇 양방향 알림/승인
- BaseAgent FSM (idle/working/waiting/reporting/break), 서비스 프록시 패턴
- Docker Compose 서비스 (port 18900) + Nginx WebSocket 프록시

## Changes (13 commits)
- Backend scaffold: config, db, models, Dockerfile
- WebSocket manager + Service proxy
- BaseAgent FSM + StockAgent + MusicAgent
- Telegram bot + Scheduler
- FastAPI main (REST + WS endpoints)
- Infrastructure: docker-compose + nginx
- Code review fixes: HTTPException, async polling, input validation

Reviewed-on: #2
2026-04-11 13:35:24 +09:00

153 lines
5.0 KiB
Python

import os
import json
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from .config import CORS_ALLOW_ORIGINS
from .db import init_db, get_all_agents, get_agent_config, update_agent_config, get_agent_tasks, get_pending_approvals, get_task, get_logs
from .models import CommandRequest, ApprovalRequest, AgentConfigUpdate
from .websocket_manager import ws_manager
from .agents import init_agents, get_agent, get_all_agent_states, AGENT_REGISTRY
from .scheduler import init_scheduler
from . import telegram_bot
app = FastAPI()
_cors_origins = CORS_ALLOW_ORIGINS.split(",")
app.add_middleware(
CORSMiddleware,
allow_origins=[o.strip() for o in _cors_origins],
allow_credentials=False,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["Content-Type"],
)
@app.on_event("startup")
async def on_startup():
init_db()
os.makedirs("/app/data", exist_ok=True)
init_agents()
for agent in AGENT_REGISTRY.values():
agent.set_ws_manager(ws_manager)
init_scheduler()
@app.get("/health")
def health():
return {"ok": True}
# --- WebSocket ---
@app.websocket("/api/agent-office/ws")
async def websocket_endpoint(ws: WebSocket):
await ws_manager.connect(ws)
try:
await ws.send_text(json.dumps({
"type": "init",
"agents": get_all_agent_states(),
"pending": [t["id"] for t in get_pending_approvals()],
}, ensure_ascii=False))
while True:
data = await ws.receive_text()
try:
msg = json.loads(data)
except json.JSONDecodeError:
continue
await _handle_ws_message(msg)
except WebSocketDisconnect:
pass
finally:
await ws_manager.disconnect(ws)
async def _handle_ws_message(msg: dict):
msg_type = msg.get("type")
agent_id = msg.get("agent")
agent = get_agent(agent_id) if agent_id else None
if msg_type == "command" and agent:
action = msg.get("action", "")
params = msg.get("params", {})
result = await agent.on_command(action, params)
await ws_manager.broadcast({"type": "command_result", "agent": agent_id, "result": result})
elif msg_type == "approval" and agent:
task_id = msg.get("task_id")
approved = msg.get("approved", False)
if task_id:
await agent.on_approval(task_id, approved)
elif msg_type == "query" and agent:
status = await agent.get_status()
await ws_manager.broadcast({"type": "agent_status", "agent": agent_id, "status": status})
# --- REST Endpoints ---
@app.get("/api/agent-office/agents")
def list_agents():
return {"agents": get_all_agents()}
@app.get("/api/agent-office/agents/{agent_id}")
def agent_detail(agent_id: str):
config = get_agent_config(agent_id)
if not config:
raise HTTPException(status_code=404, detail="Agent not found")
agent = get_agent(agent_id)
state_info = {"state": agent.state, "detail": agent.state_detail} if agent else {}
return {**config, **state_info}
@app.put("/api/agent-office/agents/{agent_id}")
def update_agent(agent_id: str, body: AgentConfigUpdate):
update_agent_config(agent_id, enabled=body.enabled,
schedule_config=body.schedule_config,
custom_config=body.custom_config)
return {"ok": True}
@app.get("/api/agent-office/agents/{agent_id}/tasks")
def agent_tasks(agent_id: str, limit: int = 20):
return {"tasks": get_agent_tasks(agent_id, limit)}
@app.get("/api/agent-office/agents/{agent_id}/logs")
def agent_logs(agent_id: str, limit: int = 50):
return {"logs": get_logs(agent_id, limit)}
@app.get("/api/agent-office/tasks/pending")
def pending_tasks():
return {"tasks": get_pending_approvals()}
@app.get("/api/agent-office/tasks/{task_id}")
def task_detail(task_id: str):
task = get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
@app.post("/api/agent-office/command")
async def send_command(body: CommandRequest):
agent = get_agent(body.agent)
if not agent:
return {"error": f"Agent '{body.agent}' not found"}
result = await agent.on_command(body.action, body.params or {})
return result
@app.post("/api/agent-office/approve")
async def approve(body: ApprovalRequest):
agent = get_agent(body.agent)
if not agent:
return {"error": f"Agent '{body.agent}' not found"}
await agent.on_approval(body.task_id, body.approved, body.feedback or "")
return {"ok": True}
# --- Telegram Webhook ---
@app.post("/api/agent-office/telegram/webhook")
async def telegram_webhook(data: dict):
result = await telegram_bot.handle_webhook(data)
if result:
agent = get_agent(result["agent_id"])
if agent:
await agent.on_approval(result["task_id"], result["approved"])
return {"ok": True}
@app.get("/api/agent-office/states")
def all_states():
return {"agents": get_all_agent_states()}