"""LLM 기반 뉴스 요약 모듈. LLM_PROVIDER 환경변수로 provider 전환: - claude (기본): Anthropic Messages API (claude-haiku-4-5) - ollama: Windows AI 서버의 Ollama (qwen3:14b 등) `summarize_news(articles)` 시그니처는 provider와 무관하게 동일하며, 실패 시 `LLMError`(구 `OllamaError` alias)를 raise 한다. """ import os import logging import time from typing import List, Dict, Any import httpx logger = logging.getLogger("stock-lab.ai_summarizer") LLM_PROVIDER = os.getenv("LLM_PROVIDER", "claude").lower().strip() # Ollama OLLAMA_URL = os.getenv("OLLAMA_URL", "http://192.168.45.59:11435") OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen3:14b") # Anthropic (Claude) ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") ANTHROPIC_MODEL = os.getenv("ANTHROPIC_MODEL", "claude-haiku-4-5-20251001") ANTHROPIC_URL = "https://api.anthropic.com/v1/messages" ANTHROPIC_VERSION = "2023-06-01" _PROMPT_TEMPLATE = """당신은 한국 주식 시장 애널리스트입니다. 아래 뉴스 목록을 읽고 투자자 관점에서 한국어로 간결하게 요약하세요. 반드시 아래 형식을 그대로 지켜서 출력하세요. 다른 설명이나 서두, `` 같은 태그는 절대 출력하지 마세요. 📌 시장 흐름 (2줄 요약) 🔥 주목 이슈 • (이슈 1) • (이슈 2) • (이슈 3) 💡 투자 관점 (1줄 인사이트) === 뉴스 목록 === {news_block} """ class LLMError(RuntimeError): """LLM provider 호출 실패.""" # 하위 호환 alias (main.py 등 기존 import 유지) OllamaError = LLMError def _build_news_block(articles: List[Dict[str, Any]]) -> str: lines = [] for i, art in enumerate(articles, start=1): title = (art.get("title") or "").strip() content = (art.get("content") or art.get("summary") or "").strip() if content: lines.append(f"{i}. {title} — {content}") else: lines.append(f"{i}. {title}") return "\n".join(lines) if lines else "(뉴스 없음)" async def _summarize_with_ollama(prompt: str) -> Dict[str, Any]: url = f"{OLLAMA_URL.rstrip('/')}/api/generate" payload = {"model": OLLAMA_MODEL, "prompt": prompt, "stream": False} started = time.monotonic() try: async with httpx.AsyncClient(timeout=180.0) as client: resp = await client.post(url, json=payload) except httpx.HTTPError as e: err_type = type(e).__name__ err_msg = str(e) or "(no message)" logger.error(f"Ollama 연결 실패 ({url}): [{err_type}] {err_msg}") raise LLMError(f"Ollama 연결 실패: [{err_type}] {err_msg}") from e if resp.status_code != 200: logger.error(f"Ollama 응답 오류 {resp.status_code}: {resp.text[:200]}") raise LLMError(f"Ollama HTTP {resp.status_code}: {resp.text[:200]}") try: data = resp.json() except ValueError as e: raise LLMError(f"Ollama 응답 JSON 파싱 실패: {e}") from e summary = (data.get("response") or "").strip() prompt_tokens = int(data.get("prompt_eval_count") or 0) completion_tokens = int(data.get("eval_count") or 0) total_duration_ns = int(data.get("total_duration") or 0) if total_duration_ns > 0: duration_ms = total_duration_ns // 1_000_000 else: duration_ms = int((time.monotonic() - started) * 1000) return { "summary": summary, "tokens": { "prompt": prompt_tokens, "completion": completion_tokens, "total": prompt_tokens + completion_tokens, }, "model": data.get("model") or OLLAMA_MODEL, "duration_ms": duration_ms, } async def _summarize_with_claude(prompt: str) -> Dict[str, Any]: if not ANTHROPIC_API_KEY: raise LLMError("ANTHROPIC_API_KEY 미설정 — Claude provider 사용 불가") headers = { "x-api-key": ANTHROPIC_API_KEY, "anthropic-version": ANTHROPIC_VERSION, "content-type": "application/json", } payload = { "model": ANTHROPIC_MODEL, "max_tokens": 1024, "messages": [{"role": "user", "content": prompt}], } started = time.monotonic() try: async with httpx.AsyncClient(timeout=60.0) as client: resp = await client.post(ANTHROPIC_URL, headers=headers, json=payload) except httpx.HTTPError as e: err_type = type(e).__name__ err_msg = str(e) or "(no message)" logger.error(f"Anthropic 연결 실패: [{err_type}] {err_msg}") raise LLMError(f"Anthropic 연결 실패: [{err_type}] {err_msg}") from e if resp.status_code != 200: logger.error(f"Anthropic 응답 오류 {resp.status_code}: {resp.text[:300]}") raise LLMError(f"Anthropic HTTP {resp.status_code}: {resp.text[:200]}") try: data = resp.json() except ValueError as e: raise LLMError(f"Anthropic 응답 JSON 파싱 실패: {e}") from e # content: [{"type": "text", "text": "..."}] blocks = data.get("content") or [] summary = "".join(b.get("text", "") for b in blocks if b.get("type") == "text").strip() usage = data.get("usage") or {} prompt_tokens = int(usage.get("input_tokens") or 0) completion_tokens = int(usage.get("output_tokens") or 0) duration_ms = int((time.monotonic() - started) * 1000) return { "summary": summary, "tokens": { "prompt": prompt_tokens, "completion": completion_tokens, "total": prompt_tokens + completion_tokens, }, "model": data.get("model") or ANTHROPIC_MODEL, "duration_ms": duration_ms, } async def summarize_news(articles: List[Dict[str, Any]]) -> Dict[str, Any]: """뉴스 리스트를 LLM으로 요약. provider는 LLM_PROVIDER 환경변수로 선택. Returns: {"summary": str, "tokens": {...}, "model": str, "duration_ms": int} Raises: LLMError: provider 호출 실패 시. """ prompt = _PROMPT_TEMPLATE.format(news_block=_build_news_block(articles)) if LLM_PROVIDER == "ollama": return await _summarize_with_ollama(prompt) if LLM_PROVIDER == "claude": return await _summarize_with_claude(prompt) raise LLMError(f"지원하지 않는 LLM_PROVIDER: {LLM_PROVIDER}")