"""텔레그램 자연어 대화 핸들러 — Claude + 프롬프트 캐싱. 구조: - system prompt(정적) + 최근 대화 이력 + 마지막 user turn - system과 history 끝 블록에 cache_control=ephemeral 적용 → 5분 TTL 프롬프트 캐시 - 평가를 위해 토큰·캐시·latency를 DB에 기록 """ import asyncio import time from typing import Optional import httpx from ..config import ( ANTHROPIC_API_KEY, CONVERSATION_MODEL, CONVERSATION_HISTORY_LIMIT, CONVERSATION_RATE_PER_MIN, TELEGRAM_CHAT_ID, TELEGRAM_WIFE_CHAT_ID, ) from ..db import ( save_conversation_message, get_conversation_history, count_recent_user_messages, ) API_URL = "https://api.anthropic.com/v1/messages" SYSTEM_PROMPT = """당신은 'gahusb' 개인 웹 플랫폼의 AI 비서입니다. 텔레그램을 통해 CEO(주인)와 그의 가족과 대화합니다. 역할과 성격: - 따뜻하지만 간결합니다. 텔레그램에서 읽기 쉽게 2~5문장 위주로 답합니다. - 농담과 위트를 섞되 공손하게. 이모지는 상황에 맞게 1~2개만. - 모르는 것은 솔직히 모른다고 하고, 추측은 명시합니다. 플랫폼 컨텍스트(대답에 자연스럽게 참고): - 주식 에이전트: 뉴스 요약·시장 브리핑·포트폴리오 관리 - 음악 에이전트: AI 음악 생성(Suno/MusicGen) - 블로그 에이전트: 키워드 리서치·포스트 생성·품질 리뷰 - 청약 에이전트: 부동산 청약 공고 수집·매칭 - 명령은 `/help`, `/agents`, `/status`, `/stock.brief` 같은 슬래시 형식이 있습니다. 사용자가 요청을 설명만 하면 해당 명령을 안내해 주세요. 응답 규칙: - 장문 설명 금지. 스크롤을 넘기지 않을 분량. - 에이전트 실행을 부탁받으면 지금 이 채널은 '대화'만 가능함을 알리고, 정확한 슬래시 명령을 한 줄로 제시하세요. - HTML·마크다운 태그 없이 평문으로 답합니다.""" _rate_lock = asyncio.Lock() def is_whitelisted(chat_id: str) -> bool: allowed = {str(x) for x in (TELEGRAM_CHAT_ID, TELEGRAM_WIFE_CHAT_ID) if x} return str(chat_id) in allowed async def _check_rate_limit(chat_id: str) -> bool: async with _rate_lock: count = count_recent_user_messages(chat_id, seconds=60) return count < CONVERSATION_RATE_PER_MIN async def _call_claude(messages: list) -> dict: """Anthropic Messages API 호출 (prompt caching beta).""" headers = { "x-api-key": ANTHROPIC_API_KEY, "anthropic-version": "2023-06-01", "anthropic-beta": "prompt-caching-2024-07-31", "content-type": "application/json", } # system: cache_control 적용하여 정적 프롬프트 캐싱 system_blocks = [ { "type": "text", "text": SYSTEM_PROMPT, "cache_control": {"type": "ephemeral"}, } ] payload = { "model": CONVERSATION_MODEL, "max_tokens": 1024, "system": system_blocks, "messages": messages, } async with httpx.AsyncClient(timeout=60) as client: r = await client.post(API_URL, headers=headers, json=payload) r.raise_for_status() return r.json() def _build_messages(history: list, user_text: str) -> list: """history: [{role, content(str)}, ...]. 가장 오래된 턴을 제외한 나머지 히스토리 끝 블록에 cache_control을 추가하여 누적 이력을 캐시한다.""" msgs: list = [] for h in history: msgs.append({"role": h["role"], "content": [{"type": "text", "text": h["content"]}]}) # 히스토리 마지막 블록에 cache_control → 이전 대화를 캐시 if msgs: last = msgs[-1]["content"][-1] last["cache_control"] = {"type": "ephemeral"} msgs.append({"role": "user", "content": [{"type": "text", "text": user_text}]}) return msgs async def respond_to_message(chat_id: str, user_text: str) -> Optional[str]: """자연어 메시지에 응답. 실패 시 사용자에게 돌려줄 문자열 반환(또는 None = 무시).""" if not ANTHROPIC_API_KEY: return None # 기능 비활성 if not is_whitelisted(chat_id): return None # 모르는 사용자 무시 if not await _check_rate_limit(chat_id): return "⏳ 잠시만요, 너무 빠릅니다. 분당 몇 번만 대화해 주세요." history = get_conversation_history(chat_id, limit=CONVERSATION_HISTORY_LIMIT) messages = _build_messages(history, user_text) started = time.monotonic() try: resp = await _call_claude(messages) except httpx.HTTPStatusError as e: body = e.response.text[:200] if e.response is not None else "" return f"⚠️ Claude 호출 실패: {e.response.status_code} {body}" except Exception as e: return f"⚠️ 응답 생성 중 오류: {type(e).__name__}" latency_ms = int((time.monotonic() - started) * 1000) try: reply = "".join( blk.get("text", "") for blk in resp.get("content", []) if blk.get("type") == "text" ).strip() except Exception: reply = "" if not reply: reply = "(빈 응답)" usage = resp.get("usage", {}) or {} t_in = int(usage.get("input_tokens", 0) or 0) t_out = int(usage.get("output_tokens", 0) or 0) c_read = int(usage.get("cache_read_input_tokens", 0) or 0) c_write = int(usage.get("cache_creation_input_tokens", 0) or 0) # 기록: user 먼저, assistant 나중 (순서 보존) save_conversation_message(chat_id, "user", user_text) save_conversation_message( chat_id, "assistant", reply, model=CONVERSATION_MODEL, tokens_input=t_in, tokens_output=t_out, cache_read=c_read, cache_write=c_write, latency_ms=latency_ms, ) return reply