diff --git a/CLAUDE.md b/CLAUDE.md index 5497d74..98574cc 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -425,6 +425,18 @@ docker compose up -d - `TELEGRAM_BOT_TOKEN`: 텔레그램 봇 토큰 (미설정 시 알림 비활성화) - `TELEGRAM_CHAT_ID`: 텔레그램 채팅 ID - `TELEGRAM_WEBHOOK_URL`: 텔레그램 Webhook URL +- `TELEGRAM_WIFE_CHAT_ID`: 아내 chat.id (브리핑 공유 + 대화 허용) +- `ANTHROPIC_API_KEY`: 자연어 대화용 Claude API 키 (미설정 시 대화 비활성) +- `CONVERSATION_MODEL`: 대화 모델 (기본 `claude-haiku-4-5-20251001`) +- `CONVERSATION_HISTORY_LIMIT`: 이력 주입 수 (기본 20) +- `CONVERSATION_RATE_PER_MIN`: 채팅당 분당 최대 메시지 (기본 6) + +**텔레그램 자연어 대화 (옵션 B)** +- 슬래시 명령이 아닌 일반 문장을 보내면 Claude Haiku 4.5가 응답 +- 프롬프트 캐싱: `system` 블록 + 히스토리 마지막 블록에 `cache_control: ephemeral` → 5분 TTL +- 허용 chat_id 화이트리스트: `TELEGRAM_CHAT_ID`, `TELEGRAM_WIFE_CHAT_ID` +- 평가 지표: `conversation_messages` 테이블에 tokens / cache_read / cache_write / latency 기록 +- 조회: `GET /api/agent-office/conversation/stats?days=7` **스케줄러 job** - 07:30 매일 — 주식 뉴스 요약 (`stock_news_job`) @@ -446,6 +458,7 @@ docker compose up -d | POST | `/api/agent-office/approve` | 작업 승인/거부 | | POST | `/api/agent-office/telegram/webhook` | 텔레그램 Webhook 수신 | | GET | `/api/agent-office/states` | 전체 에이전트 상태 조회 | +| GET | `/api/agent-office/conversation/stats` | 텔레그램 자연어 대화 토큰·캐시 통계 (`days` 필터) | ### deployer (deployer/) - Webhook 검증: `X-Gitea-Signature` (HMAC SHA256, `compare_digest` 사용) diff --git a/agent-office/app/config.py b/agent-office/app/config.py index ddede7a..3ee6b7f 100644 --- a/agent-office/app/config.py +++ b/agent-office/app/config.py @@ -12,6 +12,12 @@ TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "") TELEGRAM_WEBHOOK_URL = os.getenv("TELEGRAM_WEBHOOK_URL", "") TELEGRAM_WIFE_CHAT_ID = os.getenv("TELEGRAM_WIFE_CHAT_ID", "") +# Anthropic (conversational) +ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") +CONVERSATION_MODEL = os.getenv("CONVERSATION_MODEL", "claude-haiku-4-5-20251001") +CONVERSATION_HISTORY_LIMIT = int(os.getenv("CONVERSATION_HISTORY_LIMIT", "20")) +CONVERSATION_RATE_PER_MIN = int(os.getenv("CONVERSATION_RATE_PER_MIN", "6")) + # Database DB_PATH = os.getenv("AGENT_OFFICE_DB_PATH", "/app/data/agent_office.db") diff --git a/agent-office/app/db.py b/agent-office/app/db.py index 4731479..857c76b 100644 --- a/agent-office/app/db.py +++ b/agent-office/app/db.py @@ -67,6 +67,25 @@ def init_db() -> None: created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS conversation_messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + chat_id TEXT NOT NULL, + role TEXT NOT NULL, + content TEXT NOT NULL, + model TEXT, + tokens_input INTEGER DEFAULT 0, + tokens_output INTEGER DEFAULT 0, + cache_read INTEGER DEFAULT 0, + cache_write INTEGER DEFAULT 0, + latency_ms INTEGER DEFAULT 0, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) + ) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_conv_chat + ON conversation_messages(chat_id, created_at DESC) + """) # Seed default agent configs for agent_id, name in [ ("stock", "주식 트레이더"), @@ -319,6 +338,109 @@ def get_token_usage_stats(agent_id: str, days: int = 1) -> dict: } +def save_conversation_message( + chat_id: str, + role: str, + content: str, + model: Optional[str] = None, + tokens_input: int = 0, + tokens_output: int = 0, + cache_read: int = 0, + cache_write: int = 0, + latency_ms: int = 0, +) -> None: + with _conn() as conn: + conn.execute( + """ + INSERT INTO conversation_messages + (chat_id, role, content, model, tokens_input, tokens_output, + cache_read, cache_write, latency_ms) + VALUES (?,?,?,?,?,?,?,?,?) + """, + (str(chat_id), role, content, model, tokens_input, tokens_output, + cache_read, cache_write, latency_ms), + ) + + +def get_conversation_history(chat_id: str, limit: int = 20) -> List[Dict[str, Any]]: + """최근 N개를 시간순(오래된 → 최신)으로 반환.""" + with _conn() as conn: + rows = conn.execute( + """ + SELECT role, content FROM conversation_messages + WHERE chat_id=? ORDER BY id DESC LIMIT ? + """, + (str(chat_id), limit), + ).fetchall() + return [{"role": r["role"], "content": r["content"]} for r in reversed(rows)] + + +def count_recent_user_messages(chat_id: str, seconds: int = 60) -> int: + with _conn() as conn: + r = conn.execute( + """ + SELECT COUNT(*) AS c FROM conversation_messages + WHERE chat_id=? AND role='user' + AND created_at >= strftime('%Y-%m-%dT%H:%M:%fZ','now', ?) + """, + (str(chat_id), f"-{int(seconds)} seconds"), + ).fetchone() + return r["c"] if r else 0 + + +def get_conversation_stats(days: int = 7) -> Dict[str, Any]: + with _conn() as conn: + rows = conn.execute( + """ + SELECT chat_id, + COUNT(*) AS msg_count, + SUM(tokens_input) AS in_tokens, + SUM(tokens_output) AS out_tokens, + SUM(cache_read) AS cache_read, + SUM(cache_write) AS cache_write, + AVG(latency_ms) AS avg_latency + FROM conversation_messages + WHERE role='assistant' + AND created_at >= strftime('%Y-%m-%dT%H:%M:%fZ','now', ?) + GROUP BY chat_id + """, + (f"-{int(days)} days",), + ).fetchall() + + by_chat = [] + tot_in = tot_out = tot_r = tot_w = tot_msgs = 0 + for r in rows: + ci = int(r["in_tokens"] or 0) + co = int(r["out_tokens"] or 0) + cr = int(r["cache_read"] or 0) + cw = int(r["cache_write"] or 0) + mc = int(r["msg_count"] or 0) + hit_rate = (cr / (cr + cw)) if (cr + cw) > 0 else 0.0 + by_chat.append({ + "chat_id": r["chat_id"], + "message_count": mc, + "tokens_input": ci, + "tokens_output": co, + "cache_read": cr, + "cache_write": cw, + "cache_hit_rate": round(hit_rate, 3), + "avg_latency_ms": round(float(r["avg_latency"] or 0), 1), + }) + tot_in += ci; tot_out += co; tot_r += cr; tot_w += cw; tot_msgs += mc + + overall_hit = (tot_r / (tot_r + tot_w)) if (tot_r + tot_w) > 0 else 0.0 + return { + "days": days, + "total_messages": tot_msgs, + "tokens_input": tot_in, + "tokens_output": tot_out, + "cache_read": tot_r, + "cache_write": tot_w, + "cache_hit_rate": round(overall_hit, 3), + "by_chat": by_chat, + } + + def get_activity_feed(limit: int = 50, offset: int = 0) -> dict: with _conn() as conn: total_row = conn.execute(""" diff --git a/agent-office/app/main.py b/agent-office/app/main.py index 875357b..1912708 100644 --- a/agent-office/app/main.py +++ b/agent-office/app/main.py @@ -172,6 +172,11 @@ def agent_token_usage(agent_id: str, days: int = 1): from .db import get_token_usage_stats return get_token_usage_stats(agent_id, days) +@app.get("/api/agent-office/conversation/stats") +def conversation_stats(days: int = 7): + from .db import get_conversation_stats + return get_conversation_stats(days) + @app.get("/api/agent-office/activity") def activity_feed(limit: int = 50, offset: int = 0): return get_activity_feed(limit, offset) diff --git a/agent-office/app/telegram/conversational.py b/agent-office/app/telegram/conversational.py new file mode 100644 index 0000000..2fa3615 --- /dev/null +++ b/agent-office/app/telegram/conversational.py @@ -0,0 +1,154 @@ +"""텔레그램 자연어 대화 핸들러 — Claude + 프롬프트 캐싱. + +구조: +- system prompt(정적) + 최근 대화 이력 + 마지막 user turn +- system과 history 끝 블록에 cache_control=ephemeral 적용 → 5분 TTL 프롬프트 캐시 +- 평가를 위해 토큰·캐시·latency를 DB에 기록 +""" +import asyncio +import time +from typing import Optional + +import httpx + +from ..config import ( + ANTHROPIC_API_KEY, + CONVERSATION_MODEL, + CONVERSATION_HISTORY_LIMIT, + CONVERSATION_RATE_PER_MIN, + TELEGRAM_CHAT_ID, + TELEGRAM_WIFE_CHAT_ID, +) +from ..db import ( + save_conversation_message, + get_conversation_history, + count_recent_user_messages, +) + +API_URL = "https://api.anthropic.com/v1/messages" + +SYSTEM_PROMPT = """당신은 'gahusb' 개인 웹 플랫폼의 AI 비서입니다. 텔레그램을 통해 CEO(주인)와 그의 가족과 대화합니다. + +역할과 성격: +- 따뜻하지만 간결합니다. 텔레그램에서 읽기 쉽게 2~5문장 위주로 답합니다. +- 농담과 위트를 섞되 공손하게. 이모지는 상황에 맞게 1~2개만. +- 모르는 것은 솔직히 모른다고 하고, 추측은 명시합니다. + +플랫폼 컨텍스트(대답에 자연스럽게 참고): +- 주식 에이전트: 뉴스 요약·시장 브리핑·포트폴리오 관리 +- 음악 에이전트: AI 음악 생성(Suno/MusicGen) +- 블로그 에이전트: 키워드 리서치·포스트 생성·품질 리뷰 +- 청약 에이전트: 부동산 청약 공고 수집·매칭 +- 명령은 `/help`, `/agents`, `/status`, `/stock.brief` 같은 슬래시 형식이 있습니다. 사용자가 요청을 설명만 하면 해당 명령을 안내해 주세요. + +응답 규칙: +- 장문 설명 금지. 스크롤을 넘기지 않을 분량. +- 에이전트 실행을 부탁받으면 지금 이 채널은 '대화'만 가능함을 알리고, 정확한 슬래시 명령을 한 줄로 제시하세요. +- HTML·마크다운 태그 없이 평문으로 답합니다.""" + + +_rate_lock = asyncio.Lock() + + +def is_whitelisted(chat_id: str) -> bool: + allowed = {str(x) for x in (TELEGRAM_CHAT_ID, TELEGRAM_WIFE_CHAT_ID) if x} + return str(chat_id) in allowed + + +async def _check_rate_limit(chat_id: str) -> bool: + async with _rate_lock: + count = count_recent_user_messages(chat_id, seconds=60) + return count < CONVERSATION_RATE_PER_MIN + + +async def _call_claude(messages: list) -> dict: + """Anthropic Messages API 호출 (prompt caching beta).""" + headers = { + "x-api-key": ANTHROPIC_API_KEY, + "anthropic-version": "2023-06-01", + "anthropic-beta": "prompt-caching-2024-07-31", + "content-type": "application/json", + } + # system: cache_control 적용하여 정적 프롬프트 캐싱 + system_blocks = [ + { + "type": "text", + "text": SYSTEM_PROMPT, + "cache_control": {"type": "ephemeral"}, + } + ] + payload = { + "model": CONVERSATION_MODEL, + "max_tokens": 1024, + "system": system_blocks, + "messages": messages, + } + async with httpx.AsyncClient(timeout=60) as client: + r = await client.post(API_URL, headers=headers, json=payload) + r.raise_for_status() + return r.json() + + +def _build_messages(history: list, user_text: str) -> list: + """history: [{role, content(str)}, ...]. 가장 오래된 턴을 제외한 나머지 히스토리 끝 블록에 + cache_control을 추가하여 누적 이력을 캐시한다.""" + msgs: list = [] + for h in history: + msgs.append({"role": h["role"], "content": [{"type": "text", "text": h["content"]}]}) + # 히스토리 마지막 블록에 cache_control → 이전 대화를 캐시 + if msgs: + last = msgs[-1]["content"][-1] + last["cache_control"] = {"type": "ephemeral"} + msgs.append({"role": "user", "content": [{"type": "text", "text": user_text}]}) + return msgs + + +async def respond_to_message(chat_id: str, user_text: str) -> Optional[str]: + """자연어 메시지에 응답. 실패 시 사용자에게 돌려줄 문자열 반환(또는 None = 무시).""" + if not ANTHROPIC_API_KEY: + return None # 기능 비활성 + + if not is_whitelisted(chat_id): + return None # 모르는 사용자 무시 + + if not await _check_rate_limit(chat_id): + return "⏳ 잠시만요, 너무 빠릅니다. 분당 몇 번만 대화해 주세요." + + history = get_conversation_history(chat_id, limit=CONVERSATION_HISTORY_LIMIT) + messages = _build_messages(history, user_text) + + started = time.monotonic() + try: + resp = await _call_claude(messages) + except httpx.HTTPStatusError as e: + body = e.response.text[:200] if e.response is not None else "" + return f"⚠️ Claude 호출 실패: {e.response.status_code} {body}" + except Exception as e: + return f"⚠️ 응답 생성 중 오류: {type(e).__name__}" + latency_ms = int((time.monotonic() - started) * 1000) + + try: + reply = "".join( + blk.get("text", "") for blk in resp.get("content", []) if blk.get("type") == "text" + ).strip() + except Exception: + reply = "" + if not reply: + reply = "(빈 응답)" + + usage = resp.get("usage", {}) or {} + t_in = int(usage.get("input_tokens", 0) or 0) + t_out = int(usage.get("output_tokens", 0) or 0) + c_read = int(usage.get("cache_read_input_tokens", 0) or 0) + c_write = int(usage.get("cache_creation_input_tokens", 0) or 0) + + # 기록: user 먼저, assistant 나중 (순서 보존) + save_conversation_message(chat_id, "user", user_text) + save_conversation_message( + chat_id, "assistant", reply, + model=CONVERSATION_MODEL, + tokens_input=t_in, tokens_output=t_out, + cache_read=c_read, cache_write=c_write, + latency_ms=latency_ms, + ) + return reply diff --git a/agent-office/app/telegram/webhook.py b/agent-office/app/telegram/webhook.py index a65f992..c174a71 100644 --- a/agent-office/app/telegram/webhook.py +++ b/agent-office/app/telegram/webhook.py @@ -69,6 +69,16 @@ async def _handle_message(message: dict, agent_dispatcher) -> Optional[dict]: text = message.get("text", "") parsed = parse_command(text) if not parsed: + # 슬래시 명령이 아니면 자연어 대화로 라우팅 + chat_id = str(message.get("chat", {}).get("id", "")) + if not chat_id: + return None + from .conversational import respond_to_message + reply = await respond_to_message(chat_id, text) + if reply: + import html as _html + await send_raw(_html.escape(reply), chat_id=chat_id) + return {"handled": "chat"} return None agent_id, command, args = parsed