Files
web-page-backend/agent-office/app/main.py

265 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import json
from typing import Optional
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from .config import CORS_ALLOW_ORIGINS
from .db import init_db, get_all_agents, get_agent_config, update_agent_config, get_agent_tasks, get_pending_approvals, get_task, get_logs, get_activity_feed, get_latest_youtube_research_job
from .models import CommandRequest, ApprovalRequest, AgentConfigUpdate
from .websocket_manager import ws_manager
from .agents import init_agents, get_agent, get_all_agent_states, AGENT_REGISTRY
from .scheduler import init_scheduler
from . import telegram_bot
from .routers import notify as notify_router
app = FastAPI()
app.include_router(notify_router.router)
_cors_origins = CORS_ALLOW_ORIGINS.split(",")
app.add_middleware(
CORSMiddleware,
allow_origins=[o.strip() for o in _cors_origins],
allow_credentials=False,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["Content-Type"],
)
@app.on_event("startup")
async def on_startup():
init_db()
os.makedirs("/app/data", exist_ok=True)
init_agents()
for agent in AGENT_REGISTRY.values():
agent.set_ws_manager(ws_manager)
init_scheduler()
@app.get("/health")
def health():
return {"ok": True}
# --- WebSocket ---
@app.websocket("/api/agent-office/ws")
async def websocket_endpoint(ws: WebSocket):
await ws_manager.connect(ws)
try:
await ws.send_text(json.dumps({
"type": "init",
"agents": get_all_agent_states(),
"pending": [t["id"] for t in get_pending_approvals()],
}, ensure_ascii=False))
while True:
data = await ws.receive_text()
try:
msg = json.loads(data)
except json.JSONDecodeError:
continue
await _handle_ws_message(msg)
except WebSocketDisconnect:
pass
finally:
await ws_manager.disconnect(ws)
async def _handle_ws_message(msg: dict):
msg_type = msg.get("type")
agent_id = msg.get("agent")
agent = get_agent(agent_id) if agent_id else None
if msg_type == "command" and agent:
action = msg.get("action", "")
params = msg.get("params", {})
result = await agent.on_command(action, params)
await ws_manager.broadcast({"type": "command_result", "agent": agent_id, "result": result})
elif msg_type == "approval" and agent:
task_id = msg.get("task_id")
approved = msg.get("approved", False)
if task_id:
await agent.on_approval(task_id, approved)
elif msg_type == "query" and agent:
status = await agent.get_status()
await ws_manager.broadcast({"type": "agent_status", "agent": agent_id, "status": status})
# --- REST Endpoints ---
@app.get("/api/agent-office/agents")
def list_agents():
return {"agents": get_all_agents()}
@app.get("/api/agent-office/agents/{agent_id}")
def agent_detail(agent_id: str):
config = get_agent_config(agent_id)
if not config:
raise HTTPException(status_code=404, detail="Agent not found")
agent = get_agent(agent_id)
state_info = {"state": agent.state, "detail": agent.state_detail} if agent else {}
return {**config, **state_info}
@app.put("/api/agent-office/agents/{agent_id}")
def update_agent(agent_id: str, body: AgentConfigUpdate):
update_agent_config(agent_id, enabled=body.enabled,
schedule_config=body.schedule_config,
custom_config=body.custom_config)
return {"ok": True}
@app.get("/api/agent-office/agents/{agent_id}/tasks")
def agent_tasks(
agent_id: str,
limit: int = 20,
task_type: Optional[str] = None,
days: Optional[int] = None,
):
tasks_list = get_agent_tasks(agent_id, limit=limit, task_type=task_type, days=days)
# Backward compat: 기존 client는 'tasks', 신규 client는 'items' 사용
return {"tasks": tasks_list, "items": tasks_list}
@app.get("/api/agent-office/agents/{agent_id}/logs")
def agent_logs(agent_id: str, limit: int = 50):
return {"logs": get_logs(agent_id, limit)}
@app.get("/api/agent-office/tasks/pending")
def pending_tasks():
return {"tasks": get_pending_approvals()}
@app.get("/api/agent-office/tasks/{task_id}")
def task_detail(task_id: str):
task = get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
@app.post("/api/agent-office/command")
async def send_command(body: CommandRequest):
agent = get_agent(body.agent)
if not agent:
return {"error": f"Agent '{body.agent}' not found"}
result = await agent.on_command(body.action, body.params or {})
return result
@app.post("/api/agent-office/approve")
async def approve(body: ApprovalRequest):
agent = get_agent(body.agent)
if not agent:
return {"error": f"Agent '{body.agent}' not found"}
await agent.on_approval(body.task_id, body.approved, body.feedback or "")
return {"ok": True}
# --- Telegram Webhook ---
async def _agent_dispatcher(agent_id: str, command: str, params: dict) -> dict:
"""텔레그램 라우터가 호출하는 에이전트 디스패처."""
# 전역 상태 조회
if agent_id == "__global__" and command == "status":
result = {}
for aid, agent in AGENT_REGISTRY.items():
result[aid] = {"state": agent.state, "detail": agent.state_detail}
return result
agent = AGENT_REGISTRY.get(agent_id)
if agent is None:
return {"ok": False, "message": f"Unknown agent: {agent_id}"}
return await agent.on_command(command, params or {})
@app.post("/api/agent-office/telegram/webhook")
async def telegram_webhook(data: dict):
result = await telegram_bot.handle_webhook(data, agent_dispatcher=_agent_dispatcher)
# callback_query (승인/거절) → 기존 승인 흐름
if result and "approved" in result:
agent = get_agent(result["agent_id"])
if agent:
await agent.on_approval(result["task_id"], result["approved"])
return {"ok": True}
@app.get("/api/agent-office/states")
def all_states():
return {"agents": get_all_agent_states()}
@app.get("/api/agent-office/agents/{agent_id}/token-usage")
def agent_token_usage(agent_id: str, days: int = 1):
from .db import get_token_usage_stats
return get_token_usage_stats(agent_id, days)
@app.get("/api/agent-office/conversation/stats")
def conversation_stats(days: int = 7):
from .db import get_conversation_stats
return get_conversation_stats(days)
@app.get("/api/agent-office/activity")
def activity_feed(limit: int = 50, offset: int = 0):
return get_activity_feed(limit, offset)
# --- Realestate Agent Push Endpoint ---
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
class RealestateNotifyBody(BaseModel):
matches: List[Dict[str, Any]]
@app.post("/api/agent-office/realestate/notify")
async def realestate_notify(body: RealestateNotifyBody):
agent = get_agent("realestate")
if agent is None:
from fastapi import HTTPException
raise HTTPException(status_code=503, detail="RealestateAgent not initialized")
return await agent.on_new_matches(body.matches)
# --- YouTube Research Agent Endpoints ---
class YouTubeResearchBody(BaseModel):
countries: List[str] = []
@app.post("/api/agent-office/youtube/research")
async def trigger_youtube_research(body: Optional[YouTubeResearchBody] = None):
agent = get_agent("youtube")
if not agent:
raise HTTPException(status_code=503, detail="YouTubeResearchAgent 없음")
params = {}
if body and body.countries:
params["countries"] = body.countries
result = await agent.on_command("research", params)
return result
@app.get("/api/agent-office/youtube/research/status")
def youtube_research_status():
job = get_latest_youtube_research_job()
if not job:
return {"status": "never_run"}
return job
# --- Lotto Signal Endpoints ---
@app.get("/api/agent-office/lotto/signals")
async def list_lotto_signals(days: int = 7):
"""시그널 이력 (모든 fire_level)."""
from .db import get_signals_history
return {"items": get_signals_history(days=days)}
@app.get("/api/agent-office/lotto/baselines")
async def list_lotto_baselines():
"""현재 baseline μ/σ + window 상태."""
from .db import get_all_baselines
return {"items": get_all_baselines()}
@app.post("/api/agent-office/lotto/signal-check")
async def trigger_signal_check(source: str = "light"):
"""수동 트리거 (디버그·테스트용). source ∈ {light, sim, deep}."""
if source not in ("light", "sim", "deep"):
raise HTTPException(status_code=400, detail="source must be light/sim/deep")
agent = AGENT_REGISTRY.get("lotto")
if not agent:
raise HTTPException(status_code=503, detail="lotto agent not registered")
return await agent.run_signal_check(source=source)