Files
web-page-backend/stock-lab/app/screener/engine.py

168 lines
6.5 KiB
Python

"""Screener engine — ScreenContext (Phase 0) + Screener/combine (Phase 2)."""
from __future__ import annotations
import datetime as dt
import sqlite3
from dataclasses import dataclass, replace
import pandas as pd
@dataclass(frozen=True)
class ScreenContext:
"""1회 실행 동안 공유되는 읽기 전용 데이터 컨테이너."""
master: pd.DataFrame # index=ticker
prices: pd.DataFrame # cols: ticker,date,open,high,low,close,volume,value
flow: pd.DataFrame # cols: ticker,date,foreign_net,institution_net
kospi: pd.Series # index=date(str), name="kospi"
asof: dt.date
news_sentiment: "pd.DataFrame | None" = None
@classmethod
def load(cls, conn: sqlite3.Connection, asof: dt.date,
lookback_days: int = 252 * 2) -> "ScreenContext":
cutoff = (asof - dt.timedelta(days=int(lookback_days * 1.5))).isoformat()
asof_iso = asof.isoformat()
master = pd.read_sql_query(
"SELECT * FROM krx_master",
conn, index_col="ticker",
)
prices = pd.read_sql_query(
"SELECT ticker,date,open,high,low,close,volume,value "
"FROM krx_daily_prices WHERE date BETWEEN ? AND ? ORDER BY date",
conn, params=(cutoff, asof_iso),
)
flow = pd.read_sql_query(
"SELECT ticker,date,foreign_net,institution_net "
"FROM krx_flow WHERE date BETWEEN ? AND ? ORDER BY date",
conn, params=(cutoff, asof_iso),
)
news_sentiment = pd.read_sql_query(
"SELECT ticker, score_raw, news_count FROM news_sentiment WHERE date = ?",
conn, params=(asof_iso,),
)
# KOSPI 지수: MVP에서는 005930(삼성전자) 종가를 시장 대용으로 사용.
# 후속 슬라이스에서 ^KS11 별도 캐시.
kospi = pd.Series(dtype=float, name="kospi")
if "005930" in master.index and not prices.empty:
sub = prices[prices["ticker"] == "005930"].set_index("date")["close"]
kospi = sub.copy()
kospi.name = "kospi"
return cls(master=master, prices=prices, flow=flow, kospi=kospi, asof=asof,
news_sentiment=news_sentiment)
def restrict(self, tickers) -> "ScreenContext":
tickers = pd.Index(tickers)
return replace(
self,
master=self.master.loc[self.master.index.intersection(tickers)],
prices=self.prices[self.prices["ticker"].isin(tickers)],
flow=self.flow[self.flow["ticker"].isin(tickers)],
)
def latest_close(self) -> pd.Series:
if self.prices.empty:
return pd.Series(dtype=float)
return self.prices.sort_values("date").groupby("ticker")["close"].last()
def latest_high(self) -> pd.Series:
if self.prices.empty:
return pd.Series(dtype=float)
return self.prices.sort_values("date").groupby("ticker")["high"].last()
# ---- combine + Screener (Phase 2) ----
from . import position_sizer as _ps
def combine(scores: dict, weights: dict) -> pd.Series:
"""Weighted average across score nodes. ValueError if all weights = 0."""
active = {k: w for k, w in weights.items() if w > 0 and k in scores}
if not active:
raise ValueError("no active score nodes (all weights = 0)")
df = pd.DataFrame({k: scores[k] for k in active})
w = pd.Series(active)
weighted = (df.fillna(0).multiply(w, axis=1)).sum(axis=1) / w.sum()
return weighted
@dataclass
class ScreenerResult:
asof: dt.date
survivors_count: int
scores: dict # node name → pd.Series
weights: dict
ranked: pd.Series # ticker → total_score (sorted desc, head=top_n)
rows: list # list of dicts (for serialization)
warnings: list
class Screener:
def __init__(self, gate, score_nodes, weights: dict, node_params: dict,
gate_params: dict, top_n: int = 20, sizer_params: dict = None):
self.gate = gate
self.score_nodes = score_nodes
self.weights = weights
self.node_params = node_params
self.gate_params = gate_params
self.top_n = top_n
self.sizer_params = sizer_params or {"atr_window": 14, "atr_stop_mult": 2.0, "rr_ratio": 2.0}
def run(self, ctx: ScreenContext) -> ScreenerResult:
warnings: list = []
survivors = self.gate.filter(ctx, self.gate_params)
if len(survivors) == 0:
raise ValueError("no survivors after hygiene gate")
if len(survivors) < 100:
warnings.append(f"survivors_count={len(survivors)} < 100 — 백분위 정규화 신뢰도 낮음")
scoped = ctx.restrict(survivors)
scores: dict = {}
for n in self.score_nodes:
w = self.weights.get(n.name, 0)
if w <= 0:
continue
try:
scores[n.name] = n.compute(scoped, self.node_params.get(n.name, {}))
except Exception as e:
warnings.append(f"node '{n.name}' failed: {e}")
scores[n.name] = pd.Series(0.0, index=scoped.master.index)
total = combine(scores, self.weights)
ranked = total.sort_values(ascending=False).head(self.top_n)
sizing = _ps.plan_positions(scoped, list(ranked.index), self.sizer_params)
latest_close = scoped.latest_close()
rows = []
for rank_idx, ticker in enumerate(ranked.index, start=1):
s = sizing.get(ticker, {})
row = {
"rank": rank_idx,
"ticker": ticker,
"name": str(scoped.master.loc[ticker, "name"]),
"total_score": float(ranked.loc[ticker]),
"scores": {k: float(v.get(ticker, 0.0)) for k, v in scores.items()},
"close": int(latest_close.get(ticker, 0)),
"market_cap": int(scoped.master.loc[ticker, "market_cap"] or 0),
"entry_price": s.get("entry_price"),
"stop_price": s.get("stop_price"),
"target_price": s.get("target_price"),
"atr14": s.get("atr14"),
"r_pct": s.get("r_pct"),
}
rows.append(row)
return ScreenerResult(
asof=ctx.asof, survivors_count=len(survivors),
scores=scores, weights=self.weights,
ranked=ranked, rows=rows, warnings=warnings,
)