feat: Agent Office — AI 에이전트 가상 오피스 #2

Merged
gahusb merged 15 commits from feat/agent-office into main 2026-04-11 13:35:25 +09:00
21 changed files with 4578 additions and 0 deletions

View File

@@ -58,6 +58,7 @@ Synology NAS 기반의 개인 웹 플랫폼 백엔드 모노레포.
| `music-lab` | 18600 | AI 음악 생성·라이브러리 관리 API | | `music-lab` | 18600 | AI 음악 생성·라이브러리 관리 API |
| `blog-lab` | 18700 | 블로그 마케팅 수익화 API | | `blog-lab` | 18700 | 블로그 마케팅 수익화 API |
| `realestate-lab` | 18800 | 부동산 청약 자동 수집·매칭 API | | `realestate-lab` | 18800 | 부동산 청약 자동 수집·매칭 API |
| `agent-office` | 18900 | AI 에이전트 오피스 (실시간 WebSocket + 텔레그램 연동) |
| `travel-proxy` | 19000 | 여행 사진 API + 썸네일 생성 | | `travel-proxy` | 19000 | 여행 사진 API + 썸네일 생성 |
| `lotto-frontend` (nginx) | 8080 | 정적 SPA 서빙 + API 리버스 프록시 | | `lotto-frontend` (nginx) | 8080 | 정적 SPA 서빙 + API 리버스 프록시 |
| `webpage-deployer` | 19010 | Gitea Webhook 수신 → 자동 배포 | | `webpage-deployer` | 19010 | Gitea Webhook 수신 → 자동 배포 |
@@ -76,6 +77,7 @@ Synology NAS 기반의 개인 웹 플랫폼 백엔드 모노레포.
| `/api/music/` | `music-lab:8000` | AI 음악 생성·라이브러리 API | | `/api/music/` | `music-lab:8000` | AI 음악 생성·라이브러리 API |
| `/api/blog-marketing/` | `blog-lab:8000` | 블로그 마케팅 수익화 API | | `/api/blog-marketing/` | `blog-lab:8000` | 블로그 마케팅 수익화 API |
| `/api/realestate/` | `realestate-lab:8000` | 부동산 청약 API | | `/api/realestate/` | `realestate-lab:8000` | 부동산 청약 API |
| `/api/agent-office/` | `agent-office:8000` | AI 에이전트 오피스 API + WebSocket |
| `/webhook`, `/webhook/` | `deployer:9000` | Gitea Webhook | | `/webhook`, `/webhook/` | `deployer:9000` | Gitea Webhook |
| `/media/music/` | `/data/music/` (파일 직접 서빙) | 생성된 오디오 파일 | | `/media/music/` | `/data/music/` (파일 직접 서빙) | 생성된 오디오 파일 |
| `/media/travel/.thumb/` | `/data/thumbs/` (파일 직접 서빙) | 썸네일 캐시 | | `/media/travel/.thumb/` | `/data/thumbs/` (파일 직접 서빙) | 썸네일 캐시 |
@@ -407,6 +409,44 @@ docker compose up -d
- `NAVER_CLIENT_SECRET`: 네이버 검색 API 시크릿 - `NAVER_CLIENT_SECRET`: 네이버 검색 API 시크릿
- `BLOG_DATA_PATH`: SQLite DB 저장 경로 (기본 `./data/blog`) - `BLOG_DATA_PATH`: SQLite DB 저장 경로 (기본 `./data/blog`)
### agent-office (agent-office/)
- AI 에이전트 가상 오피스 — 2D 픽셀아트 사무실에서 에이전트가 실제 작업 수행
- stock-lab/music-lab 기존 API를 서비스 프록시로 호출 (직접 DB 접근 없음)
- 실시간 상태 동기화: WebSocket (`/api/agent-office/ws`)
- 텔레그램 봇: 양방향 알림 + 승인 (인라인 키보드)
- DB: `/app/data/agent_office.db` (agent_config, agent_tasks, agent_logs, telegram_state 테이블)
- 파일 구조: `main.py`, `db.py`, `config.py`, `models.py`, `websocket_manager.py`, `service_proxy.py`, `telegram_bot.py`, `scheduler.py`, `agents/base.py`, `agents/stock.py`, `agents/music.py`
**에이전트 FSM 상태**: idle → working → waiting (승인 대기) → reporting → break (휴식)
**환경변수**
- `STOCK_LAB_URL`: stock-lab 내부 URL (기본 `http://stock-lab:8000`)
- `MUSIC_LAB_URL`: music-lab 내부 URL (기본 `http://music-lab:8000`)
- `TELEGRAM_BOT_TOKEN`: 텔레그램 봇 토큰 (미설정 시 알림 비활성화)
- `TELEGRAM_CHAT_ID`: 텔레그램 채팅 ID
- `TELEGRAM_WEBHOOK_URL`: 텔레그램 Webhook URL
**스케줄러 job**
- 08:00 매일 — 주식 뉴스 요약 (`stock_news_job`)
- 60초 간격 — 유휴 에이전트 휴식 체크 (`idle_check_job`)
**agent-office API 목록**
| 메서드 | 경로 | 설명 |
|--------|------|------|
| WS | `/api/agent-office/ws` | WebSocket (init, agent_state, task_complete, command_result) |
| GET | `/api/agent-office/agents` | 에이전트 목록 |
| GET | `/api/agent-office/agents/{id}` | 에이전트 상세 (설정 + 상태) |
| PUT | `/api/agent-office/agents/{id}` | 에이전트 설정 수정 |
| GET | `/api/agent-office/agents/{id}/tasks` | 에이전트 작업 이력 |
| GET | `/api/agent-office/agents/{id}/logs` | 에이전트 로그 |
| GET | `/api/agent-office/tasks/pending` | 승인 대기 작업 목록 |
| GET | `/api/agent-office/tasks/{id}` | 작업 상세 |
| POST | `/api/agent-office/command` | 에이전트에 명령 전송 |
| POST | `/api/agent-office/approve` | 작업 승인/거부 |
| POST | `/api/agent-office/telegram/webhook` | 텔레그램 Webhook 수신 |
| GET | `/api/agent-office/states` | 전체 에이전트 상태 조회 |
### deployer (deployer/) ### deployer (deployer/)
- Webhook 검증: `X-Gitea-Signature` (HMAC SHA256, `compare_digest` 사용) - Webhook 검증: `X-Gitea-Signature` (HMAC SHA256, `compare_digest` 사용)
- `WEBHOOK_SECRET` 환경변수로 시크릿 관리 - `WEBHOOK_SECRET` 환경변수로 시크릿 관리

10
agent-office/Dockerfile Normal file
View File

@@ -0,0 +1,10 @@
FROM python:3.12-alpine
ENV PYTHONUNBUFFERED=1
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

View File

@@ -0,0 +1 @@
# agent-office/app/__init__.py

View File

@@ -0,0 +1,17 @@
from .stock import StockAgent
from .music import MusicAgent
AGENT_REGISTRY = {}
def init_agents():
AGENT_REGISTRY["stock"] = StockAgent()
AGENT_REGISTRY["music"] = MusicAgent()
def get_agent(agent_id: str):
return AGENT_REGISTRY.get(agent_id)
def get_all_agent_states() -> list:
return [
{"agent_id": aid, "state": agent.state, "detail": agent.state_detail}
for aid, agent in AGENT_REGISTRY.items()
]

View File

