Files
web-page-backend/agent-office/app/main.py
gahusb 1d4354e402 feat(agent-office): YouTubeResearchAgent + 스케줄러 + /youtube/research API
- db.py: youtube_research_jobs 테이블 추가 + CRUD 3종 (add/update/get_latest)
- agents/youtube.py: YouTubeResearchAgent 신규 구현 (on_schedule/on_command/on_approval/_run_research/send_weekly_report)
- agents/__init__.py: YouTubeResearchAgent 등록
- scheduler.py: youtube_research(매일 09:00) + youtube_weekly_report(월 08:00) cron 추가
- main.py: POST /api/agent-office/youtube/research + GET /api/agent-office/youtube/research/status 엔드포인트 추가

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 12:13:12 +09:00

228 lines
7.5 KiB
Python

import os
import json
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from .config import CORS_ALLOW_ORIGINS
from .db import init_db, get_all_agents, get_agent_config, update_agent_config, get_agent_tasks, get_pending_approvals, get_task, get_logs, get_activity_feed, get_latest_youtube_research_job
from .models import CommandRequest, ApprovalRequest, AgentConfigUpdate
from .websocket_manager import ws_manager
from .agents import init_agents, get_agent, get_all_agent_states, AGENT_REGISTRY
from .scheduler import init_scheduler
from . import telegram_bot
app = FastAPI()
_cors_origins = CORS_ALLOW_ORIGINS.split(",")
app.add_middleware(
CORSMiddleware,
allow_origins=[o.strip() for o in _cors_origins],
allow_credentials=False,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["Content-Type"],
)
@app.on_event("startup")
async def on_startup():
init_db()
os.makedirs("/app/data", exist_ok=True)
init_agents()
for agent in AGENT_REGISTRY.values():
agent.set_ws_manager(ws_manager)
init_scheduler()
@app.get("/health")
def health():
return {"ok": True}
# --- WebSocket ---
@app.websocket("/api/agent-office/ws")
async def websocket_endpoint(ws: WebSocket):
await ws_manager.connect(ws)
try:
await ws.send_text(json.dumps({
"type": "init",
"agents": get_all_agent_states(),
"pending": [t["id"] for t in get_pending_approvals()],
}, ensure_ascii=False))
while True:
data = await ws.receive_text()
try:
msg = json.loads(data)
except json.JSONDecodeError:
continue
await _handle_ws_message(msg)
except WebSocketDisconnect:
pass
finally:
await ws_manager.disconnect(ws)
async def _handle_ws_message(msg: dict):
msg_type = msg.get("type")
agent_id = msg.get("agent")
agent = get_agent(agent_id) if agent_id else None
if msg_type == "command" and agent:
action = msg.get("action", "")
params = msg.get("params", {})
result = await agent.on_command(action, params)
await ws_manager.broadcast({"type": "command_result", "agent": agent_id, "result": result})
elif msg_type == "approval" and agent:
task_id = msg.get("task_id")
approved = msg.get("approved", False)
if task_id:
await agent.on_approval(task_id, approved)
elif msg_type == "query" and agent:
status = await agent.get_status()
await ws_manager.broadcast({"type": "agent_status", "agent": agent_id, "status": status})
# --- REST Endpoints ---
@app.get("/api/agent-office/agents")
def list_agents():
return {"agents": get_all_agents()}
@app.get("/api/agent-office/agents/{agent_id}")
def agent_detail(agent_id: str):
config = get_agent_config(agent_id)
if not config:
raise HTTPException(status_code=404, detail="Agent not found")
agent = get_agent(agent_id)
state_info = {"state": agent.state, "detail": agent.state_detail} if agent else {}
return {**config, **state_info}
@app.put("/api/agent-office/agents/{agent_id}")
def update_agent(agent_id: str, body: AgentConfigUpdate):
update_agent_config(agent_id, enabled=body.enabled,
schedule_config=body.schedule_config,
custom_config=body.custom_config)
return {"ok": True}
@app.get("/api/agent-office/agents/{agent_id}/tasks")
def agent_tasks(agent_id: str, limit: int = 20):
return {"tasks": get_agent_tasks(agent_id, limit)}
@app.get("/api/agent-office/agents/{agent_id}/logs")
def agent_logs(agent_id: str, limit: int = 50):
return {"logs": get_logs(agent_id, limit)}
@app.get("/api/agent-office/tasks/pending")
def pending_tasks():
return {"tasks": get_pending_approvals()}
@app.get("/api/agent-office/tasks/{task_id}")
def task_detail(task_id: str):
task = get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
@app.post("/api/agent-office/command")
async def send_command(body: CommandRequest):
agent = get_agent(body.agent)
if not agent:
return {"error": f"Agent '{body.agent}' not found"}
result = await agent.on_command(body.action, body.params or {})
return result
@app.post("/api/agent-office/approve")
async def approve(body: ApprovalRequest):
agent = get_agent(body.agent)
if not agent:
return {"error": f"Agent '{body.agent}' not found"}
await agent.on_approval(body.task_id, body.approved, body.feedback or "")
return {"ok": True}
# --- Telegram Webhook ---
async def _agent_dispatcher(agent_id: str, command: str, params: dict) -> dict:
"""텔레그램 라우터가 호출하는 에이전트 디스패처."""
# 전역 상태 조회
if agent_id == "__global__" and command == "status":
result = {}
for aid, agent in AGENT_REGISTRY.items():
result[aid] = {"state": agent.state, "detail": agent.state_detail}
return result
agent = AGENT_REGISTRY.get(agent_id)
if agent is None:
return {"ok": False, "message": f"Unknown agent: {agent_id}"}
return await agent.on_command(command, params or {})
@app.post("/api/agent-office/telegram/webhook")
async def telegram_webhook(data: dict):
result = await telegram_bot.handle_webhook(data, agent_dispatcher=_agent_dispatcher)
# callback_query (승인/거절) → 기존 승인 흐름
if result and "approved" in result:
agent = get_agent(result["agent_id"])
if agent:
await agent.on_approval(result["task_id"], result["approved"])
return {"ok": True}
@app.get("/api/agent-office/states")
def all_states():
return {"agents": get_all_agent_states()}
@app.get("/api/agent-office/agents/{agent_id}/token-usage")
def agent_token_usage(agent_id: str, days: int = 1):
from .db import get_token_usage_stats
return get_token_usage_stats(agent_id, days)
@app.get("/api/agent-office/conversation/stats")
def conversation_stats(days: int = 7):
from .db import get_conversation_stats
return get_conversation_stats(days)
@app.get("/api/agent-office/activity")
def activity_feed(limit: int = 50, offset: int = 0):
return get_activity_feed(limit, offset)
# --- Realestate Agent Push Endpoint ---
from pydantic import BaseModel
from typing import List, Dict, Any
class RealestateNotifyBody(BaseModel):
matches: List[Dict[str, Any]]
@app.post("/api/agent-office/realestate/notify")
async def realestate_notify(body: RealestateNotifyBody):
agent = get_agent("realestate")
if agent is None:
from fastapi import HTTPException
raise HTTPException(status_code=503, detail="RealestateAgent not initialized")
return await agent.on_new_matches(body.matches)
# --- YouTube Research Agent Endpoints ---
class YouTubeResearchBody(BaseModel):
countries: List[str] = []
@app.post("/api/agent-office/youtube/research")
async def trigger_youtube_research(body: YouTubeResearchBody = None):
agent = AGENT_REGISTRY.get("youtube")
if not agent:
raise HTTPException(status_code=503, detail="YouTubeResearchAgent 없음")
params = {}
if body and body.countries:
params["countries"] = body.countries
result = await agent.on_command("research", params)
return result
@app.get("/api/agent-office/youtube/research/status")
def youtube_research_status():
job = get_latest_youtube_research_job()
if not job:
return {"status": "never_run"}
return job