feat(stock-lab): ScreenContext.load/restrict + 합성 픽스쳐

This commit is contained in:
2026-05-12 07:49:15 +09:00
parent 6c5481971b
commit e508b7dc35
3 changed files with 206 additions and 0 deletions

View File

@@ -0,0 +1,76 @@
"""Synthetic fixtures for screener tests — no DB / no FDR / no naver."""
import datetime as dt
import pandas as pd
def make_master(tickers: list[str], market_caps: dict | None = None,
preferred: set | None = None, managed: set | None = None) -> pd.DataFrame:
market_caps = market_caps or {t: 100_000_000_000 for t in tickers}
preferred = preferred or set()
managed = managed or set()
return pd.DataFrame([
{
"ticker": t,
"name": f"테스트{t}",
"market": "KOSPI",
"market_cap": market_caps.get(t),
"is_managed": int(t in managed),
"is_preferred": int(t in preferred),
"is_spac": 0,
"listed_date": None,
}
for t in tickers
]).set_index("ticker")
def make_prices(tickers: list[str], days: int = 260, start_close: int = 50000,
trend_pct: float = 0.0,
asof: dt.date = dt.date(2026, 5, 12)) -> pd.DataFrame:
"""trend_pct: 일별 종가 등락률(%). 양수면 상승 추세."""
rows = []
for t in tickers:
close = start_close
for i in range(days):
day_idx = days - 1 - i # asof가 마지막
date = asof - dt.timedelta(days=day_idx)
high = int(close * 1.012)
low = int(close * 0.988)
rows.append({
"ticker": t, "date": date.isoformat(),
"open": close, "high": high, "low": low, "close": close,
"volume": 1_000_000, "value": close * 1_000_000,
})
close = int(close * (1 + trend_pct / 100))
return pd.DataFrame(rows)
def make_flow(tickers: list[str], days: int = 260,
foreign_per_day: dict | None = None,
asof: dt.date = dt.date(2026, 5, 12)) -> pd.DataFrame:
foreign_per_day = foreign_per_day or {t: 0 for t in tickers}
rows = []
for t in tickers:
for i in range(days):
day_idx = days - 1 - i
date = asof - dt.timedelta(days=day_idx)
rows.append({
"ticker": t, "date": date.isoformat(),
"foreign_net": foreign_per_day.get(t, 0),
"institution_net": 0,
})
return pd.DataFrame(rows)
def make_kospi(days: int = 260, start: int = 2500, trend_pct: float = 0.0,
asof: dt.date = dt.date(2026, 5, 12)) -> pd.Series:
values = []
dates = []
v = start
for i in range(days):
day_idx = days - 1 - i
d = asof - dt.timedelta(days=day_idx)
dates.append(d.isoformat())
values.append(v)
v = v * (1 + trend_pct / 100)
return pd.Series(values, index=dates, name="kospi")

View File

@@ -0,0 +1,69 @@
"""Screener engine — ScreenContext (Phase 0) + Screener/combine (Phase 2)."""
from __future__ import annotations
import datetime as dt
import sqlite3
from dataclasses import dataclass, replace
import pandas as pd
@dataclass(frozen=True)
class ScreenContext:
"""1회 실행 동안 공유되는 읽기 전용 데이터 컨테이너."""
master: pd.DataFrame # index=ticker
prices: pd.DataFrame # cols: ticker,date,open,high,low,close,volume,value
flow: pd.DataFrame # cols: ticker,date,foreign_net,institution_net
kospi: pd.Series # index=date(str), name="kospi"
asof: dt.date
@classmethod
def load(cls, conn: sqlite3.Connection, asof: dt.date,
lookback_days: int = 252 * 2) -> "ScreenContext":
cutoff = (asof - dt.timedelta(days=int(lookback_days * 1.5))).isoformat()
asof_iso = asof.isoformat()
master = pd.read_sql_query(
"SELECT * FROM krx_master",
conn, index_col="ticker",
)
prices = pd.read_sql_query(
"SELECT ticker,date,open,high,low,close,volume,value "
"FROM krx_daily_prices WHERE date BETWEEN ? AND ? ORDER BY date",
conn, params=(cutoff, asof_iso),
)
flow = pd.read_sql_query(
"SELECT ticker,date,foreign_net,institution_net "
"FROM krx_flow WHERE date BETWEEN ? AND ? ORDER BY date",
conn, params=(cutoff, asof_iso),
)
# KOSPI 지수: MVP에서는 005930(삼성전자) 종가를 시장 대용으로 사용.
# 후속 슬라이스에서 ^KS11 별도 캐시.
kospi = pd.Series(dtype=float, name="kospi")
if "005930" in master.index and not prices.empty:
sub = prices[prices["ticker"] == "005930"].set_index("date")["close"]
kospi = sub.copy()
kospi.name = "kospi"
return cls(master=master, prices=prices, flow=flow, kospi=kospi, asof=asof)
def restrict(self, tickers) -> "ScreenContext":
tickers = pd.Index(tickers)
return replace(
self,
master=self.master.loc[self.master.index.intersection(tickers)],
prices=self.prices[self.prices["ticker"].isin(tickers)],
flow=self.flow[self.flow["ticker"].isin(tickers)],
)
def latest_close(self) -> pd.Series:
if self.prices.empty:
return pd.Series(dtype=float)
return self.prices.sort_values("date").groupby("ticker")["close"].last()
def latest_high(self) -> pd.Series:
if self.prices.empty:
return pd.Series(dtype=float)
return self.prices.sort_values("date").groupby("ticker")["high"].last()