"""ai_news refresh pipeline — 시총 상위 N종목 병렬 처리.""" from __future__ import annotations import asyncio import datetime as dt import logging import os import sqlite3 import time from typing import Any, Dict, List from . import scraper as _scraper # legacy, kept for backward import from . import analyzer as _analyzer from . import articles_source # 신규 log = logging.getLogger(__name__) DEFAULT_TOP_N = 100 DEFAULT_CONCURRENCY = 10 DEFAULT_NEWS_PER_TICKER = 5 def _top_market_cap_tickers(conn: sqlite3.Connection, n: int) -> List[str]: rows = conn.execute( "SELECT ticker FROM krx_master " "WHERE market_cap IS NOT NULL AND is_preferred=0 AND is_spac=0 " "ORDER BY market_cap DESC LIMIT ?", (n,), ).fetchall() return [r[0] for r in rows] def _make_llm(): """Anthropic AsyncClient — env에 ANTHROPIC_API_KEY 필수.""" from anthropic import AsyncAnthropic return AsyncAnthropic(api_key=os.environ["ANTHROPIC_API_KEY"]) async def _process_one( ticker: str, name: str, articles: List[Dict[str, Any]], sem: asyncio.Semaphore, llm, model: str, ) -> Dict[str, Any]: async with sem: return await _analyzer.score_sentiment( llm, ticker, articles, name=name, model=model, ) def _upsert_news_sentiment( conn: sqlite3.Connection, asof: dt.date, rows: List[Dict[str, Any]], *, source: str = "articles", ) -> None: iso = asof.isoformat() data = [ ( r["ticker"], iso, r["score_raw"], r["reason"], r["news_count"], r["tokens_input"], r["tokens_output"], r["model"], source, ) for r in rows ] conn.executemany( """INSERT INTO news_sentiment (ticker, date, score_raw, reason, news_count, tokens_input, tokens_output, model, source) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(ticker, date) DO UPDATE SET score_raw=excluded.score_raw, reason=excluded.reason, news_count=excluded.news_count, tokens_input=excluded.tokens_input, tokens_output=excluded.tokens_output, model=excluded.model, source=excluded.source """, data, ) conn.commit() async def refresh_daily( conn: sqlite3.Connection, asof: dt.date, *, top_n: int = DEFAULT_TOP_N, concurrency: int = DEFAULT_CONCURRENCY, max_news_per_ticker: int = DEFAULT_NEWS_PER_TICKER, window_days: int = 1, model: str = _analyzer.DEFAULT_MODEL, ) -> Dict[str, Any]: started = time.time() tickers = _top_market_cap_tickers(conn, n=top_n) name_map = { r[0]: r[1] for r in conn.execute( f"SELECT ticker, name FROM krx_master WHERE ticker IN " f"({','.join('?' * len(tickers))})", tickers, ).fetchall() } if tickers else {} articles_by_ticker, mapping_stats = articles_source.gather_articles_for_tickers( conn, tickers, asof, window_days=window_days, max_per_ticker=max_news_per_ticker, ) sem = asyncio.Semaphore(concurrency) async with _make_llm() as llm: tasks = [] for t in tickers: arts = articles_by_ticker.get(t, []) if not arts: continue # 매핑 0 — score 미생성 tasks.append(_process_one(t, name_map.get(t, t), arts, sem, llm, model)) raw_results = await asyncio.gather(*tasks, return_exceptions=True) successes: List[Dict[str, Any]] = [] failures: List[str] = [] for r in raw_results: if isinstance(r, BaseException): failures.append(repr(r)) elif isinstance(r, dict): successes.append(r) if successes: _upsert_news_sentiment(conn, asof, successes, source="articles") top_pos = sorted(successes, key=lambda r: -r["score_raw"])[:5] top_neg = sorted(successes, key=lambda r: r["score_raw"])[:5] return { "asof": asof.isoformat(), "updated": len(successes), "failures": failures, "duration_sec": round(time.time() - started, 2), "tokens_input": sum(r["tokens_input"] for r in successes), "tokens_output": sum(r["tokens_output"] for r in successes), "top_pos": top_pos, "top_neg": top_neg, "model": model, "mapping": mapping_stats, }