"""큐레이터 파이프라인 — fetch → claude → validate → save.""" import json import time from typing import Any, Dict import httpx from ..config import ANTHROPIC_API_KEY, LOTTO_CURATOR_MODEL from .. import service_proxy from .prompt import SYSTEM_PROMPT, build_user_message from .schema import validate_response from .retrospective import build_retrospective API_URL = "https://api.anthropic.com/v1/messages" class CuratorError(Exception): pass async def _call_claude(user_text: str, feedback: str = "") -> tuple[dict, dict]: if not ANTHROPIC_API_KEY: raise CuratorError("ANTHROPIC_API_KEY missing") headers = { "x-api-key": ANTHROPIC_API_KEY, "anthropic-version": "2023-06-01", "anthropic-beta": "prompt-caching-2024-07-31", "content-type": "application/json", } system_blocks = [{ "type": "text", "text": SYSTEM_PROMPT, "cache_control": {"type": "ephemeral"}, }] if feedback: user_text = f"이전 응답이 다음 이유로 거절됨: {feedback}\n올바른 스키마로 다시 응답.\n\n{user_text}" payload = { "model": LOTTO_CURATOR_MODEL, "max_tokens": 8192, # 4계층 20세트 + narrative + retrospective 수용 "system": system_blocks, "messages": [{"role": "user", "content": [{"type": "text", "text": user_text}]}], } started = time.monotonic() async with httpx.AsyncClient(timeout=180) as client: # 큰 응답 → 시간 여유 r = await client.post(API_URL, headers=headers, json=payload) r.raise_for_status() resp = r.json() latency_ms = int((time.monotonic() - started) * 1000) text = "".join( b.get("text", "") for b in resp.get("content", []) if b.get("type") == "text" ).strip() if text.startswith("```"): text = text.strip("`") if text.startswith("json"): text = text[4:] text = text.strip() parsed = json.loads(text) usage = resp.get("usage", {}) or {} return parsed, { "input": int(usage.get("input_tokens", 0) or 0), "output": int(usage.get("output_tokens", 0) or 0), "cache_read": int(usage.get("cache_read_input_tokens", 0) or 0), "cache_write": int(usage.get("cache_creation_input_tokens", 0) or 0), "latency_ms": latency_ms, } async def curate_weekly(source: str = "auto") -> Dict[str, Any]: cand_resp = await service_proxy.lotto_candidates(n=30) # ← 30 으로 확장 draw_no = cand_resp["draw_no"] candidates = cand_resp["candidates"] context = await service_proxy.lotto_context() retrospective = await build_retrospective(draw_no) user_text = build_user_message(draw_no, candidates, { "hot_numbers": context.get("hot_numbers", []), "cold_numbers": context.get("cold_numbers", []), "last_draw_summary": context.get("last_draw_summary", ""), "my_recent_performance": context.get("my_recent_performance", []), "retrospective": retrospective, }) candidate_numbers = [c["numbers"] for c in candidates] usage_total = {"input": 0, "output": 0, "cache_read": 0, "cache_write": 0, "latency_ms": 0} last_error = None validated = None for attempt in (0, 1): try: raw, usage = await _call_claude(user_text, feedback=last_error or "") for k in usage_total: usage_total[k] += usage[k] validated = validate_response(raw, candidate_numbers) break except Exception as e: last_error = f"{type(e).__name__}: {e}" if validated is None: raise CuratorError(f"schema validation failed after retry: {last_error}") payload = { "draw_no": draw_no, "picks": { "core": [p.model_dump() for p in validated.core_picks], "bonus": [p.model_dump() for p in validated.bonus_picks], "extended": [p.model_dump() for p in validated.extended_picks], "pool": [p.model_dump() for p in validated.pool_picks], }, "narrative": validated.narrative.model_dump(), "tier_rationale": validated.tier_rationale.model_dump(), "confidence": validated.confidence, "model": LOTTO_CURATOR_MODEL, "tokens_input": usage_total["input"], "tokens_output": usage_total["output"], "cache_read": usage_total["cache_read"], "cache_write": usage_total["cache_write"], "latency_ms": usage_total["latency_ms"], "source": source, } await service_proxy.lotto_save_briefing(payload) return { "ok": True, "draw_no": draw_no, "confidence": validated.confidence, "tokens": {"input": usage_total["input"], "output": usage_total["output"]}, "payload": payload, # 텔레그램 알림용 }