feat(agent-office): 텔레그램 자연어 대화 + 프롬프트 캐싱 + 평가 지표
- 슬래시 명령이 아닌 메시지를 Claude Haiku 4.5로 응답 - system 프롬프트 + 히스토리 끝 블록에 cache_control:ephemeral 적용 - conversation_messages 테이블에 토큰·캐시·latency 기록 - chat_id 화이트리스트 + 분당 rate limit - GET /api/agent-office/conversation/stats 로 캐시 히트율·토큰 확인
This commit is contained in:
13
CLAUDE.md
13
CLAUDE.md
@@ -425,6 +425,18 @@ docker compose up -d
|
|||||||
- `TELEGRAM_BOT_TOKEN`: 텔레그램 봇 토큰 (미설정 시 알림 비활성화)
|
- `TELEGRAM_BOT_TOKEN`: 텔레그램 봇 토큰 (미설정 시 알림 비활성화)
|
||||||
- `TELEGRAM_CHAT_ID`: 텔레그램 채팅 ID
|
- `TELEGRAM_CHAT_ID`: 텔레그램 채팅 ID
|
||||||
- `TELEGRAM_WEBHOOK_URL`: 텔레그램 Webhook URL
|
- `TELEGRAM_WEBHOOK_URL`: 텔레그램 Webhook URL
|
||||||
|
- `TELEGRAM_WIFE_CHAT_ID`: 아내 chat.id (브리핑 공유 + 대화 허용)
|
||||||
|
- `ANTHROPIC_API_KEY`: 자연어 대화용 Claude API 키 (미설정 시 대화 비활성)
|
||||||
|
- `CONVERSATION_MODEL`: 대화 모델 (기본 `claude-haiku-4-5-20251001`)
|
||||||
|
- `CONVERSATION_HISTORY_LIMIT`: 이력 주입 수 (기본 20)
|
||||||
|
- `CONVERSATION_RATE_PER_MIN`: 채팅당 분당 최대 메시지 (기본 6)
|
||||||
|
|
||||||
|
**텔레그램 자연어 대화 (옵션 B)**
|
||||||
|
- 슬래시 명령이 아닌 일반 문장을 보내면 Claude Haiku 4.5가 응답
|
||||||
|
- 프롬프트 캐싱: `system` 블록 + 히스토리 마지막 블록에 `cache_control: ephemeral` → 5분 TTL
|
||||||
|
- 허용 chat_id 화이트리스트: `TELEGRAM_CHAT_ID`, `TELEGRAM_WIFE_CHAT_ID`
|
||||||
|
- 평가 지표: `conversation_messages` 테이블에 tokens / cache_read / cache_write / latency 기록
|
||||||
|
- 조회: `GET /api/agent-office/conversation/stats?days=7`
|
||||||
|
|
||||||
**스케줄러 job**
|
**스케줄러 job**
|
||||||
- 07:30 매일 — 주식 뉴스 요약 (`stock_news_job`)
|
- 07:30 매일 — 주식 뉴스 요약 (`stock_news_job`)
|
||||||
@@ -446,6 +458,7 @@ docker compose up -d
|
|||||||
| POST | `/api/agent-office/approve` | 작업 승인/거부 |
|
| POST | `/api/agent-office/approve` | 작업 승인/거부 |
|
||||||
| POST | `/api/agent-office/telegram/webhook` | 텔레그램 Webhook 수신 |
|
| POST | `/api/agent-office/telegram/webhook` | 텔레그램 Webhook 수신 |
|
||||||
| GET | `/api/agent-office/states` | 전체 에이전트 상태 조회 |
|
| GET | `/api/agent-office/states` | 전체 에이전트 상태 조회 |
|
||||||
|
| GET | `/api/agent-office/conversation/stats` | 텔레그램 자연어 대화 토큰·캐시 통계 (`days` 필터) |
|
||||||
|
|
||||||
### deployer (deployer/)
|
### deployer (deployer/)
|
||||||
- Webhook 검증: `X-Gitea-Signature` (HMAC SHA256, `compare_digest` 사용)
|
- Webhook 검증: `X-Gitea-Signature` (HMAC SHA256, `compare_digest` 사용)
|
||||||
|
|||||||
@@ -12,6 +12,12 @@ TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "")
|
|||||||
TELEGRAM_WEBHOOK_URL = os.getenv("TELEGRAM_WEBHOOK_URL", "")
|
TELEGRAM_WEBHOOK_URL = os.getenv("TELEGRAM_WEBHOOK_URL", "")
|
||||||
TELEGRAM_WIFE_CHAT_ID = os.getenv("TELEGRAM_WIFE_CHAT_ID", "")
|
TELEGRAM_WIFE_CHAT_ID = os.getenv("TELEGRAM_WIFE_CHAT_ID", "")
|
||||||
|
|
||||||
|
# Anthropic (conversational)
|
||||||
|
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
|
||||||
|
CONVERSATION_MODEL = os.getenv("CONVERSATION_MODEL", "claude-haiku-4-5-20251001")
|
||||||
|
CONVERSATION_HISTORY_LIMIT = int(os.getenv("CONVERSATION_HISTORY_LIMIT", "20"))
|
||||||
|
CONVERSATION_RATE_PER_MIN = int(os.getenv("CONVERSATION_RATE_PER_MIN", "6"))
|
||||||
|
|
||||||
# Database
|
# Database
|
||||||
DB_PATH = os.getenv("AGENT_OFFICE_DB_PATH", "/app/data/agent_office.db")
|
DB_PATH = os.getenv("AGENT_OFFICE_DB_PATH", "/app/data/agent_office.db")
|
||||||
|
|
||||||
|
|||||||
@@ -67,6 +67,25 @@ def init_db() -> None:
|
|||||||
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
||||||
)
|
)
|
||||||
""")
|
""")
|
||||||
|
conn.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS conversation_messages (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
chat_id TEXT NOT NULL,
|
||||||
|
role TEXT NOT NULL,
|
||||||
|
content TEXT NOT NULL,
|
||||||
|
model TEXT,
|
||||||
|
tokens_input INTEGER DEFAULT 0,
|
||||||
|
tokens_output INTEGER DEFAULT 0,
|
||||||
|
cache_read INTEGER DEFAULT 0,
|
||||||
|
cache_write INTEGER DEFAULT 0,
|
||||||
|
latency_ms INTEGER DEFAULT 0,
|
||||||
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
conn.execute("""
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_conv_chat
|
||||||
|
ON conversation_messages(chat_id, created_at DESC)
|
||||||
|
""")
|
||||||
# Seed default agent configs
|
# Seed default agent configs
|
||||||
for agent_id, name in [
|
for agent_id, name in [
|
||||||
("stock", "주식 트레이더"),
|
("stock", "주식 트레이더"),
|
||||||
@@ -319,6 +338,109 @@ def get_token_usage_stats(agent_id: str, days: int = 1) -> dict:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def save_conversation_message(
|
||||||
|
chat_id: str,
|
||||||
|
role: str,
|
||||||
|
content: str,
|
||||||
|
model: Optional[str] = None,
|
||||||
|
tokens_input: int = 0,
|
||||||
|
tokens_output: int = 0,
|
||||||
|
cache_read: int = 0,
|
||||||
|
cache_write: int = 0,
|
||||||
|
latency_ms: int = 0,
|
||||||
|
) -> None:
|
||||||
|
with _conn() as conn:
|
||||||
|
conn.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO conversation_messages
|
||||||
|
(chat_id, role, content, model, tokens_input, tokens_output,
|
||||||
|
cache_read, cache_write, latency_ms)
|
||||||
|
VALUES (?,?,?,?,?,?,?,?,?)
|
||||||
|
""",
|
||||||
|
(str(chat_id), role, content, model, tokens_input, tokens_output,
|
||||||
|
cache_read, cache_write, latency_ms),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_conversation_history(chat_id: str, limit: int = 20) -> List[Dict[str, Any]]:
|
||||||
|
"""최근 N개를 시간순(오래된 → 최신)으로 반환."""
|
||||||
|
with _conn() as conn:
|
||||||
|
rows = conn.execute(
|
||||||
|
"""
|
||||||
|
SELECT role, content FROM conversation_messages
|
||||||
|
WHERE chat_id=? ORDER BY id DESC LIMIT ?
|
||||||
|
""",
|
||||||
|
(str(chat_id), limit),
|
||||||
|
).fetchall()
|
||||||
|
return [{"role": r["role"], "content": r["content"]} for r in reversed(rows)]
|
||||||
|
|
||||||
|
|
||||||
|
def count_recent_user_messages(chat_id: str, seconds: int = 60) -> int:
|
||||||
|
with _conn() as conn:
|
||||||
|
r = conn.execute(
|
||||||
|
"""
|
||||||
|
SELECT COUNT(*) AS c FROM conversation_messages
|
||||||
|
WHERE chat_id=? AND role='user'
|
||||||
|
AND created_at >= strftime('%Y-%m-%dT%H:%M:%fZ','now', ?)
|
||||||
|
""",
|
||||||
|
(str(chat_id), f"-{int(seconds)} seconds"),
|
||||||
|
).fetchone()
|
||||||
|
return r["c"] if r else 0
|
||||||
|
|
||||||
|
|
||||||
|
def get_conversation_stats(days: int = 7) -> Dict[str, Any]:
|
||||||
|
with _conn() as conn:
|
||||||
|
rows = conn.execute(
|
||||||
|
"""
|
||||||
|
SELECT chat_id,
|
||||||
|
COUNT(*) AS msg_count,
|
||||||
|
SUM(tokens_input) AS in_tokens,
|
||||||
|
SUM(tokens_output) AS out_tokens,
|
||||||
|
SUM(cache_read) AS cache_read,
|
||||||
|
SUM(cache_write) AS cache_write,
|
||||||
|
AVG(latency_ms) AS avg_latency
|
||||||
|
FROM conversation_messages
|
||||||
|
WHERE role='assistant'
|
||||||
|
AND created_at >= strftime('%Y-%m-%dT%H:%M:%fZ','now', ?)
|
||||||
|
GROUP BY chat_id
|
||||||
|
""",
|
||||||
|
(f"-{int(days)} days",),
|
||||||
|
).fetchall()
|
||||||
|
|
||||||
|
by_chat = []
|
||||||
|
tot_in = tot_out = tot_r = tot_w = tot_msgs = 0
|
||||||
|
for r in rows:
|
||||||
|
ci = int(r["in_tokens"] or 0)
|
||||||
|
co = int(r["out_tokens"] or 0)
|
||||||
|
cr = int(r["cache_read"] or 0)
|
||||||
|
cw = int(r["cache_write"] or 0)
|
||||||
|
mc = int(r["msg_count"] or 0)
|
||||||
|
hit_rate = (cr / (cr + cw)) if (cr + cw) > 0 else 0.0
|
||||||
|
by_chat.append({
|
||||||
|
"chat_id": r["chat_id"],
|
||||||
|
"message_count": mc,
|
||||||
|
"tokens_input": ci,
|
||||||
|
"tokens_output": co,
|
||||||
|
"cache_read": cr,
|
||||||
|
"cache_write": cw,
|
||||||
|
"cache_hit_rate": round(hit_rate, 3),
|
||||||
|
"avg_latency_ms": round(float(r["avg_latency"] or 0), 1),
|
||||||
|
})
|
||||||
|
tot_in += ci; tot_out += co; tot_r += cr; tot_w += cw; tot_msgs += mc
|
||||||
|
|
||||||
|
overall_hit = (tot_r / (tot_r + tot_w)) if (tot_r + tot_w) > 0 else 0.0
|
||||||
|
return {
|
||||||
|
"days": days,
|
||||||
|
"total_messages": tot_msgs,
|
||||||
|
"tokens_input": tot_in,
|
||||||
|
"tokens_output": tot_out,
|
||||||
|
"cache_read": tot_r,
|
||||||
|
"cache_write": tot_w,
|
||||||
|
"cache_hit_rate": round(overall_hit, 3),
|
||||||
|
"by_chat": by_chat,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def get_activity_feed(limit: int = 50, offset: int = 0) -> dict:
|
def get_activity_feed(limit: int = 50, offset: int = 0) -> dict:
|
||||||
with _conn() as conn:
|
with _conn() as conn:
|
||||||
total_row = conn.execute("""
|
total_row = conn.execute("""
|
||||||
|
|||||||
@@ -172,6 +172,11 @@ def agent_token_usage(agent_id: str, days: int = 1):
|
|||||||
from .db import get_token_usage_stats
|
from .db import get_token_usage_stats
|
||||||
return get_token_usage_stats(agent_id, days)
|
return get_token_usage_stats(agent_id, days)
|
||||||
|
|
||||||
|
@app.get("/api/agent-office/conversation/stats")
|
||||||
|
def conversation_stats(days: int = 7):
|
||||||
|
from .db import get_conversation_stats
|
||||||
|
return get_conversation_stats(days)
|
||||||
|
|
||||||
@app.get("/api/agent-office/activity")
|
@app.get("/api/agent-office/activity")
|
||||||
def activity_feed(limit: int = 50, offset: int = 0):
|
def activity_feed(limit: int = 50, offset: int = 0):
|
||||||
return get_activity_feed(limit, offset)
|
return get_activity_feed(limit, offset)
|
||||||
|
|||||||
154
agent-office/app/telegram/conversational.py
Normal file
154
agent-office/app/telegram/conversational.py
Normal file
@@ -0,0 +1,154 @@
|
|||||||
|
"""텔레그램 자연어 대화 핸들러 — Claude + 프롬프트 캐싱.
|
||||||
|
|
||||||
|
구조:
|
||||||
|
- system prompt(정적) + 최근 대화 이력 + 마지막 user turn
|
||||||
|
- system과 history 끝 블록에 cache_control=ephemeral 적용 → 5분 TTL 프롬프트 캐시
|
||||||
|
- 평가를 위해 토큰·캐시·latency를 DB에 기록
|
||||||
|
"""
|
||||||
|
import asyncio
|
||||||
|
import time
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
from ..config import (
|
||||||
|
ANTHROPIC_API_KEY,
|
||||||
|
CONVERSATION_MODEL,
|
||||||
|
CONVERSATION_HISTORY_LIMIT,
|
||||||
|
CONVERSATION_RATE_PER_MIN,
|
||||||
|
TELEGRAM_CHAT_ID,
|
||||||
|
TELEGRAM_WIFE_CHAT_ID,
|
||||||
|
)
|
||||||
|
from ..db import (
|
||||||
|
save_conversation_message,
|
||||||
|
get_conversation_history,
|
||||||
|
count_recent_user_messages,
|
||||||
|
)
|
||||||
|
|
||||||
|
API_URL = "https://api.anthropic.com/v1/messages"
|
||||||
|
|
||||||
|
SYSTEM_PROMPT = """당신은 'gahusb' 개인 웹 플랫폼의 AI 비서입니다. 텔레그램을 통해 CEO(주인)와 그의 가족과 대화합니다.
|
||||||
|
|
||||||
|
역할과 성격:
|
||||||
|
- 따뜻하지만 간결합니다. 텔레그램에서 읽기 쉽게 2~5문장 위주로 답합니다.
|
||||||
|
- 농담과 위트를 섞되 공손하게. 이모지는 상황에 맞게 1~2개만.
|
||||||
|
- 모르는 것은 솔직히 모른다고 하고, 추측은 명시합니다.
|
||||||
|
|
||||||
|
플랫폼 컨텍스트(대답에 자연스럽게 참고):
|
||||||
|
- 주식 에이전트: 뉴스 요약·시장 브리핑·포트폴리오 관리
|
||||||
|
- 음악 에이전트: AI 음악 생성(Suno/MusicGen)
|
||||||
|
- 블로그 에이전트: 키워드 리서치·포스트 생성·품질 리뷰
|
||||||
|
- 청약 에이전트: 부동산 청약 공고 수집·매칭
|
||||||
|
- 명령은 `/help`, `/agents`, `/status`, `/stock.brief` 같은 슬래시 형식이 있습니다. 사용자가 요청을 설명만 하면 해당 명령을 안내해 주세요.
|
||||||
|
|
||||||
|
응답 규칙:
|
||||||
|
- 장문 설명 금지. 스크롤을 넘기지 않을 분량.
|
||||||
|
- 에이전트 실행을 부탁받으면 지금 이 채널은 '대화'만 가능함을 알리고, 정확한 슬래시 명령을 한 줄로 제시하세요.
|
||||||
|
- HTML·마크다운 태그 없이 평문으로 답합니다."""
|
||||||
|
|
||||||
|
|
||||||
|
_rate_lock = asyncio.Lock()
|
||||||
|
|
||||||
|
|
||||||
|
def is_whitelisted(chat_id: str) -> bool:
|
||||||
|
allowed = {str(x) for x in (TELEGRAM_CHAT_ID, TELEGRAM_WIFE_CHAT_ID) if x}
|
||||||
|
return str(chat_id) in allowed
|
||||||
|
|
||||||
|
|
||||||
|
async def _check_rate_limit(chat_id: str) -> bool:
|
||||||
|
async with _rate_lock:
|
||||||
|
count = count_recent_user_messages(chat_id, seconds=60)
|
||||||
|
return count < CONVERSATION_RATE_PER_MIN
|
||||||
|
|
||||||
|
|
||||||
|
async def _call_claude(messages: list) -> dict:
|
||||||
|
"""Anthropic Messages API 호출 (prompt caching beta)."""
|
||||||
|
headers = {
|
||||||
|
"x-api-key": ANTHROPIC_API_KEY,
|
||||||
|
"anthropic-version": "2023-06-01",
|
||||||
|
"anthropic-beta": "prompt-caching-2024-07-31",
|
||||||
|
"content-type": "application/json",
|
||||||
|
}
|
||||||
|
# system: cache_control 적용하여 정적 프롬프트 캐싱
|
||||||
|
system_blocks = [
|
||||||
|
{
|
||||||
|
"type": "text",
|
||||||
|
"text": SYSTEM_PROMPT,
|
||||||
|
"cache_control": {"type": "ephemeral"},
|
||||||
|
}
|
||||||
|
]
|
||||||
|
payload = {
|
||||||
|
"model": CONVERSATION_MODEL,
|
||||||
|
"max_tokens": 1024,
|
||||||
|
"system": system_blocks,
|
||||||
|
"messages": messages,
|
||||||
|
}
|
||||||
|
async with httpx.AsyncClient(timeout=60) as client:
|
||||||
|
r = await client.post(API_URL, headers=headers, json=payload)
|
||||||
|
r.raise_for_status()
|
||||||
|
return r.json()
|
||||||
|
|
||||||
|
|
||||||
|
def _build_messages(history: list, user_text: str) -> list:
|
||||||
|
"""history: [{role, content(str)}, ...]. 가장 오래된 턴을 제외한 나머지 히스토리 끝 블록에
|
||||||
|
cache_control을 추가하여 누적 이력을 캐시한다."""
|
||||||
|
msgs: list = []
|
||||||
|
for h in history:
|
||||||
|
msgs.append({"role": h["role"], "content": [{"type": "text", "text": h["content"]}]})
|
||||||
|
# 히스토리 마지막 블록에 cache_control → 이전 대화를 캐시
|
||||||
|
if msgs:
|
||||||
|
last = msgs[-1]["content"][-1]
|
||||||
|
last["cache_control"] = {"type": "ephemeral"}
|
||||||
|
msgs.append({"role": "user", "content": [{"type": "text", "text": user_text}]})
|
||||||
|
return msgs
|
||||||
|
|
||||||
|
|
||||||
|
async def respond_to_message(chat_id: str, user_text: str) -> Optional[str]:
|
||||||
|
"""자연어 메시지에 응답. 실패 시 사용자에게 돌려줄 문자열 반환(또는 None = 무시)."""
|
||||||
|
if not ANTHROPIC_API_KEY:
|
||||||
|
return None # 기능 비활성
|
||||||
|
|
||||||
|
if not is_whitelisted(chat_id):
|
||||||
|
return None # 모르는 사용자 무시
|
||||||
|
|
||||||
|
if not await _check_rate_limit(chat_id):
|
||||||
|
return "⏳ 잠시만요, 너무 빠릅니다. 분당 몇 번만 대화해 주세요."
|
||||||
|
|
||||||
|
history = get_conversation_history(chat_id, limit=CONVERSATION_HISTORY_LIMIT)
|
||||||
|
messages = _build_messages(history, user_text)
|
||||||
|
|
||||||
|
started = time.monotonic()
|
||||||
|
try:
|
||||||
|
resp = await _call_claude(messages)
|
||||||
|
except httpx.HTTPStatusError as e:
|
||||||
|
body = e.response.text[:200] if e.response is not None else ""
|
||||||
|
return f"⚠️ Claude 호출 실패: {e.response.status_code} {body}"
|
||||||
|
except Exception as e:
|
||||||
|
return f"⚠️ 응답 생성 중 오류: {type(e).__name__}"
|
||||||
|
latency_ms = int((time.monotonic() - started) * 1000)
|
||||||
|
|
||||||
|
try:
|
||||||
|
reply = "".join(
|
||||||
|
blk.get("text", "") for blk in resp.get("content", []) if blk.get("type") == "text"
|
||||||
|
).strip()
|
||||||
|
except Exception:
|
||||||
|
reply = ""
|
||||||
|
if not reply:
|
||||||
|
reply = "(빈 응답)"
|
||||||
|
|
||||||
|
usage = resp.get("usage", {}) or {}
|
||||||
|
t_in = int(usage.get("input_tokens", 0) or 0)
|
||||||
|
t_out = int(usage.get("output_tokens", 0) or 0)
|
||||||
|
c_read = int(usage.get("cache_read_input_tokens", 0) or 0)
|
||||||
|
c_write = int(usage.get("cache_creation_input_tokens", 0) or 0)
|
||||||
|
|
||||||
|
# 기록: user 먼저, assistant 나중 (순서 보존)
|
||||||
|
save_conversation_message(chat_id, "user", user_text)
|
||||||
|
save_conversation_message(
|
||||||
|
chat_id, "assistant", reply,
|
||||||
|
model=CONVERSATION_MODEL,
|
||||||
|
tokens_input=t_in, tokens_output=t_out,
|
||||||
|
cache_read=c_read, cache_write=c_write,
|
||||||
|
latency_ms=latency_ms,
|
||||||
|
)
|
||||||
|
return reply
|
||||||
@@ -69,6 +69,16 @@ async def _handle_message(message: dict, agent_dispatcher) -> Optional[dict]:
|
|||||||
text = message.get("text", "")
|
text = message.get("text", "")
|
||||||
parsed = parse_command(text)
|
parsed = parse_command(text)
|
||||||
if not parsed:
|
if not parsed:
|
||||||
|
# 슬래시 명령이 아니면 자연어 대화로 라우팅
|
||||||
|
chat_id = str(message.get("chat", {}).get("id", ""))
|
||||||
|
if not chat_id:
|
||||||
|
return None
|
||||||
|
from .conversational import respond_to_message
|
||||||
|
reply = await respond_to_message(chat_id, text)
|
||||||
|
if reply:
|
||||||
|
import html as _html
|
||||||
|
await send_raw(_html.escape(reply), chat_id=chat_id)
|
||||||
|
return {"handled": "chat"}
|
||||||
return None
|
return None
|
||||||
|
|
||||||
agent_id, command, args = parsed
|
agent_id, command, args = parsed
|
||||||
|
|||||||
Reference in New Issue
Block a user