"""Screener engine — ScreenContext (Phase 0) + Screener/combine (Phase 2).""" from __future__ import annotations import datetime as dt import sqlite3 from dataclasses import dataclass, replace import pandas as pd @dataclass(frozen=True) class ScreenContext: """1회 실행 동안 공유되는 읽기 전용 데이터 컨테이너.""" master: pd.DataFrame # index=ticker prices: pd.DataFrame # cols: ticker,date,open,high,low,close,volume,value flow: pd.DataFrame # cols: ticker,date,foreign_net,institution_net kospi: pd.Series # index=date(str), name="kospi" asof: dt.date news_sentiment: "pd.DataFrame | None" = None @classmethod def load(cls, conn: sqlite3.Connection, asof: dt.date, lookback_days: int = 252 * 2) -> "ScreenContext": cutoff = (asof - dt.timedelta(days=int(lookback_days * 1.5))).isoformat() asof_iso = asof.isoformat() master = pd.read_sql_query( "SELECT * FROM krx_master", conn, index_col="ticker", ) prices = pd.read_sql_query( "SELECT ticker,date,open,high,low,close,volume,value " "FROM krx_daily_prices WHERE date BETWEEN ? AND ? ORDER BY date", conn, params=(cutoff, asof_iso), ) flow = pd.read_sql_query( "SELECT ticker,date,foreign_net,institution_net " "FROM krx_flow WHERE date BETWEEN ? AND ? ORDER BY date", conn, params=(cutoff, asof_iso), ) news_sentiment = pd.read_sql_query( "SELECT ticker, score_raw, news_count FROM news_sentiment WHERE date = ?", conn, params=(asof_iso,), ) # KOSPI 지수: MVP에서는 005930(삼성전자) 종가를 시장 대용으로 사용. # 후속 슬라이스에서 ^KS11 별도 캐시. kospi = pd.Series(dtype=float, name="kospi") if "005930" in master.index and not prices.empty: sub = prices[prices["ticker"] == "005930"].set_index("date")["close"] kospi = sub.copy() kospi.name = "kospi" return cls(master=master, prices=prices, flow=flow, kospi=kospi, asof=asof, news_sentiment=news_sentiment) def restrict(self, tickers) -> "ScreenContext": tickers = pd.Index(tickers) return replace( self, master=self.master.loc[self.master.index.intersection(tickers)], prices=self.prices[self.prices["ticker"].isin(tickers)], flow=self.flow[self.flow["ticker"].isin(tickers)], ) def latest_close(self) -> pd.Series: if self.prices.empty: return pd.Series(dtype=float) return self.prices.sort_values("date").groupby("ticker")["close"].last() def latest_high(self) -> pd.Series: if self.prices.empty: return pd.Series(dtype=float) return self.prices.sort_values("date").groupby("ticker")["high"].last() # ---- combine + Screener (Phase 2) ---- from . import position_sizer as _ps def combine(scores: dict, weights: dict) -> pd.Series: """Weighted average across score nodes. ValueError if all weights = 0.""" active = {k: w for k, w in weights.items() if w > 0 and k in scores} if not active: raise ValueError("no active score nodes (all weights = 0)") df = pd.DataFrame({k: scores[k] for k in active}) w = pd.Series(active) weighted = (df.fillna(0).multiply(w, axis=1)).sum(axis=1) / w.sum() return weighted @dataclass class ScreenerResult: asof: dt.date survivors_count: int scores: dict # node name → pd.Series weights: dict ranked: pd.Series # ticker → total_score (sorted desc, head=top_n) rows: list # list of dicts (for serialization) warnings: list class Screener: def __init__(self, gate, score_nodes, weights: dict, node_params: dict, gate_params: dict, top_n: int = 20, sizer_params: dict = None): self.gate = gate self.score_nodes = score_nodes self.weights = weights self.node_params = node_params self.gate_params = gate_params self.top_n = top_n self.sizer_params = sizer_params or {"atr_window": 14, "atr_stop_mult": 2.0, "rr_ratio": 2.0} def run(self, ctx: ScreenContext) -> ScreenerResult: warnings: list = [] survivors = self.gate.filter(ctx, self.gate_params) if len(survivors) == 0: raise ValueError("no survivors after hygiene gate") if len(survivors) < 100: warnings.append(f"survivors_count={len(survivors)} < 100 — 백분위 정규화 신뢰도 낮음") scoped = ctx.restrict(survivors) scores: dict = {} for n in self.score_nodes: w = self.weights.get(n.name, 0) if w <= 0: continue try: scores[n.name] = n.compute(scoped, self.node_params.get(n.name, {})) except Exception as e: warnings.append(f"node '{n.name}' failed: {e}") scores[n.name] = pd.Series(0.0, index=scoped.master.index) total = combine(scores, self.weights) ranked = total.sort_values(ascending=False).head(self.top_n) sizing = _ps.plan_positions(scoped, list(ranked.index), self.sizer_params) latest_close = scoped.latest_close() rows = [] for rank_idx, ticker in enumerate(ranked.index, start=1): s = sizing.get(ticker, {}) row = { "rank": rank_idx, "ticker": ticker, "name": str(scoped.master.loc[ticker, "name"]), "total_score": float(ranked.loc[ticker]), "scores": {k: float(v.get(ticker, 0.0)) for k, v in scores.items()}, "close": int(latest_close.get(ticker, 0)), "market_cap": int(scoped.master.loc[ticker, "market_cap"] or 0), "entry_price": s.get("entry_price"), "stop_price": s.get("stop_price"), "target_price": s.get("target_price"), "atr14": s.get("atr14"), "r_pct": s.get("r_pct"), } rows.append(row) return ScreenerResult( asof=ctx.asof, survivors_count=len(survivors), scores=scores, weights=self.weights, ranked=ranked, rows=rows, warnings=warnings, )