From bbe5221e5711e24b558471571c75fc5653212331 Mon Sep 17 00:00:00 2001 From: gahusb Date: Thu, 14 May 2026 02:09:41 +0900 Subject: [PATCH] feat(ai_news): pipeline uses articles_source (replaces Naver scraper) --- stock-lab/app/screener/ai_news/pipeline.py | 69 +++++++-------- stock-lab/tests/test_ai_news_pipeline.py | 99 +++++++++++++++------- 2 files changed, 97 insertions(+), 71 deletions(-) diff --git a/stock-lab/app/screener/ai_news/pipeline.py b/stock-lab/app/screener/ai_news/pipeline.py index 7ab639e..76c6267 100644 --- a/stock-lab/app/screener/ai_news/pipeline.py +++ b/stock-lab/app/screener/ai_news/pipeline.py @@ -8,19 +8,17 @@ import logging import os import sqlite3 import time -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List -import httpx - -from . import scraper as _scraper +from . import scraper as _scraper # legacy, kept for backward import from . import analyzer as _analyzer +from . import articles_source # 신규 log = logging.getLogger(__name__) DEFAULT_TOP_N = 100 DEFAULT_CONCURRENCY = 10 DEFAULT_NEWS_PER_TICKER = 5 -DEFAULT_RATE_LIMIT_SEC = 0.2 def _top_market_cap_tickers(conn: sqlite3.Connection, n: int) -> List[str]: @@ -33,10 +31,6 @@ def _top_market_cap_tickers(conn: sqlite3.Connection, n: int) -> List[str]: return [r[0] for r in rows] -def _make_http(): - return httpx.AsyncClient(timeout=10.0, headers=_scraper.NAVER_HEADERS) - - def _make_llm(): """Anthropic AsyncClient — env에 ANTHROPIC_API_KEY 필수.""" from anthropic import AsyncAnthropic @@ -44,47 +38,40 @@ def _make_llm(): async def _process_one( - ticker: str, name: str, sem: asyncio.Semaphore, - http_client, llm, news_per_ticker: int, rate_limit_sec: float, model: str, + ticker: str, name: str, articles: List[Dict[str, Any]], + sem: asyncio.Semaphore, llm, model: str, ) -> Dict[str, Any]: async with sem: - if rate_limit_sec > 0: - await asyncio.sleep(rate_limit_sec) - news = await _scraper.fetch_news(http_client, ticker, n=news_per_ticker) - if not news: - return { - "ticker": ticker, "score_raw": 0.0, "reason": "no news", - "news_count": 0, "tokens_input": 0, "tokens_output": 0, - "model": model, - } return await _analyzer.score_sentiment( - llm, ticker, news, name=name, model=model, + llm, ticker, articles, name=name, model=model, ) def _upsert_news_sentiment( - conn: sqlite3.Connection, asof: dt.date, rows: List[Dict[str, Any]] + conn: sqlite3.Connection, asof: dt.date, + rows: List[Dict[str, Any]], *, source: str = "articles", ) -> None: iso = asof.isoformat() data = [ ( r["ticker"], iso, r["score_raw"], r["reason"], r["news_count"], - r["tokens_input"], r["tokens_output"], r["model"], + r["tokens_input"], r["tokens_output"], r["model"], source, ) for r in rows ] conn.executemany( """INSERT INTO news_sentiment (ticker, date, score_raw, reason, news_count, - tokens_input, tokens_output, model) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) + tokens_input, tokens_output, model, source) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(ticker, date) DO UPDATE SET score_raw=excluded.score_raw, reason=excluded.reason, news_count=excluded.news_count, tokens_input=excluded.tokens_input, tokens_output=excluded.tokens_output, - model=excluded.model + model=excluded.model, + source=excluded.source """, data, ) @@ -97,11 +84,10 @@ async def refresh_daily( *, top_n: int = DEFAULT_TOP_N, concurrency: int = DEFAULT_CONCURRENCY, - news_per_ticker: int = DEFAULT_NEWS_PER_TICKER, - rate_limit_sec: float = DEFAULT_RATE_LIMIT_SEC, + max_news_per_ticker: int = DEFAULT_NEWS_PER_TICKER, + window_days: int = 1, model: str = _analyzer.DEFAULT_MODEL, ) -> Dict[str, Any]: - """Returns summary dict with top_pos/top_neg/token totals/failures.""" started = time.time() tickers = _top_market_cap_tickers(conn, n=top_n) name_map = { @@ -111,16 +97,20 @@ async def refresh_daily( ).fetchall() } if tickers else {} - sem = asyncio.Semaphore(concurrency) + articles_by_ticker, mapping_stats = articles_source.gather_articles_for_tickers( + conn, tickers, asof, + window_days=window_days, + max_per_ticker=max_news_per_ticker, + ) - async with _make_http() as http_client, _make_llm() as llm: - tasks = [ - _process_one( - t, name_map.get(t, t), sem, http_client, llm, - news_per_ticker, rate_limit_sec, model, - ) - for t in tickers - ] + sem = asyncio.Semaphore(concurrency) + async with _make_llm() as llm: + tasks = [] + for t in tickers: + arts = articles_by_ticker.get(t, []) + if not arts: + continue # 매핑 0 — score 미생성 + tasks.append(_process_one(t, name_map.get(t, t), arts, sem, llm, model)) raw_results = await asyncio.gather(*tasks, return_exceptions=True) successes: List[Dict[str, Any]] = [] @@ -132,7 +122,7 @@ async def refresh_daily( successes.append(r) if successes: - _upsert_news_sentiment(conn, asof, successes) + _upsert_news_sentiment(conn, asof, successes, source="articles") top_pos = sorted(successes, key=lambda r: -r["score_raw"])[:5] top_neg = sorted(successes, key=lambda r: r["score_raw"])[:5] @@ -147,4 +137,5 @@ async def refresh_daily( "top_pos": top_pos, "top_neg": top_neg, "model": model, + "mapping": mapping_stats, } diff --git a/stock-lab/tests/test_ai_news_pipeline.py b/stock-lab/tests/test_ai_news_pipeline.py index 7994047..1ef9935 100644 --- a/stock-lab/tests/test_ai_news_pipeline.py +++ b/stock-lab/tests/test_ai_news_pipeline.py @@ -26,12 +26,15 @@ def conn(): @pytest.mark.asyncio async def test_refresh_daily_happy_path(conn): - """3종목 mini integration — 각 종목별로 scraper/analyzer mock.""" + """3종목 mini integration — articles_source mock + analyzer mock.""" asof = dt.date(2026, 5, 13) - fake_news = [{"title": "헤드라인"}] - async def fake_fetch(client, ticker, n): - return fake_news + fake_articles_by_ticker = { + "005930": [{"title": "삼성 뉴스", "summary": "", "press": "", "pub_date": ""}], + "000660": [{"title": "SK 뉴스", "summary": "", "press": "", "pub_date": ""}], + "373220": [{"title": "LG 뉴스", "summary": "", "press": "", "pub_date": ""}], + } + fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3} scores_by_ticker = { "005930": 7.5, "000660": 4.0, "373220": -6.0, @@ -43,43 +46,40 @@ async def test_refresh_daily_happy_path(conn): "tokens_input": 100, "tokens_output": 20, "model": model, } - with patch.object(pipeline, "_scraper") as ms, \ + with patch.object(pipeline, "articles_source") as mas, \ patch.object(pipeline, "_analyzer") as ma, \ - patch.object(pipeline, "_make_llm") as ml, \ - patch.object(pipeline, "_make_http") as mh: - ms.fetch_news = fake_fetch + patch.object(pipeline, "_make_llm") as ml: + mas.gather_articles_for_tickers = MagicMock( + return_value=(fake_articles_by_ticker, fake_stats) + ) ma.score_sentiment = fake_score ml.return_value.__aenter__.return_value = AsyncMock() ml.return_value.__aexit__.return_value = None - mh.return_value.__aenter__.return_value = AsyncMock() - mh.return_value.__aexit__.return_value = None - result = await pipeline.refresh_daily(conn, asof, concurrency=3, rate_limit_sec=0) + result = await pipeline.refresh_daily(conn, asof, concurrency=3) assert result["asof"] == "2026-05-13" assert result["updated"] == 3 assert result["failures"] == [] - assert len(result["top_pos"]) == 3 - assert result["top_pos"][0]["ticker"] == "005930" # 가장 큰 점수 - assert result["top_neg"][0]["ticker"] == "373220" # 가장 작은 점수 - assert result["tokens_input"] == 300 - assert result["tokens_output"] == 60 + assert result["top_pos"][0]["ticker"] == "005930" + assert result["top_neg"][0]["ticker"] == "373220" + assert result["mapping"] == fake_stats - # DB upsert 확인 - rows = conn.execute("SELECT ticker, score_raw FROM news_sentiment WHERE date=?", - ("2026-05-13",)).fetchall() + rows = conn.execute("SELECT ticker, score_raw, source FROM news_sentiment " + "WHERE date=?", ("2026-05-13",)).fetchall() assert len(rows) == 3 - by_ticker = {r["ticker"]: r["score_raw"] for r in rows} - assert by_ticker["005930"] == 7.5 - assert by_ticker["373220"] == -6.0 + assert all(r["source"] == "articles" for r in rows) @pytest.mark.asyncio async def test_refresh_daily_failures_isolated(conn): - """한 종목이 예외 던져도 나머지 종목은 정상 처리.""" asof = dt.date(2026, 5, 13) - async def fake_fetch(client, ticker, n): - return [{"title": "h"}] + fake_articles_by_ticker = { + "005930": [{"title": "h", "summary": "", "press": "", "pub_date": ""}], + "000660": [{"title": "h", "summary": "", "press": "", "pub_date": ""}], + "373220": [{"title": "h", "summary": "", "press": "", "pub_date": ""}], + } + fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3} async def fake_score(llm, ticker, news, *, name=None, model="m"): if ticker == "000660": @@ -89,22 +89,57 @@ async def test_refresh_daily_failures_isolated(conn): "tokens_input": 100, "tokens_output": 20, "model": model, } - with patch.object(pipeline, "_scraper") as ms, \ + with patch.object(pipeline, "articles_source") as mas, \ patch.object(pipeline, "_analyzer") as ma, \ - patch.object(pipeline, "_make_llm") as ml, \ - patch.object(pipeline, "_make_http") as mh: - ms.fetch_news = fake_fetch + patch.object(pipeline, "_make_llm") as ml: + mas.gather_articles_for_tickers = MagicMock( + return_value=(fake_articles_by_ticker, fake_stats) + ) ma.score_sentiment = fake_score ml.return_value.__aenter__.return_value = AsyncMock() ml.return_value.__aexit__.return_value = None - mh.return_value.__aenter__.return_value = AsyncMock() - mh.return_value.__aexit__.return_value = None - result = await pipeline.refresh_daily(conn, asof, concurrency=3, rate_limit_sec=0) + result = await pipeline.refresh_daily(conn, asof, concurrency=3) assert result["updated"] == 2 assert len(result["failures"]) == 1 +@pytest.mark.asyncio +async def test_refresh_daily_no_match_ticker_skipped(conn): + """매핑 0인 ticker 는 LLM 호출 skip + news_sentiment 행 미생성.""" + asof = dt.date(2026, 5, 13) + + fake_articles_by_ticker = { + "005930": [{"title": "삼성", "summary": "", "press": "", "pub_date": ""}], + "000660": [], # 매핑 없음 + "373220": [], # 매핑 없음 + } + fake_stats = {"total_articles": 1, "matched_pairs": 1, "hit_tickers": 1} + + async def fake_score(llm, ticker, news, *, name=None, model="m"): + return { + "ticker": ticker, "score_raw": 5.0, "reason": "r", + "news_count": 1, "tokens_input": 100, "tokens_output": 20, + "model": model, + } + + with patch.object(pipeline, "articles_source") as mas, \ + patch.object(pipeline, "_analyzer") as ma, \ + patch.object(pipeline, "_make_llm") as ml: + mas.gather_articles_for_tickers = MagicMock( + return_value=(fake_articles_by_ticker, fake_stats) + ) + ma.score_sentiment = fake_score + ml.return_value.__aenter__.return_value = AsyncMock() + ml.return_value.__aexit__.return_value = None + result = await pipeline.refresh_daily(conn, asof, concurrency=3) + + assert result["updated"] == 1 + rows = conn.execute("SELECT ticker FROM news_sentiment " + "WHERE date=?", ("2026-05-13",)).fetchall() + assert {r["ticker"] for r in rows} == {"005930"} + + def test_top_market_cap_tickers(conn): out = pipeline._top_market_cap_tickers(conn, n=2) assert out == ["005930", "000660"]