From abdfcbb144bcb0557b95f082dafd10be5d608b60 Mon Sep 17 00:00:00 2001 From: gahusb Date: Mon, 25 May 2026 18:30:00 +0900 Subject: [PATCH] =?UTF-8?q?feat(tarot-lab):=20pipeline.py=20=EC=9D=B4?= =?UTF-8?q?=EA=B4=80=20+=206=20=ED=85=8C=EC=8A=A4=ED=8A=B8=20=ED=86=B5?= =?UTF-8?q?=EA=B3=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tarot-lab/app/pipeline.py | 139 +++++++++++++++++++++++++++++++ tarot-lab/tests/test_pipeline.py | 114 +++++++++++++++++++++++++ 2 files changed, 253 insertions(+) create mode 100644 tarot-lab/app/pipeline.py create mode 100644 tarot-lab/tests/test_pipeline.py diff --git a/tarot-lab/app/pipeline.py b/tarot-lab/app/pipeline.py new file mode 100644 index 0000000..d5dc21c --- /dev/null +++ b/tarot-lab/app/pipeline.py @@ -0,0 +1,139 @@ +"""Tarot 파이프라인 — Claude Sonnet 호출 + 파싱 폴백 + reroll 1회.""" +import json +import logging +import time +from typing import Any, Dict + +import httpx + +from .config import ( + ANTHROPIC_API_KEY, + TAROT_MODEL, + TAROT_COST_INPUT_PER_M, + TAROT_COST_OUTPUT_PER_M, + TAROT_TIMEOUT_SEC, +) + + +logger = logging.getLogger("tarot-lab.pipeline") +from .models import TarotInterpretRequest +from .prompt import SYSTEM_PROMPT, build_user_message +from .schema import validate_interpretation + + +API_URL = "https://api.anthropic.com/v1/messages" + + +class TarotError(Exception): + pass + + +def calc_cost(tokens_in: int, tokens_out: int) -> float: + return ( + tokens_in / 1_000_000 * TAROT_COST_INPUT_PER_M + + tokens_out / 1_000_000 * TAROT_COST_OUTPUT_PER_M + ) + + +def _strip_codeblock(text: str) -> str: + t = text.strip() + if t.startswith("```"): + t = t.strip("`") + if t.startswith("json"): + t = t[4:] + t = t.strip() + return t + + +def _extract_json(raw: str) -> dict: + cleaned = _strip_codeblock(raw) + try: + return json.loads(cleaned) + except json.JSONDecodeError: + start, end = cleaned.find("{"), cleaned.rfind("}") + if start >= 0 and end > start: + try: + return json.loads(cleaned[start : end + 1]) + except json.JSONDecodeError: + pass + raise + + +async def _call_claude(user_text: str, feedback: str = "") -> tuple[dict, dict, str]: + if not ANTHROPIC_API_KEY: + raise TarotError("ANTHROPIC_API_KEY missing") + if feedback: + user_text = f"이전 응답이 다음 이유로 거절됨: {feedback}\n올바른 스키마(시스템 지침)로 다시 응답.\n\n{user_text}" + payload = { + "model": TAROT_MODEL, + "max_tokens": 1400, # 응답 시간 단축 — 3-card spread evidence·interactions 포함 충분 + "system": [{"type": "text", "text": SYSTEM_PROMPT, + "cache_control": {"type": "ephemeral"}}], + "messages": [{"role": "user", "content": [{"type": "text", "text": user_text}]}], + } + headers = { + "x-api-key": ANTHROPIC_API_KEY, + "anthropic-version": "2023-06-01", + "anthropic-beta": "prompt-caching-2024-07-31", + "content-type": "application/json", + } + started = time.monotonic() + async with httpx.AsyncClient(timeout=TAROT_TIMEOUT_SEC) as client: + r = await client.post(API_URL, headers=headers, json=payload) + r.raise_for_status() + resp = r.json() + latency_ms = int((time.monotonic() - started) * 1000) + raw_text = "".join( + b.get("text", "") for b in resp.get("content", []) if b.get("type") == "text" + ) + usage = resp.get("usage", {}) or {} + tokens_in = int(usage.get("input_tokens", 0) or 0) + tokens_out = int(usage.get("output_tokens", 0) or 0) + logger.info("tarot claude call: latency=%dms, in=%d, out=%d", latency_ms, tokens_in, tokens_out) + parsed = _extract_json(raw_text) + meta = { + "tokens_in": tokens_in, + "tokens_out": tokens_out, + "latency_ms": latency_ms, + } + return parsed, meta, raw_text + + +async def interpret(req: TarotInterpretRequest) -> Dict[str, Any]: + user_text = build_user_message( + question=req.question or "", + category=req.category or "", + spread_type=req.spread_type, + cards_reference=req.cards_reference, + context_meta=req.context_meta or {}, + spread_count=len(req.cards), + ) + + total_in, total_out, total_latency = 0, 0, 0 + last_error = "" + for attempt in range(2): + try: + parsed, meta, _raw = await _call_claude(user_text, feedback=last_error) + except httpx.HTTPError as e: + raise TarotError(f"Claude HTTP error: {e}") from e + except json.JSONDecodeError as e: + last_error = f"JSON 파싱 실패: {e}" + continue + total_in += meta["tokens_in"] + total_out += meta["tokens_out"] + total_latency += meta["latency_ms"] + + ok, err = validate_interpretation(parsed, req.spread_type) + if ok: + return { + "interpretation_json": parsed, + "model": TAROT_MODEL, + "tokens_in": total_in, + "tokens_out": total_out, + "cost_usd": calc_cost(total_in, total_out), + "latency_ms": total_latency, + "reroll_count": attempt, + } + last_error = err + + raise TarotError(f"검증 실패 (reroll 2회): {last_error}") diff --git a/tarot-lab/tests/test_pipeline.py b/tarot-lab/tests/test_pipeline.py new file mode 100644 index 0000000..49f1983 --- /dev/null +++ b/tarot-lab/tests/test_pipeline.py @@ -0,0 +1,114 @@ +import json +import pytest +import respx +import httpx + +from app import pipeline +from app.models import TarotInterpretRequest, TarotCardDraw + + +@pytest.fixture(autouse=True) +def _patch_key(monkeypatch): + monkeypatch.setattr(pipeline, "ANTHROPIC_API_KEY", "test-key") + + +def _req(): + return TarotInterpretRequest( + spread_type="three_card", + category="연애", + question="Q", + cards=[ + TarotCardDraw(position="과거", card_id="the-fool", reversed=False), + TarotCardDraw(position="현재", card_id="the-magician", reversed=False), + TarotCardDraw(position="미래", card_id="the-empress", reversed=True), + ], + cards_reference="...", + ) + + +def _valid_response_json(): + return { + "summary": "흐름이 있음", + "cards": [ + {"position": "과거", "card": "the-fool", "reversed": False, + "interpretation": "...", "advice": "...", + "evidence": {"card_meaning_used": "...", "position_logic": "...", "category_lens": "..."}}, + {"position": "현재", "card": "the-magician", "reversed": False, + "interpretation": "...", "advice": "...", + "evidence": {"card_meaning_used": "...", "position_logic": "...", "category_lens": "..."}}, + {"position": "미래", "card": "the-empress", "reversed": True, + "interpretation": "...", "advice": "...", + "evidence": {"card_meaning_used": "...", "position_logic": "...", "category_lens": "..."}}, + ], + "interactions": [{"type": "synergy", "between": ["the-fool", "the-magician"], "explanation": "..."}], + "advice": "...", + "warning": None, + "confidence": "medium", + } + + +def _claude_envelope(text: str, in_tok=100, out_tok=200): + return { + "content": [{"type": "text", "text": text}], + "usage": {"input_tokens": in_tok, "output_tokens": out_tok}, + } + + +@respx.mock +async def test_interpret_success(): + respx.post("https://api.anthropic.com/v1/messages").mock( + return_value=httpx.Response(200, json=_claude_envelope(json.dumps(_valid_response_json()))) + ) + result = await pipeline.interpret(_req()) + assert result["reroll_count"] == 0 + assert result["model"] == pipeline.TAROT_MODEL + assert result["tokens_in"] == 100 + assert result["cost_usd"] > 0 + + +@respx.mock +async def test_interpret_codeblock_stripped(): + text = "```json\n" + json.dumps(_valid_response_json()) + "\n```" + respx.post("https://api.anthropic.com/v1/messages").mock( + return_value=httpx.Response(200, json=_claude_envelope(text)) + ) + result = await pipeline.interpret(_req()) + assert "interpretation_json" in result + + +@respx.mock +async def test_interpret_reroll_then_success(): + valid = json.dumps(_valid_response_json()) + invalid = json.dumps({"summary": "...", "cards": [], "interactions": [], "advice": "", "confidence": "medium"}) + respx.post("https://api.anthropic.com/v1/messages").mock( + side_effect=[ + httpx.Response(200, json=_claude_envelope(invalid)), + httpx.Response(200, json=_claude_envelope(valid)), + ] + ) + result = await pipeline.interpret(_req()) + assert result["reroll_count"] == 1 + + +@respx.mock +async def test_interpret_reroll_fail_raises(): + invalid = json.dumps({"summary": "...", "cards": [], "interactions": [], "advice": "", "confidence": "medium"}) + respx.post("https://api.anthropic.com/v1/messages").mock( + return_value=httpx.Response(200, json=_claude_envelope(invalid)) + ) + with pytest.raises(pipeline.TarotError): + await pipeline.interpret(_req()) + + +@respx.mock +async def test_interpret_http_error(): + respx.post("https://api.anthropic.com/v1/messages").mock( + return_value=httpx.Response(500, text="boom") + ) + with pytest.raises(pipeline.TarotError): + await pipeline.interpret(_req()) + + +def test_calc_cost(): + cost = pipeline.calc_cost(1_000_000, 1_000_000) + assert cost == pipeline.TAROT_COST_INPUT_PER_M + pipeline.TAROT_COST_OUTPUT_PER_M