feat(ai_news): pipeline uses articles_source (replaces Naver scraper)

This commit is contained in:
2026-05-14 02:09:41 +09:00
parent ec0ccf649e
commit bbe5221e57
2 changed files with 97 additions and 71 deletions

View File

@@ -8,19 +8,17 @@ import logging
import os import os
import sqlite3 import sqlite3
import time import time
from typing import Any, Dict, List, Optional from typing import Any, Dict, List
import httpx from . import scraper as _scraper # legacy, kept for backward import
from . import scraper as _scraper
from . import analyzer as _analyzer from . import analyzer as _analyzer
from . import articles_source # 신규
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
DEFAULT_TOP_N = 100 DEFAULT_TOP_N = 100
DEFAULT_CONCURRENCY = 10 DEFAULT_CONCURRENCY = 10
DEFAULT_NEWS_PER_TICKER = 5 DEFAULT_NEWS_PER_TICKER = 5
DEFAULT_RATE_LIMIT_SEC = 0.2
def _top_market_cap_tickers(conn: sqlite3.Connection, n: int) -> List[str]: def _top_market_cap_tickers(conn: sqlite3.Connection, n: int) -> List[str]:
@@ -33,10 +31,6 @@ def _top_market_cap_tickers(conn: sqlite3.Connection, n: int) -> List[str]:
return [r[0] for r in rows] return [r[0] for r in rows]
def _make_http():
return httpx.AsyncClient(timeout=10.0, headers=_scraper.NAVER_HEADERS)
def _make_llm(): def _make_llm():
"""Anthropic AsyncClient — env에 ANTHROPIC_API_KEY 필수.""" """Anthropic AsyncClient — env에 ANTHROPIC_API_KEY 필수."""
from anthropic import AsyncAnthropic from anthropic import AsyncAnthropic
@@ -44,47 +38,40 @@ def _make_llm():
async def _process_one( async def _process_one(
ticker: str, name: str, sem: asyncio.Semaphore, ticker: str, name: str, articles: List[Dict[str, Any]],
http_client, llm, news_per_ticker: int, rate_limit_sec: float, model: str, sem: asyncio.Semaphore, llm, model: str,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
async with sem: async with sem:
if rate_limit_sec > 0:
await asyncio.sleep(rate_limit_sec)
news = await _scraper.fetch_news(http_client, ticker, n=news_per_ticker)
if not news:
return {
"ticker": ticker, "score_raw": 0.0, "reason": "no news",
"news_count": 0, "tokens_input": 0, "tokens_output": 0,
"model": model,
}
return await _analyzer.score_sentiment( return await _analyzer.score_sentiment(
llm, ticker, news, name=name, model=model, llm, ticker, articles, name=name, model=model,
) )
def _upsert_news_sentiment( def _upsert_news_sentiment(
conn: sqlite3.Connection, asof: dt.date, rows: List[Dict[str, Any]] conn: sqlite3.Connection, asof: dt.date,
rows: List[Dict[str, Any]], *, source: str = "articles",
) -> None: ) -> None:
iso = asof.isoformat() iso = asof.isoformat()
data = [ data = [
( (
r["ticker"], iso, r["score_raw"], r["reason"], r["news_count"], r["ticker"], iso, r["score_raw"], r["reason"], r["news_count"],
r["tokens_input"], r["tokens_output"], r["model"], r["tokens_input"], r["tokens_output"], r["model"], source,
) )
for r in rows for r in rows
] ]
conn.executemany( conn.executemany(
"""INSERT INTO news_sentiment """INSERT INTO news_sentiment
(ticker, date, score_raw, reason, news_count, (ticker, date, score_raw, reason, news_count,
tokens_input, tokens_output, model) tokens_input, tokens_output, model, source)
VALUES (?, ?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(ticker, date) DO UPDATE SET ON CONFLICT(ticker, date) DO UPDATE SET
score_raw=excluded.score_raw, score_raw=excluded.score_raw,
reason=excluded.reason, reason=excluded.reason,
news_count=excluded.news_count, news_count=excluded.news_count,
tokens_input=excluded.tokens_input, tokens_input=excluded.tokens_input,
tokens_output=excluded.tokens_output, tokens_output=excluded.tokens_output,
model=excluded.model model=excluded.model,
source=excluded.source
""", """,
data, data,
) )
@@ -97,11 +84,10 @@ async def refresh_daily(
*, *,
top_n: int = DEFAULT_TOP_N, top_n: int = DEFAULT_TOP_N,
concurrency: int = DEFAULT_CONCURRENCY, concurrency: int = DEFAULT_CONCURRENCY,
news_per_ticker: int = DEFAULT_NEWS_PER_TICKER, max_news_per_ticker: int = DEFAULT_NEWS_PER_TICKER,
rate_limit_sec: float = DEFAULT_RATE_LIMIT_SEC, window_days: int = 1,
model: str = _analyzer.DEFAULT_MODEL, model: str = _analyzer.DEFAULT_MODEL,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Returns summary dict with top_pos/top_neg/token totals/failures."""
started = time.time() started = time.time()
tickers = _top_market_cap_tickers(conn, n=top_n) tickers = _top_market_cap_tickers(conn, n=top_n)
name_map = { name_map = {
@@ -111,16 +97,20 @@ async def refresh_daily(
).fetchall() ).fetchall()
} if tickers else {} } if tickers else {}
sem = asyncio.Semaphore(concurrency) articles_by_ticker, mapping_stats = articles_source.gather_articles_for_tickers(
conn, tickers, asof,
async with _make_http() as http_client, _make_llm() as llm: window_days=window_days,
tasks = [ max_per_ticker=max_news_per_ticker,
_process_one(
t, name_map.get(t, t), sem, http_client, llm,
news_per_ticker, rate_limit_sec, model,
) )
for t in tickers
] sem = asyncio.Semaphore(concurrency)
async with _make_llm() as llm:
tasks = []
for t in tickers:
arts = articles_by_ticker.get(t, [])
if not arts:
continue # 매핑 0 — score 미생성
tasks.append(_process_one(t, name_map.get(t, t), arts, sem, llm, model))
raw_results = await asyncio.gather(*tasks, return_exceptions=True) raw_results = await asyncio.gather(*tasks, return_exceptions=True)
successes: List[Dict[str, Any]] = [] successes: List[Dict[str, Any]] = []
@@ -132,7 +122,7 @@ async def refresh_daily(
successes.append(r) successes.append(r)
if successes: if successes:
_upsert_news_sentiment(conn, asof, successes) _upsert_news_sentiment(conn, asof, successes, source="articles")
top_pos = sorted(successes, key=lambda r: -r["score_raw"])[:5] top_pos = sorted(successes, key=lambda r: -r["score_raw"])[:5]
top_neg = sorted(successes, key=lambda r: r["score_raw"])[:5] top_neg = sorted(successes, key=lambda r: r["score_raw"])[:5]
@@ -147,4 +137,5 @@ async def refresh_daily(
"top_pos": top_pos, "top_pos": top_pos,
"top_neg": top_neg, "top_neg": top_neg,
"model": model, "model": model,
"mapping": mapping_stats,
} }

View File

@@ -26,12 +26,15 @@ def conn():
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_refresh_daily_happy_path(conn): async def test_refresh_daily_happy_path(conn):
"""3종목 mini integration — 각 종목별로 scraper/analyzer mock.""" """3종목 mini integration — articles_source mock + analyzer mock."""
asof = dt.date(2026, 5, 13) asof = dt.date(2026, 5, 13)
fake_news = [{"title": "헤드라인"}]
async def fake_fetch(client, ticker, n): fake_articles_by_ticker = {
return fake_news "005930": [{"title": "삼성 뉴스", "summary": "", "press": "", "pub_date": ""}],
"000660": [{"title": "SK 뉴스", "summary": "", "press": "", "pub_date": ""}],
"373220": [{"title": "LG 뉴스", "summary": "", "press": "", "pub_date": ""}],
}
fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3}
scores_by_ticker = { scores_by_ticker = {
"005930": 7.5, "000660": 4.0, "373220": -6.0, "005930": 7.5, "000660": 4.0, "373220": -6.0,
@@ -43,43 +46,40 @@ async def test_refresh_daily_happy_path(conn):
"tokens_input": 100, "tokens_output": 20, "model": model, "tokens_input": 100, "tokens_output": 20, "model": model,
} }
with patch.object(pipeline, "_scraper") as ms, \ with patch.object(pipeline, "articles_source") as mas, \
patch.object(pipeline, "_analyzer") as ma, \ patch.object(pipeline, "_analyzer") as ma, \
patch.object(pipeline, "_make_llm") as ml, \ patch.object(pipeline, "_make_llm") as ml:
patch.object(pipeline, "_make_http") as mh: mas.gather_articles_for_tickers = MagicMock(
ms.fetch_news = fake_fetch return_value=(fake_articles_by_ticker, fake_stats)
)
ma.score_sentiment = fake_score ma.score_sentiment = fake_score
ml.return_value.__aenter__.return_value = AsyncMock() ml.return_value.__aenter__.return_value = AsyncMock()
ml.return_value.__aexit__.return_value = None ml.return_value.__aexit__.return_value = None
mh.return_value.__aenter__.return_value = AsyncMock() result = await pipeline.refresh_daily(conn, asof, concurrency=3)
mh.return_value.__aexit__.return_value = None
result = await pipeline.refresh_daily(conn, asof, concurrency=3, rate_limit_sec=0)
assert result["asof"] == "2026-05-13" assert result["asof"] == "2026-05-13"
assert result["updated"] == 3 assert result["updated"] == 3
assert result["failures"] == [] assert result["failures"] == []
assert len(result["top_pos"]) == 3 assert result["top_pos"][0]["ticker"] == "005930"
assert result["top_pos"][0]["ticker"] == "005930" # 가장 큰 점수 assert result["top_neg"][0]["ticker"] == "373220"
assert result["top_neg"][0]["ticker"] == "373220" # 가장 작은 점수 assert result["mapping"] == fake_stats
assert result["tokens_input"] == 300
assert result["tokens_output"] == 60
# DB upsert 확인 rows = conn.execute("SELECT ticker, score_raw, source FROM news_sentiment "
rows = conn.execute("SELECT ticker, score_raw FROM news_sentiment WHERE date=?", "WHERE date=?", ("2026-05-13",)).fetchall()
("2026-05-13",)).fetchall()
assert len(rows) == 3 assert len(rows) == 3
by_ticker = {r["ticker"]: r["score_raw"] for r in rows} assert all(r["source"] == "articles" for r in rows)
assert by_ticker["005930"] == 7.5
assert by_ticker["373220"] == -6.0
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_refresh_daily_failures_isolated(conn): async def test_refresh_daily_failures_isolated(conn):
"""한 종목이 예외 던져도 나머지 종목은 정상 처리."""
asof = dt.date(2026, 5, 13) asof = dt.date(2026, 5, 13)
async def fake_fetch(client, ticker, n): fake_articles_by_ticker = {
return [{"title": "h"}] "005930": [{"title": "h", "summary": "", "press": "", "pub_date": ""}],
"000660": [{"title": "h", "summary": "", "press": "", "pub_date": ""}],
"373220": [{"title": "h", "summary": "", "press": "", "pub_date": ""}],
}
fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3}
async def fake_score(llm, ticker, news, *, name=None, model="m"): async def fake_score(llm, ticker, news, *, name=None, model="m"):
if ticker == "000660": if ticker == "000660":
@@ -89,22 +89,57 @@ async def test_refresh_daily_failures_isolated(conn):
"tokens_input": 100, "tokens_output": 20, "model": model, "tokens_input": 100, "tokens_output": 20, "model": model,
} }
with patch.object(pipeline, "_scraper") as ms, \ with patch.object(pipeline, "articles_source") as mas, \
patch.object(pipeline, "_analyzer") as ma, \ patch.object(pipeline, "_analyzer") as ma, \
patch.object(pipeline, "_make_llm") as ml, \ patch.object(pipeline, "_make_llm") as ml:
patch.object(pipeline, "_make_http") as mh: mas.gather_articles_for_tickers = MagicMock(
ms.fetch_news = fake_fetch return_value=(fake_articles_by_ticker, fake_stats)
)
ma.score_sentiment = fake_score ma.score_sentiment = fake_score
ml.return_value.__aenter__.return_value = AsyncMock() ml.return_value.__aenter__.return_value = AsyncMock()
ml.return_value.__aexit__.return_value = None ml.return_value.__aexit__.return_value = None
mh.return_value.__aenter__.return_value = AsyncMock() result = await pipeline.refresh_daily(conn, asof, concurrency=3)
mh.return_value.__aexit__.return_value = None
result = await pipeline.refresh_daily(conn, asof, concurrency=3, rate_limit_sec=0)
assert result["updated"] == 2 assert result["updated"] == 2
assert len(result["failures"]) == 1 assert len(result["failures"]) == 1
@pytest.mark.asyncio
async def test_refresh_daily_no_match_ticker_skipped(conn):
"""매핑 0인 ticker 는 LLM 호출 skip + news_sentiment 행 미생성."""
asof = dt.date(2026, 5, 13)
fake_articles_by_ticker = {
"005930": [{"title": "삼성", "summary": "", "press": "", "pub_date": ""}],
"000660": [], # 매핑 없음
"373220": [], # 매핑 없음
}
fake_stats = {"total_articles": 1, "matched_pairs": 1, "hit_tickers": 1}
async def fake_score(llm, ticker, news, *, name=None, model="m"):
return {
"ticker": ticker, "score_raw": 5.0, "reason": "r",
"news_count": 1, "tokens_input": 100, "tokens_output": 20,
"model": model,
}
with patch.object(pipeline, "articles_source") as mas, \
patch.object(pipeline, "_analyzer") as ma, \
patch.object(pipeline, "_make_llm") as ml:
mas.gather_articles_for_tickers = MagicMock(
return_value=(fake_articles_by_ticker, fake_stats)
)
ma.score_sentiment = fake_score
ml.return_value.__aenter__.return_value = AsyncMock()
ml.return_value.__aexit__.return_value = None
result = await pipeline.refresh_daily(conn, asof, concurrency=3)
assert result["updated"] == 1
rows = conn.execute("SELECT ticker FROM news_sentiment "
"WHERE date=?", ("2026-05-13",)).fetchall()
assert {r["ticker"] for r in rows} == {"005930"}
def test_top_market_cap_tickers(conn): def test_top_market_cap_tickers(conn):
out = pipeline._top_market_cap_tickers(conn, n=2) out = pipeline._top_market_cap_tickers(conn, n=2)
assert out == ["005930", "000660"] assert out == ["005930", "000660"]