@@ -0,0 +1,72 @@
import asyncio
import random
import time
from typing import Optional
from ..config import IDLE_BREAK_THRESHOLD, BREAK_DURATION_MIN, BREAK_DURATION_MAX
from ..db import add_log
VALID_STATES = ("idle", "working", "waiting", "reporting", "break")
class BaseAgent:
agent_id: str = ""
display_name: str = ""
state: str = "idle"
state_detail: str = ""
_idle_since: float = 0.0
_break_until: float = 0.0
_ws_manager = None
def __init__(self):
self._idle_since = time.time()
def set_ws_manager(self, manager):
self._ws_manager = manager
async def transition(self, new_state: str, detail: str = "", task_id: str = None) -> None:
if new_state not in VALID_STATES:
return
old = self.state
self.state = new_state
self.state_detail = detail
if new_state == "idle":
self._idle_since = time.time()
elif new_state == "break":
duration = random.randint(BREAK_DURATION_MIN, BREAK_DURATION_MAX)
self._break_until = time.time() + duration
add_log(self.agent_id, f"State: {old} -> {new_state} ({detail})")
if self._ws_manager:
await self._ws_manager.send_agent_state(self.agent_id, new_state, detail, task_id)
if new_state == "break":
await self._ws_manager.send_agent_move(self.agent_id, "break_room")
elif old == "break" and new_state == "idle":
await self._ws_manager.send_agent_move(self.agent_id, "desk")
async def check_idle_break(self) -> None:
now = time.time()
if self.state == "idle" and (now - self._idle_since) > IDLE_BREAK_THRESHOLD:
if random.random() < 0.5:
break_type = random.choice(["커피 타임", "잠깐 산책", "졸고 있음"])
await self.transition("break", break_type)
elif self.state == "break" and now > self._break_until:
await self.transition("idle", "휴식 완료")
async def on_schedule(self) -> None:
raise NotImplementedError
async def on_command(self, command: str, params: dict) -> dict:
raise NotImplementedError
async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None:
raise NotImplementedError
async def get_status(self) -> dict:
return {
"agent_id": self.agent_id,
"display_name": self.display_name,
"state": self.state,
"detail": self.state_detail,
}

View File

@@ -0,0 +1,124 @@
import asyncio
from .base import BaseAgent
from ..db import create_task, update_task_status, approve_task, reject_task, add_log
from .. import service_proxy
from .. import telegram_bot
class MusicAgent(BaseAgent):
agent_id = "music"
display_name = "음악 프로듀서"
async def on_schedule(self) -> None:
pass
async def on_command(self, command: str, params: dict) -> dict:
if command == "compose":
prompt = params.get("prompt", "")
style = params.get("style", "")
model = params.get("model", "V4")
instrumental = params.get("instrumental", False)
if not prompt:
return {"ok": False, "message": "프롬프트를 입력해주세요"}
task_id = create_task(self.agent_id, "compose", {
"prompt": prompt, "style": style,
"model": model, "instrumental": instrumental,
}, requires_approval=True)
await self.transition("waiting", "프롬프트 승인 대기", task_id)
detail = f"프롬프트: {prompt}"
if style:
detail += f"\n스타일: {style}"
detail += f"\n모델: {model}"
await telegram_bot.send_approval_request(
self.agent_id, task_id,
"🎵 [음악 에이전트] 작곡 요청", detail,
)
return {"ok": True, "task_id": task_id, "message": "승인 대기 중"}
if command == "credits":
credits = await service_proxy.get_music_credits()
return {"ok": True, "credits": credits}
return {"ok": False, "message": f"Unknown command: {command}"}
async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None:
if not approved:
reject_task(task_id)
await self.transition("idle", "작곡 거절됨")
await telegram_bot.send_task_result(
self.agent_id, "🎵 [음악 에이전트] 작곡 취소",
"사용자가 거절했습니다.",
)
return
from ..db import get_task
task = get_task(task_id)
if not task:
return
approve_task(task_id, via="telegram")
await self.transition("working", "작곡 중...", task_id)
asyncio.create_task(self._poll_composition(task_id, task))
async def _poll_composition(self, task_id: str, task: dict) -> None:
try:
input_data = task["input_data"]
payload = {
"provider": "suno",
"model": input_data.get("model", "V4"),
"prompt": input_data.get("prompt", ""),
"style": input_data.get("style", ""),
"instrumental": input_data.get("instrumental", False),
"custom_mode": True,
}
result = await service_proxy.generate_music(payload)
music_task_id = result.get("task_id")
if not music_task_id:
raise Exception("music-lab did not return task_id")
for _ in range(60):
await asyncio.sleep(5)
status = await service_proxy.get_music_status(music_task_id)
state = status.get("status", "")
if state == "succeeded":
tracks = status.get("tracks", [])
update_task_status(task_id, "succeeded", {
"music_task_id": music_task_id,
"tracks": tracks,
})
await self.transition("reporting", "작곡 완료!")
track_info = ""
for t in tracks:
title = t.get("title", "Untitled")
url = t.get("audio_url", "")
track_info += f"🎶 {title}\n{url}\n"
await telegram_bot.send_task_result(
self.agent_id, "🎵 [음악 에이전트] 작곡 완료",
track_info or "트랙 생성 완료",
)
await self.transition("idle", "작곡 완료")
return
if state == "failed":
raise Exception(status.get("message", "Generation failed"))
raise Exception("Timeout: 5분 초과")
except Exception as e:
add_log(self.agent_id, f"Compose failed: {e}", "error", task_id)
update_task_status(task_id, "failed", {"error": str(e)})
await self.transition("idle", f"오류: {e}")
await telegram_bot.send_task_result(
self.agent_id, "🎵 [음악 에이전트] 작곡 실패",
f"오류: {e}",
)

View File

@@ -0,0 +1,99 @@
import asyncio
from typing import Optional
from .base import BaseAgent
from ..db import create_task, update_task_status, get_agent_config, add_log
from .. import service_proxy
class StockAgent(BaseAgent):
agent_id = "stock"
display_name = "주식 트레이더"
async def on_schedule(self) -> None:
if self.state not in ("idle", "break"):
return
task_id = create_task(self.agent_id, "news_summary", {"limit": 15})
await self.transition("working", "뉴스 수집 중...", task_id)
try:
news = await service_proxy.fetch_stock_news(limit=15)
indices = await service_proxy.fetch_stock_indices()
summary = self._format_news_summary(news, indices)
update_task_status(task_id, "succeeded", {
"summary": summary,
"news_count": len(news) if isinstance(news, list) else 0,
})
await self.transition("reporting", "뉴스 요약 전송 중...")
from ..telegram_bot import send_stock_summary
await send_stock_summary(summary)
await self.transition("idle", "뉴스 요약 완료")
except Exception as e:
add_log(self.agent_id, f"News summary failed: {e}", "error", task_id)
update_task_status(task_id, "failed", {"error": str(e)})
await self.transition("idle", f"오류: {e}")
async def on_command(self, command: str, params: dict) -> dict:
if command == "fetch_news":
await self.on_schedule()
return {"ok": True, "message": "뉴스 수집 시작"}
if command == "add_alert":
symbol = params.get("symbol")
target_price = params.get("target_price")
if not symbol or target_price is None:
return {"ok": False, "message": "symbol과 target_price는 필수입니다"}
config = get_agent_config(self.agent_id)
alerts = config["custom_config"].get("alerts", [])
alerts.append({
"symbol": symbol,
"name": params.get("name", symbol),
"target_price": target_price,
"direction": params.get("direction", "above"),
})
from ..db import update_agent_config
update_agent_config(self.agent_id, custom_config={**config["custom_config"], "alerts": alerts})
return {"ok": True, "message": f"알람 추가: {params['symbol']}"}
if command == "list_alerts":
config = get_agent_config(self.agent_id)
alerts = config["custom_config"].get("alerts", [])
return {"ok": True, "alerts": alerts}
return {"ok": False, "message": f"Unknown command: {command}"}
async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None:
pass
def _format_news_summary(self, news, indices) -> str:
lines = ["📈 [주식 에이전트] 아침 뉴스 요약", "" * 20]
if isinstance(news, list):
for item in news[:10]:
title = item.get("title", "")
if title:
lines.append(f"{title}")
elif isinstance(news, dict) and "articles" in news:
for item in news["articles"][:10]:
title = item.get("title", "")
if title:
lines.append(f"{title}")
if indices:
lines.append("")
lines.append("📊 주요 지수")
if isinstance(indices, dict):
for key, val in indices.items():
if isinstance(val, dict):
name = val.get("name", key)
price = val.get("price", "")
change = val.get("change", "")
lines.append(f"{name}: {price} ({change})")
return "\n".join(lines)

