diff --git a/agent-office/app/curator/pipeline.py b/agent-office/app/curator/pipeline.py new file mode 100644 index 0000000..642f0ff --- /dev/null +++ b/agent-office/app/curator/pipeline.py @@ -0,0 +1,121 @@ +"""큐레이터 파이프라인 — fetch → claude → validate → save.""" +import json +import time +from typing import Any, Dict + +import httpx + +from ..config import ANTHROPIC_API_KEY, LOTTO_CURATOR_MODEL +from .. import service_proxy +from .prompt import SYSTEM_PROMPT, build_user_message +from .schema import validate_response + + +API_URL = "https://api.anthropic.com/v1/messages" + + +class CuratorError(Exception): + pass + + +async def _call_claude(user_text: str, feedback: str = "") -> tuple[dict, dict]: + if not ANTHROPIC_API_KEY: + raise CuratorError("ANTHROPIC_API_KEY missing") + headers = { + "x-api-key": ANTHROPIC_API_KEY, + "anthropic-version": "2023-06-01", + "anthropic-beta": "prompt-caching-2024-07-31", + "content-type": "application/json", + } + system_blocks = [{ + "type": "text", + "text": SYSTEM_PROMPT, + "cache_control": {"type": "ephemeral"}, + }] + if feedback: + user_text = f"이전 응답이 다음 이유로 거절됨: {feedback}\n올바른 스키마로 다시 응답.\n\n{user_text}" + payload = { + "model": LOTTO_CURATOR_MODEL, + "max_tokens": 4096, + "system": system_blocks, + "messages": [{"role": "user", "content": [{"type": "text", "text": user_text}]}], + } + started = time.monotonic() + async with httpx.AsyncClient(timeout=120) as client: + r = await client.post(API_URL, headers=headers, json=payload) + r.raise_for_status() + resp = r.json() + latency_ms = int((time.monotonic() - started) * 1000) + + text = "".join( + b.get("text", "") for b in resp.get("content", []) if b.get("type") == "text" + ).strip() + if text.startswith("```"): + text = text.strip("`") + if text.startswith("json"): + text = text[4:] + text = text.strip() + parsed = json.loads(text) + + usage = resp.get("usage", {}) or {} + return parsed, { + "input": int(usage.get("input_tokens", 0) or 0), + "output": int(usage.get("output_tokens", 0) or 0), + "cache_read": int(usage.get("cache_read_input_tokens", 0) or 0), + "cache_write": int(usage.get("cache_creation_input_tokens", 0) or 0), + "latency_ms": latency_ms, + } + + +async def curate_weekly(source: str = "auto") -> Dict[str, Any]: + cand_resp = await service_proxy.lotto_candidates(n=20) + draw_no = cand_resp["draw_no"] + candidates = cand_resp["candidates"] + context = await service_proxy.lotto_context() + + user_text = build_user_message(draw_no, candidates, { + "hot_numbers": context.get("hot_numbers", []), + "cold_numbers": context.get("cold_numbers", []), + "last_draw_summary": context.get("last_draw_summary", ""), + "my_recent_performance": context.get("my_recent_performance", []), + }) + + candidate_numbers = [c["numbers"] for c in candidates] + + usage_total = {"input": 0, "output": 0, "cache_read": 0, "cache_write": 0, "latency_ms": 0} + last_error = None + validated = None + + for attempt in (0, 1): + try: + raw, usage = await _call_claude(user_text, feedback=last_error or "") + for k in usage_total: + usage_total[k] += usage[k] + validated = validate_response(raw, candidate_numbers) + break + except Exception as e: + last_error = f"{type(e).__name__}: {e}" + + if validated is None: + raise CuratorError(f"schema validation failed after retry: {last_error}") + + payload = { + "draw_no": draw_no, + "picks": [p.model_dump() for p in validated.picks], + "narrative": validated.narrative.model_dump(), + "confidence": validated.confidence, + "model": LOTTO_CURATOR_MODEL, + "tokens_input": usage_total["input"], + "tokens_output": usage_total["output"], + "cache_read": usage_total["cache_read"], + "cache_write": usage_total["cache_write"], + "latency_ms": usage_total["latency_ms"], + "source": source, + } + await service_proxy.lotto_save_briefing(payload) + return { + "ok": True, + "draw_no": draw_no, + "confidence": validated.confidence, + "tokens": {"input": usage_total["input"], "output": usage_total["output"]}, + }