70 lines
2.7 KiB
Python
70 lines
2.7 KiB
Python
"""Screener engine — ScreenContext (Phase 0) + Screener/combine (Phase 2)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import datetime as dt
|
|
import sqlite3
|
|
from dataclasses import dataclass, replace
|
|
|
|
import pandas as pd
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ScreenContext:
|
|
"""1회 실행 동안 공유되는 읽기 전용 데이터 컨테이너."""
|
|
master: pd.DataFrame # index=ticker
|
|
prices: pd.DataFrame # cols: ticker,date,open,high,low,close,volume,value
|
|
flow: pd.DataFrame # cols: ticker,date,foreign_net,institution_net
|
|
kospi: pd.Series # index=date(str), name="kospi"
|
|
asof: dt.date
|
|
|
|
@classmethod
|
|
def load(cls, conn: sqlite3.Connection, asof: dt.date,
|
|
lookback_days: int = 252 * 2) -> "ScreenContext":
|
|
cutoff = (asof - dt.timedelta(days=int(lookback_days * 1.5))).isoformat()
|
|
asof_iso = asof.isoformat()
|
|
|
|
master = pd.read_sql_query(
|
|
"SELECT * FROM krx_master",
|
|
conn, index_col="ticker",
|
|
)
|
|
prices = pd.read_sql_query(
|
|
"SELECT ticker,date,open,high,low,close,volume,value "
|
|
"FROM krx_daily_prices WHERE date BETWEEN ? AND ? ORDER BY date",
|
|
conn, params=(cutoff, asof_iso),
|
|
)
|
|
flow = pd.read_sql_query(
|
|
"SELECT ticker,date,foreign_net,institution_net "
|
|
"FROM krx_flow WHERE date BETWEEN ? AND ? ORDER BY date",
|
|
conn, params=(cutoff, asof_iso),
|
|
)
|
|
|
|
# KOSPI 지수: MVP에서는 005930(삼성전자) 종가를 시장 대용으로 사용.
|
|
# 후속 슬라이스에서 ^KS11 별도 캐시.
|
|
kospi = pd.Series(dtype=float, name="kospi")
|
|
if "005930" in master.index and not prices.empty:
|
|
sub = prices[prices["ticker"] == "005930"].set_index("date")["close"]
|
|
kospi = sub.copy()
|
|
kospi.name = "kospi"
|
|
|
|
return cls(master=master, prices=prices, flow=flow, kospi=kospi, asof=asof)
|
|
|
|
def restrict(self, tickers) -> "ScreenContext":
|
|
tickers = pd.Index(tickers)
|
|
return replace(
|
|
self,
|
|
master=self.master.loc[self.master.index.intersection(tickers)],
|
|
prices=self.prices[self.prices["ticker"].isin(tickers)],
|
|
flow=self.flow[self.flow["ticker"].isin(tickers)],
|
|
)
|
|
|
|
def latest_close(self) -> pd.Series:
|
|
if self.prices.empty:
|
|
return pd.Series(dtype=float)
|
|
return self.prices.sort_values("date").groupby("ticker")["close"].last()
|
|
|
|
def latest_high(self) -> pd.Series:
|
|
if self.prices.empty:
|
|
return pd.Series(dtype=float)
|
|
return self.prices.sort_values("date").groupby("ticker")["high"].last()
|