View File

@@ -0,0 +1,23 @@
import os
# Service URLs (Docker internal network)
STOCK_LAB_URL = os.getenv("STOCK_LAB_URL", "http://localhost:18500")
MUSIC_LAB_URL = os.getenv("MUSIC_LAB_URL", "http://localhost:18600")
# Telegram
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "")
TELEGRAM_WEBHOOK_URL = os.getenv("TELEGRAM_WEBHOOK_URL", "")
# Database
DB_PATH = os.getenv("AGENT_OFFICE_DB_PATH", "/app/data/agent_office.db")
# CORS
CORS_ALLOW_ORIGINS = os.getenv(
"CORS_ALLOW_ORIGINS", "http://localhost:3007,http://localhost:8080"
)
# Idle break threshold (seconds)
IDLE_BREAK_THRESHOLD = int(os.getenv("IDLE_BREAK_THRESHOLD", "300")) # 5 min
BREAK_DURATION_MIN = int(os.getenv("BREAK_DURATION_MIN", "60")) # 1 min
BREAK_DURATION_MAX = int(os.getenv("BREAK_DURATION_MAX", "180")) # 3 min

261
agent-office/app/db.py Normal file
View File

@@ -0,0 +1,261 @@
import os
import json
import sqlite3
import uuid
from typing import Any, Dict, List, Optional
from .config import DB_PATH
def _conn() -> sqlite3.Connection:
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH, timeout=10)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
return conn
def init_db() -> None:
with _conn() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS agent_config (
agent_id TEXT PRIMARY KEY,
display_name TEXT NOT NULL,
enabled INTEGER NOT NULL DEFAULT 1,
schedule_config TEXT NOT NULL DEFAULT '{}',
custom_config TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS agent_tasks (
id TEXT PRIMARY KEY,
agent_id TEXT NOT NULL,
task_type TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
input_data TEXT NOT NULL DEFAULT '{}',
result_data TEXT,
requires_approval INTEGER NOT NULL DEFAULT 0,
approved_at TEXT,
approved_via TEXT,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
completed_at TEXT
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_tasks_agent
ON agent_tasks(agent_id, created_at DESC)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS agent_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_id TEXT NOT NULL,
task_id TEXT,
level TEXT NOT NULL DEFAULT 'info',
message TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS telegram_state (
callback_id TEXT PRIMARY KEY,
task_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
action TEXT,
responded INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
# Seed default agent configs
for agent_id, name in [("stock", "주식 트레이더"), ("music", "음악 프로듀서")]:
conn.execute(
"INSERT OR IGNORE INTO agent_config(agent_id, display_name) VALUES(?,?)",
(agent_id, name),
)
# --- agent_config CRUD ---
def get_all_agents() -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute("SELECT * FROM agent_config ORDER BY agent_id").fetchall()
return [_config_to_dict(r) for r in rows]
def get_agent_config(agent_id: str) -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute("SELECT * FROM agent_config WHERE agent_id=?", (agent_id,)).fetchone()
return _config_to_dict(r) if r else None
def update_agent_config(agent_id: str, **kwargs) -> None:
sets, vals = [], []
for k in ("enabled", "schedule_config", "custom_config"):
if k in kwargs and kwargs[k] is not None:
if k in ("schedule_config", "custom_config"):
sets.append(f"{k}=?")
vals.append(json.dumps(kwargs[k]))
else:
sets.append(f"{k}=?")
vals.append(kwargs[k])
if not sets:
return
sets.append("updated_at=strftime('%Y-%m-%dT%H:%M:%fZ','now')")
vals.append(agent_id)
with _conn() as conn:
conn.execute(f"UPDATE agent_config SET {','.join(sets)} WHERE agent_id=?", vals)
def _config_to_dict(r) -> Dict[str, Any]:
return {
"agent_id": r["agent_id"],
"display_name": r["display_name"],
"enabled": bool(r["enabled"]),
"schedule_config": json.loads(r["schedule_config"]),
"custom_config": json.loads(r["custom_config"]),
"created_at": r["created_at"],
"updated_at": r["updated_at"],
}
# --- agent_tasks CRUD ---
def create_task(agent_id: str, task_type: str, input_data: dict, requires_approval: bool = False) -> str:
task_id = str(uuid.uuid4())
status = "pending" if requires_approval else "working"
with _conn() as conn:
conn.execute(
"INSERT INTO agent_tasks(id,agent_id,task_type,status,input_data,requires_approval) VALUES(?,?,?,?,?,?)",
(task_id, agent_id, task_type, status, json.dumps(input_data), int(requires_approval)),
)
return task_id
def update_task_status(task_id: str, status: str, result_data: dict = None) -> None:
with _conn() as conn:
if result_data is not None:
conn.execute(
"UPDATE agent_tasks SET status=?, result_data=?, completed_at=strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id=?",
(status, json.dumps(result_data), task_id),
)
else:
conn.execute("UPDATE agent_tasks SET status=? WHERE id=?", (status, task_id))
def approve_task(task_id: str, via: str = "web") -> None:
with _conn() as conn:
conn.execute(
"UPDATE agent_tasks SET status='approved', approved_at=strftime('%Y-%m-%dT%H:%M:%fZ','now'), approved_via=? WHERE id=?",
(via, task_id),
)
def reject_task(task_id: str) -> None:
with _conn() as conn:
conn.execute(
"UPDATE agent_tasks SET status='rejected', completed_at=strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id=?",
(task_id,),
)
def get_task(task_id: str) -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute("SELECT * FROM agent_tasks WHERE id=?", (task_id,)).fetchone()
return _task_to_dict(r) if r else None
def get_agent_tasks(agent_id: str, limit: int = 20) -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM agent_tasks WHERE agent_id=? ORDER BY created_at DESC LIMIT ?",
(agent_id, limit),
).fetchall()
return [_task_to_dict(r) for r in rows]
def get_pending_approvals() -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM agent_tasks WHERE status='pending' AND requires_approval=1 ORDER BY created_at DESC"
).fetchall()
return [_task_to_dict(r) for r in rows]
def _task_to_dict(r) -> Dict[str, Any]:
return {
"id": r["id"],
"agent_id": r["agent_id"],
"task_type": r["task_type"],
"status": r["status"],
"input_data": json.loads(r["input_data"]) if r["input_data"] else {},
"result_data": json.loads(r["result_data"]) if r["result_data"] else None,
"requires_approval": bool(r["requires_approval"]),
"approved_at": r["approved_at"],
"approved_via": r["approved_via"],
"created_at": r["created_at"],
"completed_at": r["completed_at"],
}
# --- agent_logs ---
def add_log(agent_id: str, message: str, level: str = "info", task_id: str = None) -> None:
with _conn() as conn:
conn.execute(
"INSERT INTO agent_logs(agent_id,task_id,level,message) VALUES(?,?,?,?)",
(agent_id, task_id, level, message),
)
def get_logs(agent_id: str, limit: int = 50) -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM agent_logs WHERE agent_id=? ORDER BY created_at DESC LIMIT ?",
(agent_id, limit),
).fetchall()
return [
{
"id": r["id"],
"agent_id": r["agent_id"],
"task_id": r["task_id"],
"level": r["level"],
"message": r["message"],
"created_at": r["created_at"],
}
for r in rows
]
# --- telegram_state ---
def save_telegram_callback(callback_id: str, task_id: str, agent_id: str) -> None:
with _conn() as conn:
conn.execute(
"INSERT OR REPLACE INTO telegram_state(callback_id,task_id,agent_id) VALUES(?,?,?)",
(callback_id, task_id, agent_id),
)
def get_telegram_callback(callback_id: str) -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute(
"SELECT * FROM telegram_state WHERE callback_id=? AND responded=0",
(callback_id,),
).fetchone()
if not r:
return None
return {
"callback_id": r["callback_id"],
"task_id": r["task_id"],
"agent_id": r["agent_id"],
"responded": bool(r["responded"]),
}
def mark_telegram_responded(callback_id: str, action: str) -> None:
with _conn() as conn:
conn.execute(
"UPDATE telegram_state SET responded=1, action=? WHERE callback_id=?",
(action, callback_id),
)

