fix(ai_news): set weight=0 and add Spearman IC validation harness

검증 전 gradient 차단 + IC 측정 인프라.

- schema.py: DEFAULT_WEIGHTS["ai_news"] 0.8 → 0.0
  + 1회성 migration: 기존 운영 row 의 0.8 값 자동 reset
  (사용자가 명시 조정한 다른 값은 그대로 유지)
- ai_news/validation.py: compute_ic() — 일자별 score_raw × forward
  return Spearman 상관, ic_mean/ic_std/ic_per_day 반환, verdict 분류
  (skip/weak/strong)
- router.py: GET /api/stock/screener/ai-news/ic?days=30&horizon=1
- 단위 테스트 5개: empty DB, strong +IC, random ≈0 IC, min_news_count
  필터, horizon=5

배경: adversarial review 결과 — ai_news 가중치 0.8 이 검증 없이 출시됨.
4주+ 데이터 누적 후 IC > 0.05 확인 전까지 데이터 수집은 계속하되
가중합 영향만 차단. 운영 DB row 의 0.8 → 0.0 자동 reset 도 같은 의도.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-14 01:06:02 +09:00
parent 06162b1e6e
commit 943f676414
4 changed files with 271 additions and 1 deletions

View File

@@ -0,0 +1,125 @@
"""AI news sentiment validation — Spearman IC vs forward returns.
핵심 metric: 일자별 score_raw 와 다음 N일 forward return 의 Spearman 상관.
4주+ 누적 후 IC mean > 0.05 면 weight 활성화 가치 있음.
"""
from __future__ import annotations
import datetime as dt
import sqlite3
from typing import Any, Dict, List, Optional
import pandas as pd
def _spearman(a: pd.Series, b: pd.Series) -> Optional[float]:
"""Spearman rank correlation. None if insufficient/degenerate data."""
if len(a) < 5 or len(b) < 5:
return None
if a.std(ddof=0) == 0 or b.std(ddof=0) == 0:
return None
return float(a.rank().corr(b.rank()))
def compute_ic(
conn: sqlite3.Connection,
*,
days: int = 30,
horizon: int = 1,
min_news_count: int = 1,
asof_today: Optional[dt.date] = None,
) -> Dict[str, Any]:
"""Compute daily Spearman IC of ai_news.score_raw vs forward return.
Returns:
{
"horizon_days": int,
"min_news_count": int,
"window_days": int,
"ic_count": int, # 유효 일수
"ic_mean": float | None,
"ic_std": float | None,
"ic_per_day": [{"date": "YYYY-MM-DD", "ic": float, "n": int}, ...],
"verdict": "skip" | "weak" | "strong",
}
verdict:
- skip: ic_count < 10
- weak: ic_mean in [-0.05, 0.05]
- strong: |ic_mean| > 0.05
"""
asof_today = asof_today or dt.date.today()
cutoff = (asof_today - dt.timedelta(days=days)).isoformat()
sentiment = pd.read_sql_query(
"SELECT ticker, date, score_raw, news_count "
"FROM news_sentiment WHERE date >= ? AND news_count >= ? ORDER BY date",
conn, params=(cutoff, min_news_count),
)
if sentiment.empty:
return _empty_result(days, horizon, min_news_count)
# forward return 조회: 각 (ticker, date) 에 대해 close[date+horizon] / close[date] - 1
prices = pd.read_sql_query(
"SELECT ticker, date, close FROM krx_daily_prices "
"WHERE date >= ? ORDER BY ticker, date",
conn, params=(cutoff,),
)
if prices.empty:
return _empty_result(days, horizon, min_news_count)
prices = prices.sort_values(["ticker", "date"])
prices["fwd_close"] = prices.groupby("ticker", group_keys=False)["close"].shift(-horizon)
prices["fwd_ret"] = prices["fwd_close"] / prices["close"] - 1.0
merged = sentiment.merge(
prices[["ticker", "date", "fwd_ret"]], on=["ticker", "date"], how="inner"
)
merged = merged.dropna(subset=["fwd_ret"])
if merged.empty:
return _empty_result(days, horizon, min_news_count)
ic_rows: List[Dict[str, Any]] = []
for date, grp in merged.groupby("date"):
ic = _spearman(grp["score_raw"], grp["fwd_ret"])
if ic is not None:
ic_rows.append({"date": date, "ic": ic, "n": int(len(grp))})
if not ic_rows:
return _empty_result(days, horizon, min_news_count)
ic_series = pd.Series([r["ic"] for r in ic_rows], dtype=float)
ic_mean = float(ic_series.mean())
ic_std = float(ic_series.std(ddof=0)) if len(ic_series) > 1 else 0.0
if len(ic_rows) < 10:
verdict = "skip"
elif abs(ic_mean) > 0.05:
verdict = "strong"
else:
verdict = "weak"
return {
"horizon_days": horizon,
"min_news_count": min_news_count,
"window_days": days,
"ic_count": len(ic_rows),
"ic_mean": round(ic_mean, 4),
"ic_std": round(ic_std, 4),
"ic_per_day": ic_rows,
"verdict": verdict,
}
def _empty_result(days: int, horizon: int, min_news_count: int) -> Dict[str, Any]:
return {
"horizon_days": horizon,
"min_news_count": min_news_count,
"window_days": days,
"ic_count": 0,
"ic_mean": None,
"ic_std": None,
"ic_per_day": [],
"verdict": "skip",
}

