Files
web-page-backend/agent-office/app/tarot/pipeline.py
2026-05-24 00:18:42 +09:00

133 lines
4.2 KiB
Python

"""Tarot 파이프라인 — Claude Sonnet 호출 + 파싱 폴백 + reroll 1회."""
import json
import time
from typing import Any, Dict
import httpx
from ..config import (
ANTHROPIC_API_KEY,
TAROT_MODEL,
TAROT_COST_INPUT_PER_M,
TAROT_COST_OUTPUT_PER_M,
TAROT_TIMEOUT_SEC,
)
from ..models import TarotInterpretRequest
from .prompt import SYSTEM_PROMPT, build_user_message
from .schema import validate_interpretation
API_URL = "https://api.anthropic.com/v1/messages"
class TarotError(Exception):
pass
def calc_cost(tokens_in: int, tokens_out: int) -> float:
return (
tokens_in / 1_000_000 * TAROT_COST_INPUT_PER_M
+ tokens_out / 1_000_000 * TAROT_COST_OUTPUT_PER_M
)
def _strip_codeblock(text: str) -> str:
t = text.strip()
if t.startswith("```"):
t = t.strip("`")
if t.startswith("json"):
t = t[4:]
t = t.strip()
return t
def _extract_json(raw: str) -> dict:
cleaned = _strip_codeblock(raw)
try:
return json.loads(cleaned)
except json.JSONDecodeError:
start, end = cleaned.find("{"), cleaned.rfind("}")
if start >= 0 and end > start:
try:
return json.loads(cleaned[start : end + 1])
except json.JSONDecodeError:
pass
raise
async def _call_claude(user_text: str, feedback: str = "") -> tuple[dict, dict, str]:
if not ANTHROPIC_API_KEY:
raise TarotError("ANTHROPIC_API_KEY missing")
if feedback:
user_text = f"이전 응답이 다음 이유로 거절됨: {feedback}\n올바른 스키마(시스템 지침)로 다시 응답.\n\n{user_text}"
payload = {
"model": TAROT_MODEL,
"max_tokens": 2048,
"system": [{"type": "text", "text": SYSTEM_PROMPT,
"cache_control": {"type": "ephemeral"}}],
"messages": [{"role": "user", "content": [{"type": "text", "text": user_text}]}],
}
headers = {
"x-api-key": ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
"anthropic-beta": "prompt-caching-2024-07-31",
"content-type": "application/json",
}
started = time.monotonic()
async with httpx.AsyncClient(timeout=TAROT_TIMEOUT_SEC) as client:
r = await client.post(API_URL, headers=headers, json=payload)
r.raise_for_status()
resp = r.json()
latency_ms = int((time.monotonic() - started) * 1000)
raw_text = "".join(
b.get("text", "") for b in resp.get("content", []) if b.get("type") == "text"
)
parsed = _extract_json(raw_text)
usage = resp.get("usage", {}) or {}
meta = {
"tokens_in": int(usage.get("input_tokens", 0) or 0),
"tokens_out": int(usage.get("output_tokens", 0) or 0),
"latency_ms": latency_ms,
}
return parsed, meta, raw_text
async def interpret(req: TarotInterpretRequest) -> Dict[str, Any]:
user_text = build_user_message(
question=req.question or "",
category=req.category or "",
spread_type=req.spread_type,
cards_reference=req.cards_reference,
context_meta=req.context_meta or {},
spread_count=len(req.cards),
)
total_in, total_out, total_latency = 0, 0, 0
last_error = ""
for attempt in range(2):
try:
parsed, meta, _raw = await _call_claude(user_text, feedback=last_error)
except httpx.HTTPError as e:
raise TarotError(f"Claude HTTP error: {e}") from e
except json.JSONDecodeError as e:
last_error = f"JSON 파싱 실패: {e}"
continue
total_in += meta["tokens_in"]
total_out += meta["tokens_out"]
total_latency += meta["latency_ms"]
ok, err = validate_interpretation(parsed, req.spread_type)
if ok:
return {
"interpretation_json": parsed,
"model": TAROT_MODEL,
"tokens_in": total_in,
"tokens_out": total_out,
"cost_usd": calc_cost(total_in, total_out),
"latency_ms": total_latency,
"reroll_count": attempt,
}
last_error = err
raise TarotError(f"검증 실패 (reroll 2회): {last_error}")