3-card spread 해석 응답이 1400 토큰 한계에서 잘려 JSON "Unterminated string" 파싱 실패가 reroll 2회 모두 발생하던 버그 수정. - max_tokens 1400 → 2800 (saju-lab 2400 기준 + interactions 마진) - stop_reason == "max_tokens" 검사 → 신규 TarotTruncated 예외로 truncation 명시화 - reroll feedback에 "각 카드 1~2문장으로 축약" 안내 추가 → 모델이 다음 응답 길이 조절 - truncation 시나리오 테스트 2개 추가 (1차 잘림→성공, 2회 모두 잘림→TarotError) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
165 lines
5.6 KiB
Python
165 lines
5.6 KiB
Python
"""Tarot 파이프라인 — Claude Sonnet 호출 + 파싱 폴백 + reroll 1회."""
|
||
import json
|
||
import logging
|
||
import time
|
||
from typing import Any, Dict
|
||
|
||
import httpx
|
||
|
||
from .config import (
|
||
ANTHROPIC_API_KEY,
|
||
TAROT_MODEL,
|
||
TAROT_COST_INPUT_PER_M,
|
||
TAROT_COST_OUTPUT_PER_M,
|
||
TAROT_TIMEOUT_SEC,
|
||
)
|
||
|
||
|
||
logger = logging.getLogger("tarot-lab.pipeline")
|
||
from .models import TarotInterpretRequest
|
||
from .prompt import SYSTEM_PROMPT, build_user_message
|
||
from .schema import validate_interpretation
|
||
|
||
|
||
API_URL = "https://api.anthropic.com/v1/messages"
|
||
|
||
|
||
class TarotError(Exception):
|
||
pass
|
||
|
||
|
||
class TarotTruncated(Exception):
|
||
"""Claude 응답이 max_tokens에서 잘림 — reroll feedback에 길이 단축 요청 필요."""
|
||
|
||
def __init__(self, raw_text: str, limit: int):
|
||
self.raw_text = raw_text
|
||
self.limit = limit
|
||
super().__init__(f"Claude 응답이 max_tokens={limit}에서 잘림")
|
||
|
||
|
||
def calc_cost(tokens_in: int, tokens_out: int) -> float:
|
||
return (
|
||
tokens_in / 1_000_000 * TAROT_COST_INPUT_PER_M
|
||
+ tokens_out / 1_000_000 * TAROT_COST_OUTPUT_PER_M
|
||
)
|
||
|
||
|
||
def _strip_codeblock(text: str) -> str:
|
||
t = text.strip()
|
||
if t.startswith("```"):
|
||
t = t.strip("`")
|
||
if t.startswith("json"):
|
||
t = t[4:]
|
||
t = t.strip()
|
||
return t
|
||
|
||
|
||
def _extract_json(raw: str) -> dict:
|
||
cleaned = _strip_codeblock(raw)
|
||
try:
|
||
return json.loads(cleaned)
|
||
except json.JSONDecodeError:
|
||
start, end = cleaned.find("{"), cleaned.rfind("}")
|
||
if start >= 0 and end > start:
|
||
try:
|
||
return json.loads(cleaned[start : end + 1])
|
||
except json.JSONDecodeError:
|
||
pass
|
||
raise
|
||
|
||
|
||
async def _call_claude(user_text: str, feedback: str = "") -> tuple[dict, dict, str]:
|
||
if not ANTHROPIC_API_KEY:
|
||
raise TarotError("ANTHROPIC_API_KEY missing")
|
||
if feedback:
|
||
user_text = f"이전 응답이 다음 이유로 거절됨: {feedback}\n올바른 스키마(시스템 지침)로 다시 응답.\n\n{user_text}"
|
||
max_tokens = 2800 # 3-card spread JSON(summary + cards×3 evidence + interactions + advice) 안전 마진
|
||
payload = {
|
||
"model": TAROT_MODEL,
|
||
"max_tokens": max_tokens,
|
||
"system": [{"type": "text", "text": SYSTEM_PROMPT,
|
||
"cache_control": {"type": "ephemeral"}}],
|
||
"messages": [{"role": "user", "content": [{"type": "text", "text": user_text}]}],
|
||
}
|
||
headers = {
|
||
"x-api-key": ANTHROPIC_API_KEY,
|
||
"anthropic-version": "2023-06-01",
|
||
"anthropic-beta": "prompt-caching-2024-07-31",
|
||
"content-type": "application/json",
|
||
}
|
||
started = time.monotonic()
|
||
async with httpx.AsyncClient(timeout=TAROT_TIMEOUT_SEC) as client:
|
||
r = await client.post(API_URL, headers=headers, json=payload)
|
||
r.raise_for_status()
|
||
resp = r.json()
|
||
latency_ms = int((time.monotonic() - started) * 1000)
|
||
raw_text = "".join(
|
||
b.get("text", "") for b in resp.get("content", []) if b.get("type") == "text"
|
||
)
|
||
usage = resp.get("usage", {}) or {}
|
||
tokens_in = int(usage.get("input_tokens", 0) or 0)
|
||
tokens_out = int(usage.get("output_tokens", 0) or 0)
|
||
stop_reason = resp.get("stop_reason", "")
|
||
logger.info(
|
||
"tarot claude call: latency=%dms, in=%d, out=%d, stop=%s",
|
||
latency_ms, tokens_in, tokens_out, stop_reason,
|
||
)
|
||
if stop_reason == "max_tokens":
|
||
# truncation은 JSON 파싱 시도 전에 단락 — _extract_json은 거의 항상 실패하고
|
||
# 일반 JSONDecodeError로 reroll되면 모델이 길이를 줄일 힌트를 못 받음.
|
||
raise TarotTruncated(raw_text, max_tokens)
|
||
parsed = _extract_json(raw_text)
|
||
meta = {
|
||
"tokens_in": tokens_in,
|
||
"tokens_out": tokens_out,
|
||
"latency_ms": latency_ms,
|
||
}
|
||
return parsed, meta, raw_text
|
||
|
||
|
||
async def interpret(req: TarotInterpretRequest) -> Dict[str, Any]:
|
||
user_text = build_user_message(
|
||
question=req.question or "",
|
||
category=req.category or "",
|
||
spread_type=req.spread_type,
|
||
cards_reference=req.cards_reference,
|
||
context_meta=req.context_meta or {},
|
||
spread_count=len(req.cards),
|
||
)
|
||
|
||
total_in, total_out, total_latency = 0, 0, 0
|
||
last_error = ""
|
||
for attempt in range(2):
|
||
try:
|
||
parsed, meta, _raw = await _call_claude(user_text, feedback=last_error)
|
||
except httpx.HTTPError as e:
|
||
raise TarotError(f"Claude HTTP error: {e}") from e
|
||
except TarotTruncated as e:
|
||
last_error = (
|
||
f"이전 응답이 max_tokens={e.limit}에서 잘렸습니다. "
|
||
"각 카드의 interpretation·evidence·advice를 1~2문장으로 축약해 "
|
||
"전체 JSON 길이를 줄여 다시 응답하세요."
|
||
)
|
||
continue
|
||
except json.JSONDecodeError as e:
|
||
last_error = f"JSON 파싱 실패: {e}. 모든 문자열을 닫고 유효한 JSON으로 재작성."
|
||
continue
|
||
total_in += meta["tokens_in"]
|
||
total_out += meta["tokens_out"]
|
||
total_latency += meta["latency_ms"]
|
||
|
||
ok, err = validate_interpretation(parsed, req.spread_type)
|
||
if ok:
|
||
return {
|
||
"interpretation_json": parsed,
|
||
"model": TAROT_MODEL,
|
||
"tokens_in": total_in,
|
||
"tokens_out": total_out,
|
||
"cost_usd": calc_cost(total_in, total_out),
|
||
"latency_ms": total_latency,
|
||
"reroll_count": attempt,
|
||
}
|
||
last_error = err
|
||
|
||
raise TarotError(f"검증 실패 (reroll 2회): {last_error}")
|