View File

@@ -280,6 +280,7 @@ def list_runs(limit: int = 30):
from .ai_news import pipeline as _ai_pipeline from .ai_news import pipeline as _ai_pipeline
from .ai_news import telegram as _ai_telegram from .ai_news import telegram as _ai_telegram
from .ai_news import validation as _ai_validation
@router.post("/snapshot/refresh-news-sentiment") @router.post("/snapshot/refresh-news-sentiment")
@@ -312,6 +313,23 @@ async def post_refresh_news_sentiment(asof: Optional[str] = None):
return summary return summary
# ---------- /ai-news/ic ----------
@router.get("/ai-news/ic")
def get_ai_news_ic(days: int = 30, horizon: int = 1, min_news_count: int = 1):
"""ai_news.score_raw 의 forward return IC (Spearman) 계산.
verdict:
- skip: ic_count < 10 (데이터 부족)
- weak: |ic_mean| <= 0.05
- strong: |ic_mean| > 0.05 (gradient 활성화 가치 있음)
"""
with _conn() as c:
return _ai_validation.compute_ic(
c, days=days, horizon=horizon, min_news_count=min_news_count,
)
@router.get("/runs/{run_id}") @router.get("/runs/{run_id}")
def get_run(run_id: int): def get_run(run_id: int):
with _conn() as c: with _conn() as c:

View File

