"""Screener engine — ScreenContext (Phase 0) + Screener/combine (Phase 2).""" from __future__ import annotations import datetime as dt import sqlite3 from dataclasses import dataclass, replace import pandas as pd @dataclass(frozen=True) class ScreenContext: """1회 실행 동안 공유되는 읽기 전용 데이터 컨테이너.""" master: pd.DataFrame # index=ticker prices: pd.DataFrame # cols: ticker,date,open,high,low,close,volume,value flow: pd.DataFrame # cols: ticker,date,foreign_net,institution_net kospi: pd.Series # index=date(str), name="kospi" asof: dt.date @classmethod def load(cls, conn: sqlite3.Connection, asof: dt.date, lookback_days: int = 252 * 2) -> "ScreenContext": cutoff = (asof - dt.timedelta(days=int(lookback_days * 1.5))).isoformat() asof_iso = asof.isoformat() master = pd.read_sql_query( "SELECT * FROM krx_master", conn, index_col="ticker", ) prices = pd.read_sql_query( "SELECT ticker,date,open,high,low,close,volume,value " "FROM krx_daily_prices WHERE date BETWEEN ? AND ? ORDER BY date", conn, params=(cutoff, asof_iso), ) flow = pd.read_sql_query( "SELECT ticker,date,foreign_net,institution_net " "FROM krx_flow WHERE date BETWEEN ? AND ? ORDER BY date", conn, params=(cutoff, asof_iso), ) # KOSPI 지수: MVP에서는 005930(삼성전자) 종가를 시장 대용으로 사용. # 후속 슬라이스에서 ^KS11 별도 캐시. kospi = pd.Series(dtype=float, name="kospi") if "005930" in master.index and not prices.empty: sub = prices[prices["ticker"] == "005930"].set_index("date")["close"] kospi = sub.copy() kospi.name = "kospi" return cls(master=master, prices=prices, flow=flow, kospi=kospi, asof=asof) def restrict(self, tickers) -> "ScreenContext": tickers = pd.Index(tickers) return replace( self, master=self.master.loc[self.master.index.intersection(tickers)], prices=self.prices[self.prices["ticker"].isin(tickers)], flow=self.flow[self.flow["ticker"].isin(tickers)], ) def latest_close(self) -> pd.Series: if self.prices.empty: return pd.Series(dtype=float) return self.prices.sort_values("date").groupby("ticker")["close"].last() def latest_high(self) -> pd.Series: if self.prices.empty: return pd.Series(dtype=float) return self.prices.sort_values("date").groupby("ticker")["high"].last()