152
agent-office/app/main.py Normal file
View File

@@ -0,0 +1,152 @@
import os
import json
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from .config import CORS_ALLOW_ORIGINS
from .db import init_db, get_all_agents, get_agent_config, update_agent_config, get_agent_tasks, get_pending_approvals, get_task, get_logs
from .models import CommandRequest, ApprovalRequest, AgentConfigUpdate
from .websocket_manager import ws_manager
from .agents import init_agents, get_agent, get_all_agent_states, AGENT_REGISTRY
from .scheduler import init_scheduler
from . import telegram_bot
app = FastAPI()
_cors_origins = CORS_ALLOW_ORIGINS.split(",")
app.add_middleware(
CORSMiddleware,
allow_origins=[o.strip() for o in _cors_origins],
allow_credentials=False,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["Content-Type"],
)
@app.on_event("startup")
async def on_startup():
init_db()
os.makedirs("/app/data", exist_ok=True)
init_agents()
for agent in AGENT_REGISTRY.values():
agent.set_ws_manager(ws_manager)
init_scheduler()
@app.get("/health")
def health():
return {"ok": True}
# --- WebSocket ---
@app.websocket("/api/agent-office/ws")
async def websocket_endpoint(ws: WebSocket):
await ws_manager.connect(ws)
try:
await ws.send_text(json.dumps({
"type": "init",
"agents": get_all_agent_states(),
"pending": [t["id"] for t in get_pending_approvals()],
}, ensure_ascii=False))
while True:
data = await ws.receive_text()
try:
msg = json.loads(data)
except json.JSONDecodeError:
continue
await _handle_ws_message(msg)
except WebSocketDisconnect:
pass
finally:
await ws_manager.disconnect(ws)
async def _handle_ws_message(msg: dict):
msg_type = msg.get("type")
agent_id = msg.get("agent")
agent = get_agent(agent_id) if agent_id else None
if msg_type == "command" and agent:
action = msg.get("action", "")
params = msg.get("params", {})
result = await agent.on_command(action, params)
await ws_manager.broadcast({"type": "command_result", "agent": agent_id, "result": result})
elif msg_type == "approval" and agent:
task_id = msg.get("task_id")
approved = msg.get("approved", False)
if task_id:
await agent.on_approval(task_id, approved)
elif msg_type == "query" and agent:
status = await agent.get_status()
await ws_manager.broadcast({"type": "agent_status", "agent": agent_id, "status": status})
# --- REST Endpoints ---
@app.get("/api/agent-office/agents")
def list_agents():
return {"agents": get_all_agents()}
@app.get("/api/agent-office/agents/{agent_id}")
def agent_detail(agent_id: str):
config = get_agent_config(agent_id)
if not config:
raise HTTPException(status_code=404, detail="Agent not found")
agent = get_agent(agent_id)
state_info = {"state": agent.state, "detail": agent.state_detail} if agent else {}
return {**config, **state_info}
@app.put("/api/agent-office/agents/{agent_id}")
def update_agent(agent_id: str, body: AgentConfigUpdate):
update_agent_config(agent_id, enabled=body.enabled,
schedule_config=body.schedule_config,
custom_config=body.custom_config)
return {"ok": True}
@app.get("/api/agent-office/agents/{agent_id}/tasks")
def agent_tasks(agent_id: str, limit: int = 20):
return {"tasks": get_agent_tasks(agent_id, limit)}
@app.get("/api/agent-office/agents/{agent_id}/logs")
def agent_logs(agent_id: str, limit: int = 50):
return {"logs": get_logs(agent_id, limit)}
@app.get("/api/agent-office/tasks/pending")
def pending_tasks():
return {"tasks": get_pending_approvals()}
@app.get("/api/agent-office/tasks/{task_id}")
def task_detail(task_id: str):
task = get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
@app.post("/api/agent-office/command")
async def send_command(body: CommandRequest):
agent = get_agent(body.agent)
if not agent:
return {"error": f"Agent '{body.agent}' not found"}
result = await agent.on_command(body.action, body.params or {})
return result
@app.post("/api/agent-office/approve")
async def approve(body: ApprovalRequest):
agent = get_agent(body.agent)
if not agent:
return {"error": f"Agent '{body.agent}' not found"}
await agent.on_approval(body.task_id, body.approved, body.feedback or "")
return {"ok": True}
# --- Telegram Webhook ---
@app.post("/api/agent-office/telegram/webhook")
async def telegram_webhook(data: dict):
result = await telegram_bot.handle_webhook(data)
if result:
agent = get_agent(result["agent_id"])
if agent:
await agent.on_approval(result["task_id"], result["approved"])
return {"ok": True}
@app.get("/api/agent-office/states")
def all_states():
return {"agents": get_all_agent_states()}

View File

@@ -0,0 +1,35 @@
from pydantic import BaseModel
from typing import Optional
class CommandRequest(BaseModel):
agent: str
action: str
params: Optional[dict] = None
class ApprovalRequest(BaseModel):
agent: str
task_id: str
approved: bool
feedback: Optional[str] = None
class AgentConfigUpdate(BaseModel):
enabled: Optional[bool] = None
schedule_config: Optional[dict] = None
custom_config: Optional[dict] = None
class PriceAlertConfig(BaseModel):
symbol: str
name: str
target_price: float
direction: str # "above" or "below"
class ComposeCommand(BaseModel):
prompt: str
style: Optional[str] = None
model: Optional[str] = "V4"
instrumental: Optional[bool] = False

View File

