feat(stock-lab): Screener 엔진 + combine + ScreenerResult + 노드 레지스트리

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-12 09:29:10 +09:00
parent 97cb38ca7f
commit 7db9869722
4 changed files with 177 additions and 4 deletions

View File

@@ -3,8 +3,10 @@
See docs/superpowers/specs/2026-05-12-stock-screener-board-design.md See docs/superpowers/specs/2026-05-12-stock-screener-board-design.md
""" """
# Phase 2 완료 후 활성화: from .engine import Screener, ScreenContext, ScreenerResult
# from .engine import Screener, ScreenContext, ScreenerResult from .registry import NODE_REGISTRY, GATE_REGISTRY
# from .registry import NODE_REGISTRY, GATE_REGISTRY
__all__ = [] __all__ = [
"Screener", "ScreenContext", "ScreenerResult",
"NODE_REGISTRY", "GATE_REGISTRY",
]

View File

@@ -67,3 +67,95 @@ class ScreenContext:
if self.prices.empty: if self.prices.empty:
return pd.Series(dtype=float) return pd.Series(dtype=float)
return self.prices.sort_values("date").groupby("ticker")["high"].last() return self.prices.sort_values("date").groupby("ticker")["high"].last()
# ---- combine + Screener (Phase 2) ----
from . import position_sizer as _ps
def combine(scores: dict, weights: dict) -> pd.Series:
"""Weighted average across score nodes. ValueError if all weights = 0."""
active = {k: w for k, w in weights.items() if w > 0 and k in scores}
if not active:
raise ValueError("no active score nodes (all weights = 0)")
df = pd.DataFrame({k: scores[k] for k in active})
w = pd.Series(active)
weighted = (df.fillna(0).multiply(w, axis=1)).sum(axis=1) / w.sum()
return weighted
@dataclass
class ScreenerResult:
asof: dt.date
survivors_count: int
scores: dict # node name → pd.Series
weights: dict
ranked: pd.Series # ticker → total_score (sorted desc, head=top_n)
rows: list # list of dicts (for serialization)
warnings: list
class Screener:
def __init__(self, gate, score_nodes, weights: dict, node_params: dict,
gate_params: dict, top_n: int = 20, sizer_params: dict = None):
self.gate = gate
self.score_nodes = score_nodes
self.weights = weights
self.node_params = node_params
self.gate_params = gate_params
self.top_n = top_n
self.sizer_params = sizer_params or {"atr_window": 14, "atr_stop_mult": 2.0, "rr_ratio": 2.0}
def run(self, ctx: ScreenContext) -> ScreenerResult:
warnings: list = []
survivors = self.gate.filter(ctx, self.gate_params)
if len(survivors) == 0:
raise ValueError("no survivors after hygiene gate")
if len(survivors) < 100:
warnings.append(f"survivors_count={len(survivors)} < 100 — 백분위 정규화 신뢰도 낮음")
scoped = ctx.restrict(survivors)
scores: dict = {}
for n in self.score_nodes:
w = self.weights.get(n.name, 0)
if w <= 0:
continue
try:
scores[n.name] = n.compute(scoped, self.node_params.get(n.name, {}))
except Exception as e:
warnings.append(f"node '{n.name}' failed: {e}")
scores[n.name] = pd.Series(0.0, index=scoped.master.index)
total = combine(scores, self.weights)
ranked = total.sort_values(ascending=False).head(self.top_n)
sizing = _ps.plan_positions(scoped, list(ranked.index), self.sizer_params)
latest_close = scoped.latest_close()
rows = []
for rank_idx, ticker in enumerate(ranked.index, start=1):
s = sizing.get(ticker, {})
row = {
"rank": rank_idx,
"ticker": ticker,
"name": str(scoped.master.loc[ticker, "name"]),
"total_score": float(ranked.loc[ticker]),
"scores": {k: float(v.get(ticker, 0.0)) for k, v in scores.items()},
"close": int(latest_close.get(ticker, 0)),
"market_cap": int(scoped.master.loc[ticker, "market_cap"] or 0),
"entry_price": s.get("entry_price"),
"stop_price": s.get("stop_price"),
"target_price": s.get("target_price"),
"atr14": s.get("atr14"),
"r_pct": s.get("r_pct"),
}
rows.append(row)
return ScreenerResult(
asof=ctx.asof, survivors_count=len(survivors),
scores=scores, weights=self.weights,
ranked=ranked, rows=rows, warnings=warnings,
)

View File

@@ -0,0 +1,24 @@
"""Registry of node classes (single source of truth for /nodes endpoint)."""
from .nodes.hygiene import HygieneGate
from .nodes.foreign_buy import ForeignBuy
from .nodes.volume_surge import VolumeSurge
from .nodes.momentum import Momentum20
from .nodes.high52w import High52WProximity
from .nodes.rs_rating import RsRating
from .nodes.ma_alignment import MaAlignment
from .nodes.vcp_lite import VcpLite
NODE_REGISTRY: dict = {
"foreign_buy": ForeignBuy,
"volume_surge": VolumeSurge,
"momentum": Momentum20,
"high52w": High52WProximity,
"rs_rating": RsRating,
"ma_alignment": MaAlignment,
"vcp_lite": VcpLite,
}
GATE_REGISTRY: dict = {
"hygiene": HygieneGate,
}

View File

@@ -0,0 +1,55 @@
import datetime as dt
import pandas as pd
import pytest
from app.screener.engine import ScreenContext, Screener, combine
from app.screener.nodes.hygiene import HygieneGate
from app.screener.nodes.foreign_buy import ForeignBuy
from app.screener.nodes.momentum import Momentum20
from app.screener._test_fixtures import make_master, make_prices, make_flow, make_kospi
def _ctx(master, prices, flow):
return ScreenContext(master=master, prices=prices, flow=flow,
kospi=make_kospi(days=260),
asof=dt.date(2026, 5, 12))
def test_combine_weighted_average():
scores = {
"foreign_buy": pd.Series({"A": 80, "B": 20}),
"momentum": pd.Series({"A": 60, "B": 40}),
}
weights = {"foreign_buy": 2.0, "momentum": 1.0}
out = combine(scores, weights)
# A: (80*2 + 60*1)/3 = 73.33
assert abs(out["A"] - 73.333) < 0.1
assert abs(out["B"] - 26.666) < 0.1
def test_combine_all_zero_weight_raises():
scores = {"foreign_buy": pd.Series({"A": 80})}
with pytest.raises(ValueError, match="no active"):
combine(scores, {"foreign_buy": 0})
def test_screener_run_end_to_end():
asof = dt.date(2026, 5, 12)
master = make_master(["GOOD", "SMALL"],
market_caps={"GOOD": 200_000_000_000, "SMALL": 1_000_000_000})
prices = make_prices(["GOOD", "SMALL"], days=260, asof=asof, trend_pct=0.1)
flow = make_flow(["GOOD", "SMALL"], days=260, asof=asof,
foreign_per_day={"GOOD": 100_000_000, "SMALL": 0})
ctx = _ctx(master, prices, flow)
screener = Screener(
gate=HygieneGate(),
score_nodes=[ForeignBuy(), Momentum20()],
weights={"foreign_buy": 1.0, "momentum": 1.0},
node_params={"foreign_buy": {"window_days": 5}, "momentum": {"window_days": 20}},
gate_params={**HygieneGate.default_params, "min_listed_days": 0},
top_n=10,
)
result = screener.run(ctx)
assert result.survivors_count == 1 # SMALL은 게이트 탈락
assert result.ranked.index[0] == "GOOD"