7 Commits

12 changed files with 383 additions and 79 deletions

View File

@@ -27,6 +27,25 @@ def _clamp(x: float, lo: float = -10.0, hi: float = 10.0) -> float:
return max(lo, min(hi, x))
def _format_news_block(news: List[Dict[str, Any]]) -> str:
"""news dict 리스트 → prompt 에 들어가는 텍스트 블록.
summary 가 있으면 title 다음 줄에 indent 해서 포함 (최대 200자).
pub_date 가 있으면 title 앞에 표시.
"""
lines: List[str] = []
for n in news:
date = (n.get("pub_date") or "").strip()
title = (n.get("title") or "").strip()
summary = (n.get("summary") or "").strip()
prefix = f"[{date}] " if date else ""
if summary:
lines.append(f"- {prefix}{title}\n {summary[:200]}")
else:
lines.append(f"- {prefix}{title}")
return "\n".join(lines)
async def score_sentiment(
llm,
ticker: str,
@@ -36,7 +55,7 @@ async def score_sentiment(
model: str = DEFAULT_MODEL,
) -> Dict[str, Any]:
"""Returns {ticker, score_raw, reason, news_count, tokens_input, tokens_output, model}."""
news_block = "\n".join(f"- {n['title']}" for n in news)
news_block = _format_news_block(news)
prompt = PROMPT_TEMPLATE.format(
name=name or ticker, ticker=ticker,
n=len(news), news_block=news_block,

View File

@@ -0,0 +1,70 @@
"""기존 articles 테이블에서 종목별 뉴스 매핑."""
from __future__ import annotations
import datetime as dt
import logging
import sqlite3
from typing import Any, Dict, List, Tuple
log = logging.getLogger(__name__)
def gather_articles_for_tickers(
conn: sqlite3.Connection,
tickers: List[str],
asof: dt.date,
*,
window_days: int = 1,
max_per_ticker: int = 5,
) -> Tuple[Dict[str, List[Dict[str, Any]]], Dict[str, int]]:
"""articles 에서 ticker.name substring 매칭으로 종목별 뉴스 dict 반환.
Returns:
(
{ticker: [{"title": str, "summary": str, "press": str, "pub_date": str}, ...]},
{"total_articles": int, "matched_pairs": int, "hit_tickers": int},
)
"""
out: Dict[str, List[Dict[str, Any]]] = {t: [] for t in tickers}
stats = {"total_articles": 0, "matched_pairs": 0, "hit_tickers": 0}
if not tickers:
return out, stats
cutoff = (asof - dt.timedelta(days=window_days)).isoformat()
placeholders = ",".join("?" * len(tickers))
name_rows = conn.execute(
f"SELECT ticker, name FROM krx_master WHERE ticker IN ({placeholders})",
tickers,
).fetchall()
# 2글자 미만 회사명은 false positive 위험으로 제외
name_map = {r[0]: r[1] for r in name_rows if r[1] and len(r[1]) >= 2}
articles = conn.execute(
"SELECT title, summary, press, pub_date, crawled_at "
"FROM articles WHERE crawled_at >= ? ORDER BY crawled_at DESC",
(cutoff,),
).fetchall()
stats["total_articles"] = len(articles)
for a in articles:
title = (a[0] or "").strip()
summary = (a[1] or "").strip()
haystack = title + " " + summary
for ticker, name in name_map.items():
if name not in haystack:
continue
if len(out[ticker]) >= max_per_ticker:
continue
out[ticker].append({
"title": title,
"summary": summary,
"press": a[2] or "",
"pub_date": a[3] or "",
})
stats["matched_pairs"] += 1
stats["hit_tickers"] = sum(1 for arts in out.values() if arts)
return out, stats

View File

@@ -8,19 +8,17 @@ import logging
import os
import sqlite3
import time
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List
import httpx
from . import scraper as _scraper
from . import scraper as _scraper # legacy, kept for backward import
from . import analyzer as _analyzer
from . import articles_source # 신규
log = logging.getLogger(__name__)
DEFAULT_TOP_N = 100
DEFAULT_CONCURRENCY = 10
DEFAULT_NEWS_PER_TICKER = 5
DEFAULT_RATE_LIMIT_SEC = 0.2
def _top_market_cap_tickers(conn: sqlite3.Connection, n: int) -> List[str]:
@@ -33,10 +31,6 @@ def _top_market_cap_tickers(conn: sqlite3.Connection, n: int) -> List[str]:
return [r[0] for r in rows]
def _make_http():
return httpx.AsyncClient(timeout=10.0, headers=_scraper.NAVER_HEADERS)
def _make_llm():
"""Anthropic AsyncClient — env에 ANTHROPIC_API_KEY 필수."""
from anthropic import AsyncAnthropic
@@ -44,47 +38,40 @@ def _make_llm():
async def _process_one(
ticker: str, name: str, sem: asyncio.Semaphore,
http_client, llm, news_per_ticker: int, rate_limit_sec: float, model: str,
ticker: str, name: str, articles: List[Dict[str, Any]],
sem: asyncio.Semaphore, llm, model: str,
) -> Dict[str, Any]:
async with sem:
if rate_limit_sec > 0:
await asyncio.sleep(rate_limit_sec)
news = await _scraper.fetch_news(http_client, ticker, n=news_per_ticker)
if not news:
return {
"ticker": ticker, "score_raw": 0.0, "reason": "no news",
"news_count": 0, "tokens_input": 0, "tokens_output": 0,
"model": model,
}
return await _analyzer.score_sentiment(
llm, ticker, news, name=name, model=model,
llm, ticker, articles, name=name, model=model,
)
def _upsert_news_sentiment(
conn: sqlite3.Connection, asof: dt.date, rows: List[Dict[str, Any]]
conn: sqlite3.Connection, asof: dt.date,
rows: List[Dict[str, Any]], *, source: str = "articles",
) -> None:
iso = asof.isoformat()
data = [
(
r["ticker"], iso, r["score_raw"], r["reason"], r["news_count"],
r["tokens_input"], r["tokens_output"], r["model"],
r["tokens_input"], r["tokens_output"], r["model"], source,
)
for r in rows
]
conn.executemany(
"""INSERT INTO news_sentiment
(ticker, date, score_raw, reason, news_count,
tokens_input, tokens_output, model)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
tokens_input, tokens_output, model, source)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(ticker, date) DO UPDATE SET
score_raw=excluded.score_raw,
reason=excluded.reason,
news_count=excluded.news_count,
tokens_input=excluded.tokens_input,
tokens_output=excluded.tokens_output,
model=excluded.model
model=excluded.model,
source=excluded.source
""",
data,
)
@@ -97,11 +84,10 @@ async def refresh_daily(
*,
top_n: int = DEFAULT_TOP_N,
concurrency: int = DEFAULT_CONCURRENCY,
news_per_ticker: int = DEFAULT_NEWS_PER_TICKER,
rate_limit_sec: float = DEFAULT_RATE_LIMIT_SEC,
max_news_per_ticker: int = DEFAULT_NEWS_PER_TICKER,
window_days: int = 1,
model: str = _analyzer.DEFAULT_MODEL,
) -> Dict[str, Any]:
"""Returns summary dict with top_pos/top_neg/token totals/failures."""
started = time.time()
tickers = _top_market_cap_tickers(conn, n=top_n)
name_map = {
@@ -111,16 +97,20 @@ async def refresh_daily(
).fetchall()
} if tickers else {}
sem = asyncio.Semaphore(concurrency)
articles_by_ticker, mapping_stats = articles_source.gather_articles_for_tickers(
conn, tickers, asof,
window_days=window_days,
max_per_ticker=max_news_per_ticker,
)
async with _make_http() as http_client, _make_llm() as llm:
tasks = [
_process_one(
t, name_map.get(t, t), sem, http_client, llm,
news_per_ticker, rate_limit_sec, model,
)
for t in tickers
]
sem = asyncio.Semaphore(concurrency)
async with _make_llm() as llm:
tasks = []
for t in tickers:
arts = articles_by_ticker.get(t, [])
if not arts:
continue # 매핑 0 — score 미생성
tasks.append(_process_one(t, name_map.get(t, t), arts, sem, llm, model))
raw_results = await asyncio.gather(*tasks, return_exceptions=True)
successes: List[Dict[str, Any]] = []
@@ -132,7 +122,7 @@ async def refresh_daily(
successes.append(r)
if successes:
_upsert_news_sentiment(conn, asof, successes)
_upsert_news_sentiment(conn, asof, successes, source="articles")
top_pos = sorted(successes, key=lambda r: -r["score_raw"])[:5]
top_neg = sorted(successes, key=lambda r: r["score_raw"])[:5]
@@ -147,4 +137,5 @@ async def refresh_daily(
"top_pos": top_pos,
"top_neg": top_neg,
"model": model,
"mapping": mapping_stats,
}

View File

@@ -1,4 +1,11 @@
"""네이버 finance 종목 뉴스 스크래핑."""
"""[DEPRECATED] 네이버 finance 종목 뉴스 스크래핑.
본 모듈은 ai_news Phase 1 (2026-05-14) 에서 더 이상 파이프라인에서 사용되지 않음.
데이터 소스는 stock-lab 의 articles 테이블 (ai_news/articles_source.py) 로 전환됨.
삭제 시점: Phase 2 (DART 도입) 결정 후. IC 검증 4주 누적 후 노드 활성화
여부에 따라 본 모듈을 (a) 완전 삭제 또는 (b) ensemble fallback 으로 재활용.
"""
from __future__ import annotations

View File

@@ -37,6 +37,7 @@ def build_message(
top_neg: List[Dict[str, Any]],
tokens_input: int,
tokens_output: int,
mapping: Dict[str, int] | None = None,
) -> str:
lines: List[str] = [
f"🌅 *AI 뉴스 분석* \\({_escape(asof)} 08:00\\)",
@@ -57,9 +58,16 @@ def build_message(
lines.append(_escape("- (없음)"))
cost = _cost_won(tokens_input, tokens_output)
mapping_part = ""
if mapping:
mapping_part = (
f"매핑 {mapping['hit_tickers']}/100 ticker "
f"\\({mapping['matched_pairs']}쌍 / articles {mapping['total_articles']}\\) · "
)
lines += [
"",
f"_분석: 시총 상위 100종목 · 토큰 {tokens_input:,} in / {tokens_output:,} out · "
f"_분석: 시총 상위 100종목 · {mapping_part}"
f"토큰 {tokens_input:,} in / {tokens_output:,} out · "
f"약 ₩{cost:,}_",
]
return "\n".join(lines)

View File

@@ -309,6 +309,7 @@ async def post_refresh_news_sentiment(asof: Optional[str] = None):
top_pos=summary["top_pos"], top_neg=summary["top_neg"],
tokens_input=summary["tokens_input"],
tokens_output=summary["tokens_output"],
mapping=summary.get("mapping"),
)
return summary

View File

@@ -115,6 +115,22 @@ CREATE TABLE IF NOT EXISTS screener_results (
);
CREATE INDEX IF NOT EXISTS idx_results_run_rank ON screener_results(run_id, rank);
-- articles 테이블 (도메스틱/해외 뉴스 원본).
-- 메인 app.db.init_db() 에서도 생성하지만, 테스트 환경 및 단독 screener 컨텍스트
-- (ai_news.articles_source 등)에서도 참조 가능하도록 idempotent 하게 보장한다.
CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
hash TEXT UNIQUE NOT NULL,
category TEXT DEFAULT 'domestic',
title TEXT NOT NULL,
link TEXT,
summary TEXT,
press TEXT,
pub_date TEXT,
crawled_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_articles_crawled ON articles(crawled_at DESC);
CREATE TABLE IF NOT EXISTS news_sentiment (
ticker TEXT NOT NULL,
date TEXT NOT NULL,
@@ -124,6 +140,7 @@ CREATE TABLE IF NOT EXISTS news_sentiment (
tokens_input INTEGER NOT NULL DEFAULT 0,
tokens_output INTEGER NOT NULL DEFAULT 0,
model TEXT NOT NULL DEFAULT 'claude-haiku-4-5-20251001',
source TEXT NOT NULL DEFAULT 'articles',
created_at TEXT NOT NULL DEFAULT (datetime('now','localtime')),
PRIMARY KEY (ticker, date)
);
@@ -158,6 +175,15 @@ def ensure_screener_schema(conn: sqlite3.Connection) -> None:
"UPDATE screener_settings SET weights_json=?, node_params_json=? WHERE id=1",
(json.dumps(w), json.dumps(p)),
)
# news_sentiment.source 컬럼 1회 추가 (기존 운영 환경)
cols = {r[1] for r in conn.execute(
"PRAGMA table_info(news_sentiment)"
).fetchall()}
if "source" not in cols:
conn.execute(
"ALTER TABLE news_sentiment "
"ADD COLUMN source TEXT NOT NULL DEFAULT 'articles'"
)
existing = conn.execute("SELECT id FROM screener_settings WHERE id=1").fetchone()
if existing is None:
now = datetime.now(timezone.utc).isoformat()

View File

@@ -17,7 +17,10 @@ def _mk_llm(content_text: str, in_tokens: int = 100, out_tokens: int = 20):
return llm
NEWS = [{"title": "삼성전자, HBM 양산"}, {"title": "메모리 가격 반등"}]
NEWS = [
{"title": "삼성전자, HBM 양산", "summary": "1분기 영업이익 사상 최대", "pub_date": "2026-05-14"},
{"title": "메모리 가격 반등", "summary": "", "pub_date": "2026-05-14"},
]
@pytest.mark.asyncio
@@ -53,3 +56,15 @@ async def test_score_sentiment_clamps_negative_out_of_range():
llm = _mk_llm(json.dumps({"score": -42.0, "reason": "초악재"}))
out = await analyzer.score_sentiment(llm, "005930", NEWS)
assert out["score_raw"] == -10.0
@pytest.mark.asyncio
async def test_score_sentiment_includes_summary_in_prompt():
"""summary 가 있으면 prompt 에 포함, 없으면 title 만."""
llm = _mk_llm(json.dumps({"score": 5.0, "reason": "ok"}))
await analyzer.score_sentiment(llm, "005930", NEWS, name="삼성전자")
call = llm.messages.create.call_args
user_msg = call.kwargs["messages"][0]["content"]
assert "1분기 영업이익 사상 최대" in user_msg # summary 포함
assert "삼성전자, HBM 양산" in user_msg # title 포함
assert "2026-05-14" in user_msg # pub_date 포함

View File

@@ -0,0 +1,108 @@
import datetime as dt
import sqlite3
import pytest
from app.screener.ai_news import articles_source
from app.screener.schema import ensure_screener_schema
@pytest.fixture
def conn():
c = sqlite3.connect(":memory:")
c.row_factory = sqlite3.Row
ensure_screener_schema(c)
yield c
c.close()
def _seed_master(conn, ticker, name):
conn.execute(
"INSERT INTO krx_master (ticker, name, market, market_cap, updated_at) "
"VALUES (?, ?, 'KOSPI', 1000000000, datetime('now'))",
(ticker, name),
)
def _seed_article(conn, title, summary="", crawled_at="2026-05-14T07:30:00"):
import hashlib
h = hashlib.md5(f"{title}|x".encode()).hexdigest()
conn.execute(
"INSERT INTO articles (hash, title, summary, link, press, pub_date, crawled_at) "
"VALUES (?, ?, ?, '', '', '2026-05-14', ?)",
(h, title, summary, crawled_at),
)
ASOF = dt.date(2026, 5, 14)
def test_single_ticker_match_in_title(conn):
_seed_master(conn, "005930", "삼성전자")
_seed_article(conn, "삼성전자, HBM 양산 가시화")
conn.commit()
out, stats = articles_source.gather_articles_for_tickers(
conn, ["005930"], ASOF, window_days=1, max_per_ticker=5,
)
assert len(out["005930"]) == 1
assert out["005930"][0]["title"] == "삼성전자, HBM 양산 가시화"
assert stats["matched_pairs"] == 1
assert stats["hit_tickers"] == 1
def test_single_ticker_match_in_summary(conn):
_seed_master(conn, "005930", "삼성전자")
_seed_article(conn, "메모리 시장 회복세", summary="삼성전자가 1분기 어닝 서프라이즈")
conn.commit()
out, _ = articles_source.gather_articles_for_tickers(
conn, ["005930"], ASOF, window_days=1, max_per_ticker=5,
)
assert len(out["005930"]) == 1
def test_multi_ticker_match(conn):
_seed_master(conn, "005930", "삼성전자")
_seed_master(conn, "000660", "SK하이닉스")
_seed_article(conn, "삼성전자와 SK하이닉스, 메모리 양산 경쟁")
conn.commit()
out, stats = articles_source.gather_articles_for_tickers(
conn, ["005930", "000660"], ASOF, window_days=1, max_per_ticker=5,
)
assert len(out["005930"]) == 1
assert len(out["000660"]) == 1
assert stats["matched_pairs"] == 2
assert stats["hit_tickers"] == 2
def test_no_match_returns_empty_list(conn):
_seed_master(conn, "005930", "삼성전자")
_seed_article(conn, "엔비디아 실적 발표", summary="AI 칩 수요 견조")
conn.commit()
out, stats = articles_source.gather_articles_for_tickers(
conn, ["005930"], ASOF, window_days=1, max_per_ticker=5,
)
assert out["005930"] == []
assert stats["matched_pairs"] == 0
assert stats["hit_tickers"] == 0
def test_max_per_ticker_caps_results(conn):
_seed_master(conn, "005930", "삼성전자")
for i in range(6):
_seed_article(conn, f"삼성전자 뉴스 #{i}", crawled_at=f"2026-05-14T0{i}:00:00")
conn.commit()
out, _ = articles_source.gather_articles_for_tickers(
conn, ["005930"], ASOF, window_days=1, max_per_ticker=5,
)
assert len(out["005930"]) == 5
def test_window_days_filters_old_articles(conn):
_seed_master(conn, "005930", "삼성전자")
_seed_article(conn, "삼성전자 최신 뉴스", crawled_at="2026-05-14T07:00:00")
_seed_article(conn, "삼성전자 오래된 뉴스", crawled_at="2026-05-01T07:00:00")
conn.commit()
out, _ = articles_source.gather_articles_for_tickers(
conn, ["005930"], ASOF, window_days=1, max_per_ticker=5,
)
assert len(out["005930"]) == 1
assert "최신" in out["005930"][0]["title"]

View File

@@ -26,12 +26,15 @@ def conn():
@pytest.mark.asyncio
async def test_refresh_daily_happy_path(conn):
"""3종목 mini integration — 각 종목별로 scraper/analyzer mock."""
"""3종목 mini integration — articles_source mock + analyzer mock."""
asof = dt.date(2026, 5, 13)
fake_news = [{"title": "헤드라인"}]
async def fake_fetch(client, ticker, n):
return fake_news
fake_articles_by_ticker = {
"005930": [{"title": "삼성 뉴스", "summary": "", "press": "", "pub_date": ""}],
"000660": [{"title": "SK 뉴스", "summary": "", "press": "", "pub_date": ""}],
"373220": [{"title": "LG 뉴스", "summary": "", "press": "", "pub_date": ""}],
}
fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3}
scores_by_ticker = {
"005930": 7.5, "000660": 4.0, "373220": -6.0,
@@ -43,43 +46,40 @@ async def test_refresh_daily_happy_path(conn):
"tokens_input": 100, "tokens_output": 20, "model": model,
}
with patch.object(pipeline, "_scraper") as ms, \
with patch.object(pipeline, "articles_source") as mas, \
patch.object(pipeline, "_analyzer") as ma, \
patch.object(pipeline, "_make_llm") as ml, \
patch.object(pipeline, "_make_http") as mh:
ms.fetch_news = fake_fetch
patch.object(pipeline, "_make_llm") as ml:
mas.gather_articles_for_tickers = MagicMock(
return_value=(fake_articles_by_ticker, fake_stats)
)
ma.score_sentiment = fake_score
ml.return_value.__aenter__.return_value = AsyncMock()
ml.return_value.__aexit__.return_value = None
mh.return_value.__aenter__.return_value = AsyncMock()
mh.return_value.__aexit__.return_value = None
result = await pipeline.refresh_daily(conn, asof, concurrency=3, rate_limit_sec=0)
result = await pipeline.refresh_daily(conn, asof, concurrency=3)
assert result["asof"] == "2026-05-13"
assert result["updated"] == 3
assert result["failures"] == []
assert len(result["top_pos"]) == 3
assert result["top_pos"][0]["ticker"] == "005930" # 가장 큰 점수
assert result["top_neg"][0]["ticker"] == "373220" # 가장 작은 점수
assert result["tokens_input"] == 300
assert result["tokens_output"] == 60
assert result["top_pos"][0]["ticker"] == "005930"
assert result["top_neg"][0]["ticker"] == "373220"
assert result["mapping"] == fake_stats
# DB upsert 확인
rows = conn.execute("SELECT ticker, score_raw FROM news_sentiment WHERE date=?",
("2026-05-13",)).fetchall()
rows = conn.execute("SELECT ticker, score_raw, source FROM news_sentiment "
"WHERE date=?", ("2026-05-13",)).fetchall()
assert len(rows) == 3
by_ticker = {r["ticker"]: r["score_raw"] for r in rows}
assert by_ticker["005930"] == 7.5
assert by_ticker["373220"] == -6.0
assert all(r["source"] == "articles" for r in rows)
@pytest.mark.asyncio
async def test_refresh_daily_failures_isolated(conn):
"""한 종목이 예외 던져도 나머지 종목은 정상 처리."""
asof = dt.date(2026, 5, 13)
async def fake_fetch(client, ticker, n):
return [{"title": "h"}]
fake_articles_by_ticker = {
"005930": [{"title": "h", "summary": "", "press": "", "pub_date": ""}],
"000660": [{"title": "h", "summary": "", "press": "", "pub_date": ""}],
"373220": [{"title": "h", "summary": "", "press": "", "pub_date": ""}],
}
fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3}
async def fake_score(llm, ticker, news, *, name=None, model="m"):
if ticker == "000660":
@@ -89,22 +89,57 @@ async def test_refresh_daily_failures_isolated(conn):
"tokens_input": 100, "tokens_output": 20, "model": model,
}
with patch.object(pipeline, "_scraper") as ms, \
with patch.object(pipeline, "articles_source") as mas, \
patch.object(pipeline, "_analyzer") as ma, \
patch.object(pipeline, "_make_llm") as ml, \
patch.object(pipeline, "_make_http") as mh:
ms.fetch_news = fake_fetch
patch.object(pipeline, "_make_llm") as ml:
mas.gather_articles_for_tickers = MagicMock(
return_value=(fake_articles_by_ticker, fake_stats)
)
ma.score_sentiment = fake_score
ml.return_value.__aenter__.return_value = AsyncMock()
ml.return_value.__aexit__.return_value = None
mh.return_value.__aenter__.return_value = AsyncMock()
mh.return_value.__aexit__.return_value = None
result = await pipeline.refresh_daily(conn, asof, concurrency=3, rate_limit_sec=0)
result = await pipeline.refresh_daily(conn, asof, concurrency=3)
assert result["updated"] == 2
assert len(result["failures"]) == 1
@pytest.mark.asyncio
async def test_refresh_daily_no_match_ticker_skipped(conn):
"""매핑 0인 ticker 는 LLM 호출 skip + news_sentiment 행 미생성."""
asof = dt.date(2026, 5, 13)
fake_articles_by_ticker = {
"005930": [{"title": "삼성", "summary": "", "press": "", "pub_date": ""}],
"000660": [], # 매핑 없음
"373220": [], # 매핑 없음
}
fake_stats = {"total_articles": 1, "matched_pairs": 1, "hit_tickers": 1}
async def fake_score(llm, ticker, news, *, name=None, model="m"):
return {
"ticker": ticker, "score_raw": 5.0, "reason": "r",
"news_count": 1, "tokens_input": 100, "tokens_output": 20,
"model": model,
}
with patch.object(pipeline, "articles_source") as mas, \
patch.object(pipeline, "_analyzer") as ma, \
patch.object(pipeline, "_make_llm") as ml:
mas.gather_articles_for_tickers = MagicMock(
return_value=(fake_articles_by_ticker, fake_stats)
)
ma.score_sentiment = fake_score
ml.return_value.__aenter__.return_value = AsyncMock()
ml.return_value.__aexit__.return_value = None
result = await pipeline.refresh_daily(conn, asof, concurrency=3)
assert result["updated"] == 1
rows = conn.execute("SELECT ticker FROM news_sentiment "
"WHERE date=?", ("2026-05-13",)).fetchall()
assert {r["ticker"] for r in rows} == {"005930"}
def test_top_market_cap_tickers(conn):
out = pipeline._top_market_cap_tickers(conn, n=2)
assert out == ["005930", "000660"]

View File

@@ -20,17 +20,17 @@ def test_refresh_news_sentiment_weekday_invokes_pipeline():
"asof": "2026-05-13", "updated": 3, "failures": [],
"duration_sec": 1.0, "tokens_input": 100, "tokens_output": 20,
"top_pos": [], "top_neg": [], "model": "m",
"mapping": {"total_articles": 5, "matched_pairs": 8, "hit_tickers": 3},
}
with patch("app.screener.router._ai_pipeline") as mp, \
patch("app.screener.router._ai_telegram") as mt:
mp.refresh_daily = AsyncMock(return_value=fake_summary)
mt.build_message = lambda **kw: "BUILT_TEXT"
mt.build_message = lambda **kw: f"TEXT_with_mapping={kw.get('mapping')}"
client = TestClient(app)
resp = client.post(
"/api/stock/screener/snapshot/refresh-news-sentiment?asof=2026-05-13"
)
assert resp.status_code == 200
body = resp.json()
assert body["asof"] == "2026-05-13"
assert body["updated"] == 3
assert body["telegram_text"] == "BUILT_TEXT"
assert body["mapping"]["hit_tickers"] == 3
assert "mapping=" in body["telegram_text"]

View File

@@ -53,3 +53,27 @@ def test_build_message_empty_lists():
# 빈 리스트라도 헤더는 있어야 함
assert "호재 Top" in msg
assert "악재 Top" in msg
def test_build_message_includes_mapping_line():
msg = tg.build_message(
asof="2026-05-14",
top_pos=[_row("005930", 8.5, "HBM 호재")],
top_neg=[],
tokens_input=1000, tokens_output=200,
mapping={"total_articles": 35, "matched_pairs": 50, "hit_tickers": 42},
)
assert "매핑" in msg
assert "42" in msg
assert "50" in msg
assert "35" in msg
def test_build_message_without_mapping_omits_line():
msg = tg.build_message(
asof="2026-05-14",
top_pos=[],
top_neg=[],
tokens_input=1000, tokens_output=200,
)
assert "매핑" not in msg