@@ -0,0 +1,20 @@
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from .agents import AGENT_REGISTRY
scheduler = AsyncIOScheduler(timezone="Asia/Seoul")
async def _check_idle_breaks():
for agent in AGENT_REGISTRY.values():
await agent.check_idle_break()
async def _run_stock_schedule():
agent = AGENT_REGISTRY.get("stock")
if agent:
await agent.on_schedule()
def init_scheduler():
scheduler.add_job(_run_stock_schedule, "cron", hour=8, minute=0, id="stock_news")
scheduler.add_job(_check_idle_breaks, "interval", seconds=60, id="idle_check")
scheduler.start()

View File

@@ -0,0 +1,34 @@
import httpx
from typing import Any, Dict, List, Optional
from .config import STOCK_LAB_URL, MUSIC_LAB_URL
_client = httpx.AsyncClient(timeout=30.0)
async def fetch_stock_news(limit: int = 10, category: str = None) -> List[Dict[str, Any]]:
params = {"limit": limit}
if category:
params["category"] = category
resp = await _client.get(f"{STOCK_LAB_URL}/api/stock/news", params=params)
resp.raise_for_status()
return resp.json()
async def fetch_stock_indices() -> Dict[str, Any]:
resp = await _client.get(f"{STOCK_LAB_URL}/api/stock/indices")
resp.raise_for_status()
return resp.json()
async def generate_music(payload: dict) -> Dict[str, Any]:
resp = await _client.post(f"{MUSIC_LAB_URL}/api/music/generate", json=payload)
resp.raise_for_status()
return resp.json()
async def get_music_status(task_id: str) -> Dict[str, Any]:
resp = await _client.get(f"{MUSIC_LAB_URL}/api/music/status/{task_id}")
resp.raise_for_status()
return resp.json()
async def get_music_credits() -> Dict[str, Any]:
resp = await _client.get(f"{MUSIC_LAB_URL}/api/music/credits")
resp.raise_for_status()
return resp.json()

View File

@@ -0,0 +1,82 @@
import json
import uuid
import httpx
from typing import Optional
from .config import TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID, TELEGRAM_WEBHOOK_URL
from .db import save_telegram_callback, get_telegram_callback, mark_telegram_responded
_BASE = "https://api.telegram.org/bot"
def _enabled() -> bool:
return bool(TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID)
async def _api(method: str, payload: dict) -> dict:
if not _enabled():
return {"ok": False, "description": "Telegram not configured"}
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.post(f"{_BASE}{TELEGRAM_BOT_TOKEN}/{method}", json=payload)
return resp.json()
async def send_message(text: str, reply_markup: dict = None) -> dict:
payload = {
"chat_id": TELEGRAM_CHAT_ID,
"text": text,
"parse_mode": "HTML",
}
if reply_markup:
payload["reply_markup"] = reply_markup
return await _api("sendMessage", payload)
async def send_stock_summary(summary: str) -> dict:
return await send_message(summary)
async def send_approval_request(agent_id: str, task_id: str, title: str, detail: str) -> dict:
approve_id = f"approve_{uuid.uuid4().hex[:8]}"
reject_id = f"reject_{uuid.uuid4().hex[:8]}"
save_telegram_callback(approve_id, task_id, agent_id)
save_telegram_callback(reject_id, task_id, agent_id)
text = f"{title}\n{'' * 20}\n{detail}"
reply_markup = {
"inline_keyboard": [[
{"text": "✅ 승인", "callback_data": approve_id},
{"text": "❌ 거절", "callback_data": reject_id},
]]
}
return await send_message(text, reply_markup)
async def send_task_result(agent_id: str, title: str, result: str) -> dict:
text = f"{title}\n{'' * 20}\n{result}"
return await send_message(text)
async def handle_webhook(data: dict) -> Optional[dict]:
callback_query = data.get("callback_query")
if not callback_query:
return None
callback_id = callback_query.get("data", "")
cb = get_telegram_callback(callback_id)
if not cb:
return None
action = "approve" if callback_id.startswith("approve_") else "reject"
mark_telegram_responded(callback_id, action)
await _api("answerCallbackQuery", {
"callback_query_id": callback_query["id"],
"text": "승인됨 ✅" if action == "approve" else "거절됨 ❌",
})
return {
"task_id": cb["task_id"],
"agent_id": cb["agent_id"],
"action": action,
"approved": action == "approve",
}
async def setup_webhook() -> dict:
if not _enabled() or not TELEGRAM_WEBHOOK_URL:
return {"ok": False, "description": "Webhook URL not configured"}
return await _api("setWebhook", {"url": TELEGRAM_WEBHOOK_URL})

110
agent-office/app/test_db.py Normal file
View File

@@ -0,0 +1,110 @@
import os
import sys
import tempfile
# Override DB_PATH before importing db
_tmp = tempfile.mktemp(suffix=".db")
os.environ["AGENT_OFFICE_DB_PATH"] = _tmp
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from app.db import (
init_db, get_all_agents, get_agent_config, update_agent_config,
create_task, update_task_status, approve_task, get_task, get_agent_tasks,
get_pending_approvals, add_log, get_logs,
save_telegram_callback, get_telegram_callback, mark_telegram_responded,
)
def test_init_and_seed():
init_db()
agents = get_all_agents()
assert len(agents) == 2, f"Expected 2 agents, got {len(agents)}"
ids = {a["agent_id"] for a in agents}
assert ids == {"stock", "music"}, f"Unexpected agent ids: {ids}"
print(" [PASS] test_init_and_seed")
def test_agent_config_update():
init_db()
update_agent_config("stock", custom_config={"watch": ["AAPL"]})
cfg = get_agent_config("stock")
assert cfg["custom_config"] == {"watch": ["AAPL"]}, f"Unexpected config: {cfg['custom_config']}"
print(" [PASS] test_agent_config_update")
def test_task_lifecycle():
init_db()
# Create task with approval
tid = create_task("music", "compose", {"prompt": "test"}, requires_approval=True)
task = get_task(tid)
assert task["status"] == "pending", f"Expected pending, got {task['status']}"
assert task["requires_approval"] is True
# Approve
approve_task(tid, via="telegram")
task = get_task(tid)
assert task["status"] == "approved", f"Expected approved, got {task['status']}"
assert task["approved_via"] == "telegram"
# Complete
update_task_status(tid, "succeeded", {"url": "/media/music/test.mp3"})
task = get_task(tid)
assert task["status"] == "succeeded", f"Expected succeeded, got {task['status']}"
assert task["result_data"]["url"] == "/media/music/test.mp3"
print(" [PASS] test_task_lifecycle")
def test_task_no_approval():
init_db()
tid = create_task("stock", "news_summary", {"limit": 10})
task = get_task(tid)
assert task["status"] == "working", f"Expected working, got {task['status']}"
print(" [PASS] test_task_no_approval")
def test_pending_approvals():
init_db()
create_task("music", "compose", {"prompt": "a"}, requires_approval=True)
create_task("music", "compose", {"prompt": "b"}, requires_approval=True)
create_task("stock", "news_summary", {})
pending = get_pending_approvals()
assert len(pending) == 2, f"Expected 2 pending, got {len(pending)}"
print(" [PASS] test_pending_approvals")
def test_logs():
init_db()
add_log("stock", "News fetched", "info", "task-1")
add_log("stock", "API error", "error")
logs = get_logs("stock")
assert len(logs) == 2, f"Expected 2 logs, got {len(logs)}"
assert logs[0]["level"] == "error", f"Expected error first (DESC), got {logs[0]['level']}"
print(" [PASS] test_logs")
def test_telegram_state():
init_db()
save_telegram_callback("cb-1", "task-1", "music")
cb = get_telegram_callback("cb-1")
assert cb["task_id"] == "task-1"
mark_telegram_responded("cb-1", "approve")
cb = get_telegram_callback("cb-1")
assert cb is None, f"Expected None after responded=1, got {cb}"
print(" [PASS] test_telegram_state")
if __name__ == "__main__":
test_init_and_seed()
test_agent_config_update()
test_task_lifecycle()
test_task_no_approval()
test_pending_approvals()
test_logs()
test_telegram_state()
print("All DB tests passed!")
# Cleanup temp DB (best-effort; WAL mode may keep files open on Windows)
for ext in ("", "-wal", "-shm"):
try:
os.unlink(_tmp + ext)
except OSError:
pass

