diff --git a/CLAUDE.md b/CLAUDE.md index 8d3758e..88e573d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -58,6 +58,7 @@ Synology NAS 기반의 개인 웹 플랫폼 백엔드 모노레포. | `music-lab` | 18600 | AI 음악 생성·라이브러리 관리 API | | `blog-lab` | 18700 | 블로그 마케팅 수익화 API | | `realestate-lab` | 18800 | 부동산 청약 자동 수집·매칭 API | +| `agent-office` | 18900 | AI 에이전트 오피스 (실시간 WebSocket + 텔레그램 연동) | | `travel-proxy` | 19000 | 여행 사진 API + 썸네일 생성 | | `lotto-frontend` (nginx) | 8080 | 정적 SPA 서빙 + API 리버스 프록시 | | `webpage-deployer` | 19010 | Gitea Webhook 수신 → 자동 배포 | @@ -76,6 +77,7 @@ Synology NAS 기반의 개인 웹 플랫폼 백엔드 모노레포. | `/api/music/` | `music-lab:8000` | AI 음악 생성·라이브러리 API | | `/api/blog-marketing/` | `blog-lab:8000` | 블로그 마케팅 수익화 API | | `/api/realestate/` | `realestate-lab:8000` | 부동산 청약 API | +| `/api/agent-office/` | `agent-office:8000` | AI 에이전트 오피스 API + WebSocket | | `/webhook`, `/webhook/` | `deployer:9000` | Gitea Webhook | | `/media/music/` | `/data/music/` (파일 직접 서빙) | 생성된 오디오 파일 | | `/media/travel/.thumb/` | `/data/thumbs/` (파일 직접 서빙) | 썸네일 캐시 | @@ -407,6 +409,44 @@ docker compose up -d - `NAVER_CLIENT_SECRET`: 네이버 검색 API 시크릿 - `BLOG_DATA_PATH`: SQLite DB 저장 경로 (기본 `./data/blog`) +### agent-office (agent-office/) +- AI 에이전트 가상 오피스 — 2D 픽셀아트 사무실에서 에이전트가 실제 작업 수행 +- stock-lab/music-lab 기존 API를 서비스 프록시로 호출 (직접 DB 접근 없음) +- 실시간 상태 동기화: WebSocket (`/api/agent-office/ws`) +- 텔레그램 봇: 양방향 알림 + 승인 (인라인 키보드) +- DB: `/app/data/agent_office.db` (agent_config, agent_tasks, agent_logs, telegram_state 테이블) +- 파일 구조: `main.py`, `db.py`, `config.py`, `models.py`, `websocket_manager.py`, `service_proxy.py`, `telegram_bot.py`, `scheduler.py`, `agents/base.py`, `agents/stock.py`, `agents/music.py` + +**에이전트 FSM 상태**: idle → working → waiting (승인 대기) → reporting → break (휴식) + +**환경변수** +- `STOCK_LAB_URL`: stock-lab 내부 URL (기본 `http://stock-lab:8000`) +- `MUSIC_LAB_URL`: music-lab 내부 URL (기본 `http://music-lab:8000`) +- `TELEGRAM_BOT_TOKEN`: 텔레그램 봇 토큰 (미설정 시 알림 비활성화) +- `TELEGRAM_CHAT_ID`: 텔레그램 채팅 ID +- `TELEGRAM_WEBHOOK_URL`: 텔레그램 Webhook URL + +**스케줄러 job** +- 08:00 매일 — 주식 뉴스 요약 (`stock_news_job`) +- 60초 간격 — 유휴 에이전트 휴식 체크 (`idle_check_job`) + +**agent-office API 목록** + +| 메서드 | 경로 | 설명 | +|--------|------|------| +| WS | `/api/agent-office/ws` | WebSocket (init, agent_state, task_complete, command_result) | +| GET | `/api/agent-office/agents` | 에이전트 목록 | +| GET | `/api/agent-office/agents/{id}` | 에이전트 상세 (설정 + 상태) | +| PUT | `/api/agent-office/agents/{id}` | 에이전트 설정 수정 | +| GET | `/api/agent-office/agents/{id}/tasks` | 에이전트 작업 이력 | +| GET | `/api/agent-office/agents/{id}/logs` | 에이전트 로그 | +| GET | `/api/agent-office/tasks/pending` | 승인 대기 작업 목록 | +| GET | `/api/agent-office/tasks/{id}` | 작업 상세 | +| POST | `/api/agent-office/command` | 에이전트에 명령 전송 | +| POST | `/api/agent-office/approve` | 작업 승인/거부 | +| POST | `/api/agent-office/telegram/webhook` | 텔레그램 Webhook 수신 | +| GET | `/api/agent-office/states` | 전체 에이전트 상태 조회 | + ### deployer (deployer/) - Webhook 검증: `X-Gitea-Signature` (HMAC SHA256, `compare_digest` 사용) - `WEBHOOK_SECRET` 환경변수로 시크릿 관리 diff --git a/agent-office/Dockerfile b/agent-office/Dockerfile new file mode 100644 index 0000000..c05ee7c --- /dev/null +++ b/agent-office/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.12-alpine +ENV PYTHONUNBUFFERED=1 + +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/agent-office/app/__init__.py b/agent-office/app/__init__.py new file mode 100644 index 0000000..35502e0 --- /dev/null +++ b/agent-office/app/__init__.py @@ -0,0 +1 @@ +# agent-office/app/__init__.py diff --git a/agent-office/app/agents/__init__.py b/agent-office/app/agents/__init__.py new file mode 100644 index 0000000..f941a04 --- /dev/null +++ b/agent-office/app/agents/__init__.py @@ -0,0 +1,17 @@ +from .stock import StockAgent +from .music import MusicAgent + +AGENT_REGISTRY = {} + +def init_agents(): + AGENT_REGISTRY["stock"] = StockAgent() + AGENT_REGISTRY["music"] = MusicAgent() + +def get_agent(agent_id: str): + return AGENT_REGISTRY.get(agent_id) + +def get_all_agent_states() -> list: + return [ + {"agent_id": aid, "state": agent.state, "detail": agent.state_detail} + for aid, agent in AGENT_REGISTRY.items() + ] diff --git a/agent-office/app/agents/base.py b/agent-office/app/agents/base.py new file mode 100644 index 0000000..ef9653d --- /dev/null +++ b/agent-office/app/agents/base.py @@ -0,0 +1,72 @@ +import asyncio +import random +import time +from typing import Optional + +from ..config import IDLE_BREAK_THRESHOLD, BREAK_DURATION_MIN, BREAK_DURATION_MAX +from ..db import add_log + +VALID_STATES = ("idle", "working", "waiting", "reporting", "break") + +class BaseAgent: + agent_id: str = "" + display_name: str = "" + state: str = "idle" + state_detail: str = "" + _idle_since: float = 0.0 + _break_until: float = 0.0 + _ws_manager = None + + def __init__(self): + self._idle_since = time.time() + + def set_ws_manager(self, manager): + self._ws_manager = manager + + async def transition(self, new_state: str, detail: str = "", task_id: str = None) -> None: + if new_state not in VALID_STATES: + return + old = self.state + self.state = new_state + self.state_detail = detail + + if new_state == "idle": + self._idle_since = time.time() + elif new_state == "break": + duration = random.randint(BREAK_DURATION_MIN, BREAK_DURATION_MAX) + self._break_until = time.time() + duration + + add_log(self.agent_id, f"State: {old} -> {new_state} ({detail})") + + if self._ws_manager: + await self._ws_manager.send_agent_state(self.agent_id, new_state, detail, task_id) + if new_state == "break": + await self._ws_manager.send_agent_move(self.agent_id, "break_room") + elif old == "break" and new_state == "idle": + await self._ws_manager.send_agent_move(self.agent_id, "desk") + + async def check_idle_break(self) -> None: + now = time.time() + if self.state == "idle" and (now - self._idle_since) > IDLE_BREAK_THRESHOLD: + if random.random() < 0.5: + break_type = random.choice(["커피 타임", "잠깐 산책", "졸고 있음"]) + await self.transition("break", break_type) + elif self.state == "break" and now > self._break_until: + await self.transition("idle", "휴식 완료") + + async def on_schedule(self) -> None: + raise NotImplementedError + + async def on_command(self, command: str, params: dict) -> dict: + raise NotImplementedError + + async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None: + raise NotImplementedError + + async def get_status(self) -> dict: + return { + "agent_id": self.agent_id, + "display_name": self.display_name, + "state": self.state, + "detail": self.state_detail, + } diff --git a/agent-office/app/agents/music.py b/agent-office/app/agents/music.py new file mode 100644 index 0000000..9e05809 --- /dev/null +++ b/agent-office/app/agents/music.py @@ -0,0 +1,124 @@ +import asyncio +from .base import BaseAgent +from ..db import create_task, update_task_status, approve_task, reject_task, add_log +from .. import service_proxy +from .. import telegram_bot + +class MusicAgent(BaseAgent): + agent_id = "music" + display_name = "음악 프로듀서" + + async def on_schedule(self) -> None: + pass + + async def on_command(self, command: str, params: dict) -> dict: + if command == "compose": + prompt = params.get("prompt", "") + style = params.get("style", "") + model = params.get("model", "V4") + instrumental = params.get("instrumental", False) + + if not prompt: + return {"ok": False, "message": "프롬프트를 입력해주세요"} + + task_id = create_task(self.agent_id, "compose", { + "prompt": prompt, "style": style, + "model": model, "instrumental": instrumental, + }, requires_approval=True) + + await self.transition("waiting", "프롬프트 승인 대기", task_id) + + detail = f"프롬프트: {prompt}" + if style: + detail += f"\n스타일: {style}" + detail += f"\n모델: {model}" + + await telegram_bot.send_approval_request( + self.agent_id, task_id, + "🎵 [음악 에이전트] 작곡 요청", detail, + ) + + return {"ok": True, "task_id": task_id, "message": "승인 대기 중"} + + if command == "credits": + credits = await service_proxy.get_music_credits() + return {"ok": True, "credits": credits} + + return {"ok": False, "message": f"Unknown command: {command}"} + + async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None: + if not approved: + reject_task(task_id) + await self.transition("idle", "작곡 거절됨") + await telegram_bot.send_task_result( + self.agent_id, "🎵 [음악 에이전트] 작곡 취소", + "사용자가 거절했습니다.", + ) + return + + from ..db import get_task + task = get_task(task_id) + if not task: + return + + approve_task(task_id, via="telegram") + await self.transition("working", "작곡 중...", task_id) + asyncio.create_task(self._poll_composition(task_id, task)) + + async def _poll_composition(self, task_id: str, task: dict) -> None: + try: + input_data = task["input_data"] + payload = { + "provider": "suno", + "model": input_data.get("model", "V4"), + "prompt": input_data.get("prompt", ""), + "style": input_data.get("style", ""), + "instrumental": input_data.get("instrumental", False), + "custom_mode": True, + } + + result = await service_proxy.generate_music(payload) + music_task_id = result.get("task_id") + + if not music_task_id: + raise Exception("music-lab did not return task_id") + + for _ in range(60): + await asyncio.sleep(5) + status = await service_proxy.get_music_status(music_task_id) + state = status.get("status", "") + + if state == "succeeded": + tracks = status.get("tracks", []) + update_task_status(task_id, "succeeded", { + "music_task_id": music_task_id, + "tracks": tracks, + }) + await self.transition("reporting", "작곡 완료!") + + track_info = "" + for t in tracks: + title = t.get("title", "Untitled") + url = t.get("audio_url", "") + track_info += f"🎶 {title}\n{url}\n" + + await telegram_bot.send_task_result( + self.agent_id, "🎵 [음악 에이전트] 작곡 완료", + track_info or "트랙 생성 완료", + ) + await self.transition("idle", "작곡 완료") + return + + if state == "failed": + raise Exception(status.get("message", "Generation failed")) + + raise Exception("Timeout: 5분 초과") + + except Exception as e: + add_log(self.agent_id, f"Compose failed: {e}", "error", task_id) + update_task_status(task_id, "failed", {"error": str(e)}) + await self.transition("idle", f"오류: {e}") + await telegram_bot.send_task_result( + self.agent_id, "🎵 [음악 에이전트] 작곡 실패", + f"오류: {e}", + ) diff --git a/agent-office/app/agents/stock.py b/agent-office/app/agents/stock.py new file mode 100644 index 0000000..effabb0 --- /dev/null +++ b/agent-office/app/agents/stock.py @@ -0,0 +1,99 @@ +import asyncio +from typing import Optional + +from .base import BaseAgent +from ..db import create_task, update_task_status, get_agent_config, add_log +from .. import service_proxy + +class StockAgent(BaseAgent): + agent_id = "stock" + display_name = "주식 트레이더" + + async def on_schedule(self) -> None: + if self.state not in ("idle", "break"): + return + + task_id = create_task(self.agent_id, "news_summary", {"limit": 15}) + await self.transition("working", "뉴스 수집 중...", task_id) + + try: + news = await service_proxy.fetch_stock_news(limit=15) + indices = await service_proxy.fetch_stock_indices() + + summary = self._format_news_summary(news, indices) + + update_task_status(task_id, "succeeded", { + "summary": summary, + "news_count": len(news) if isinstance(news, list) else 0, + }) + + await self.transition("reporting", "뉴스 요약 전송 중...") + + from ..telegram_bot import send_stock_summary + await send_stock_summary(summary) + + await self.transition("idle", "뉴스 요약 완료") + + except Exception as e: + add_log(self.agent_id, f"News summary failed: {e}", "error", task_id) + update_task_status(task_id, "failed", {"error": str(e)}) + await self.transition("idle", f"오류: {e}") + + async def on_command(self, command: str, params: dict) -> dict: + if command == "fetch_news": + await self.on_schedule() + return {"ok": True, "message": "뉴스 수집 시작"} + + if command == "add_alert": + symbol = params.get("symbol") + target_price = params.get("target_price") + if not symbol or target_price is None: + return {"ok": False, "message": "symbol과 target_price는 필수입니다"} + config = get_agent_config(self.agent_id) + alerts = config["custom_config"].get("alerts", []) + alerts.append({ + "symbol": symbol, + "name": params.get("name", symbol), + "target_price": target_price, + "direction": params.get("direction", "above"), + }) + from ..db import update_agent_config + update_agent_config(self.agent_id, custom_config={**config["custom_config"], "alerts": alerts}) + return {"ok": True, "message": f"알람 추가: {params['symbol']}"} + + if command == "list_alerts": + config = get_agent_config(self.agent_id) + alerts = config["custom_config"].get("alerts", []) + return {"ok": True, "alerts": alerts} + + return {"ok": False, "message": f"Unknown command: {command}"} + + async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None: + pass + + def _format_news_summary(self, news, indices) -> str: + lines = ["📈 [주식 에이전트] 아침 뉴스 요약", "━" * 20] + + if isinstance(news, list): + for item in news[:10]: + title = item.get("title", "") + if title: + lines.append(f"• {title}") + elif isinstance(news, dict) and "articles" in news: + for item in news["articles"][:10]: + title = item.get("title", "") + if title: + lines.append(f"• {title}") + + if indices: + lines.append("") + lines.append("📊 주요 지수") + if isinstance(indices, dict): + for key, val in indices.items(): + if isinstance(val, dict): + name = val.get("name", key) + price = val.get("price", "") + change = val.get("change", "") + lines.append(f"{name}: {price} ({change})") + + return "\n".join(lines) diff --git a/agent-office/app/config.py b/agent-office/app/config.py new file mode 100644 index 0000000..66caf61 --- /dev/null +++ b/agent-office/app/config.py @@ -0,0 +1,23 @@ +import os + +# Service URLs (Docker internal network) +STOCK_LAB_URL = os.getenv("STOCK_LAB_URL", "http://localhost:18500") +MUSIC_LAB_URL = os.getenv("MUSIC_LAB_URL", "http://localhost:18600") + +# Telegram +TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "") +TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "") +TELEGRAM_WEBHOOK_URL = os.getenv("TELEGRAM_WEBHOOK_URL", "") + +# Database +DB_PATH = os.getenv("AGENT_OFFICE_DB_PATH", "/app/data/agent_office.db") + +# CORS +CORS_ALLOW_ORIGINS = os.getenv( + "CORS_ALLOW_ORIGINS", "http://localhost:3007,http://localhost:8080" +) + +# Idle break threshold (seconds) +IDLE_BREAK_THRESHOLD = int(os.getenv("IDLE_BREAK_THRESHOLD", "300")) # 5 min +BREAK_DURATION_MIN = int(os.getenv("BREAK_DURATION_MIN", "60")) # 1 min +BREAK_DURATION_MAX = int(os.getenv("BREAK_DURATION_MAX", "180")) # 3 min diff --git a/agent-office/app/db.py b/agent-office/app/db.py new file mode 100644 index 0000000..cc5a7e1 --- /dev/null +++ b/agent-office/app/db.py @@ -0,0 +1,261 @@ +import os +import json +import sqlite3 +import uuid +from typing import Any, Dict, List, Optional + +from .config import DB_PATH + + +def _conn() -> sqlite3.Connection: + os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) + conn = sqlite3.connect(DB_PATH, timeout=10) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + return conn + + +def init_db() -> None: + with _conn() as conn: + conn.execute(""" + CREATE TABLE IF NOT EXISTS agent_config ( + agent_id TEXT PRIMARY KEY, + display_name TEXT NOT NULL, + enabled INTEGER NOT NULL DEFAULT 1, + schedule_config TEXT NOT NULL DEFAULT '{}', + custom_config TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), + updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) + ) + """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS agent_tasks ( + id TEXT PRIMARY KEY, + agent_id TEXT NOT NULL, + task_type TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + input_data TEXT NOT NULL DEFAULT '{}', + result_data TEXT, + requires_approval INTEGER NOT NULL DEFAULT 0, + approved_at TEXT, + approved_via TEXT, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), + completed_at TEXT + ) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_tasks_agent + ON agent_tasks(agent_id, created_at DESC) + """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS agent_logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + agent_id TEXT NOT NULL, + task_id TEXT, + level TEXT NOT NULL DEFAULT 'info', + message TEXT NOT NULL, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) + ) + """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS telegram_state ( + callback_id TEXT PRIMARY KEY, + task_id TEXT NOT NULL, + agent_id TEXT NOT NULL, + action TEXT, + responded INTEGER NOT NULL DEFAULT 0, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) + ) + """) + # Seed default agent configs + for agent_id, name in [("stock", "주식 트레이더"), ("music", "음악 프로듀서")]: + conn.execute( + "INSERT OR IGNORE INTO agent_config(agent_id, display_name) VALUES(?,?)", + (agent_id, name), + ) + + +# --- agent_config CRUD --- + +def get_all_agents() -> List[Dict[str, Any]]: + with _conn() as conn: + rows = conn.execute("SELECT * FROM agent_config ORDER BY agent_id").fetchall() + return [_config_to_dict(r) for r in rows] + + +def get_agent_config(agent_id: str) -> Optional[Dict[str, Any]]: + with _conn() as conn: + r = conn.execute("SELECT * FROM agent_config WHERE agent_id=?", (agent_id,)).fetchone() + return _config_to_dict(r) if r else None + + +def update_agent_config(agent_id: str, **kwargs) -> None: + sets, vals = [], [] + for k in ("enabled", "schedule_config", "custom_config"): + if k in kwargs and kwargs[k] is not None: + if k in ("schedule_config", "custom_config"): + sets.append(f"{k}=?") + vals.append(json.dumps(kwargs[k])) + else: + sets.append(f"{k}=?") + vals.append(kwargs[k]) + if not sets: + return + sets.append("updated_at=strftime('%Y-%m-%dT%H:%M:%fZ','now')") + vals.append(agent_id) + with _conn() as conn: + conn.execute(f"UPDATE agent_config SET {','.join(sets)} WHERE agent_id=?", vals) + + +def _config_to_dict(r) -> Dict[str, Any]: + return { + "agent_id": r["agent_id"], + "display_name": r["display_name"], + "enabled": bool(r["enabled"]), + "schedule_config": json.loads(r["schedule_config"]), + "custom_config": json.loads(r["custom_config"]), + "created_at": r["created_at"], + "updated_at": r["updated_at"], + } + + +# --- agent_tasks CRUD --- + +def create_task(agent_id: str, task_type: str, input_data: dict, requires_approval: bool = False) -> str: + task_id = str(uuid.uuid4()) + status = "pending" if requires_approval else "working" + with _conn() as conn: + conn.execute( + "INSERT INTO agent_tasks(id,agent_id,task_type,status,input_data,requires_approval) VALUES(?,?,?,?,?,?)", + (task_id, agent_id, task_type, status, json.dumps(input_data), int(requires_approval)), + ) + return task_id + + +def update_task_status(task_id: str, status: str, result_data: dict = None) -> None: + with _conn() as conn: + if result_data is not None: + conn.execute( + "UPDATE agent_tasks SET status=?, result_data=?, completed_at=strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id=?", + (status, json.dumps(result_data), task_id), + ) + else: + conn.execute("UPDATE agent_tasks SET status=? WHERE id=?", (status, task_id)) + + +def approve_task(task_id: str, via: str = "web") -> None: + with _conn() as conn: + conn.execute( + "UPDATE agent_tasks SET status='approved', approved_at=strftime('%Y-%m-%dT%H:%M:%fZ','now'), approved_via=? WHERE id=?", + (via, task_id), + ) + + +def reject_task(task_id: str) -> None: + with _conn() as conn: + conn.execute( + "UPDATE agent_tasks SET status='rejected', completed_at=strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id=?", + (task_id,), + ) + + +def get_task(task_id: str) -> Optional[Dict[str, Any]]: + with _conn() as conn: + r = conn.execute("SELECT * FROM agent_tasks WHERE id=?", (task_id,)).fetchone() + return _task_to_dict(r) if r else None + + +def get_agent_tasks(agent_id: str, limit: int = 20) -> List[Dict[str, Any]]: + with _conn() as conn: + rows = conn.execute( + "SELECT * FROM agent_tasks WHERE agent_id=? ORDER BY created_at DESC LIMIT ?", + (agent_id, limit), + ).fetchall() + return [_task_to_dict(r) for r in rows] + + +def get_pending_approvals() -> List[Dict[str, Any]]: + with _conn() as conn: + rows = conn.execute( + "SELECT * FROM agent_tasks WHERE status='pending' AND requires_approval=1 ORDER BY created_at DESC" + ).fetchall() + return [_task_to_dict(r) for r in rows] + + +def _task_to_dict(r) -> Dict[str, Any]: + return { + "id": r["id"], + "agent_id": r["agent_id"], + "task_type": r["task_type"], + "status": r["status"], + "input_data": json.loads(r["input_data"]) if r["input_data"] else {}, + "result_data": json.loads(r["result_data"]) if r["result_data"] else None, + "requires_approval": bool(r["requires_approval"]), + "approved_at": r["approved_at"], + "approved_via": r["approved_via"], + "created_at": r["created_at"], + "completed_at": r["completed_at"], + } + + +# --- agent_logs --- + +def add_log(agent_id: str, message: str, level: str = "info", task_id: str = None) -> None: + with _conn() as conn: + conn.execute( + "INSERT INTO agent_logs(agent_id,task_id,level,message) VALUES(?,?,?,?)", + (agent_id, task_id, level, message), + ) + + +def get_logs(agent_id: str, limit: int = 50) -> List[Dict[str, Any]]: + with _conn() as conn: + rows = conn.execute( + "SELECT * FROM agent_logs WHERE agent_id=? ORDER BY created_at DESC LIMIT ?", + (agent_id, limit), + ).fetchall() + return [ + { + "id": r["id"], + "agent_id": r["agent_id"], + "task_id": r["task_id"], + "level": r["level"], + "message": r["message"], + "created_at": r["created_at"], + } + for r in rows + ] + + +# --- telegram_state --- + +def save_telegram_callback(callback_id: str, task_id: str, agent_id: str) -> None: + with _conn() as conn: + conn.execute( + "INSERT OR REPLACE INTO telegram_state(callback_id,task_id,agent_id) VALUES(?,?,?)", + (callback_id, task_id, agent_id), + ) + + +def get_telegram_callback(callback_id: str) -> Optional[Dict[str, Any]]: + with _conn() as conn: + r = conn.execute( + "SELECT * FROM telegram_state WHERE callback_id=? AND responded=0", + (callback_id,), + ).fetchone() + if not r: + return None + return { + "callback_id": r["callback_id"], + "task_id": r["task_id"], + "agent_id": r["agent_id"], + "responded": bool(r["responded"]), + } + + +def mark_telegram_responded(callback_id: str, action: str) -> None: + with _conn() as conn: + conn.execute( + "UPDATE telegram_state SET responded=1, action=? WHERE callback_id=?", + (action, callback_id), + ) diff --git a/agent-office/app/main.py b/agent-office/app/main.py new file mode 100644 index 0000000..1928a71 --- /dev/null +++ b/agent-office/app/main.py @@ -0,0 +1,152 @@ +import os +import json +from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect +from fastapi.middleware.cors import CORSMiddleware + +from .config import CORS_ALLOW_ORIGINS +from .db import init_db, get_all_agents, get_agent_config, update_agent_config, get_agent_tasks, get_pending_approvals, get_task, get_logs +from .models import CommandRequest, ApprovalRequest, AgentConfigUpdate +from .websocket_manager import ws_manager +from .agents import init_agents, get_agent, get_all_agent_states, AGENT_REGISTRY +from .scheduler import init_scheduler +from . import telegram_bot + +app = FastAPI() + +_cors_origins = CORS_ALLOW_ORIGINS.split(",") +app.add_middleware( + CORSMiddleware, + allow_origins=[o.strip() for o in _cors_origins], + allow_credentials=False, + allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], + allow_headers=["Content-Type"], +) + +@app.on_event("startup") +async def on_startup(): + init_db() + os.makedirs("/app/data", exist_ok=True) + init_agents() + for agent in AGENT_REGISTRY.values(): + agent.set_ws_manager(ws_manager) + init_scheduler() + +@app.get("/health") +def health(): + return {"ok": True} + +# --- WebSocket --- + +@app.websocket("/api/agent-office/ws") +async def websocket_endpoint(ws: WebSocket): + await ws_manager.connect(ws) + try: + await ws.send_text(json.dumps({ + "type": "init", + "agents": get_all_agent_states(), + "pending": [t["id"] for t in get_pending_approvals()], + }, ensure_ascii=False)) + while True: + data = await ws.receive_text() + try: + msg = json.loads(data) + except json.JSONDecodeError: + continue + await _handle_ws_message(msg) + except WebSocketDisconnect: + pass + finally: + await ws_manager.disconnect(ws) + +async def _handle_ws_message(msg: dict): + msg_type = msg.get("type") + agent_id = msg.get("agent") + agent = get_agent(agent_id) if agent_id else None + + if msg_type == "command" and agent: + action = msg.get("action", "") + params = msg.get("params", {}) + result = await agent.on_command(action, params) + await ws_manager.broadcast({"type": "command_result", "agent": agent_id, "result": result}) + + elif msg_type == "approval" and agent: + task_id = msg.get("task_id") + approved = msg.get("approved", False) + if task_id: + await agent.on_approval(task_id, approved) + + elif msg_type == "query" and agent: + status = await agent.get_status() + await ws_manager.broadcast({"type": "agent_status", "agent": agent_id, "status": status}) + +# --- REST Endpoints --- + +@app.get("/api/agent-office/agents") +def list_agents(): + return {"agents": get_all_agents()} + +@app.get("/api/agent-office/agents/{agent_id}") +def agent_detail(agent_id: str): + config = get_agent_config(agent_id) + if not config: + raise HTTPException(status_code=404, detail="Agent not found") + agent = get_agent(agent_id) + state_info = {"state": agent.state, "detail": agent.state_detail} if agent else {} + return {**config, **state_info} + +@app.put("/api/agent-office/agents/{agent_id}") +def update_agent(agent_id: str, body: AgentConfigUpdate): + update_agent_config(agent_id, enabled=body.enabled, + schedule_config=body.schedule_config, + custom_config=body.custom_config) + return {"ok": True} + +@app.get("/api/agent-office/agents/{agent_id}/tasks") +def agent_tasks(agent_id: str, limit: int = 20): + return {"tasks": get_agent_tasks(agent_id, limit)} + +@app.get("/api/agent-office/agents/{agent_id}/logs") +def agent_logs(agent_id: str, limit: int = 50): + return {"logs": get_logs(agent_id, limit)} + +@app.get("/api/agent-office/tasks/pending") +def pending_tasks(): + return {"tasks": get_pending_approvals()} + +@app.get("/api/agent-office/tasks/{task_id}") +def task_detail(task_id: str): + task = get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail="Task not found") + return task + +@app.post("/api/agent-office/command") +async def send_command(body: CommandRequest): + agent = get_agent(body.agent) + if not agent: + return {"error": f"Agent '{body.agent}' not found"} + result = await agent.on_command(body.action, body.params or {}) + return result + +@app.post("/api/agent-office/approve") +async def approve(body: ApprovalRequest): + agent = get_agent(body.agent) + if not agent: + return {"error": f"Agent '{body.agent}' not found"} + await agent.on_approval(body.task_id, body.approved, body.feedback or "") + return {"ok": True} + +# --- Telegram Webhook --- + +@app.post("/api/agent-office/telegram/webhook") +async def telegram_webhook(data: dict): + result = await telegram_bot.handle_webhook(data) + if result: + agent = get_agent(result["agent_id"]) + if agent: + await agent.on_approval(result["task_id"], result["approved"]) + return {"ok": True} + +@app.get("/api/agent-office/states") +def all_states(): + return {"agents": get_all_agent_states()} diff --git a/agent-office/app/models.py b/agent-office/app/models.py new file mode 100644 index 0000000..891c19f --- /dev/null +++ b/agent-office/app/models.py @@ -0,0 +1,35 @@ +from pydantic import BaseModel +from typing import Optional + + +class CommandRequest(BaseModel): + agent: str + action: str + params: Optional[dict] = None + + +class ApprovalRequest(BaseModel): + agent: str + task_id: str + approved: bool + feedback: Optional[str] = None + + +class AgentConfigUpdate(BaseModel): + enabled: Optional[bool] = None + schedule_config: Optional[dict] = None + custom_config: Optional[dict] = None + + +class PriceAlertConfig(BaseModel): + symbol: str + name: str + target_price: float + direction: str # "above" or "below" + + +class ComposeCommand(BaseModel): + prompt: str + style: Optional[str] = None + model: Optional[str] = "V4" + instrumental: Optional[bool] = False diff --git a/agent-office/app/scheduler.py b/agent-office/app/scheduler.py new file mode 100644 index 0000000..c5eda69 --- /dev/null +++ b/agent-office/app/scheduler.py @@ -0,0 +1,20 @@ +import asyncio +from apscheduler.schedulers.asyncio import AsyncIOScheduler + +from .agents import AGENT_REGISTRY + +scheduler = AsyncIOScheduler(timezone="Asia/Seoul") + +async def _check_idle_breaks(): + for agent in AGENT_REGISTRY.values(): + await agent.check_idle_break() + +async def _run_stock_schedule(): + agent = AGENT_REGISTRY.get("stock") + if agent: + await agent.on_schedule() + +def init_scheduler(): + scheduler.add_job(_run_stock_schedule, "cron", hour=8, minute=0, id="stock_news") + scheduler.add_job(_check_idle_breaks, "interval", seconds=60, id="idle_check") + scheduler.start() diff --git a/agent-office/app/service_proxy.py b/agent-office/app/service_proxy.py new file mode 100644 index 0000000..ae6d3b6 --- /dev/null +++ b/agent-office/app/service_proxy.py @@ -0,0 +1,34 @@ +import httpx +from typing import Any, Dict, List, Optional + +from .config import STOCK_LAB_URL, MUSIC_LAB_URL + +_client = httpx.AsyncClient(timeout=30.0) + +async def fetch_stock_news(limit: int = 10, category: str = None) -> List[Dict[str, Any]]: + params = {"limit": limit} + if category: + params["category"] = category + resp = await _client.get(f"{STOCK_LAB_URL}/api/stock/news", params=params) + resp.raise_for_status() + return resp.json() + +async def fetch_stock_indices() -> Dict[str, Any]: + resp = await _client.get(f"{STOCK_LAB_URL}/api/stock/indices") + resp.raise_for_status() + return resp.json() + +async def generate_music(payload: dict) -> Dict[str, Any]: + resp = await _client.post(f"{MUSIC_LAB_URL}/api/music/generate", json=payload) + resp.raise_for_status() + return resp.json() + +async def get_music_status(task_id: str) -> Dict[str, Any]: + resp = await _client.get(f"{MUSIC_LAB_URL}/api/music/status/{task_id}") + resp.raise_for_status() + return resp.json() + +async def get_music_credits() -> Dict[str, Any]: + resp = await _client.get(f"{MUSIC_LAB_URL}/api/music/credits") + resp.raise_for_status() + return resp.json() diff --git a/agent-office/app/telegram_bot.py b/agent-office/app/telegram_bot.py new file mode 100644 index 0000000..fb34670 --- /dev/null +++ b/agent-office/app/telegram_bot.py @@ -0,0 +1,82 @@ +import json +import uuid +import httpx +from typing import Optional + +from .config import TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID, TELEGRAM_WEBHOOK_URL +from .db import save_telegram_callback, get_telegram_callback, mark_telegram_responded + +_BASE = "https://api.telegram.org/bot" + +def _enabled() -> bool: + return bool(TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID) + +async def _api(method: str, payload: dict) -> dict: + if not _enabled(): + return {"ok": False, "description": "Telegram not configured"} + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.post(f"{_BASE}{TELEGRAM_BOT_TOKEN}/{method}", json=payload) + return resp.json() + +async def send_message(text: str, reply_markup: dict = None) -> dict: + payload = { + "chat_id": TELEGRAM_CHAT_ID, + "text": text, + "parse_mode": "HTML", + } + if reply_markup: + payload["reply_markup"] = reply_markup + return await _api("sendMessage", payload) + +async def send_stock_summary(summary: str) -> dict: + return await send_message(summary) + +async def send_approval_request(agent_id: str, task_id: str, title: str, detail: str) -> dict: + approve_id = f"approve_{uuid.uuid4().hex[:8]}" + reject_id = f"reject_{uuid.uuid4().hex[:8]}" + + save_telegram_callback(approve_id, task_id, agent_id) + save_telegram_callback(reject_id, task_id, agent_id) + + text = f"{title}\n{'━' * 20}\n{detail}" + reply_markup = { + "inline_keyboard": [[ + {"text": "✅ 승인", "callback_data": approve_id}, + {"text": "❌ 거절", "callback_data": reject_id}, + ]] + } + return await send_message(text, reply_markup) + +async def send_task_result(agent_id: str, title: str, result: str) -> dict: + text = f"{title}\n{'━' * 20}\n{result}" + return await send_message(text) + +async def handle_webhook(data: dict) -> Optional[dict]: + callback_query = data.get("callback_query") + if not callback_query: + return None + + callback_id = callback_query.get("data", "") + cb = get_telegram_callback(callback_id) + if not cb: + return None + + action = "approve" if callback_id.startswith("approve_") else "reject" + mark_telegram_responded(callback_id, action) + + await _api("answerCallbackQuery", { + "callback_query_id": callback_query["id"], + "text": "승인됨 ✅" if action == "approve" else "거절됨 ❌", + }) + + return { + "task_id": cb["task_id"], + "agent_id": cb["agent_id"], + "action": action, + "approved": action == "approve", + } + +async def setup_webhook() -> dict: + if not _enabled() or not TELEGRAM_WEBHOOK_URL: + return {"ok": False, "description": "Webhook URL not configured"} + return await _api("setWebhook", {"url": TELEGRAM_WEBHOOK_URL}) diff --git a/agent-office/app/test_db.py b/agent-office/app/test_db.py new file mode 100644 index 0000000..267763b --- /dev/null +++ b/agent-office/app/test_db.py @@ -0,0 +1,110 @@ +import os +import sys +import tempfile + +# Override DB_PATH before importing db +_tmp = tempfile.mktemp(suffix=".db") +os.environ["AGENT_OFFICE_DB_PATH"] = _tmp + +sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) +from app.db import ( + init_db, get_all_agents, get_agent_config, update_agent_config, + create_task, update_task_status, approve_task, get_task, get_agent_tasks, + get_pending_approvals, add_log, get_logs, + save_telegram_callback, get_telegram_callback, mark_telegram_responded, +) + + +def test_init_and_seed(): + init_db() + agents = get_all_agents() + assert len(agents) == 2, f"Expected 2 agents, got {len(agents)}" + ids = {a["agent_id"] for a in agents} + assert ids == {"stock", "music"}, f"Unexpected agent ids: {ids}" + print(" [PASS] test_init_and_seed") + + +def test_agent_config_update(): + init_db() + update_agent_config("stock", custom_config={"watch": ["AAPL"]}) + cfg = get_agent_config("stock") + assert cfg["custom_config"] == {"watch": ["AAPL"]}, f"Unexpected config: {cfg['custom_config']}" + print(" [PASS] test_agent_config_update") + + +def test_task_lifecycle(): + init_db() + # Create task with approval + tid = create_task("music", "compose", {"prompt": "test"}, requires_approval=True) + task = get_task(tid) + assert task["status"] == "pending", f"Expected pending, got {task['status']}" + assert task["requires_approval"] is True + + # Approve + approve_task(tid, via="telegram") + task = get_task(tid) + assert task["status"] == "approved", f"Expected approved, got {task['status']}" + assert task["approved_via"] == "telegram" + + # Complete + update_task_status(tid, "succeeded", {"url": "/media/music/test.mp3"}) + task = get_task(tid) + assert task["status"] == "succeeded", f"Expected succeeded, got {task['status']}" + assert task["result_data"]["url"] == "/media/music/test.mp3" + print(" [PASS] test_task_lifecycle") + + +def test_task_no_approval(): + init_db() + tid = create_task("stock", "news_summary", {"limit": 10}) + task = get_task(tid) + assert task["status"] == "working", f"Expected working, got {task['status']}" + print(" [PASS] test_task_no_approval") + + +def test_pending_approvals(): + init_db() + create_task("music", "compose", {"prompt": "a"}, requires_approval=True) + create_task("music", "compose", {"prompt": "b"}, requires_approval=True) + create_task("stock", "news_summary", {}) + pending = get_pending_approvals() + assert len(pending) == 2, f"Expected 2 pending, got {len(pending)}" + print(" [PASS] test_pending_approvals") + + +def test_logs(): + init_db() + add_log("stock", "News fetched", "info", "task-1") + add_log("stock", "API error", "error") + logs = get_logs("stock") + assert len(logs) == 2, f"Expected 2 logs, got {len(logs)}" + assert logs[0]["level"] == "error", f"Expected error first (DESC), got {logs[0]['level']}" + print(" [PASS] test_logs") + + +def test_telegram_state(): + init_db() + save_telegram_callback("cb-1", "task-1", "music") + cb = get_telegram_callback("cb-1") + assert cb["task_id"] == "task-1" + mark_telegram_responded("cb-1", "approve") + cb = get_telegram_callback("cb-1") + assert cb is None, f"Expected None after responded=1, got {cb}" + print(" [PASS] test_telegram_state") + + +if __name__ == "__main__": + test_init_and_seed() + test_agent_config_update() + test_task_lifecycle() + test_task_no_approval() + test_pending_approvals() + test_logs() + test_telegram_state() + print("All DB tests passed!") + # Cleanup temp DB (best-effort; WAL mode may keep files open on Windows) + for ext in ("", "-wal", "-shm"): + try: + os.unlink(_tmp + ext) + except OSError: + pass diff --git a/agent-office/app/websocket_manager.py b/agent-office/app/websocket_manager.py new file mode 100644 index 0000000..b8d9103 --- /dev/null +++ b/agent-office/app/websocket_manager.py @@ -0,0 +1,46 @@ +import asyncio +import json +from typing import Any, Dict, Set +from fastapi import WebSocket + +class WebSocketManager: + def __init__(self): + self._connections: Set[WebSocket] = set() + self._lock = asyncio.Lock() + + async def connect(self, ws: WebSocket) -> None: + await ws.accept() + async with self._lock: + self._connections.add(ws) + + async def disconnect(self, ws: WebSocket) -> None: + async with self._lock: + self._connections.discard(ws) + + async def broadcast(self, message: Dict[str, Any]) -> None: + payload = json.dumps(message, ensure_ascii=False) + async with self._lock: + dead = set() + for ws in self._connections: + try: + await ws.send_text(payload) + except Exception: + dead.add(ws) + self._connections -= dead + + async def send_agent_state(self, agent_id: str, state: str, detail: str = "", task_id: str = None) -> None: + msg = {"type": "agent_state", "agent": agent_id, "state": state, "detail": detail} + if task_id: + msg["task_id"] = task_id + await self.broadcast(msg) + + async def send_task_complete(self, agent_id: str, task_id: str, result: dict) -> None: + await self.broadcast({ + "type": "task_complete", "agent": agent_id, + "task_id": task_id, "result": result, + }) + + async def send_agent_move(self, agent_id: str, target: str) -> None: + await self.broadcast({"type": "agent_move", "agent": agent_id, "target": target}) + +ws_manager = WebSocketManager() diff --git a/agent-office/requirements.txt b/agent-office/requirements.txt new file mode 100644 index 0000000..0aed057 --- /dev/null +++ b/agent-office/requirements.txt @@ -0,0 +1,5 @@ +fastapi==0.115.6 +uvicorn[standard]==0.30.6 +apscheduler==3.10.4 +websockets>=12.0 +httpx>=0.27 diff --git a/docker-compose.yml b/docker-compose.yml index 55c47a3..69c821a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -108,6 +108,32 @@ services: timeout: 5s retries: 3 + agent-office: + build: + context: ./agent-office + container_name: agent-office + restart: unless-stopped + ports: + - "18900:8000" + environment: + - TZ=${TZ:-Asia/Seoul} + - CORS_ALLOW_ORIGINS=${CORS_ALLOW_ORIGINS:-http://localhost:3007,http://localhost:8080} + - STOCK_LAB_URL=http://stock-lab:8000 + - MUSIC_LAB_URL=http://music-lab:8000 + - TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN:-} + - TELEGRAM_CHAT_ID=${TELEGRAM_CHAT_ID:-} + - TELEGRAM_WEBHOOK_URL=${TELEGRAM_WEBHOOK_URL:-} + volumes: + - ${RUNTIME_PATH:-.}/data/agent-office:/app/data + depends_on: + - stock-lab + - music-lab + healthcheck: + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"] + interval: 30s + timeout: 5s + retries: 3 + travel-proxy: build: ./travel-proxy container_name: travel-proxy diff --git a/nginx/default.conf b/nginx/default.conf index 8123e20..cfcb846 100644 --- a/nginx/default.conf +++ b/nginx/default.conf @@ -139,6 +139,22 @@ server { } + # agent-office API + WebSocket + location /api/agent-office/ { + resolver 127.0.0.11 valid=10s; + set $agent_office_backend agent-office:8000; + + proxy_http_version 1.1; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + proxy_read_timeout 86400s; + proxy_pass http://$agent_office_backend$request_uri; + } + # API 프록시 (여기가 포인트: /api/ 중복 제거) location /api/ { proxy_http_version 1.1;