feat(tarot-lab): pipeline.py 이관 + 6 테스트 통과
This commit is contained in:
139
tarot-lab/app/pipeline.py
Normal file
139
tarot-lab/app/pipeline.py
Normal file
@@ -0,0 +1,139 @@
|
||||
"""Tarot 파이프라인 — Claude Sonnet 호출 + 파싱 폴백 + reroll 1회."""
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from typing import Any, Dict
|
||||
|
||||
import httpx
|
||||
|
||||
from .config import (
|
||||
ANTHROPIC_API_KEY,
|
||||
TAROT_MODEL,
|
||||
TAROT_COST_INPUT_PER_M,
|
||||
TAROT_COST_OUTPUT_PER_M,
|
||||
TAROT_TIMEOUT_SEC,
|
||||
)
|
||||
|
||||
|
||||
logger = logging.getLogger("tarot-lab.pipeline")
|
||||
from .models import TarotInterpretRequest
|
||||
from .prompt import SYSTEM_PROMPT, build_user_message
|
||||
from .schema import validate_interpretation
|
||||
|
||||
|
||||
API_URL = "https://api.anthropic.com/v1/messages"
|
||||
|
||||
|
||||
class TarotError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def calc_cost(tokens_in: int, tokens_out: int) -> float:
|
||||
return (
|
||||
tokens_in / 1_000_000 * TAROT_COST_INPUT_PER_M
|
||||
+ tokens_out / 1_000_000 * TAROT_COST_OUTPUT_PER_M
|
||||
)
|
||||
|
||||
|
||||
def _strip_codeblock(text: str) -> str:
|
||||
t = text.strip()
|
||||
if t.startswith("```"):
|
||||
t = t.strip("`")
|
||||
if t.startswith("json"):
|
||||
t = t[4:]
|
||||
t = t.strip()
|
||||
return t
|
||||
|
||||
|
||||
def _extract_json(raw: str) -> dict:
|
||||
cleaned = _strip_codeblock(raw)
|
||||
try:
|
||||
return json.loads(cleaned)
|
||||
except json.JSONDecodeError:
|
||||
start, end = cleaned.find("{"), cleaned.rfind("}")
|
||||
if start >= 0 and end > start:
|
||||
try:
|
||||
return json.loads(cleaned[start : end + 1])
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
raise
|
||||
|
||||
|
||||
async def _call_claude(user_text: str, feedback: str = "") -> tuple[dict, dict, str]:
|
||||
if not ANTHROPIC_API_KEY:
|
||||
raise TarotError("ANTHROPIC_API_KEY missing")
|
||||
if feedback:
|
||||
user_text = f"이전 응답이 다음 이유로 거절됨: {feedback}\n올바른 스키마(시스템 지침)로 다시 응답.\n\n{user_text}"
|
||||
payload = {
|
||||
"model": TAROT_MODEL,
|
||||
"max_tokens": 1400, # 응답 시간 단축 — 3-card spread evidence·interactions 포함 충분
|
||||
"system": [{"type": "text", "text": SYSTEM_PROMPT,
|
||||
"cache_control": {"type": "ephemeral"}}],
|
||||
"messages": [{"role": "user", "content": [{"type": "text", "text": user_text}]}],
|
||||
}
|
||||
headers = {
|
||||
"x-api-key": ANTHROPIC_API_KEY,
|
||||
"anthropic-version": "2023-06-01",
|
||||
"anthropic-beta": "prompt-caching-2024-07-31",
|
||||
"content-type": "application/json",
|
||||
}
|
||||
started = time.monotonic()
|
||||
async with httpx.AsyncClient(timeout=TAROT_TIMEOUT_SEC) as client:
|
||||
r = await client.post(API_URL, headers=headers, json=payload)
|
||||
r.raise_for_status()
|
||||
resp = r.json()
|
||||
latency_ms = int((time.monotonic() - started) * 1000)
|
||||
raw_text = "".join(
|
||||
b.get("text", "") for b in resp.get("content", []) if b.get("type") == "text"
|
||||
)
|
||||
usage = resp.get("usage", {}) or {}
|
||||
tokens_in = int(usage.get("input_tokens", 0) or 0)
|
||||
tokens_out = int(usage.get("output_tokens", 0) or 0)
|
||||
logger.info("tarot claude call: latency=%dms, in=%d, out=%d", latency_ms, tokens_in, tokens_out)
|
||||
parsed = _extract_json(raw_text)
|
||||
meta = {
|
||||
"tokens_in": tokens_in,
|
||||
"tokens_out": tokens_out,
|
||||
"latency_ms": latency_ms,
|
||||
}
|
||||
return parsed, meta, raw_text
|
||||
|
||||
|
||||
async def interpret(req: TarotInterpretRequest) -> Dict[str, Any]:
|
||||
user_text = build_user_message(
|
||||
question=req.question or "",
|
||||
category=req.category or "",
|
||||
spread_type=req.spread_type,
|
||||
cards_reference=req.cards_reference,
|
||||
context_meta=req.context_meta or {},
|
||||
spread_count=len(req.cards),
|
||||
)
|
||||
|
||||
total_in, total_out, total_latency = 0, 0, 0
|
||||
last_error = ""
|
||||
for attempt in range(2):
|
||||
try:
|
||||
parsed, meta, _raw = await _call_claude(user_text, feedback=last_error)
|
||||
except httpx.HTTPError as e:
|
||||
raise TarotError(f"Claude HTTP error: {e}") from e
|
||||
except json.JSONDecodeError as e:
|
||||
last_error = f"JSON 파싱 실패: {e}"
|
||||
continue
|
||||
total_in += meta["tokens_in"]
|
||||
total_out += meta["tokens_out"]
|
||||
total_latency += meta["latency_ms"]
|
||||
|
||||
ok, err = validate_interpretation(parsed, req.spread_type)
|
||||
if ok:
|
||||
return {
|
||||
"interpretation_json": parsed,
|
||||
"model": TAROT_MODEL,
|
||||
"tokens_in": total_in,
|
||||
"tokens_out": total_out,
|
||||
"cost_usd": calc_cost(total_in, total_out),
|
||||
"latency_ms": total_latency,
|
||||
"reroll_count": attempt,
|
||||
}
|
||||
last_error = err
|
||||
|
||||
raise TarotError(f"검증 실패 (reroll 2회): {last_error}")
|
||||
114
tarot-lab/tests/test_pipeline.py
Normal file
114
tarot-lab/tests/test_pipeline.py
Normal file
@@ -0,0 +1,114 @@
|
||||
import json
|
||||
import pytest
|
||||
import respx
|
||||
import httpx
|
||||
|
||||
from app import pipeline
|
||||
from app.models import TarotInterpretRequest, TarotCardDraw
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _patch_key(monkeypatch):
|
||||
monkeypatch.setattr(pipeline, "ANTHROPIC_API_KEY", "test-key")
|
||||
|
||||
|
||||
def _req():
|
||||
return TarotInterpretRequest(
|
||||
spread_type="three_card",
|
||||
category="연애",
|
||||
question="Q",
|
||||
cards=[
|
||||
TarotCardDraw(position="과거", card_id="the-fool", reversed=False),
|
||||
TarotCardDraw(position="현재", card_id="the-magician", reversed=False),
|
||||
TarotCardDraw(position="미래", card_id="the-empress", reversed=True),
|
||||
],
|
||||
cards_reference="...",
|
||||
)
|
||||
|
||||
|
||||
def _valid_response_json():
|
||||
return {
|
||||
"summary": "흐름이 있음",
|
||||
"cards": [
|
||||
{"position": "과거", "card": "the-fool", "reversed": False,
|
||||
"interpretation": "...", "advice": "...",
|
||||
"evidence": {"card_meaning_used": "...", "position_logic": "...", "category_lens": "..."}},
|
||||
{"position": "현재", "card": "the-magician", "reversed": False,
|
||||
"interpretation": "...", "advice": "...",
|
||||
"evidence": {"card_meaning_used": "...", "position_logic": "...", "category_lens": "..."}},
|
||||
{"position": "미래", "card": "the-empress", "reversed": True,
|
||||
"interpretation": "...", "advice": "...",
|
||||
"evidence": {"card_meaning_used": "...", "position_logic": "...", "category_lens": "..."}},
|
||||
],
|
||||
"interactions": [{"type": "synergy", "between": ["the-fool", "the-magician"], "explanation": "..."}],
|
||||
"advice": "...",
|
||||
"warning": None,
|
||||
"confidence": "medium",
|
||||
}
|
||||
|
||||
|
||||
def _claude_envelope(text: str, in_tok=100, out_tok=200):
|
||||
return {
|
||||
"content": [{"type": "text", "text": text}],
|
||||
"usage": {"input_tokens": in_tok, "output_tokens": out_tok},
|
||||
}
|
||||
|
||||
|
||||
@respx.mock
|
||||
async def test_interpret_success():
|
||||
respx.post("https://api.anthropic.com/v1/messages").mock(
|
||||
return_value=httpx.Response(200, json=_claude_envelope(json.dumps(_valid_response_json())))
|
||||
)
|
||||
result = await pipeline.interpret(_req())
|
||||
assert result["reroll_count"] == 0
|
||||
assert result["model"] == pipeline.TAROT_MODEL
|
||||
assert result["tokens_in"] == 100
|
||||
assert result["cost_usd"] > 0
|
||||
|
||||
|
||||
@respx.mock
|
||||
async def test_interpret_codeblock_stripped():
|
||||
text = "```json\n" + json.dumps(_valid_response_json()) + "\n```"
|
||||
respx.post("https://api.anthropic.com/v1/messages").mock(
|
||||
return_value=httpx.Response(200, json=_claude_envelope(text))
|
||||
)
|
||||
result = await pipeline.interpret(_req())
|
||||
assert "interpretation_json" in result
|
||||
|
||||
|
||||
@respx.mock
|
||||
async def test_interpret_reroll_then_success():
|
||||
valid = json.dumps(_valid_response_json())
|
||||
invalid = json.dumps({"summary": "...", "cards": [], "interactions": [], "advice": "", "confidence": "medium"})
|
||||
respx.post("https://api.anthropic.com/v1/messages").mock(
|
||||
side_effect=[
|
||||
httpx.Response(200, json=_claude_envelope(invalid)),
|
||||
httpx.Response(200, json=_claude_envelope(valid)),
|
||||
]
|
||||
)
|
||||
result = await pipeline.interpret(_req())
|
||||
assert result["reroll_count"] == 1
|
||||
|
||||
|
||||
@respx.mock
|
||||
async def test_interpret_reroll_fail_raises():
|
||||
invalid = json.dumps({"summary": "...", "cards": [], "interactions": [], "advice": "", "confidence": "medium"})
|
||||
respx.post("https://api.anthropic.com/v1/messages").mock(
|
||||
return_value=httpx.Response(200, json=_claude_envelope(invalid))
|
||||
)
|
||||
with pytest.raises(pipeline.TarotError):
|
||||
await pipeline.interpret(_req())
|
||||
|
||||
|
||||
@respx.mock
|
||||
async def test_interpret_http_error():
|
||||
respx.post("https://api.anthropic.com/v1/messages").mock(
|
||||
return_value=httpx.Response(500, text="boom")
|
||||
)
|
||||
with pytest.raises(pipeline.TarotError):
|
||||
await pipeline.interpret(_req())
|
||||
|
||||
|
||||
def test_calc_cost():
|
||||
cost = pipeline.calc_cost(1_000_000, 1_000_000)
|
||||
assert cost == pipeline.TAROT_COST_INPUT_PER_M + pipeline.TAROT_COST_OUTPUT_PER_M
|
||||
Reference in New Issue
Block a user