- YoutubePublisherAgent: 음악 파이프라인의 *_pending 상태를 폴링하여 텔레그램 단일 채널로 단계별 검토 요청 발송, reply 수신 시 의도 분류 후 music-lab에 feedback POST - service_proxy: pipeline list/get/feedback/telegram-msg/lookup-by-msg 헬퍼 5종 추가 (MUSIC_LAB_URL 사용) - scheduler: 30초 interval로 poll_state_changes 실행 - telegram webhook: reply_to_message 가 파이프라인 메시지면 youtube_publisher 로 라우팅 (슬래시 명령보다 우선) - 테스트 4종 추가 (4 PASS)
183 lines
6.6 KiB
Python
183 lines
6.6 KiB
Python
"""텔레그램 자연어 대화 핸들러 — Claude + 프롬프트 캐싱.
|
|
|
|
구조:
|
|
- system prompt(정적) + 최근 대화 이력 + 마지막 user turn
|
|
- system과 history 끝 블록에 cache_control=ephemeral 적용 → 5분 TTL 프롬프트 캐시
|
|
- 평가를 위해 토큰·캐시·latency를 DB에 기록
|
|
"""
|
|
import asyncio
|
|
import time
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
|
|
from ..config import (
|
|
ANTHROPIC_API_KEY,
|
|
CONVERSATION_MODEL,
|
|
CONVERSATION_HISTORY_LIMIT,
|
|
CONVERSATION_RATE_PER_MIN,
|
|
TELEGRAM_CHAT_ID,
|
|
TELEGRAM_WIFE_CHAT_ID,
|
|
)
|
|
from ..db import (
|
|
save_conversation_message,
|
|
get_conversation_history,
|
|
count_recent_user_messages,
|
|
)
|
|
|
|
API_URL = "https://api.anthropic.com/v1/messages"
|
|
|
|
SYSTEM_PROMPT = """당신은 'gahusb' 개인 웹 플랫폼의 AI 비서입니다. 텔레그램을 통해 CEO(주인)와 그의 가족과 대화합니다.
|
|
|
|
역할과 성격:
|
|
- 따뜻하지만 간결합니다. 텔레그램에서 읽기 쉽게 2~5문장 위주로 답합니다.
|
|
- 농담과 위트를 섞되 공손하게. 이모지는 상황에 맞게 1~2개만.
|
|
- 모르는 것은 솔직히 모른다고 하고, 추측은 명시합니다.
|
|
|
|
플랫폼 컨텍스트(대답에 자연스럽게 참고):
|
|
- 주식 에이전트: 뉴스 요약·시장 브리핑·포트폴리오 관리
|
|
- 음악 에이전트: AI 음악 생성(Suno/MusicGen)
|
|
- 블로그 에이전트: 키워드 리서치·포스트 생성·품질 리뷰
|
|
- 청약 에이전트: 부동산 청약 공고 수집·매칭
|
|
- 명령은 `/help`, `/agents`, `/status`, `/stock.brief` 같은 슬래시 형식이 있습니다. 사용자가 요청을 설명만 하면 해당 명령을 안내해 주세요.
|
|
|
|
응답 규칙:
|
|
- 장문 설명 금지. 스크롤을 넘기지 않을 분량.
|
|
- 에이전트 실행을 부탁받으면 지금 이 채널은 '대화'만 가능함을 알리고, 정확한 슬래시 명령을 한 줄로 제시하세요.
|
|
- HTML·마크다운 태그 없이 평문으로 답합니다."""
|
|
|
|
|
|
_rate_lock = asyncio.Lock()
|
|
|
|
|
|
def is_whitelisted(chat_id: str) -> bool:
|
|
allowed = {str(x) for x in (TELEGRAM_CHAT_ID, TELEGRAM_WIFE_CHAT_ID) if x}
|
|
return str(chat_id) in allowed
|
|
|
|
|
|
async def _check_rate_limit(chat_id: str) -> bool:
|
|
async with _rate_lock:
|
|
count = count_recent_user_messages(chat_id, seconds=60)
|
|
return count < CONVERSATION_RATE_PER_MIN
|
|
|
|
|
|
async def _call_claude(messages: list) -> dict:
|
|
"""Anthropic Messages API 호출 (prompt caching beta)."""
|
|
headers = {
|
|
"x-api-key": ANTHROPIC_API_KEY,
|
|
"anthropic-version": "2023-06-01",
|
|
"anthropic-beta": "prompt-caching-2024-07-31",
|
|
"content-type": "application/json",
|
|
}
|
|
# system: cache_control 적용하여 정적 프롬프트 캐싱
|
|
system_blocks = [
|
|
{
|
|
"type": "text",
|
|
"text": SYSTEM_PROMPT,
|
|
"cache_control": {"type": "ephemeral"},
|
|
}
|
|
]
|
|
payload = {
|
|
"model": CONVERSATION_MODEL,
|
|
"max_tokens": 1024,
|
|
"system": system_blocks,
|
|
"messages": messages,
|
|
}
|
|
async with httpx.AsyncClient(timeout=60) as client:
|
|
r = await client.post(API_URL, headers=headers, json=payload)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
|
|
def _build_messages(history: list, user_text: str) -> list:
|
|
"""history: [{role, content(str)}, ...]. 가장 오래된 턴을 제외한 나머지 히스토리 끝 블록에
|
|
cache_control을 추가하여 누적 이력을 캐시한다."""
|
|
msgs: list = []
|
|
for h in history:
|
|
msgs.append({"role": h["role"], "content": [{"type": "text", "text": h["content"]}]})
|
|
# 히스토리 마지막 블록에 cache_control → 이전 대화를 캐시
|
|
if msgs:
|
|
last = msgs[-1]["content"][-1]
|
|
last["cache_control"] = {"type": "ephemeral"}
|
|
msgs.append({"role": "user", "content": [{"type": "text", "text": user_text}]})
|
|
return msgs
|
|
|
|
|
|
async def maybe_route_to_pipeline(message: dict) -> bool:
|
|
"""파이프라인 텔레그램 메시지에 대한 reply 인 경우 youtube_publisher 로 라우팅.
|
|
|
|
Returns True if message was routed (caller should stop further processing).
|
|
"""
|
|
reply_to = message.get("reply_to_message") or {}
|
|
msg_id = reply_to.get("message_id")
|
|
if not msg_id:
|
|
return False
|
|
from .. import service_proxy
|
|
try:
|
|
link = await service_proxy.lookup_pipeline_by_msg(msg_id)
|
|
except Exception:
|
|
return False
|
|
if not link:
|
|
return False
|
|
from ..agents import AGENT_REGISTRY
|
|
agent = AGENT_REGISTRY.get("youtube_publisher")
|
|
if not agent:
|
|
return False
|
|
pipeline_id = link.get("pipeline_id")
|
|
step = link.get("step")
|
|
if pipeline_id is None or not step:
|
|
return False
|
|
await agent.on_telegram_reply(pipeline_id, step, message.get("text", ""))
|
|
return True
|
|
|
|
|
|
async def respond_to_message(chat_id: str, user_text: str) -> Optional[str]:
|
|
"""자연어 메시지에 응답. 실패 시 사용자에게 돌려줄 문자열 반환(또는 None = 무시)."""
|
|
if not ANTHROPIC_API_KEY:
|
|
return None # 기능 비활성
|
|
|
|
if not is_whitelisted(chat_id):
|
|
return None # 모르는 사용자 무시
|
|
|
|
if not await _check_rate_limit(chat_id):
|
|
return "⏳ 잠시만요, 너무 빠릅니다. 분당 몇 번만 대화해 주세요."
|
|
|
|
history = get_conversation_history(chat_id, limit=CONVERSATION_HISTORY_LIMIT)
|
|
messages = _build_messages(history, user_text)
|
|
|
|
started = time.monotonic()
|
|
try:
|
|
resp = await _call_claude(messages)
|
|
except httpx.HTTPStatusError as e:
|
|
body = e.response.text[:200] if e.response is not None else ""
|
|
return f"⚠️ Claude 호출 실패: {e.response.status_code} {body}"
|
|
except Exception as e:
|
|
return f"⚠️ 응답 생성 중 오류: {type(e).__name__}"
|
|
latency_ms = int((time.monotonic() - started) * 1000)
|
|
|
|
try:
|
|
reply = "".join(
|
|
blk.get("text", "") for blk in resp.get("content", []) if blk.get("type") == "text"
|
|
).strip()
|
|
except Exception:
|
|
reply = ""
|
|
if not reply:
|
|
reply = "(빈 응답)"
|
|
|
|
usage = resp.get("usage", {}) or {}
|
|
t_in = int(usage.get("input_tokens", 0) or 0)
|
|
t_out = int(usage.get("output_tokens", 0) or 0)
|
|
c_read = int(usage.get("cache_read_input_tokens", 0) or 0)
|
|
c_write = int(usage.get("cache_creation_input_tokens", 0) or 0)
|
|
|
|
# 기록: user 먼저, assistant 나중 (순서 보존)
|
|
save_conversation_message(chat_id, "user", user_text)
|
|
save_conversation_message(
|
|
chat_id, "assistant", reply,
|
|
model=CONVERSATION_MODEL,
|
|
tokens_input=t_in, tokens_output=t_out,
|
|
cache_read=c_read, cache_write=c_write,
|
|
latency_ms=latency_ms,
|
|
)
|
|
return reply
|