Files
web-page-backend/stock-lab/app/screener/ai_news/pipeline.py

142 lines
4.3 KiB
Python

"""ai_news refresh pipeline — 시총 상위 N종목 병렬 처리."""
from __future__ import annotations
import asyncio
import datetime as dt
import logging
import os
import sqlite3
import time
from typing import Any, Dict, List
from . import scraper as _scraper # legacy, kept for backward import
from . import analyzer as _analyzer
from . import articles_source # 신규
log = logging.getLogger(__name__)
DEFAULT_TOP_N = 100
DEFAULT_CONCURRENCY = 10
DEFAULT_NEWS_PER_TICKER = 5
def _top_market_cap_tickers(conn: sqlite3.Connection, n: int) -> List[str]:
rows = conn.execute(
"SELECT ticker FROM krx_master "
"WHERE market_cap IS NOT NULL AND is_preferred=0 AND is_spac=0 "
"ORDER BY market_cap DESC LIMIT ?",
(n,),
).fetchall()
return [r[0] for r in rows]
def _make_llm():
"""Anthropic AsyncClient — env에 ANTHROPIC_API_KEY 필수."""
from anthropic import AsyncAnthropic
return AsyncAnthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
async def _process_one(
ticker: str, name: str, articles: List[Dict[str, Any]],
sem: asyncio.Semaphore, llm, model: str,
) -> Dict[str, Any]:
async with sem:
return await _analyzer.score_sentiment(
llm, ticker, articles, name=name, model=model,
)
def _upsert_news_sentiment(
conn: sqlite3.Connection, asof: dt.date,
rows: List[Dict[str, Any]], *, source: str = "articles",
) -> None:
iso = asof.isoformat()
data = [
(
r["ticker"], iso, r["score_raw"], r["reason"], r["news_count"],
r["tokens_input"], r["tokens_output"], r["model"], source,
)
for r in rows
]
conn.executemany(
"""INSERT INTO news_sentiment
(ticker, date, score_raw, reason, news_count,
tokens_input, tokens_output, model, source)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(ticker, date) DO UPDATE SET
score_raw=excluded.score_raw,
reason=excluded.reason,
news_count=excluded.news_count,
tokens_input=excluded.tokens_input,
tokens_output=excluded.tokens_output,
model=excluded.model,
source=excluded.source
""",
data,
)
conn.commit()
async def refresh_daily(
conn: sqlite3.Connection,
asof: dt.date,
*,
top_n: int = DEFAULT_TOP_N,
concurrency: int = DEFAULT_CONCURRENCY,
max_news_per_ticker: int = DEFAULT_NEWS_PER_TICKER,
window_days: int = 1,
model: str = _analyzer.DEFAULT_MODEL,
) -> Dict[str, Any]:
started = time.time()
tickers = _top_market_cap_tickers(conn, n=top_n)
name_map = {
r[0]: r[1] for r in conn.execute(
f"SELECT ticker, name FROM krx_master WHERE ticker IN "
f"({','.join('?' * len(tickers))})", tickers,
).fetchall()
} if tickers else {}
articles_by_ticker, mapping_stats = articles_source.gather_articles_for_tickers(
conn, tickers, asof,
window_days=window_days,
max_per_ticker=max_news_per_ticker,
)
sem = asyncio.Semaphore(concurrency)
async with _make_llm() as llm:
tasks = []
for t in tickers:
arts = articles_by_ticker.get(t, [])
if not arts:
continue # 매핑 0 — score 미생성
tasks.append(_process_one(t, name_map.get(t, t), arts, sem, llm, model))
raw_results = await asyncio.gather(*tasks, return_exceptions=True)
successes: List[Dict[str, Any]] = []
failures: List[str] = []
for r in raw_results:
if isinstance(r, BaseException):
failures.append(repr(r))
elif isinstance(r, dict):
successes.append(r)
if successes:
_upsert_news_sentiment(conn, asof, successes, source="articles")
top_pos = sorted(successes, key=lambda r: -r["score_raw"])[:5]
top_neg = sorted(successes, key=lambda r: r["score_raw"])[:5]
return {
"asof": asof.isoformat(),
"updated": len(successes),
"failures": failures,
"duration_sec": round(time.time() - started, 2),
"tokens_input": sum(r["tokens_input"] for r in successes),
"tokens_output": sum(r["tokens_output"] for r in successes),
"top_pos": top_pos,
"top_neg": top_neg,
"model": model,
"mapping": mapping_stats,
}