View File

@@ -0,0 +1,46 @@
import asyncio
import json
from typing import Any, Dict, Set
from fastapi import WebSocket
class WebSocketManager:
def __init__(self):
self._connections: Set[WebSocket] = set()
self._lock = asyncio.Lock()
async def connect(self, ws: WebSocket) -> None:
await ws.accept()
async with self._lock:
self._connections.add(ws)
async def disconnect(self, ws: WebSocket) -> None:
async with self._lock:
self._connections.discard(ws)
async def broadcast(self, message: Dict[str, Any]) -> None:
payload = json.dumps(message, ensure_ascii=False)
async with self._lock:
dead = set()
for ws in self._connections:
try:
await ws.send_text(payload)
except Exception:
dead.add(ws)
self._connections -= dead
async def send_agent_state(self, agent_id: str, state: str, detail: str = "", task_id: str = None) -> None:
msg = {"type": "agent_state", "agent": agent_id, "state": state, "detail": detail}
if task_id:
msg["task_id"] = task_id
await self.broadcast(msg)
async def send_task_complete(self, agent_id: str, task_id: str, result: dict) -> None:
await self.broadcast({
"type": "task_complete", "agent": agent_id,
"task_id": task_id, "result": result,
})
async def send_agent_move(self, agent_id: str, target: str) -> None:
await self.broadcast({"type": "agent_move", "agent": agent_id, "target": target})
ws_manager = WebSocketManager()

View File

@@ -0,0 +1,5 @@
fastapi==0.115.6
uvicorn[standard]==0.30.6
apscheduler==3.10.4
websockets>=12.0
httpx>=0.27

View File