@@ -12,7 +12,9 @@ DEFAULT_WEIGHTS = {
"rs_rating": 1.2, "rs_rating": 1.2,
"ma_alignment": 1.0, "ma_alignment": 1.0,
"vcp_lite": 0.8, "vcp_lite": 0.8,
"ai_news": 0.8, # ai_news: 검증 전 gradient 차단 (4주 IC > 0.05 확인 후 활성화).
# 데이터 수집은 계속, 가중합 영향만 0.
"ai_news": 0.0,
} }
DEFAULT_NODE_PARAMS = { DEFAULT_NODE_PARAMS = {
"foreign_buy": {"window_days": 5}, "foreign_buy": {"window_days": 5},
@@ -143,6 +145,11 @@ def ensure_screener_schema(conn: sqlite3.Connection) -> None:
if "ai_news" not in w: if "ai_news" not in w:
w["ai_news"] = DEFAULT_WEIGHTS["ai_news"] w["ai_news"] = DEFAULT_WEIGHTS["ai_news"]
changed = True changed = True
# One-time reset: ai_news default 0.8 → 0.0 (검증 전 gradient 차단).
# 사용자가 명시적으로 0.8 외 값을 설정했다면 영향 없음.
elif w.get("ai_news") == 0.8:
w["ai_news"] = 0.0
changed = True
if "ai_news" not in p: if "ai_news" not in p:
p["ai_news"] = DEFAULT_NODE_PARAMS["ai_news"] p["ai_news"] = DEFAULT_NODE_PARAMS["ai_news"]
changed = True changed = True

View File

@@ -0,0 +1,120 @@
"""Tests for ai_news validation harness (Spearman IC)."""
import datetime as dt
import sqlite3
import pytest
from app.screener.ai_news import validation
from app.screener.schema import ensure_screener_schema
@pytest.fixture
def conn():
c = sqlite3.connect(":memory:")
c.row_factory = sqlite3.Row
ensure_screener_schema(c)
yield c
c.close()
def _seed_sentiment(conn, date, ticker, score, news_count=3):
conn.execute(
"INSERT INTO news_sentiment (ticker, date, score_raw, reason, news_count, "
"tokens_input, tokens_output, model) "
"VALUES (?, ?, ?, 'r', ?, 100, 20, 'm')",
(ticker, date, score, news_count),
)
def _seed_price(conn, ticker, date, close):
conn.execute(
"INSERT INTO krx_daily_prices (ticker, date, close) VALUES (?, ?, ?)",
(ticker, date, close),
)
def test_empty_db_returns_skip(conn):
out = validation.compute_ic(conn, days=30, horizon=1, asof_today=dt.date(2026, 5, 14))
assert out["ic_count"] == 0
assert out["verdict"] == "skip"
assert out["ic_mean"] is None
def test_strong_positive_ic(conn):
"""5종목 × 12일 — 점수가 높을수록 다음날 수익률 높게 시드 → IC ≈ +1.
score 가 변하지 않는 ticker × day-wise close 로 정확한 monotonic 관계 시드.
"""
base_date = dt.date(2026, 5, 1)
# 가격 13일치 시드 (day0..day12). ticker별 base 다르고 (score-기반) day마다 다른 close.
for i, ticker in enumerate(["A", "B", "C", "D", "E"]):
score = i * 2.0 - 4.0 # ticker별 score 고정 (-4, -2, 0, +2, +4)
# day 0 close=100, day n close=100+(score × n)
for day in range(13):
d = (base_date + dt.timedelta(days=day)).isoformat()
_seed_price(conn, ticker, d, 100.0 + score * day)
if day < 12:
_seed_sentiment(conn, d, ticker, score)
conn.commit()
out = validation.compute_ic(conn, days=30, horizon=1, asof_today=dt.date(2026, 5, 14))
assert out["ic_count"] >= 10
assert out["ic_mean"] > 0.5
assert out["verdict"] == "strong"
def test_zero_ic_random_data(conn):
"""점수와 수익률이 무관 → IC ≈ 0."""
import random
random.seed(42)
base_date = dt.date(2026, 5, 1)
for ticker in ["A", "B", "C", "D", "E", "F", "G"]:
for day in range(13):
d = (base_date + dt.timedelta(days=day)).isoformat()
_seed_price(conn, ticker, d, 100.0 + random.uniform(-5, 5))
if day < 12:
_seed_sentiment(conn, d, ticker, random.uniform(-10, 10))
conn.commit()
out = validation.compute_ic(conn, days=30, horizon=1, asof_today=dt.date(2026, 5, 14))
assert out["ic_count"] >= 10
assert abs(out["ic_mean"]) < 0.3 # 약한 신호 — verdict는 weak 가능
assert out["verdict"] in ("weak", "strong") # 시드에 따라 약간 흔들림
def test_min_news_count_filter(conn):
"""news_count < min_news_count 인 row 는 제외."""
_seed_sentiment(conn, "2026-05-13", "A", 5.0, news_count=0)
_seed_sentiment(conn, "2026-05-13", "B", -5.0, news_count=3)
_seed_price(conn, "A", "2026-05-13", 100.0)
_seed_price(conn, "A", "2026-05-14", 105.0)
_seed_price(conn, "B", "2026-05-13", 100.0)
_seed_price(conn, "B", "2026-05-14", 95.0)
conn.commit()
out = validation.compute_ic(
conn, days=30, horizon=1, min_news_count=1,
asof_today=dt.date(2026, 5, 14),
)
# A 가 필터됨 → 1종목만 남으면 Spearman 계산 불가 (< 5) → skip
assert out["ic_count"] == 0
def test_horizon_5_days(conn):
"""horizon=5 면 close[date+5] / close[date] - 1 사용."""
base_date = dt.date(2026, 5, 1)
for day in range(20):
d = (base_date + dt.timedelta(days=day)).isoformat()
for i, ticker in enumerate(["A", "B", "C", "D", "E"]):
_seed_sentiment(conn, d, ticker, i * 2.0 - 4.0)
# 가격: A=오름, B=오름, C=평, D=내림, E=내림
for day in range(25):
d = (base_date + dt.timedelta(days=day)).isoformat()
for i, ticker in enumerate(["A", "B", "C", "D", "E"]):
slope = i - 2 # -2 ~ +2
_seed_price(conn, ticker, d, 100.0 + slope * day)
conn.commit()
out = validation.compute_ic(conn, days=30, horizon=5, asof_today=dt.date(2026, 5, 25))
assert out["horizon_days"] == 5
assert out["ic_count"] > 0