import datetime as dt import sqlite3 import pytest from unittest.mock import AsyncMock, MagicMock, patch from app.screener.ai_news import pipeline from app.screener.schema import ensure_screener_schema @pytest.fixture def conn(): c = sqlite3.connect(":memory:") c.row_factory = sqlite3.Row ensure_screener_schema(c) # 시총 상위 3종목 시드 c.execute("INSERT INTO krx_master (ticker, name, market, market_cap, updated_at) " "VALUES (?, ?, 'KOSPI', ?, datetime('now'))", ("005930", "삼성전자", 9_000_000)) c.execute("INSERT INTO krx_master (ticker, name, market, market_cap, updated_at) " "VALUES (?, ?, 'KOSPI', ?, datetime('now'))", ("000660", "SK하이닉스", 8_000_000)) c.execute("INSERT INTO krx_master (ticker, name, market, market_cap, updated_at) " "VALUES (?, ?, 'KOSPI', ?, datetime('now'))", ("373220", "LG에너지솔루션", 7_000_000)) c.commit() yield c c.close() @pytest.mark.asyncio async def test_refresh_daily_happy_path(conn): """3종목 mini integration — articles_source mock + analyzer mock.""" asof = dt.date(2026, 5, 13) fake_articles_by_ticker = { "005930": [{"title": "삼성 뉴스", "summary": "", "press": "", "pub_date": ""}], "000660": [{"title": "SK 뉴스", "summary": "", "press": "", "pub_date": ""}], "373220": [{"title": "LG 뉴스", "summary": "", "press": "", "pub_date": ""}], } fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3} scores_by_ticker = { "005930": 7.5, "000660": 4.0, "373220": -6.0, } async def fake_score(llm, ticker, news, *, name=None, model="m"): return { "ticker": ticker, "score_raw": scores_by_ticker[ticker], "reason": f"r{ticker}", "news_count": 1, "tokens_input": 100, "tokens_output": 20, "model": model, } with patch.object(pipeline, "articles_source") as mas, \ patch.object(pipeline, "_analyzer") as ma, \ patch.object(pipeline, "_make_llm") as ml: mas.gather_articles_for_tickers = MagicMock( return_value=(fake_articles_by_ticker, fake_stats) ) ma.score_sentiment = fake_score ml.return_value.__aenter__.return_value = AsyncMock() ml.return_value.__aexit__.return_value = None result = await pipeline.refresh_daily(conn, asof, concurrency=3) assert result["asof"] == "2026-05-13" assert result["updated"] == 3 assert result["failures"] == [] assert result["top_pos"][0]["ticker"] == "005930" assert result["top_neg"][0]["ticker"] == "373220" assert result["mapping"] == fake_stats rows = conn.execute("SELECT ticker, score_raw, source FROM news_sentiment " "WHERE date=?", ("2026-05-13",)).fetchall() assert len(rows) == 3 assert all(r["source"] == "articles" for r in rows) @pytest.mark.asyncio async def test_refresh_daily_failures_isolated(conn): asof = dt.date(2026, 5, 13) fake_articles_by_ticker = { "005930": [{"title": "h", "summary": "", "press": "", "pub_date": ""}], "000660": [{"title": "h", "summary": "", "press": "", "pub_date": ""}], "373220": [{"title": "h", "summary": "", "press": "", "pub_date": ""}], } fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3} async def fake_score(llm, ticker, news, *, name=None, model="m"): if ticker == "000660": raise RuntimeError("llm exploded") return { "ticker": ticker, "score_raw": 5.0, "reason": "r", "news_count": 1, "tokens_input": 100, "tokens_output": 20, "model": model, } with patch.object(pipeline, "articles_source") as mas, \ patch.object(pipeline, "_analyzer") as ma, \ patch.object(pipeline, "_make_llm") as ml: mas.gather_articles_for_tickers = MagicMock( return_value=(fake_articles_by_ticker, fake_stats) ) ma.score_sentiment = fake_score ml.return_value.__aenter__.return_value = AsyncMock() ml.return_value.__aexit__.return_value = None result = await pipeline.refresh_daily(conn, asof, concurrency=3) assert result["updated"] == 2 assert len(result["failures"]) == 1 @pytest.mark.asyncio async def test_refresh_daily_no_match_ticker_skipped(conn): """매핑 0인 ticker 는 LLM 호출 skip + news_sentiment 행 미생성.""" asof = dt.date(2026, 5, 13) fake_articles_by_ticker = { "005930": [{"title": "삼성", "summary": "", "press": "", "pub_date": ""}], "000660": [], # 매핑 없음 "373220": [], # 매핑 없음 } fake_stats = {"total_articles": 1, "matched_pairs": 1, "hit_tickers": 1} async def fake_score(llm, ticker, news, *, name=None, model="m"): return { "ticker": ticker, "score_raw": 5.0, "reason": "r", "news_count": 1, "tokens_input": 100, "tokens_output": 20, "model": model, } with patch.object(pipeline, "articles_source") as mas, \ patch.object(pipeline, "_analyzer") as ma, \ patch.object(pipeline, "_make_llm") as ml: mas.gather_articles_for_tickers = MagicMock( return_value=(fake_articles_by_ticker, fake_stats) ) ma.score_sentiment = fake_score ml.return_value.__aenter__.return_value = AsyncMock() ml.return_value.__aexit__.return_value = None result = await pipeline.refresh_daily(conn, asof, concurrency=3) assert result["updated"] == 1 rows = conn.execute("SELECT ticker FROM news_sentiment " "WHERE date=?", ("2026-05-13",)).fetchall() assert {r["ticker"] for r in rows} == {"005930"} @pytest.mark.asyncio async def test_refresh_daily_sign_gate_no_positive_in_neg(conn): """전 종목 양수 점수면 top_neg는 비어야 함 (호재 종목이 악재란에 채워지면 안 됨).""" asof = dt.date(2026, 5, 13) fake_articles_by_ticker = { "005930": [{"title": "h", "summary": "", "press": "", "pub_date": ""}], "000660": [{"title": "h", "summary": "", "press": "", "pub_date": ""}], "373220": [{"title": "h", "summary": "", "press": "", "pub_date": ""}], } fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3} scores = {"005930": 6.0, "000660": 2.0, "373220": 0.5} # 모두 양수 async def fake_score(llm, ticker, news, *, name=None, model="m"): return { "ticker": ticker, "score_raw": scores[ticker], "reason": "r", "news_count": 1, "tokens_input": 1, "tokens_output": 1, "model": model, } with patch.object(pipeline, "articles_source") as mas, \ patch.object(pipeline, "_analyzer") as ma, \ patch.object(pipeline, "_make_llm") as ml: mas.gather_articles_for_tickers = MagicMock(return_value=(fake_articles_by_ticker, fake_stats)) ma.score_sentiment = fake_score ml.return_value.__aenter__.return_value = AsyncMock() ml.return_value.__aexit__.return_value = None result = await pipeline.refresh_daily(conn, asof, concurrency=3) assert len(result["top_pos"]) == 3 assert result["top_neg"] == [] # 양수 종목이 악재란에 들어가면 안 됨 @pytest.mark.asyncio async def test_refresh_daily_sign_gate_excludes_neutral(conn): """score=0(중립)은 호재·악재 어디에도 포함되지 않음.""" asof = dt.date(2026, 5, 13) fake_articles_by_ticker = { "005930": [{"title": "h", "summary": "", "press": "", "pub_date": ""}], "000660": [{"title": "h", "summary": "", "press": "", "pub_date": ""}], "373220": [{"title": "h", "summary": "", "press": "", "pub_date": ""}], } fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3} scores = {"005930": 3.0, "000660": 0.0, "373220": -3.0} async def fake_score(llm, ticker, news, *, name=None, model="m"): return { "ticker": ticker, "score_raw": scores[ticker], "reason": "r", "news_count": 1, "tokens_input": 1, "tokens_output": 1, "model": model, } with patch.object(pipeline, "articles_source") as mas, \ patch.object(pipeline, "_analyzer") as ma, \ patch.object(pipeline, "_make_llm") as ml: mas.gather_articles_for_tickers = MagicMock(return_value=(fake_articles_by_ticker, fake_stats)) ma.score_sentiment = fake_score ml.return_value.__aenter__.return_value = AsyncMock() ml.return_value.__aexit__.return_value = None result = await pipeline.refresh_daily(conn, asof, concurrency=3) pos_tickers = {r["ticker"] for r in result["top_pos"]} neg_tickers = {r["ticker"] for r in result["top_neg"]} assert pos_tickers == {"005930"} assert neg_tickers == {"373220"} assert "000660" not in pos_tickers and "000660" not in neg_tickers def test_top_market_cap_tickers(conn): out = pipeline._top_market_cap_tickers(conn, n=2) assert out == ["005930", "000660"]