@@ -108,6 +108,32 @@ services:
timeout: 5s timeout: 5s
retries: 3 retries: 3
agent-office:
build:
context: ./agent-office
container_name: agent-office
restart: unless-stopped
ports:
- "18900:8000"
environment:
- TZ=${TZ:-Asia/Seoul}
- CORS_ALLOW_ORIGINS=${CORS_ALLOW_ORIGINS:-http://localhost:3007,http://localhost:8080}
- STOCK_LAB_URL=http://stock-lab:8000
- MUSIC_LAB_URL=http://music-lab:8000
- TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN:-}
- TELEGRAM_CHAT_ID=${TELEGRAM_CHAT_ID:-}
- TELEGRAM_WEBHOOK_URL=${TELEGRAM_WEBHOOK_URL:-}
volumes:
- ${RUNTIME_PATH:-.}/data/agent-office:/app/data
depends_on:
- stock-lab
- music-lab
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
interval: 30s
timeout: 5s
retries: 3
travel-proxy: travel-proxy:
build: ./travel-proxy build: ./travel-proxy
container_name: travel-proxy container_name: travel-proxy

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,444 @@
# Agent Office - AI 에이전트 사무실 시각화 설계
## 개요
Lab 하위에 2D 픽셀아트 스타일의 가상 사무실을 구현하여, AI 에이전트들이 실시간으로 작업하는 모습을 게임처럼 시각화하고 상호작용하는 페이지.
### 핵심 컨셉
- **게임 같은 사무실**: 2D 픽셀아트 오픈 오피스에 에이전트 캐릭터들이 배치
- **실제 작업 수행**: 에이전트들이 기존 백엔드 서비스 API를 호출하여 실제 결과물 생성
- **직접 지시**: 에이전트 클릭 → 채팅/명령 패널로 지시, 승인 요청 시 알림 표시
- **텔레그램 양방향**: 알림 발송 + 인라인 버튼으로 승인/거절/수정
- **아이들 행동**: 장시간 명령 없으면 휴게실에서 커피, 졸기, 동료 잡담 등
### MVP 범위
- **에이전트 2개**: StockAgent (주식 뉴스/주가 알람), MusicAgent (작곡 파이프라인)
- **사무실**: 단일 오픈 오피스 (향후 방/층 확장 가능)
- **텔레그램**: 양방향 (알림 + 인라인 버튼 승인)
---
## 1. 아키텍처
```
┌─────────────────────────────────────────────────┐
│ Frontend (React) │
│ │
│ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ OfficeCanvas │ │ React Overlay │ │
│ │ (Canvas 2D) │ │ - ChatPanel │ │
│ │ - 타일맵 렌더 │ │ - AgentStatus │ │
│ │ - 스프라이트 │ │ - TaskHistory │ │
│ │ - 클릭 히트맵 │ │ - ApprovalDialog │ │
│ └──────────────┘ └─────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────┐ │
│ │ useAgentManager (상태 + WebSocket) │ │
│ └──────────────────────────────────────────┘ │
└──────────────────┬──────────────────────────────┘
│ WebSocket + REST
┌──────────────────▼──────────────────────────────┐
│ Backend: agent-office (새 서비스, 포트 18900) │
│ │
│ ┌────────────┐ ┌────────────┐ ┌──────────────┐ │
│ │ Scheduler │ │ Agent FSM │ │ Telegram Bot │ │
│ │(APScheduler)│ │ (상태머신) │ │ (양방향) │ │
│ └────────────┘ └────────────┘ └──────────────┘ │
│ │
│ ┌──────────────────────────────────────────┐ │
│ │ Service Proxy (기존 서비스 API 호출) │ │
│ │ stock-lab / music-lab 등 │ │
│ └──────────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘
```
### 핵심 결정
- **agent-office**: 새 백엔드 서비스 (포트 18900). 기존 서비스는 수정하지 않음
- **Service Proxy 패턴**: agent-office가 기존 서비스 API를 HTTP 호출
- **WebSocket**: 에이전트 상태 변화를 실시간 전달
- **Canvas + React 오버레이 하이브리드**: 게임 렌더링은 Canvas, UI 패널은 React DOM
---
## 2. 에이전트 상태 머신 (FSM)
### 상태 전이
```
┌──────┐ 스케줄/지시 ┌──────────┐ 완료 ┌──────────┐
│ idle │ ──────────────→ │ working │ ───────→ │ reporting│
└──┬───┘ └────┬─────┘ └────┬─────┘
│ │ 승인 필요 │
│ 장시간 idle ▼ │ 결과 전달 후
│ ┌───────────┐ │
▼ │ waiting │ │
┌──────┐ │ (승인대기) │ │
│ break│ └───────────┘ │
│ (휴식)│ │
└──┬───┘◄───────────────────────────────────────────┘
│ 새 작업 발생
└──────────→ idle
```
### 상태별 시각화
| 상태 | 캐릭터 행동 | 위치 | 오버레이 |
|------|------------|------|---------|
| `idle` | 모니터 보며 대기 애니메이션 | 자기 데스크 | 없음 |
| `working` | 타이핑 애니메이션, 모니터에 진행 표시 | 자기 데스크 | 작업명 말풍선 |
| `waiting` | 살짝 좌우 흔들림 | 자기 데스크 | `❗` 아이콘 (클릭 유도) |
| `reporting` | 결과물 들고 걸어감 | 데스크 → 회의 테이블 | 결과 요약 말풍선 |
| `break` | 커피 마시기/졸기/산책/잡담 | 휴게실/복도 | `☕`/`💤` 아이콘 |
### 아이들 행동 규칙
- idle 상태 5분 경과 → 50% 확률로 break 전환
- break 지속: 1~3분 랜덤 → idle 복귀
- break 중 에이전트끼리 근처에 있으면 잡담 애니메이션
- 새 작업 발생 시 즉시 break 종료 → idle → working
### 승인 흐름별 분류
| 에이전트 | 자동 실행 | 승인 필요 |
|---------|----------|----------|
| Stock | 뉴스 요약, 주가 알람 | - |
| Music | - | 작곡 (프롬프트 확인 후) |
| Lotto (향후) | 통계 분석, 추천번호 | 구매 관련 |
| Blog (향후) | - | 키워드 제시 후 글 생성 |
| Realestate (향후) | 공고 수집, 매칭 | - |
| Claude AI (향후) | - | 직접 지시 + 승인 |
---
## 3. 사무실 맵 & 렌더링
### 타일맵 구조 (MVP: 단일 오픈 오피스)
```
┌─────────────────────────────────────────┐
│ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │
│ │Stock│ │Music│ │Claude│ │ (빈) │ │
│ │Desk │ │Desk │ │Desk │ │향후용│ │
│ └─────┘ └─────┘ └─────┘ └─────┘ │
│ │
│ ┌───────────┐ │
│ │ 회의 테이블 │ │
│ │ (보고구역) │ │
│ └───────────┘ │
│ │
│ ┌──────────┐ ┌─────────────────┐ │
│ │ 휴게실 │ │ CEO 데스크 (나) │ │
│ │ coffee │ │ │ │
│ └──────────┘ └─────────────────┘ │
└─────────────────────────────────────────┘
```
### 렌더링 계층 (아래→위)
1. **바닥 타일**: 카펫, 나무 바닥
2. **가구**: 데스크, 의자, 소파, 화분, 커피머신
3. **캐릭터**: 에이전트 스프라이트 (상태별 애니메이션)
4. **오버레이**: 말풍선, 상태 아이콘, 이름표
### 스프라이트 에셋
- 무료 픽셀아트 에셋팩 활용 (타일셋, 가구)
- 에이전트 캐릭터: 기본 인물 스프라이트 + 액세서리로 구분
- Stock: 넥타이 + 차트 모니터
- Music: 헤드폰 + 음표 이펙트
- Claude: 보라색 톤 + AI 아이콘
- 스프라이트시트: 4방향 × 4프레임 (idle, walk, work, break)
### Canvas 렌더링 엔진
- **게임 루프**: `requestAnimationFrame` 기반, 60fps 타겟
- **카메라**: 고정 뷰 (MVP), 향후 줌/팬 추가 가능
- **클릭 히트맵**: 캐릭터 바운딩 박스 체크 → 클릭 시 React 이벤트 발생
- **이동**: 웨이포인트 기반 lerp (데스크↔회의실↔휴게실)
---
## 4. 백엔드: agent-office 서비스
### 파일 구조
```
agent-office/
├── app/
│ ├── main.py # FastAPI + WebSocket + lifespan
│ ├── db.py # SQLite (agent_tasks, agent_logs, agent_config)
│ ├── config.py # 환경변수, 서비스 URL 설정
│ ├── scheduler.py # APScheduler 스케줄 관리
│ ├── telegram_bot.py # Telegram Bot API 양방향
│ ├── websocket_manager.py # WebSocket 연결 관리 + 브로드캐스트
│ ├── service_proxy.py # 기존 서비스 API 호출 래퍼
│ ├── agents/
│ │ ├── base.py # BaseAgent (FSM, 공통 로직)
│ │ ├── stock.py # StockAgent
│ │ └── music.py # MusicAgent
│ └── models.py # Pydantic 모델
├── Dockerfile
└── requirements.txt
```
### DB 테이블 (agent_office.db)
**agent_config**
| 컬럼 | 타입 | 설명 |
|------|------|------|
| agent_id | TEXT PK | 에이전트 식별자 (stock, music, ...) |
| display_name | TEXT | 표시명 ("주식 트레이더") |
| enabled | BOOLEAN | 활성 상태 |
| schedule_config | TEXT (JSON) | 스케줄 설정 |
| custom_config | TEXT (JSON) | 에이전트별 커스텀 설정 (감시 종목 등) |
| created_at | TEXT | 생성 시각 |
| updated_at | TEXT | 수정 시각 |
**agent_tasks**
| 컬럼 | 타입 | 설명 |
|------|------|------|
| id | TEXT PK (UUID) | 작업 ID |
| agent_id | TEXT FK | 에이전트 |
| task_type | TEXT | 작업 유형 (news_summary, price_alert, compose, ...) |
| status | TEXT | pending / approved / working / succeeded / failed |
| input_data | TEXT (JSON) | 입력 파라미터 |
| result_data | TEXT (JSON) | 결과 데이터 |
| requires_approval | BOOLEAN | 승인 필요 여부 |
| approved_at | TEXT | 승인 시각 |
| approved_via | TEXT | 승인 경로 (web / telegram) |
| created_at | TEXT | 생성 시각 |
| completed_at | TEXT | 완료 시각 |
**agent_logs**
| 컬럼 | 타입 | 설명 |
|------|------|------|
| id | INTEGER PK | 자동 증가 |
| agent_id | TEXT FK | 에이전트 |
| task_id | TEXT FK | 관련 작업 (nullable) |
| level | TEXT | info / warn / error |
| message | TEXT | 로그 메시지 |
| created_at | TEXT | 시각 |
**telegram_state**
| 컬럼 | 타입 | 설명 |
|------|------|------|
| callback_id | TEXT PK | 텔레그램 콜백 ID |
| task_id | TEXT FK | 매핑된 작업 |
| agent_id | TEXT FK | 매핑된 에이전트 |
| action | TEXT | approve / reject / modify |
| responded | BOOLEAN | 응답 완료 여부 |
| created_at | TEXT | 생성 시각 |
### BaseAgent 인터페이스
```python
class BaseAgent:
agent_id: str
state: str # idle, working, waiting, reporting, break
async def on_schedule(self) -> None:
"""스케줄러에 의해 호출. 자동 작업 실행."""
async def on_command(self, command: str, params: dict) -> dict:
"""사용자 직접 지시 처리."""
async def on_approval(self, task_id: str, approved: bool, feedback: str) -> None:
"""승인/거절 콜백."""
async def get_status(self) -> dict:
"""현재 상태 + 최근 작업 요약."""
```
### MVP 에이전트 상세
**StockAgent:**
- 스케줄: 매일 08:00 `on_schedule()``stock-lab GET /api/stock/news` 호출
- AI 요약: 뉴스 데이터를 Ollama(192.168.45.59)로 요약 생성
- 텔레그램 전송: 요약 결과를 포맷팅하여 발송 (자동, 승인 불필요)
- 주가 알람: `agent_config.custom_config`에 감시 종목/조건 저장, 주기적 체크
- 상태 전이: idle → working(뉴스 수집) → reporting(텔레그램 전송) → idle
**MusicAgent:**
- 트리거: 사용자 웹/텔레그램 지시 → `on_command()`
- 프롬프트 확인: 사용자 입력 프롬프트를 텔레그램으로 전송 + 인라인 버튼
- 승인 시: `music-lab POST /api/music/generate` 호출
- 상태 폴링: `music-lab GET /api/music/status/{task_id}` → 완료까지 반복
- 결과 알림: 생성된 음악 URL을 텔레그램 + 웹에 전달
- 상태 전이: idle → waiting(프롬프트 승인 대기) → working(생성 중) → reporting(결과 전달) → idle
---
## 5. 텔레그램 봇
### 구성
- **Telegram Bot API** + **Webhook 수신** (NAS에서)
- agent-office 서비스 내부에 통합 (별도 프로세스 아님)
- Nginx: `/api/agent-office/telegram/webhook``agent-office:8000`
### 환경변수
- `TELEGRAM_BOT_TOKEN`: Bot Father에서 발급
- `TELEGRAM_CHAT_ID`: 사용자 채팅 ID (1:1 봇)
- `TELEGRAM_WEBHOOK_URL`: Webhook 수신 URL (NAS 외부 접근 가능 URL)
### 메시지 포맷
**자동 알림 (뉴스 요약):**
```
📈 [주식 에이전트] 아침 뉴스 요약
━━━━━━━━━━━━━━━━
• 삼성전자: 반도체 수출 호조...
• 코스피: 외인 순매수 전환...
• 미국 CPI 발표 예정...
📊 관심종목 현황
삼성전자 82,500원 (+2.1%)
AAPL $185.20 (+1.2%)
```
**승인 요청 (작곡):**
```
🎵 [음악 에이전트] 작곡 요청
━━━━━━━━━━━━━━━━
프롬프트: "Lo-fi hip hop, rainy day, piano"
스타일: Chill, Ambient
모델: V5.5
[✅ 승인] [❌ 거절] [✏️ 수정]
```
**주가 알람:**
```
🚨 [주식 에이전트] 주가 알림
━━━━━━━━━━━━━━━━
삼성전자 82,500원
조건: 82,000원 이상 → 도달!
현재 등락: +2.1%
```
### 양방향 흐름
1. 에이전트 → `telegram_bot.send_message()` → 텔레그램
2. 사용자 → 인라인 버튼 클릭 or 텍스트 입력
3. 텔레그램 → Webhook POST → `telegram_bot.handle_webhook()`
4. `handle_webhook()``telegram_state` 조회 → 에이전트 `on_approval()` 호출
5. 에이전트 FSM 상태 전이 → WebSocket 브로드캐스트 → 프론트엔드 반영
---
## 6. 프론트엔드 구조
### 파일 구조
```
src/pages/agent-office/
├── AgentOffice.jsx # 메인 페이지 (Canvas + Overlay 컨테이너)
├── AgentOffice.css # 스타일
├── canvas/
│ ├── OfficeRenderer.js # Canvas 렌더링 엔진 (게임루프)
│ ├── SpriteSheet.js # 스프라이트시트 로더 + 프레임 애니메이션
│ ├── TileMap.js # 타일맵 데이터 + 렌더링
│ └── AgentSprite.js # 에이전트 캐릭터 (위치, 상태, 이동, 애니메이션)
├── components/
│ ├── ChatPanel.jsx # 에이전트 채팅/명령 패널
│ ├── AgentBubble.jsx # 말풍선/상태 아이콘 오버레이
│ ├── TaskHistory.jsx # 작업 이력 사이드패널
│ └── ApprovalDialog.jsx # 승인 요청 다이얼로그
├── hooks/
│ ├── useAgentManager.js # WebSocket + 에이전트 상태 관리
│ └── useOfficeCanvas.js # Canvas 초기화 + 이벤트 바인딩
└── assets/
├── tileset.png # 사무실 타일셋 (16x16 or 32x32)
├── agents.png # 에이전트 스프라이트시트
└── office-map.json # 타일맵 데이터
```
### WebSocket 프로토콜
**서버 → 클라이언트:**
```json
{"type": "agent_state", "agent": "stock", "state": "working", "detail": "뉴스 수집 중..."}
{"type": "agent_state", "agent": "music", "state": "waiting", "detail": "프롬프트 승인 대기", "task_id": "abc-123"}
{"type": "task_complete", "agent": "stock", "task_id": "...", "result": {"summary": "..."}}
{"type": "agent_move", "agent": "stock", "target": "break_room"}
```
**클라이언트 → 서버:**
```json
{"type": "command", "agent": "music", "action": "compose", "params": {"prompt": "...", "style": "..."}}
{"type": "approval", "agent": "music", "task_id": "abc-123", "approved": true}
{"type": "query", "agent": "stock", "action": "status"}
```
### ChatPanel 기능
- 에이전트별 채팅 히스토리 표시
- 텍스트 입력 + 빠른 액션 버튼
- 승인 대기 중인 작업 강조 표시
- 최근 작업 결과 인라인 표시
---
## 7. 인프라 변경
### Docker Compose 추가
```yaml
agent-office:
build: ./agent-office
container_name: agent-office
ports:
- "18900:8000"
volumes:
- ${RUNTIME_PATH}/data:/app/data
environment:
- TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN}
- TELEGRAM_CHAT_ID=${TELEGRAM_CHAT_ID}
- TELEGRAM_WEBHOOK_URL=${TELEGRAM_WEBHOOK_URL}
- STOCK_LAB_URL=http://stock-lab:8000
- MUSIC_LAB_URL=http://music-lab:8000
depends_on:
- stock-lab
- music-lab
restart: unless-stopped
```
### Nginx 라우팅 추가
```nginx
location /api/agent-office/ {
proxy_pass http://agent-office:8000;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade"; # WebSocket 지원
}
```
### 라우팅 (React Router)
```javascript
// routes.jsx
{ path: 'agent-office', lazy: () => import('./pages/agent-office/AgentOffice') }
```
Lab 페이지(EffectLab.jsx)의 LAB_ITEMS에 Agent Office 항목 추가.
---
## 8. 향후 확장 (Phase 2+)
| 단계 | 내용 |
|------|------|
| Phase 2 | LottoAgent, BlogAgent, RealestateAgent 추가 |
| Phase 3 | Claude AI Agent (자연어 복합 지시) |
| Phase 4 | 방/층 확장 (부서별 공간 분리) |
| Phase 5 | 에이전트 간 협업 시각화 (회의 테이블에서 토론) |
| Phase 6 | 에이전트 커스텀 (이름, 외형, 성격 설정) |
---
## 9. 기술 스택 요약
| 레이어 | 기술 |
|--------|------|
| 사무실 렌더링 | HTML5 Canvas 2D (커스텀 엔진) |
| 프론트엔드 | React 18 + Vite |
| 실시간 통신 | WebSocket (FastAPI) |
| 백엔드 | FastAPI (Python 3.12) |
| DB | SQLite (agent_office.db) |
| 스케줄러 | APScheduler |
| 메시징 | Telegram Bot API (Webhook) |
| 서비스 연동 | HTTP Proxy (기존 서비스 API 호출) |

View File

@@ -139,6 +139,22 @@ server {
} }
# agent-office API + WebSocket
location /api/agent-office/ {
resolver 127.0.0.11 valid=10s;
set $agent_office_backend agent-office:8000;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_read_timeout 86400s;
proxy_pass http://$agent_office_backend$request_uri;
}
# API 프록시 (여기가 포인트: /api/ 중복 제거) # API 프록시 (여기가 포인트: /api/ 중복 제거)
location /api/ { location /api/ {
proxy_http_version 1.1; proxy_http_version 1.1;