diff --git a/agent-office/app/agents/stock.py b/agent-office/app/agents/stock.py
index eaa1405..629440a 100644
--- a/agent-office/app/agents/stock.py
+++ b/agent-office/app/agents/stock.py
@@ -119,7 +119,125 @@ class StockAgent(BaseAgent):
update_task_status(task_id, "failed", {"error": str(e)})
await self.transition("idle", f"오류: {e}")
+ async def on_screener_schedule(self) -> None:
+ """KRX 강세주 스크리너 자동 잡 (평일 16:30 KST).
+
+ 흐름:
+ 1) snapshot/refresh — 일봉 갱신 (실패해도 진행, 경고 로그)
+ 2) screener/run mode='auto' — 실행 + 결과 영구화 + telegram_payload 응답
+ 3) status=='skipped_holiday' → 종료 (텔레그램 미발신)
+ 4) status=='success' → telegram_payload.text 를 parse_mode 그대로 전송
+ 5) 예외/실패 → 운영자에게 별도 텔레그램 알림 (HTML)
+ """
+ if self.state not in ("idle", "break"):
+ return
+
+ task_id = create_task(self.agent_id, "screener_run", {"mode": "auto"})
+ await self.transition("working", "스크리너 스냅샷 갱신 중...", task_id)
+
+ try:
+ # 1) 스냅샷 갱신 — 실패해도 기존 일봉 데이터로 진행
+ try:
+ snap = await service_proxy.refresh_screener_snapshot()
+ add_log(
+ self.agent_id,
+ f"snapshot refreshed: status={snap.get('status', '?')}",
+ "info", task_id,
+ )
+ except Exception as e:
+ add_log(
+ self.agent_id,
+ f"스냅샷 갱신 실패 (기존 데이터로 진행): {e}",
+ "warning", task_id,
+ )
+
+ await self.transition("working", "스크리너 실행 중...")
+
+ # 2) 스크리너 실행
+ body = await service_proxy.run_stock_screener(mode="auto")
+ status = body.get("status")
+ asof = body.get("asof")
+
+ # 3) 공휴일 — 종료
+ if status == "skipped_holiday":
+ update_task_status(task_id, "succeeded", {
+ "status": status,
+ "asof": asof,
+ "telegram_sent": False,
+ })
+ add_log(self.agent_id, f"스크리너 건너뜀 (휴일): {asof}", "info", task_id)
+ await self.transition("idle", "휴일 — 스크리너 건너뜀")
+ return
+
+ # 4) 성공 → 텔레그램 전송
+ if status == "success":
+ payload = body.get("telegram_payload") or {}
+ text = payload.get("text") or ""
+ parse_mode = payload.get("parse_mode", "MarkdownV2")
+
+ if not text:
+ raise RuntimeError("telegram_payload.text 누락")
+
+ await self.transition("reporting", "스크리너 결과 전송 중...")
+
+ from ..telegram.messaging import send_raw
+ tg = await send_raw(text, parse_mode=parse_mode)
+
+ update_task_status(task_id, "succeeded", {
+ "status": status,
+ "asof": asof,
+ "run_id": body.get("run_id"),
+ "survivors_count": body.get("survivors_count"),
+ "telegram_sent": tg.get("ok", False),
+ "telegram_message_id": tg.get("message_id"),
+ })
+
+ if not tg.get("ok"):
+ desc = tg.get("description") or "unknown"
+ code = tg.get("error_code")
+ add_log(
+ self.agent_id,
+ f"Screener telegram send failed: [{code}] {desc}",
+ "warning", task_id,
+ )
+ if self._ws_manager:
+ await self._ws_manager.send_notification(
+ self.agent_id, "telegram_failed", task_id,
+ "스크리너 텔레그램 전송 실패",
+ )
+
+ await self.transition("idle", "스크리너 완료")
+ return
+
+ # 5) 기타 status — failed 취급
+ raise RuntimeError(f"unexpected screener status: {status}")
+
+ except Exception as e:
+ err_msg = str(e)
+ add_log(self.agent_id, f"Screener job failed: {err_msg}", "error", task_id)
+ update_task_status(task_id, "failed", {"error": err_msg})
+
+ # 운영자 알림 — 기본 HTML parse_mode 사용
+ try:
+ from ..telegram.messaging import send_raw
+ await send_raw(
+ f"⚠️ KRX 스크리너 실패\n"
+ f"{html.escape(err_msg)[:500]}"
+ )
+ except Exception as notify_err:
+ add_log(
+ self.agent_id,
+ f"operator notify failed: {notify_err}",
+ "warning", task_id,
+ )
+
+ await self.transition("idle", f"스크리너 오류: {err_msg[:80]}")
+
async def on_command(self, command: str, params: dict) -> dict:
+ if command == "run_screener":
+ await self.on_screener_schedule()
+ return {"ok": True, "message": "스크리너 실행 트리거 완료"}
+
if command == "test_telegram":
from ..telegram import send_agent_message
result = await send_agent_message(
diff --git a/agent-office/app/scheduler.py b/agent-office/app/scheduler.py
index c1c265c..37d51f2 100644
--- a/agent-office/app/scheduler.py
+++ b/agent-office/app/scheduler.py
@@ -14,6 +14,11 @@ async def _run_stock_schedule():
if agent:
await agent.on_schedule()
+async def _run_stock_screener():
+ agent = AGENT_REGISTRY.get("stock")
+ if agent:
+ await agent.on_screener_schedule()
+
async def _run_blog_schedule():
agent = AGENT_REGISTRY.get("blog")
if agent:
@@ -41,6 +46,14 @@ async def _poll_pipelines():
def init_scheduler():
scheduler.add_job(_run_stock_schedule, "cron", hour=7, minute=30, id="stock_news")
+ scheduler.add_job(
+ _run_stock_screener,
+ "cron",
+ day_of_week="mon-fri",
+ hour=16,
+ minute=30,
+ id="stock_screener",
+ )
scheduler.add_job(_run_blog_schedule, "cron", hour=10, minute=0, id="blog_pipeline")
scheduler.add_job(_run_lotto_schedule, "cron", day_of_week="mon", hour=9, minute=0, id="lotto_curate")
scheduler.add_job(_run_youtube_research, "cron", hour=9, minute=0, id="youtube_research")
diff --git a/agent-office/app/service_proxy.py b/agent-office/app/service_proxy.py
index 4ff6dc0..31a1d54 100644
--- a/agent-office/app/service_proxy.py
+++ b/agent-office/app/service_proxy.py
@@ -32,6 +32,34 @@ async def summarize_stock_news(limit: int = 15) -> Dict[str, Any]:
return resp.json()
+async def refresh_screener_snapshot() -> Dict[str, Any]:
+ """stock-lab의 KRX 일봉 스냅샷 갱신 (스크리너 실행 전 호출).
+
+ 네이버 금융 일괄 다운로드라 보통 30~120s, 여유있게 180s.
+ """
+ async with httpx.AsyncClient(timeout=180.0) as client:
+ resp = await client.post(f"{STOCK_LAB_URL}/api/stock/screener/snapshot/refresh")
+ resp.raise_for_status()
+ return resp.json()
+
+
+async def run_stock_screener(mode: str = "auto") -> Dict[str, Any]:
+ """stock-lab의 스크리너 실행.
+
+ 반환 status:
+ - 'skipped_holiday': 공휴일/주말 — telegram_payload 없음
+ - 'success': telegram_payload 동봉
+ 엔진 자체는 수 초 내 끝나지만, 컨텍스트 로드+200종목 처리 여유 180s.
+ """
+ async with httpx.AsyncClient(timeout=180.0) as client:
+ resp = await client.post(
+ f"{STOCK_LAB_URL}/api/stock/screener/run",
+ json={"mode": mode},
+ )
+ resp.raise_for_status()
+ return resp.json()
+
+
async def scrape_stock_news() -> Dict[str, Any]:
"""stock-lab의 수동 뉴스 스크랩 트리거 — DB에 최신 뉴스 저장.
diff --git a/agent-office/app/telegram/messaging.py b/agent-office/app/telegram/messaging.py
index 969bd28..7f962ed 100644
--- a/agent-office/app/telegram/messaging.py
+++ b/agent-office/app/telegram/messaging.py
@@ -8,14 +8,22 @@ from .client import _enabled, api_call
from .formatter import MessageKind, format_agent_message
-async def send_raw(text: str, reply_markup: Optional[dict] = None, chat_id: Optional[str] = None) -> dict:
- """가장 저수준. 원문 텍스트 그대로 전송. chat_id 생략 시 기본 TELEGRAM_CHAT_ID로."""
+async def send_raw(
+ text: str,
+ reply_markup: Optional[dict] = None,
+ chat_id: Optional[str] = None,
+ parse_mode: str = "HTML",
+) -> dict:
+ """가장 저수준. 원문 텍스트 그대로 전송. chat_id 생략 시 기본 TELEGRAM_CHAT_ID로.
+
+ parse_mode: 기본 'HTML'. MarkdownV2 페이로드(예: 스크리너) 전송 시 명시 지정.
+ """
if not _enabled():
return {"ok": False, "message_id": None}
payload = {
"chat_id": chat_id or TELEGRAM_CHAT_ID,
"text": text,
- "parse_mode": "HTML",
+ "parse_mode": parse_mode,
}
if reply_markup:
payload["reply_markup"] = reply_markup
diff --git a/agent-office/tests/test_stock_screener_job.py b/agent-office/tests/test_stock_screener_job.py
new file mode 100644
index 0000000..1e33402
--- /dev/null
+++ b/agent-office/tests/test_stock_screener_job.py
@@ -0,0 +1,177 @@
+"""StockAgent.on_screener_schedule — 평일 16:30 KST 자동 잡 단위 테스트.
+
+stock-lab HTTP 호출은 service_proxy mock, 텔레그램은 messaging.send_raw mock.
+"""
+import os
+import sys
+import tempfile
+
+_fd, _TMP = tempfile.mkstemp(suffix=".db")
+os.close(_fd)
+os.unlink(_TMP)
+os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+
+import asyncio
+from unittest.mock import AsyncMock, patch
+import pytest
+
+
+@pytest.fixture(autouse=True)
+def _init_db():
+ import gc
+ gc.collect()
+ if os.path.exists(_TMP):
+ os.remove(_TMP)
+ from app.db import init_db
+ init_db()
+ yield
+ gc.collect()
+
+
+def _success_body(asof="2026-05-12"):
+ return {
+ "asof": asof,
+ "mode": "auto",
+ "status": "success",
+ "run_id": 42,
+ "survivors_count": 600,
+ "top_n": 20,
+ "results": [],
+ "telegram_payload": {
+ "chat_target": "default",
+ "parse_mode": "MarkdownV2",
+ "text": "*KRX 강세주 스크리너* test body",
+ },
+ "warnings": [],
+ }
+
+
+def _holiday_body(asof="2026-05-05"):
+ return {
+ "asof": asof,
+ "mode": "auto",
+ "status": "skipped_holiday",
+ "run_id": None,
+ "survivors_count": None,
+ "top_n": 0,
+ "results": [],
+ "telegram_payload": None,
+ "warnings": [f"{asof} is a holiday — skipped"],
+ }
+
+
+def test_screener_success_sends_markdownv2_telegram():
+ from app.agents.stock import StockAgent
+ from app import service_proxy
+ from app.telegram import messaging
+
+ fake_snap = AsyncMock(return_value={"status": "ok"})
+ fake_run = AsyncMock(return_value=_success_body())
+ fake_send = AsyncMock(return_value={"ok": True, "message_id": 7777})
+
+ with patch.object(service_proxy, "refresh_screener_snapshot", fake_snap), \
+ patch.object(service_proxy, "run_stock_screener", fake_run), \
+ patch.object(messaging, "send_raw", fake_send):
+ agent = StockAgent()
+ asyncio.run(agent.on_screener_schedule())
+
+ fake_snap.assert_awaited_once()
+ fake_run.assert_awaited_once_with(mode="auto")
+ fake_send.assert_awaited_once()
+ args, kwargs = fake_send.call_args
+ # 첫 인자(text) 또는 kwargs로 전달
+ text = args[0] if args else kwargs.get("text")
+ assert "KRX 강세주 스크리너" in text
+ assert kwargs.get("parse_mode") == "MarkdownV2"
+ assert agent.state == "idle"
+
+
+def test_screener_holiday_skips_telegram():
+ from app.agents.stock import StockAgent
+ from app import service_proxy
+ from app.telegram import messaging
+
+ fake_snap = AsyncMock(return_value={"status": "skipped_weekend"})
+ fake_run = AsyncMock(return_value=_holiday_body())
+ fake_send = AsyncMock(return_value={"ok": True, "message_id": 1})
+
+ with patch.object(service_proxy, "refresh_screener_snapshot", fake_snap), \
+ patch.object(service_proxy, "run_stock_screener", fake_run), \
+ patch.object(messaging, "send_raw", fake_send):
+ agent = StockAgent()
+ asyncio.run(agent.on_screener_schedule())
+
+ fake_run.assert_awaited_once()
+ # 휴일이면 텔레그램 미발신
+ fake_send.assert_not_awaited()
+ assert agent.state == "idle"
+
+
+def test_screener_snapshot_failure_still_runs_screener():
+ """스냅샷 실패는 경고만 남기고 screener 호출은 계속됨."""
+ from app.agents.stock import StockAgent
+ from app import service_proxy
+ from app.telegram import messaging
+
+ fake_snap = AsyncMock(side_effect=RuntimeError("snapshot upstream down"))
+ fake_run = AsyncMock(return_value=_success_body())
+ fake_send = AsyncMock(return_value={"ok": True, "message_id": 8888})
+
+ with patch.object(service_proxy, "refresh_screener_snapshot", fake_snap), \
+ patch.object(service_proxy, "run_stock_screener", fake_run), \
+ patch.object(messaging, "send_raw", fake_send):
+ agent = StockAgent()
+ asyncio.run(agent.on_screener_schedule())
+
+ fake_snap.assert_awaited_once()
+ fake_run.assert_awaited_once_with(mode="auto")
+ fake_send.assert_awaited_once()
+
+
+def test_screener_run_failure_notifies_operator():
+ """screener/run 실패 시 운영자 알림 텔레그램 발송."""
+ from app.agents.stock import StockAgent
+ from app import service_proxy
+ from app.telegram import messaging
+
+ fake_snap = AsyncMock(return_value={"status": "ok"})
+ fake_run = AsyncMock(side_effect=RuntimeError("stock-lab 500"))
+ fake_send = AsyncMock(return_value={"ok": True, "message_id": 1})
+
+ with patch.object(service_proxy, "refresh_screener_snapshot", fake_snap), \
+ patch.object(service_proxy, "run_stock_screener", fake_run), \
+ patch.object(messaging, "send_raw", fake_send):
+ agent = StockAgent()
+ asyncio.run(agent.on_screener_schedule())
+
+ # 운영자 알림 1회는 호출
+ assert fake_send.await_count == 1
+ args, kwargs = fake_send.call_args
+ text = args[0] if args else kwargs.get("text")
+ assert "스크리너 실패" in text
+ assert agent.state == "idle"
+
+
+def test_screener_unexpected_status_treated_as_failure():
+ from app.agents.stock import StockAgent
+ from app import service_proxy
+ from app.telegram import messaging
+
+ fake_snap = AsyncMock(return_value={"status": "ok"})
+ fake_run = AsyncMock(return_value={"status": "weird", "asof": "2026-05-12"})
+ fake_send = AsyncMock(return_value={"ok": True, "message_id": 1})
+
+ with patch.object(service_proxy, "refresh_screener_snapshot", fake_snap), \
+ patch.object(service_proxy, "run_stock_screener", fake_run), \
+ patch.object(messaging, "send_raw", fake_send):
+ agent = StockAgent()
+ asyncio.run(agent.on_screener_schedule())
+
+ # 운영자 알림 1회 + screener payload 미발송
+ assert fake_send.await_count == 1
+ args, kwargs = fake_send.call_args
+ text = args[0] if args else kwargs.get("text")
+ assert "스크리너 실패" in text
diff --git a/stock-lab/app/db.py b/stock-lab/app/db.py
index a6d85c9..4183a10 100644
--- a/stock-lab/app/db.py
+++ b/stock-lab/app/db.py
@@ -3,11 +3,16 @@ import os
import hashlib
from typing import List, Dict, Any, Optional
-DB_PATH = "/app/data/stock.db"
+from app.screener.schema import ensure_screener_schema
+
+DB_PATH = os.environ.get("STOCK_DB_PATH", "/app/data/stock.db")
def _conn() -> sqlite3.Connection:
- os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
- conn = sqlite3.connect(DB_PATH)
+ db_path = os.environ.get("STOCK_DB_PATH", DB_PATH)
+ parent = os.path.dirname(db_path)
+ if parent:
+ os.makedirs(parent, exist_ok=True)
+ conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
return conn
@@ -96,6 +101,9 @@ def init_db():
if "commission" not in sh_cols:
conn.execute("ALTER TABLE sell_history ADD COLUMN commission REAL NOT NULL DEFAULT 0")
+ # Screener 스키마 부트스트랩 (7테이블 + 디폴트 설정 시드)
+ ensure_screener_schema(conn)
+
def save_articles(articles: List[Dict[str, str]]) -> int:
count = 0
with _conn() as conn:
diff --git a/stock-lab/app/main.py b/stock-lab/app/main.py
index 76cedcc..f66ffd6 100644
--- a/stock-lab/app/main.py
+++ b/stock-lab/app/main.py
@@ -27,6 +27,10 @@ from .ai_summarizer import summarize_news, OllamaError
app = FastAPI()
+# Screener 라우터 등록
+from app.screener.router import router as screener_router
+app.include_router(screener_router)
+
# CORS 설정 (프론트엔드 접근 허용)
_cors_origins = os.getenv("CORS_ALLOW_ORIGINS", "http://localhost:3007,http://localhost:8080").split(",")
app.add_middleware(
diff --git a/stock-lab/app/screener/__init__.py b/stock-lab/app/screener/__init__.py
new file mode 100644
index 0000000..e00112f
--- /dev/null
+++ b/stock-lab/app/screener/__init__.py
@@ -0,0 +1,12 @@
+"""Stock screener — KRX 강세주 분석 노드 기반 보드.
+
+See docs/superpowers/specs/2026-05-12-stock-screener-board-design.md
+"""
+
+from .engine import Screener, ScreenContext, ScreenerResult
+from .registry import NODE_REGISTRY, GATE_REGISTRY
+
+__all__ = [
+ "Screener", "ScreenContext", "ScreenerResult",
+ "NODE_REGISTRY", "GATE_REGISTRY",
+]
diff --git a/stock-lab/app/screener/_test_fixtures.py b/stock-lab/app/screener/_test_fixtures.py
new file mode 100644
index 0000000..ca90608
--- /dev/null
+++ b/stock-lab/app/screener/_test_fixtures.py
@@ -0,0 +1,76 @@
+"""Synthetic fixtures for screener tests — no DB / no FDR / no naver."""
+
+import datetime as dt
+import pandas as pd
+
+
+def make_master(tickers: list[str], market_caps: dict | None = None,
+ preferred: set | None = None, managed: set | None = None) -> pd.DataFrame:
+ market_caps = market_caps or {t: 100_000_000_000 for t in tickers}
+ preferred = preferred or set()
+ managed = managed or set()
+ return pd.DataFrame([
+ {
+ "ticker": t,
+ "name": f"테스트{t}",
+ "market": "KOSPI",
+ "market_cap": market_caps.get(t),
+ "is_managed": int(t in managed),
+ "is_preferred": int(t in preferred),
+ "is_spac": 0,
+ "listed_date": None,
+ }
+ for t in tickers
+ ]).set_index("ticker")
+
+
+def make_prices(tickers: list[str], days: int = 260, start_close: int = 50000,
+ trend_pct: float = 0.0,
+ asof: dt.date = dt.date(2026, 5, 12)) -> pd.DataFrame:
+ """trend_pct: 일별 종가 등락률(%). 양수면 상승 추세."""
+ rows = []
+ for t in tickers:
+ close = start_close
+ for i in range(days):
+ day_idx = days - 1 - i # asof가 마지막
+ date = asof - dt.timedelta(days=day_idx)
+ high = int(close * 1.012)
+ low = int(close * 0.988)
+ rows.append({
+ "ticker": t, "date": date.isoformat(),
+ "open": close, "high": high, "low": low, "close": close,
+ "volume": 1_000_000, "value": close * 1_000_000,
+ })
+ close = int(close * (1 + trend_pct / 100))
+ return pd.DataFrame(rows)
+
+
+def make_flow(tickers: list[str], days: int = 260,
+ foreign_per_day: dict | None = None,
+ asof: dt.date = dt.date(2026, 5, 12)) -> pd.DataFrame:
+ foreign_per_day = foreign_per_day or {t: 0 for t in tickers}
+ rows = []
+ for t in tickers:
+ for i in range(days):
+ day_idx = days - 1 - i
+ date = asof - dt.timedelta(days=day_idx)
+ rows.append({
+ "ticker": t, "date": date.isoformat(),
+ "foreign_net": foreign_per_day.get(t, 0),
+ "institution_net": 0,
+ })
+ return pd.DataFrame(rows)
+
+
+def make_kospi(days: int = 260, start: int = 2500, trend_pct: float = 0.0,
+ asof: dt.date = dt.date(2026, 5, 12)) -> pd.Series:
+ values = []
+ dates = []
+ v = start
+ for i in range(days):
+ day_idx = days - 1 - i
+ d = asof - dt.timedelta(days=day_idx)
+ dates.append(d.isoformat())
+ values.append(v)
+ v = v * (1 + trend_pct / 100)
+ return pd.Series(values, index=dates, name="kospi")
diff --git a/stock-lab/app/screener/engine.py b/stock-lab/app/screener/engine.py
new file mode 100644
index 0000000..0f3b373
--- /dev/null
+++ b/stock-lab/app/screener/engine.py
@@ -0,0 +1,161 @@
+"""Screener engine — ScreenContext (Phase 0) + Screener/combine (Phase 2)."""
+
+from __future__ import annotations
+
+import datetime as dt
+import sqlite3
+from dataclasses import dataclass, replace
+
+import pandas as pd
+
+
+@dataclass(frozen=True)
+class ScreenContext:
+ """1회 실행 동안 공유되는 읽기 전용 데이터 컨테이너."""
+ master: pd.DataFrame # index=ticker
+ prices: pd.DataFrame # cols: ticker,date,open,high,low,close,volume,value
+ flow: pd.DataFrame # cols: ticker,date,foreign_net,institution_net
+ kospi: pd.Series # index=date(str), name="kospi"
+ asof: dt.date
+
+ @classmethod
+ def load(cls, conn: sqlite3.Connection, asof: dt.date,
+ lookback_days: int = 252 * 2) -> "ScreenContext":
+ cutoff = (asof - dt.timedelta(days=int(lookback_days * 1.5))).isoformat()
+ asof_iso = asof.isoformat()
+
+ master = pd.read_sql_query(
+ "SELECT * FROM krx_master",
+ conn, index_col="ticker",
+ )
+ prices = pd.read_sql_query(
+ "SELECT ticker,date,open,high,low,close,volume,value "
+ "FROM krx_daily_prices WHERE date BETWEEN ? AND ? ORDER BY date",
+ conn, params=(cutoff, asof_iso),
+ )
+ flow = pd.read_sql_query(
+ "SELECT ticker,date,foreign_net,institution_net "
+ "FROM krx_flow WHERE date BETWEEN ? AND ? ORDER BY date",
+ conn, params=(cutoff, asof_iso),
+ )
+
+ # KOSPI 지수: MVP에서는 005930(삼성전자) 종가를 시장 대용으로 사용.
+ # 후속 슬라이스에서 ^KS11 별도 캐시.
+ kospi = pd.Series(dtype=float, name="kospi")
+ if "005930" in master.index and not prices.empty:
+ sub = prices[prices["ticker"] == "005930"].set_index("date")["close"]
+ kospi = sub.copy()
+ kospi.name = "kospi"
+
+ return cls(master=master, prices=prices, flow=flow, kospi=kospi, asof=asof)
+
+ def restrict(self, tickers) -> "ScreenContext":
+ tickers = pd.Index(tickers)
+ return replace(
+ self,
+ master=self.master.loc[self.master.index.intersection(tickers)],
+ prices=self.prices[self.prices["ticker"].isin(tickers)],
+ flow=self.flow[self.flow["ticker"].isin(tickers)],
+ )
+
+ def latest_close(self) -> pd.Series:
+ if self.prices.empty:
+ return pd.Series(dtype=float)
+ return self.prices.sort_values("date").groupby("ticker")["close"].last()
+
+ def latest_high(self) -> pd.Series:
+ if self.prices.empty:
+ return pd.Series(dtype=float)
+ return self.prices.sort_values("date").groupby("ticker")["high"].last()
+
+
+# ---- combine + Screener (Phase 2) ----
+
+from . import position_sizer as _ps
+
+
+def combine(scores: dict, weights: dict) -> pd.Series:
+ """Weighted average across score nodes. ValueError if all weights = 0."""
+ active = {k: w for k, w in weights.items() if w > 0 and k in scores}
+ if not active:
+ raise ValueError("no active score nodes (all weights = 0)")
+
+ df = pd.DataFrame({k: scores[k] for k in active})
+ w = pd.Series(active)
+ weighted = (df.fillna(0).multiply(w, axis=1)).sum(axis=1) / w.sum()
+ return weighted
+
+
+@dataclass
+class ScreenerResult:
+ asof: dt.date
+ survivors_count: int
+ scores: dict # node name → pd.Series
+ weights: dict
+ ranked: pd.Series # ticker → total_score (sorted desc, head=top_n)
+ rows: list # list of dicts (for serialization)
+ warnings: list
+
+
+class Screener:
+ def __init__(self, gate, score_nodes, weights: dict, node_params: dict,
+ gate_params: dict, top_n: int = 20, sizer_params: dict = None):
+ self.gate = gate
+ self.score_nodes = score_nodes
+ self.weights = weights
+ self.node_params = node_params
+ self.gate_params = gate_params
+ self.top_n = top_n
+ self.sizer_params = sizer_params or {"atr_window": 14, "atr_stop_mult": 2.0, "rr_ratio": 2.0}
+
+ def run(self, ctx: ScreenContext) -> ScreenerResult:
+ warnings: list = []
+
+ survivors = self.gate.filter(ctx, self.gate_params)
+ if len(survivors) == 0:
+ raise ValueError("no survivors after hygiene gate")
+ if len(survivors) < 100:
+ warnings.append(f"survivors_count={len(survivors)} < 100 — 백분위 정규화 신뢰도 낮음")
+
+ scoped = ctx.restrict(survivors)
+ scores: dict = {}
+ for n in self.score_nodes:
+ w = self.weights.get(n.name, 0)
+ if w <= 0:
+ continue
+ try:
+ scores[n.name] = n.compute(scoped, self.node_params.get(n.name, {}))
+ except Exception as e:
+ warnings.append(f"node '{n.name}' failed: {e}")
+ scores[n.name] = pd.Series(0.0, index=scoped.master.index)
+
+ total = combine(scores, self.weights)
+ ranked = total.sort_values(ascending=False).head(self.top_n)
+
+ sizing = _ps.plan_positions(scoped, list(ranked.index), self.sizer_params)
+ latest_close = scoped.latest_close()
+
+ rows = []
+ for rank_idx, ticker in enumerate(ranked.index, start=1):
+ s = sizing.get(ticker, {})
+ row = {
+ "rank": rank_idx,
+ "ticker": ticker,
+ "name": str(scoped.master.loc[ticker, "name"]),
+ "total_score": float(ranked.loc[ticker]),
+ "scores": {k: float(v.get(ticker, 0.0)) for k, v in scores.items()},
+ "close": int(latest_close.get(ticker, 0)),
+ "market_cap": int(scoped.master.loc[ticker, "market_cap"] or 0),
+ "entry_price": s.get("entry_price"),
+ "stop_price": s.get("stop_price"),
+ "target_price": s.get("target_price"),
+ "atr14": s.get("atr14"),
+ "r_pct": s.get("r_pct"),
+ }
+ rows.append(row)
+
+ return ScreenerResult(
+ asof=ctx.asof, survivors_count=len(survivors),
+ scores=scores, weights=self.weights,
+ ranked=ranked, rows=rows, warnings=warnings,
+ )
diff --git a/stock-lab/app/screener/nodes/__init__.py b/stock-lab/app/screener/nodes/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/stock-lab/app/screener/nodes/base.py b/stock-lab/app/screener/nodes/base.py
new file mode 100644
index 0000000..fbe1050
--- /dev/null
+++ b/stock-lab/app/screener/nodes/base.py
@@ -0,0 +1,40 @@
+"""Node base classes + helpers."""
+
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from typing import Any, ClassVar
+
+import pandas as pd
+
+
+class ScoreNode(ABC):
+ name: ClassVar[str]
+ label: ClassVar[str]
+ default_params: ClassVar[dict]
+ param_schema: ClassVar[dict]
+
+ @abstractmethod
+ def compute(self, ctx: "Any", params: dict) -> pd.Series:
+ """returns Series indexed by ticker, 0..100 float."""
+
+
+class GateNode(ABC):
+ name: ClassVar[str]
+ label: ClassVar[str]
+ default_params: ClassVar[dict]
+ param_schema: ClassVar[dict]
+
+ @abstractmethod
+ def filter(self, ctx: "Any", params: dict) -> pd.Index:
+ """returns surviving tickers."""
+
+
+def percentile_rank(series: pd.Series) -> pd.Series:
+ """Percentile rank in [0, 100]. All-equal → 50. NaN preserved."""
+ if series.empty:
+ return series.astype(float)
+ if series.dropna().nunique() == 1:
+ return pd.Series(50.0, index=series.index)
+ ranked = series.rank(pct=True, na_option="keep") * 100.0
+ return ranked
diff --git a/stock-lab/app/screener/nodes/foreign_buy.py b/stock-lab/app/screener/nodes/foreign_buy.py
new file mode 100644
index 0000000..f035a96
--- /dev/null
+++ b/stock-lab/app/screener/nodes/foreign_buy.py
@@ -0,0 +1,33 @@
+"""외국인 N일 누적 순매수 강도 (시총 대비)."""
+
+import pandas as pd
+
+from .base import ScoreNode, percentile_rank
+
+
+class ForeignBuy(ScoreNode):
+ name = "foreign_buy"
+ label = "외국인 누적 순매수"
+ default_params = {"window_days": 5}
+ param_schema = {
+ "type": "object",
+ "properties": {
+ "window_days": {"type": "integer", "minimum": 1, "maximum": 60, "default": 5}
+ },
+ }
+
+ def compute(self, ctx, params: dict) -> pd.Series:
+ window = int(params.get("window_days", 5))
+ flow = ctx.flow
+ if flow.empty:
+ return pd.Series(dtype=float)
+
+ last_dates = (
+ flow.sort_values("date").groupby("ticker").tail(window)
+ )
+ net_sum = last_dates.groupby("ticker")["foreign_net"].sum()
+
+ market_cap = ctx.master["market_cap"].fillna(0).reindex(net_sum.index)
+ raw = (net_sum / market_cap.replace(0, pd.NA)).astype(float)
+
+ return percentile_rank(raw).fillna(50.0)
diff --git a/stock-lab/app/screener/nodes/high52w.py b/stock-lab/app/screener/nodes/high52w.py
new file mode 100644
index 0000000..9b6f86d
--- /dev/null
+++ b/stock-lab/app/screener/nodes/high52w.py
@@ -0,0 +1,30 @@
+"""52주 신고가 근접도 (룰 기반: 70% 미만 0점, 100% 도달 100점, 선형)."""
+
+import pandas as pd
+
+from .base import ScoreNode
+
+
+class High52WProximity(ScoreNode):
+ name = "high52w"
+ label = "52주 신고가 근접도"
+ default_params = {"window_days": 252}
+ param_schema = {
+ "type": "object",
+ "properties": {
+ "window_days": {"type": "integer", "minimum": 60, "maximum": 504, "default": 252}
+ },
+ }
+
+ def compute(self, ctx, params: dict) -> pd.Series:
+ window = int(params.get("window_days", 252))
+ prices = ctx.prices
+ if prices.empty:
+ return pd.Series(dtype=float)
+
+ ordered = prices.sort_values("date")
+ last = ordered.groupby("ticker").tail(window)
+ agg = last.groupby("ticker").agg(close=("close", "last"), high=("high", "max"))
+ proximity = (agg["close"] / agg["high"]).clip(upper=1.0)
+ score = ((proximity - 0.7) / 0.3).clip(lower=0.0, upper=1.0) * 100.0
+ return score.fillna(0.0)
diff --git a/stock-lab/app/screener/nodes/hygiene.py b/stock-lab/app/screener/nodes/hygiene.py
new file mode 100644
index 0000000..066fafe
--- /dev/null
+++ b/stock-lab/app/screener/nodes/hygiene.py
@@ -0,0 +1,81 @@
+"""HygieneGate — pre-filter for screener."""
+
+from __future__ import annotations
+
+import pandas as pd
+
+from .base import GateNode
+
+
+class HygieneGate(GateNode):
+ name = "hygiene"
+ label = "위생 게이트"
+ default_params = {
+ "min_market_cap_won": 50_000_000_000,
+ "min_avg_value_won": 500_000_000,
+ "min_listed_days": 60,
+ "skip_managed": True,
+ "skip_preferred": True,
+ "skip_spac": True,
+ "skip_halted_days": 3,
+ }
+ param_schema = {
+ "type": "object",
+ "properties": {
+ "min_market_cap_won": {"type": "integer", "minimum": 0},
+ "min_avg_value_won": {"type": "integer", "minimum": 0},
+ "min_listed_days": {"type": "integer", "minimum": 0},
+ "skip_managed": {"type": "boolean"},
+ "skip_preferred": {"type": "boolean"},
+ "skip_spac": {"type": "boolean"},
+ "skip_halted_days": {"type": "integer", "minimum": 0},
+ },
+ }
+
+ def filter(self, ctx, params: dict) -> pd.Index:
+ master = ctx.master.copy()
+ prices = ctx.prices
+
+ # 시총
+ master = master[master["market_cap"].fillna(0) >= params["min_market_cap_won"]]
+
+ # 우선주·관리·스팩
+ if params.get("skip_preferred", True):
+ master = master[master["is_preferred"] == 0]
+ if params.get("skip_managed", True):
+ master = master[master["is_managed"] == 0]
+ if params.get("skip_spac", True):
+ master = master[master["is_spac"] == 0]
+
+ candidates = master.index
+
+ # 20일 평균 거래대금
+ if not prices.empty:
+ recent20 = (
+ prices[prices["ticker"].isin(candidates)]
+ .sort_values("date")
+ .groupby("ticker")
+ .tail(20)
+ )
+ avg_value = recent20.groupby("ticker")["value"].mean()
+ ok = avg_value[avg_value >= params["min_avg_value_won"]].index
+ candidates = candidates.intersection(ok)
+
+ # 최근 N일 거래정지 (volume==0 N일 이상)
+ halted_days = params.get("skip_halted_days", 3)
+ if halted_days > 0 and not prices.empty:
+ recent = (
+ prices[prices["ticker"].isin(candidates)]
+ .sort_values("date")
+ .groupby("ticker")
+ .tail(halted_days)
+ )
+ zero_count = (
+ recent.assign(z=lambda d: (d["volume"] == 0).astype(int))
+ .groupby("ticker")["z"].sum()
+ )
+ healthy = zero_count[zero_count < halted_days].index
+ candidates = candidates.intersection(healthy)
+
+ # 상장 N일 — MVP에선 listed_date null 허용, null이면 통과
+ return pd.Index(candidates)
diff --git a/stock-lab/app/screener/nodes/ma_alignment.py b/stock-lab/app/screener/nodes/ma_alignment.py
new file mode 100644
index 0000000..768da7b
--- /dev/null
+++ b/stock-lab/app/screener/nodes/ma_alignment.py
@@ -0,0 +1,51 @@
+"""이평선 정배열 점수 — 5개 조건 충족 개수 / 5 × 100."""
+
+import pandas as pd
+
+from .base import ScoreNode
+
+
+class MaAlignment(ScoreNode):
+ name = "ma_alignment"
+ label = "이평선 정배열"
+ default_params = {"ma_periods": [50, 150, 200]}
+ param_schema = {
+ "type": "object",
+ "properties": {
+ "ma_periods": {"type": "array", "items": {"type": "integer"}}
+ },
+ }
+
+ def compute(self, ctx, params: dict) -> pd.Series:
+ ma_periods = params.get("ma_periods", self.default_params["ma_periods"])
+ if len(ma_periods) != 3:
+ raise ValueError("ma_periods must have 3 entries (short, medium, long)")
+ ma_s, ma_m, ma_l = (int(x) for x in ma_periods)
+
+ prices = ctx.prices
+ if prices.empty:
+ return pd.Series(dtype=float)
+
+ ordered = prices.sort_values("date")
+ min_history = max(252, ma_l)
+
+ def _score(s: pd.Series) -> float:
+ closes = s.astype(float).reset_index(drop=True)
+ if len(closes) < min_history:
+ return float("nan")
+ close = closes.iloc[-1]
+ ma_short = closes.rolling(ma_s).mean().iloc[-1]
+ ma_medium = closes.rolling(ma_m).mean().iloc[-1]
+ ma_long = closes.rolling(ma_l).mean().iloc[-1]
+ low52 = closes.iloc[-252:].min()
+ conds = [
+ close > ma_short,
+ ma_short > ma_medium,
+ ma_medium > ma_long,
+ close > ma_long,
+ close >= low52 * 1.25,
+ ]
+ return sum(conds) / 5 * 100.0
+
+ raw = ordered.groupby("ticker", group_keys=False)["close"].apply(_score)
+ return raw.fillna(0.0)
diff --git a/stock-lab/app/screener/nodes/momentum.py b/stock-lab/app/screener/nodes/momentum.py
new file mode 100644
index 0000000..ab09a46
--- /dev/null
+++ b/stock-lab/app/screener/nodes/momentum.py
@@ -0,0 +1,34 @@
+"""20일 모멘텀."""
+
+import pandas as pd
+
+from .base import ScoreNode, percentile_rank
+
+
+class Momentum20(ScoreNode):
+ name = "momentum"
+ label = "20일 모멘텀"
+ default_params = {"window_days": 20}
+ param_schema = {
+ "type": "object",
+ "properties": {
+ "window_days": {"type": "integer", "minimum": 5, "maximum": 120, "default": 20}
+ },
+ }
+
+ def compute(self, ctx, params: dict) -> pd.Series:
+ window = int(params.get("window_days", 20))
+ prices = ctx.prices
+ if prices.empty:
+ return pd.Series(dtype=float)
+
+ ordered = prices.sort_values("date")
+ last = ordered.groupby("ticker").tail(window + 1)
+
+ def _ret(s):
+ if len(s) < window + 1:
+ return float("nan")
+ return s.iloc[-1] / s.iloc[0] - 1
+
+ raw = last.groupby("ticker")["close"].apply(_ret)
+ return percentile_rank(raw).fillna(50.0)
diff --git a/stock-lab/app/screener/nodes/rs_rating.py b/stock-lab/app/screener/nodes/rs_rating.py
new file mode 100644
index 0000000..895002d
--- /dev/null
+++ b/stock-lab/app/screener/nodes/rs_rating.py
@@ -0,0 +1,48 @@
+"""RS Rating — IBD 가중 (3m=2,6m=1,9m=1,12m=1)."""
+
+import pandas as pd
+
+from .base import ScoreNode, percentile_rank
+
+
+_PERIOD_TO_DAYS = {"3m": 63, "6m": 126, "9m": 189, "12m": 252}
+
+
+class RsRating(ScoreNode):
+ name = "rs_rating"
+ label = "RS Rating (시장 대비 상대강도)"
+ default_params = {"weights": {"3m": 2, "6m": 1, "9m": 1, "12m": 1}}
+ param_schema = {
+ "type": "object",
+ "properties": {
+ "weights": {"type": "object"}
+ },
+ }
+
+ def compute(self, ctx, params: dict) -> pd.Series:
+ weights: dict = params.get("weights", self.default_params["weights"])
+ prices = ctx.prices
+ kospi = ctx.kospi
+ if prices.empty or kospi.empty:
+ return pd.Series(dtype=float)
+
+ ordered = prices.sort_values("date")
+
+ def _excess_for_ticker(g: pd.DataFrame) -> float:
+ closes = g.set_index("date")["close"]
+ total = 0.0
+ wsum = 0.0
+ for period, w in weights.items():
+ k = _PERIOD_TO_DAYS.get(period, 0)
+ if len(closes) <= k or len(kospi) <= k:
+ continue
+ r_stock = closes.iloc[-1] / closes.iloc[-(k + 1)] - 1
+ r_market = kospi.iloc[-1] / kospi.iloc[-(k + 1)] - 1
+ total += w * (r_stock - r_market)
+ wsum += w
+ return total / wsum if wsum else float("nan")
+
+ raw = ordered.groupby("ticker", group_keys=False).apply(
+ _excess_for_ticker, include_groups=False
+ )
+ return percentile_rank(raw).fillna(50.0)
diff --git a/stock-lab/app/screener/nodes/vcp_lite.py b/stock-lab/app/screener/nodes/vcp_lite.py
new file mode 100644
index 0000000..efa1537
--- /dev/null
+++ b/stock-lab/app/screener/nodes/vcp_lite.py
@@ -0,0 +1,40 @@
+"""VCP-lite — 단기/장기 일중 변동성 비율 기반 수축률."""
+
+import pandas as pd
+
+from .base import ScoreNode, percentile_rank
+
+
+class VcpLite(ScoreNode):
+ name = "vcp_lite"
+ label = "VCP-lite (변동성 수축)"
+ default_params = {"short_window": 40, "long_window": 252}
+ param_schema = {
+ "type": "object",
+ "properties": {
+ "short_window": {"type": "integer", "minimum": 10, "maximum": 120, "default": 40},
+ "long_window": {"type": "integer", "minimum": 60, "maximum": 504, "default": 252},
+ },
+ }
+
+ def compute(self, ctx, params: dict) -> pd.Series:
+ short_w = int(params.get("short_window", 40))
+ long_w = int(params.get("long_window", 252))
+ prices = ctx.prices
+ if prices.empty:
+ return pd.Series(dtype=float)
+
+ ordered = prices.sort_values("date").copy()
+ ordered["range_pct"] = (ordered["high"] - ordered["low"]) / ordered["close"]
+
+ def _ratio(s: pd.Series) -> float:
+ if len(s) < long_w:
+ return float("nan")
+ short_vol = s.tail(short_w).mean()
+ long_vol = s.tail(long_w).mean()
+ if long_vol == 0 or pd.isna(long_vol):
+ return float("nan")
+ return 1 - (short_vol / long_vol)
+
+ raw = ordered.groupby("ticker", group_keys=False)["range_pct"].apply(_ratio)
+ return percentile_rank(raw).fillna(50.0)
diff --git a/stock-lab/app/screener/nodes/volume_surge.py b/stock-lab/app/screener/nodes/volume_surge.py
new file mode 100644
index 0000000..4b4393f
--- /dev/null
+++ b/stock-lab/app/screener/nodes/volume_surge.py
@@ -0,0 +1,40 @@
+"""거래량 급증 — log1p(recent/baseline)."""
+
+import numpy as np
+import pandas as pd
+
+from .base import ScoreNode, percentile_rank
+
+
+class VolumeSurge(ScoreNode):
+ name = "volume_surge"
+ label = "거래량 급증"
+ default_params = {"baseline_days": 20, "eval_days": 3}
+ param_schema = {
+ "type": "object",
+ "properties": {
+ "baseline_days": {"type": "integer", "minimum": 5, "maximum": 60, "default": 20},
+ "eval_days": {"type": "integer", "minimum": 1, "maximum": 10, "default": 3},
+ },
+ }
+
+ def compute(self, ctx, params: dict) -> pd.Series:
+ baseline = int(params.get("baseline_days", 20))
+ eval_d = int(params.get("eval_days", 3))
+ prices = ctx.prices
+ if prices.empty:
+ return pd.Series(dtype=float)
+
+ ordered = prices.sort_values("date")
+ last_recent = ordered.groupby("ticker").tail(eval_d).groupby("ticker")["volume"].mean()
+ last_baseline = (
+ ordered.groupby("ticker")
+ .tail(baseline + eval_d)
+ .groupby("ticker")
+ .head(baseline)
+ .groupby("ticker")["volume"]
+ .mean()
+ )
+ ratio = last_recent / last_baseline.replace(0, pd.NA)
+ raw = np.log1p(ratio.astype(float))
+ return percentile_rank(raw).fillna(50.0)
diff --git a/stock-lab/app/screener/position_sizer.py b/stock-lab/app/screener/position_sizer.py
new file mode 100644
index 0000000..f14d845
--- /dev/null
+++ b/stock-lab/app/screener/position_sizer.py
@@ -0,0 +1,51 @@
+"""ATR Wilder smoothing + entry/stop/target 계산."""
+
+import pandas as pd
+
+
+def compute_atr_wilder(df_one_ticker: pd.DataFrame, window: int = 14) -> float:
+ """단일 종목 DataFrame(date·open·high·low·close)에 대해 Wilder ATR 마지막 값."""
+ g = df_one_ticker.sort_values("date").copy()
+ high = g["high"].astype(float)
+ low = g["low"].astype(float)
+ close = g["close"].astype(float)
+ prev_close = close.shift(1)
+ tr = pd.concat([
+ (high - low),
+ (high - prev_close).abs(),
+ (low - prev_close).abs(),
+ ], axis=1).max(axis=1)
+ atr = tr.ewm(alpha=1 / window, adjust=False).mean()
+ return float(atr.iloc[-1])
+
+
+def round_won(x: float) -> int:
+ return int(round(x))
+
+
+def plan_positions(ctx, tickers: list, params: dict) -> dict:
+ """각 ticker 에 대해 entry/stop/target/atr14 반환."""
+ atr_window = int(params.get("atr_window", 14))
+ stop_mult = float(params.get("atr_stop_mult", 2.0))
+ rr = float(params.get("rr_ratio", 2.0))
+
+ prices = ctx.prices.sort_values("date")
+ out: dict = {}
+ for t in tickers:
+ sub = prices[prices["ticker"] == t]
+ if sub.empty:
+ continue
+ close = float(sub["close"].iloc[-1])
+ atr14 = compute_atr_wilder(sub, window=atr_window)
+ entry = round_won(close * 1.005)
+ stop = round_won(close - stop_mult * atr14)
+ target = round_won(entry + rr * (entry - stop))
+ r_pct = (entry - stop) / entry * 100 if entry else 0.0
+ out[t] = {
+ "entry_price": entry,
+ "stop_price": stop,
+ "target_price": target,
+ "atr14": atr14,
+ "r_pct": r_pct,
+ }
+ return out
diff --git a/stock-lab/app/screener/registry.py b/stock-lab/app/screener/registry.py
new file mode 100644
index 0000000..58d4073
--- /dev/null
+++ b/stock-lab/app/screener/registry.py
@@ -0,0 +1,24 @@
+"""Registry of node classes (single source of truth for /nodes endpoint)."""
+
+from .nodes.hygiene import HygieneGate
+from .nodes.foreign_buy import ForeignBuy
+from .nodes.volume_surge import VolumeSurge
+from .nodes.momentum import Momentum20
+from .nodes.high52w import High52WProximity
+from .nodes.rs_rating import RsRating
+from .nodes.ma_alignment import MaAlignment
+from .nodes.vcp_lite import VcpLite
+
+NODE_REGISTRY: dict = {
+ "foreign_buy": ForeignBuy,
+ "volume_surge": VolumeSurge,
+ "momentum": Momentum20,
+ "high52w": High52WProximity,
+ "rs_rating": RsRating,
+ "ma_alignment": MaAlignment,
+ "vcp_lite": VcpLite,
+}
+
+GATE_REGISTRY: dict = {
+ "hygiene": HygieneGate,
+}
diff --git a/stock-lab/app/screener/router.py b/stock-lab/app/screener/router.py
new file mode 100644
index 0000000..aadd10f
--- /dev/null
+++ b/stock-lab/app/screener/router.py
@@ -0,0 +1,310 @@
+"""FastAPI router for /api/stock/screener/*"""
+
+from __future__ import annotations
+
+import datetime as dt
+import json
+import os
+import sqlite3
+from typing import Optional
+
+from fastapi import APIRouter, HTTPException
+
+from . import schemas
+from .registry import NODE_REGISTRY, GATE_REGISTRY
+
+
+router = APIRouter(prefix="/api/stock/screener")
+
+
+import json as _json
+import pathlib as _pathlib
+
+_HOLIDAYS_CACHE = None
+
+
+def _holidays():
+ global _HOLIDAYS_CACHE
+ if _HOLIDAYS_CACHE is None:
+ path = _pathlib.Path(__file__).resolve().parent.parent / "holidays.json"
+ try:
+ with path.open(encoding="utf-8") as f:
+ data = _json.load(f)
+ _HOLIDAYS_CACHE = set(data) if isinstance(data, list) else set(data.keys())
+ except FileNotFoundError:
+ _HOLIDAYS_CACHE = set()
+ return _HOLIDAYS_CACHE
+
+
+def _is_holiday(d: dt.date) -> bool:
+ return d.weekday() >= 5 or d.isoformat() in _holidays()
+
+
+def _db_path() -> str:
+ return os.environ.get("STOCK_DB_PATH", "/app/data/stock.db")
+
+
+def _conn() -> sqlite3.Connection:
+ return sqlite3.connect(_db_path())
+
+
+# ---------- /nodes ----------
+
+@router.get("/nodes", response_model=schemas.NodesResponse)
+def get_nodes():
+ score_nodes = [
+ schemas.NodeMeta(
+ name=cls.name, label=cls.label,
+ default_params=cls.default_params, param_schema=cls.param_schema,
+ )
+ for cls in NODE_REGISTRY.values()
+ ]
+ gate_nodes = [
+ schemas.NodeMeta(
+ name=cls.name, label=cls.label,
+ default_params=cls.default_params, param_schema=cls.param_schema,
+ )
+ for cls in GATE_REGISTRY.values()
+ ]
+ return schemas.NodesResponse(score_nodes=score_nodes, gate_nodes=gate_nodes)
+
+
+# ---------- /settings ----------
+
+@router.get("/settings", response_model=schemas.SettingsResponse)
+def get_settings():
+ with _conn() as c:
+ row = c.execute(
+ "SELECT weights_json, node_params_json, gate_params_json, "
+ "top_n, rr_ratio, atr_window, atr_stop_mult, updated_at "
+ "FROM screener_settings WHERE id=1"
+ ).fetchone()
+ if row is None:
+ raise HTTPException(503, "settings not initialized")
+ return schemas.SettingsResponse(
+ weights=json.loads(row[0]),
+ node_params=json.loads(row[1]),
+ gate_params=json.loads(row[2]),
+ top_n=row[3], rr_ratio=row[4], atr_window=row[5], atr_stop_mult=row[6],
+ updated_at=row[7],
+ )
+
+
+@router.put("/settings", response_model=schemas.SettingsResponse)
+def put_settings(body: schemas.SettingsBody):
+ now = dt.datetime.utcnow().isoformat()
+ with _conn() as c:
+ c.execute(
+ """UPDATE screener_settings SET
+ weights_json=?, node_params_json=?, gate_params_json=?,
+ top_n=?, rr_ratio=?, atr_window=?, atr_stop_mult=?, updated_at=?
+ WHERE id=1""",
+ (
+ json.dumps(body.weights), json.dumps(body.node_params),
+ json.dumps(body.gate_params),
+ body.top_n, body.rr_ratio, body.atr_window, body.atr_stop_mult, now,
+ ),
+ )
+ c.commit()
+ return schemas.SettingsResponse(**body.model_dump(), updated_at=now)
+
+
+# ---------- /run ----------
+
+from . import telegram as _tg
+from .engine import Screener, ScreenContext
+
+
+def _resolve_asof(asof_str, conn: sqlite3.Connection) -> dt.date:
+ if asof_str:
+ return dt.date.fromisoformat(asof_str)
+ row = conn.execute("SELECT max(date) FROM krx_daily_prices").fetchone()
+ if not row or row[0] is None:
+ raise HTTPException(503, "no snapshot available — run /snapshot/refresh first")
+ return dt.date.fromisoformat(row[0])
+
+
+def _load_settings(conn) -> dict:
+ row = conn.execute(
+ "SELECT weights_json,node_params_json,gate_params_json,top_n,"
+ "rr_ratio,atr_window,atr_stop_mult FROM screener_settings WHERE id=1"
+ ).fetchone()
+ return {
+ "weights": json.loads(row[0]),
+ "node_params": json.loads(row[1]),
+ "gate_params": json.loads(row[2]),
+ "top_n": row[3],
+ "rr_ratio": row[4],
+ "atr_window": row[5],
+ "atr_stop_mult": row[6],
+ }
+
+
+def _persist_run(conn, asof, mode, weights, node_params, gate_params, top_n,
+ result, started_at, finished_at) -> int:
+ cur = conn.execute(
+ """INSERT INTO screener_runs (asof,mode,status,started_at,finished_at,
+ weights_json,node_params_json,gate_params_json,top_n,survivors_count,telegram_sent)
+ VALUES (?,?,?,?,?,?,?,?,?,?,0)""",
+ (asof.isoformat(), mode, "success", started_at, finished_at,
+ json.dumps(weights), json.dumps(node_params), json.dumps(gate_params),
+ top_n, result.survivors_count),
+ )
+ run_id = cur.lastrowid
+ for row in result.rows:
+ conn.execute(
+ """INSERT INTO screener_results (run_id,rank,ticker,name,total_score,
+ scores_json,close,market_cap,entry_price,stop_price,target_price,atr14)
+ VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""",
+ (run_id, row["rank"], row["ticker"], row["name"], row["total_score"],
+ json.dumps(row["scores"]), row["close"], row["market_cap"],
+ row["entry_price"], row["stop_price"], row["target_price"], row["atr14"]),
+ )
+ conn.commit()
+ return run_id
+
+
+@router.post("/run", response_model=schemas.RunResponse)
+def post_run(body: schemas.RunRequest):
+ from .registry import NODE_REGISTRY as _NR, GATE_REGISTRY as _GR
+ started_at = dt.datetime.utcnow().isoformat()
+ with _conn() as c:
+ asof = _resolve_asof(body.asof, c)
+
+ # Skipped holiday handling for mode='auto'
+ if body.mode == "auto" and _is_holiday(asof):
+ return schemas.RunResponse(
+ asof=asof.isoformat(), mode="auto", status="skipped_holiday",
+ run_id=None, survivors_count=None,
+ weights={}, top_n=0,
+ results=[], telegram_payload=None,
+ warnings=[f"{asof.isoformat()} is a holiday — skipped"],
+ )
+
+ defaults = _load_settings(c)
+
+ if body.mode == "auto":
+ weights = defaults["weights"]
+ node_params = defaults["node_params"]
+ gate_params = defaults["gate_params"]
+ top_n = defaults["top_n"]
+ else:
+ weights = body.weights if body.weights is not None else defaults["weights"]
+ node_params = body.node_params if body.node_params is not None else defaults["node_params"]
+ gate_params = body.gate_params if body.gate_params is not None else defaults["gate_params"]
+ top_n = body.top_n if body.top_n is not None else defaults["top_n"]
+
+ sizer_params = {
+ "atr_window": defaults["atr_window"],
+ "atr_stop_mult": defaults["atr_stop_mult"],
+ "rr_ratio": defaults["rr_ratio"],
+ }
+
+ ctx = ScreenContext.load(c, asof)
+ score_nodes = [cls() for name, cls in _NR.items() if weights.get(name, 0) > 0]
+ gate = _GR["hygiene"]()
+
+ try:
+ screener = Screener(
+ gate=gate, score_nodes=score_nodes, weights=weights,
+ node_params=node_params, gate_params=gate_params,
+ top_n=top_n, sizer_params=sizer_params,
+ )
+ result = screener.run(ctx)
+ except ValueError as e:
+ raise HTTPException(422, str(e))
+
+ finished_at = dt.datetime.utcnow().isoformat()
+ run_id = None
+ if body.mode in ("manual_save", "auto"):
+ run_id = _persist_run(c, asof, body.mode, weights, node_params, gate_params,
+ top_n, result, started_at, finished_at)
+
+ payload = _tg.build_telegram_payload(
+ asof=asof, mode=body.mode, survivors_count=result.survivors_count,
+ top_n=top_n, rows=result.rows, run_id=run_id,
+ )
+
+ return schemas.RunResponse(
+ asof=asof.isoformat(), mode=body.mode, status="success",
+ run_id=run_id, survivors_count=result.survivors_count,
+ weights=weights, top_n=top_n,
+ results=result.rows,
+ telegram_payload=schemas.TelegramPayload(**payload),
+ warnings=result.warnings,
+ )
+
+
+# ---------- /snapshot/refresh ----------
+
+from . import snapshot as _snap
+
+
+@router.post("/snapshot/refresh")
+def post_snapshot_refresh(asof: Optional[str] = None):
+ asof_date = dt.date.fromisoformat(asof) if asof else dt.date.today()
+ if asof_date.weekday() >= 5:
+ return {"asof": asof_date.isoformat(), "status": "skipped_weekend"}
+ with _conn() as c:
+ summary = _snap.refresh_daily(c, asof_date)
+ return summary
+
+
+# ---------- /runs ----------
+
+@router.get("/runs", response_model=list[schemas.RunSummary])
+def list_runs(limit: int = 30):
+ with _conn() as c:
+ rows = c.execute(
+ "SELECT id,asof,mode,status,started_at,finished_at,top_n,"
+ "survivors_count,telegram_sent FROM screener_runs "
+ "ORDER BY asof DESC, id DESC LIMIT ?", (limit,),
+ ).fetchall()
+ return [
+ schemas.RunSummary(
+ id=r[0], asof=r[1], mode=r[2], status=r[3],
+ started_at=r[4], finished_at=r[5], top_n=r[6],
+ survivors_count=r[7], telegram_sent=bool(r[8]),
+ )
+ for r in rows
+ ]
+
+
+@router.get("/runs/{run_id}")
+def get_run(run_id: int):
+ with _conn() as c:
+ meta = c.execute(
+ "SELECT id,asof,mode,status,started_at,finished_at,top_n,"
+ "survivors_count,telegram_sent,weights_json,node_params_json,gate_params_json "
+ "FROM screener_runs WHERE id=?",
+ (run_id,),
+ ).fetchone()
+ if not meta:
+ raise HTTPException(404, "run not found")
+ rows = c.execute(
+ "SELECT rank,ticker,name,total_score,scores_json,close,market_cap,"
+ "entry_price,stop_price,target_price,atr14 "
+ "FROM screener_results WHERE run_id=? ORDER BY rank",
+ (run_id,),
+ ).fetchall()
+
+ return {
+ "meta": {
+ "id": meta[0], "asof": meta[1], "mode": meta[2], "status": meta[3],
+ "started_at": meta[4], "finished_at": meta[5], "top_n": meta[6],
+ "survivors_count": meta[7], "telegram_sent": bool(meta[8]),
+ "weights": json.loads(meta[9]),
+ "node_params": json.loads(meta[10]),
+ "gate_params": json.loads(meta[11]),
+ },
+ "results": [
+ {
+ "rank": r[0], "ticker": r[1], "name": r[2],
+ "total_score": r[3], "scores": json.loads(r[4]),
+ "close": r[5], "market_cap": r[6],
+ "entry_price": r[7], "stop_price": r[8], "target_price": r[9],
+ "atr14": r[10],
+ }
+ for r in rows
+ ],
+ }
diff --git a/stock-lab/app/screener/schema.py b/stock-lab/app/screener/schema.py
new file mode 100644
index 0000000..cde379d
--- /dev/null
+++ b/stock-lab/app/screener/schema.py
@@ -0,0 +1,136 @@
+"""Screener schema bootstrap. Called once at module import via db.py."""
+
+import json
+import sqlite3
+from datetime import datetime, timezone
+
+DEFAULT_WEIGHTS = {
+ "foreign_buy": 1.0,
+ "volume_surge": 1.0,
+ "momentum": 1.0,
+ "high52w": 1.2,
+ "rs_rating": 1.2,
+ "ma_alignment": 1.0,
+ "vcp_lite": 0.8,
+}
+DEFAULT_NODE_PARAMS = {
+ "foreign_buy": {"window_days": 5},
+ "volume_surge": {"baseline_days": 20, "eval_days": 3},
+ "momentum": {"window_days": 20},
+ "high52w": {"window_days": 252},
+ "rs_rating": {"weights": {"3m": 2, "6m": 1, "9m": 1, "12m": 1}},
+ "ma_alignment": {"ma_periods": [50, 150, 200]},
+ "vcp_lite": {"short_window": 40, "long_window": 252},
+}
+DEFAULT_GATE_PARAMS = {
+ "min_market_cap_won": 50_000_000_000,
+ "min_avg_value_won": 500_000_000,
+ "min_listed_days": 60,
+ "skip_managed": True,
+ "skip_preferred": True,
+ "skip_spac": True,
+ "skip_halted_days": 3,
+}
+
+DDL = """
+CREATE TABLE IF NOT EXISTS krx_master (
+ ticker TEXT PRIMARY KEY,
+ name TEXT NOT NULL,
+ market TEXT NOT NULL,
+ market_cap INTEGER,
+ is_managed INTEGER NOT NULL DEFAULT 0,
+ is_preferred INTEGER NOT NULL DEFAULT 0,
+ is_spac INTEGER NOT NULL DEFAULT 0,
+ listed_date TEXT,
+ updated_at TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS krx_daily_prices (
+ ticker TEXT NOT NULL,
+ date TEXT NOT NULL,
+ open INTEGER, high INTEGER, low INTEGER, close INTEGER,
+ volume INTEGER,
+ value INTEGER,
+ PRIMARY KEY (ticker, date)
+);
+CREATE INDEX IF NOT EXISTS idx_prices_date ON krx_daily_prices(date);
+
+CREATE TABLE IF NOT EXISTS krx_flow (
+ ticker TEXT NOT NULL,
+ date TEXT NOT NULL,
+ foreign_net INTEGER,
+ institution_net INTEGER,
+ PRIMARY KEY (ticker, date)
+);
+CREATE INDEX IF NOT EXISTS idx_flow_date ON krx_flow(date);
+
+CREATE TABLE IF NOT EXISTS screener_settings (
+ id INTEGER PRIMARY KEY CHECK (id = 1),
+ weights_json TEXT NOT NULL,
+ node_params_json TEXT NOT NULL,
+ gate_params_json TEXT NOT NULL,
+ top_n INTEGER NOT NULL DEFAULT 20,
+ rr_ratio REAL NOT NULL DEFAULT 2.0,
+ atr_window INTEGER NOT NULL DEFAULT 14,
+ atr_stop_mult REAL NOT NULL DEFAULT 2.0,
+ updated_at TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS screener_runs (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ asof TEXT NOT NULL,
+ mode TEXT NOT NULL,
+ status TEXT NOT NULL,
+ error TEXT,
+ started_at TEXT NOT NULL,
+ finished_at TEXT,
+ weights_json TEXT NOT NULL,
+ node_params_json TEXT NOT NULL,
+ gate_params_json TEXT NOT NULL,
+ top_n INTEGER NOT NULL,
+ survivors_count INTEGER,
+ telegram_sent INTEGER NOT NULL DEFAULT 0
+);
+CREATE INDEX IF NOT EXISTS idx_runs_asof ON screener_runs(asof DESC);
+
+CREATE TABLE IF NOT EXISTS screener_results (
+ run_id INTEGER NOT NULL,
+ rank INTEGER NOT NULL,
+ ticker TEXT NOT NULL,
+ name TEXT NOT NULL,
+ total_score REAL NOT NULL,
+ scores_json TEXT NOT NULL,
+ close INTEGER,
+ market_cap INTEGER,
+ entry_price INTEGER,
+ stop_price INTEGER,
+ target_price INTEGER,
+ atr14 REAL,
+ PRIMARY KEY (run_id, ticker),
+ FOREIGN KEY (run_id) REFERENCES screener_runs(id) ON DELETE CASCADE
+);
+CREATE INDEX IF NOT EXISTS idx_results_run_rank ON screener_results(run_id, rank);
+"""
+
+
+def ensure_screener_schema(conn: sqlite3.Connection) -> None:
+ """Create tables and seed default settings (idempotent)."""
+ conn.executescript(DDL)
+ existing = conn.execute("SELECT id FROM screener_settings WHERE id=1").fetchone()
+ if existing is None:
+ now = datetime.now(timezone.utc).isoformat()
+ conn.execute(
+ """
+ INSERT INTO screener_settings (
+ id, weights_json, node_params_json, gate_params_json,
+ top_n, rr_ratio, atr_window, atr_stop_mult, updated_at
+ ) VALUES (1, ?, ?, ?, 20, 2.0, 14, 2.0, ?)
+ """,
+ (
+ json.dumps(DEFAULT_WEIGHTS),
+ json.dumps(DEFAULT_NODE_PARAMS),
+ json.dumps(DEFAULT_GATE_PARAMS),
+ now,
+ ),
+ )
+ conn.commit()
diff --git a/stock-lab/app/screener/schemas.py b/stock-lab/app/screener/schemas.py
new file mode 100644
index 0000000..b18835d
--- /dev/null
+++ b/stock-lab/app/screener/schemas.py
@@ -0,0 +1,85 @@
+from __future__ import annotations
+from typing import Literal, Optional
+from pydantic import BaseModel, Field
+
+
+class NodeMeta(BaseModel):
+ name: str
+ label: str
+ default_params: dict
+ param_schema: dict
+
+
+class NodesResponse(BaseModel):
+ score_nodes: list[NodeMeta]
+ gate_nodes: list[NodeMeta]
+
+
+class SettingsBody(BaseModel):
+ weights: dict[str, float]
+ node_params: dict[str, dict] = Field(default_factory=dict)
+ gate_params: dict
+ top_n: int = 20
+ rr_ratio: float = 2.0
+ atr_window: int = 14
+ atr_stop_mult: float = 2.0
+
+
+class SettingsResponse(SettingsBody):
+ updated_at: str
+
+
+class RunRequest(BaseModel):
+ mode: Literal["preview", "manual_save", "auto"] = "preview"
+ asof: Optional[str] = None
+ weights: Optional[dict[str, float]] = None
+ node_params: Optional[dict[str, dict]] = None
+ gate_params: Optional[dict] = None
+ top_n: Optional[int] = None
+
+
+class ResultRow(BaseModel):
+ rank: int
+ ticker: str
+ name: str
+ total_score: float
+ scores: dict[str, float]
+ close: int
+ market_cap: int
+ entry_price: Optional[int] = None
+ stop_price: Optional[int] = None
+ target_price: Optional[int] = None
+ atr14: Optional[float] = None
+ r_pct: Optional[float] = None
+
+
+class TelegramPayload(BaseModel):
+ chat_target: str
+ parse_mode: str
+ text: str
+
+
+class RunResponse(BaseModel):
+ asof: str
+ mode: str
+ status: Literal["success", "failed", "skipped_holiday"]
+ run_id: Optional[int] = None
+ survivors_count: Optional[int] = None
+ weights: dict[str, float]
+ top_n: int
+ results: list[ResultRow] = Field(default_factory=list)
+ telegram_payload: Optional[TelegramPayload] = None
+ warnings: list[str] = Field(default_factory=list)
+ error: Optional[str] = None
+
+
+class RunSummary(BaseModel):
+ id: int
+ asof: str
+ mode: str
+ status: str
+ started_at: str
+ finished_at: Optional[str] = None
+ top_n: int
+ survivors_count: Optional[int] = None
+ telegram_sent: bool
diff --git a/stock-lab/app/screener/snapshot.py b/stock-lab/app/screener/snapshot.py
new file mode 100644
index 0000000..6633f6a
--- /dev/null
+++ b/stock-lab/app/screener/snapshot.py
@@ -0,0 +1,247 @@
+"""KRX daily snapshot loader (FDR + naver finance scraping)."""
+
+from __future__ import annotations
+
+import datetime as dt
+import logging
+import re
+import sqlite3
+import time
+from dataclasses import dataclass
+
+import FinanceDataReader as fdr
+import httpx
+import pandas as pd
+from bs4 import BeautifulSoup
+
+log = logging.getLogger(__name__)
+
+NAVER_FRGN_URL = "https://finance.naver.com/item/frgn.naver"
+NAVER_HEADERS = {
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
+ "Referer": "https://finance.naver.com/",
+}
+
+DEFAULT_FLOW_TOP_N = 500
+DEFAULT_RATE_LIMIT_SEC = 0.2
+
+
+@dataclass
+class RefreshSummary:
+ asof: dt.date
+ master_count: int
+ prices_count: int
+ flow_count: int
+ failures: list[str]
+
+ def asdict(self) -> dict:
+ return {
+ "asof": self.asof.isoformat(),
+ "master_count": self.master_count,
+ "prices_count": self.prices_count,
+ "flow_count": self.flow_count,
+ "failures": self.failures,
+ }
+
+
+def _iso(d: dt.date) -> str:
+ return d.isoformat()
+
+
+def _is_preferred(name: str) -> int:
+ """우선주 휴리스틱: 종목명이 '우'로 끝나거나 '우[A-Z]?'/'우\\d?' 패턴."""
+ n = name or ""
+ return 1 if re.search(r"우[A-Z]?$|우\d?$", n) else 0
+
+
+def _is_spac(name: str) -> int:
+ return 1 if "스팩" in (name or "") else 0
+
+
+def fetch_master_listing() -> pd.DataFrame:
+ """fdr.StockListing('KRX'). Wrapped for stub-ability in tests."""
+ return fdr.StockListing("KRX")
+
+
+def fetch_ohlcv_for_ticker(ticker: str, start: str, end: str) -> pd.DataFrame:
+ """fdr.DataReader for backfill."""
+ return fdr.DataReader(ticker, start, end)
+
+
+def fetch_flow_naver(ticker: str, *, client) -> dict | None:
+ """Scrape naver frgn page; return latest-day flow dict, or None."""
+ r = client.get(NAVER_FRGN_URL, params={"code": ticker, "page": 1})
+ if r.status_code != 200:
+ return None
+ soup = BeautifulSoup(r.text, "lxml")
+ for row in soup.select("table.type2 tr"):
+ cells = [c.get_text(strip=True).replace(",", "") for c in row.select("td")]
+ if not cells or not cells[0]:
+ continue
+ if not re.match(r"\d{4}\.\d{2}\.\d{2}", cells[0]):
+ continue
+ try:
+ inst = int(cells[5]) if cells[5] not in ("", "-") else 0
+ foreign = int(cells[6]) if cells[6] not in ("", "-") else 0
+ return {
+ "date": cells[0].replace(".", "-"),
+ "foreign_net": foreign,
+ "institution_net": inst,
+ }
+ except (IndexError, ValueError):
+ return None
+ return None
+
+
+def _master_and_prices_rows(asof: dt.date,
+ df: pd.DataFrame) -> tuple[list[tuple], list[tuple]]:
+ iso = _iso(asof)
+ now_iso = dt.datetime.utcnow().isoformat()
+ master_rows: list[tuple] = []
+ price_rows: list[tuple] = []
+ for _, row in df.iterrows():
+ ticker = str(row.get("Code") or "").strip()
+ name = str(row.get("Name") or "").strip()
+ if not ticker or not name:
+ continue
+ market_raw = str(row.get("Market") or "").upper()
+ market = "KOSDAQ" if "KOSDAQ" in market_raw else "KOSPI"
+ try:
+ market_cap = int(row["Marcap"]) if pd.notna(row.get("Marcap")) else None
+ except (TypeError, ValueError):
+ market_cap = None
+ master_rows.append((
+ ticker, name, market, market_cap,
+ 0, _is_preferred(name), _is_spac(name),
+ None, now_iso,
+ ))
+ try:
+ o = int(row["Open"]) if pd.notna(row.get("Open")) else None
+ h = int(row["High"]) if pd.notna(row.get("High")) else None
+ l = int(row["Low"]) if pd.notna(row.get("Low")) else None
+ c = int(row["Close"]) if pd.notna(row.get("Close")) else None
+ v = int(row["Volume"]) if pd.notna(row.get("Volume")) else None
+ amt = row.get("Amount")
+ a = int(amt) if pd.notna(amt) else None
+ if c is not None and v is not None:
+ price_rows.append((ticker, iso, o, h, l, c, v, a))
+ except (TypeError, KeyError):
+ pass
+ return master_rows, price_rows
+
+
+def _gather_flow_naver(asof: dt.date, tickers: list[str],
+ *, rate_limit_sec: float = DEFAULT_RATE_LIMIT_SEC) -> list[tuple]:
+ iso = _iso(asof)
+ rows: list[tuple] = []
+ if not tickers:
+ return rows
+ with httpx.Client(timeout=10, headers=NAVER_HEADERS) as client:
+ for t in tickers:
+ try:
+ data = fetch_flow_naver(t, client=client)
+ if data and data["date"] == iso:
+ rows.append((t, iso, data["foreign_net"], data["institution_net"]))
+ except Exception as e:
+ log.warning("flow scrape failed for %s: %s", t, e)
+ if rate_limit_sec > 0:
+ time.sleep(rate_limit_sec)
+ return rows
+
+
+def refresh_daily(conn: sqlite3.Connection, asof: dt.date,
+ flow_top_n: int = DEFAULT_FLOW_TOP_N,
+ rate_limit_sec: float = DEFAULT_RATE_LIMIT_SEC) -> dict:
+ """Pull master + prices (FDR) + flow (naver scraping for top N by market cap)."""
+ df = fetch_master_listing()
+ master_rows, price_rows = _master_and_prices_rows(asof, df)
+
+ conn.executemany("""
+ INSERT INTO krx_master (
+ ticker, name, market, market_cap,
+ is_managed, is_preferred, is_spac,
+ listed_date, updated_at
+ ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
+ ON CONFLICT(ticker) DO UPDATE SET
+ name=excluded.name, market=excluded.market,
+ market_cap=excluded.market_cap,
+ is_managed=excluded.is_managed,
+ is_preferred=excluded.is_preferred,
+ is_spac=excluded.is_spac,
+ updated_at=excluded.updated_at
+ """, master_rows)
+ conn.executemany("""
+ INSERT OR REPLACE INTO krx_daily_prices
+ (ticker, date, open, high, low, close, volume, value)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+ """, price_rows)
+
+ # 외국인/기관: 시총 상위 N종목만 (rate limit 보호)
+ if flow_top_n > 0:
+ top = sorted(master_rows, key=lambda r: r[3] or 0, reverse=True)[:flow_top_n]
+ flow_tickers = [r[0] for r in top]
+ else:
+ flow_tickers = []
+ flow_rows = _gather_flow_naver(asof, flow_tickers, rate_limit_sec=rate_limit_sec)
+ conn.executemany("""
+ INSERT OR REPLACE INTO krx_flow
+ (ticker, date, foreign_net, institution_net)
+ VALUES (?, ?, ?, ?)
+ """, flow_rows)
+ conn.commit()
+
+ return RefreshSummary(
+ asof=asof, master_count=len(master_rows),
+ prices_count=len(price_rows), flow_count=len(flow_rows),
+ failures=[],
+ ).asdict()
+
+
+def backfill(conn: sqlite3.Connection, start: dt.date, end: dt.date) -> list[dict]:
+ """5년치 일봉 백필 — 종목별 fdr.DataReader 호출. Master는 end 기준 (FDR은 historical master 미지원)."""
+ df = fetch_master_listing()
+ master_rows, _ = _master_and_prices_rows(end, df)
+ conn.executemany("""
+ INSERT INTO krx_master (
+ ticker, name, market, market_cap,
+ is_managed, is_preferred, is_spac,
+ listed_date, updated_at
+ ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
+ ON CONFLICT(ticker) DO UPDATE SET name=excluded.name
+ """, master_rows)
+
+ iso_start = start.isoformat()
+ iso_end = end.isoformat()
+ results = []
+ for r in master_rows:
+ t = r[0]
+ try:
+ ddf = fetch_ohlcv_for_ticker(t, iso_start, iso_end)
+ if ddf is None or ddf.empty:
+ continue
+ ddf = ddf.reset_index()
+ ddf["Date"] = pd.to_datetime(ddf["Date"]).dt.strftime("%Y-%m-%d")
+ rows = []
+ for _, rr in ddf.iterrows():
+ if pd.isna(rr["Close"]) or pd.isna(rr["Volume"]):
+ continue
+ rows.append((
+ t, rr["Date"],
+ int(rr["Open"]) if pd.notna(rr["Open"]) else None,
+ int(rr["High"]) if pd.notna(rr["High"]) else None,
+ int(rr["Low"]) if pd.notna(rr["Low"]) else None,
+ int(rr["Close"]),
+ int(rr["Volume"]),
+ int(rr["Close"] * rr["Volume"]),
+ ))
+ conn.executemany("""
+ INSERT OR REPLACE INTO krx_daily_prices
+ (ticker, date, open, high, low, close, volume, value)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+ """, rows)
+ results.append({"ticker": t, "count": len(rows)})
+ except Exception as e:
+ log.error("backfill failed for %s: %s", t, e)
+ results.append({"ticker": t, "error": str(e)})
+ conn.commit()
+ return results
diff --git a/stock-lab/app/screener/telegram.py b/stock-lab/app/screener/telegram.py
new file mode 100644
index 0000000..842bb69
--- /dev/null
+++ b/stock-lab/app/screener/telegram.py
@@ -0,0 +1,72 @@
+"""Telegram payload builder. Caller (agent-office) handles actual delivery."""
+
+from __future__ import annotations
+
+import datetime as dt
+
+NODE_ICONS = {
+ "foreign_buy": "👤외",
+ "volume_surge": "⚡거",
+ "momentum": "🚀모",
+ "high52w": "🆙고",
+ "rs_rating": "💪RS",
+ "ma_alignment": "📈MA",
+ "vcp_lite": "🌀VCP",
+}
+
+PAGE_BASE = "https://gahusb.synology.me/stock/screener"
+
+
+def _escape_md(s: str) -> str:
+ """Minimal MarkdownV2 escape — extend if formatting breaks."""
+ for ch in r"\_*[]()~`>#+-=|{}.!":
+ s = s.replace(ch, "\\" + ch)
+ return s
+
+
+def _format_won(n) -> str:
+ if n is None:
+ return "-"
+ return f"{int(n):,}"
+
+
+def build_telegram_payload(asof: dt.date, mode: str, survivors_count: int,
+ top_n: int, rows: list, run_id) -> dict:
+ title = "*KRX 강세주 스크리너*"
+ header = (
+ f"🎯 {title} — {_escape_md(asof.isoformat())} \\({_escape_md(mode)}\\)\n"
+ f"통과 {survivors_count}종 / Top {top_n} / 본문 1\\-10"
+ )
+
+ lines = []
+ for r in rows[:10]:
+ icons = " ".join(
+ NODE_ICONS[name] for name, sc in r["scores"].items()
+ if sc >= 70 and name in NODE_ICONS
+ )
+ score_str = f"{r['total_score']:.1f}"
+ r_pct = r.get("r_pct")
+ r_pct_str = f"{r_pct:.1f}" if r_pct is not None else "-"
+ lines.append(
+ f"{r['rank']}\\. *{_escape_md(r['name'])}* `{r['ticker']}` "
+ f"⭐ {_escape_md(score_str)}\n"
+ f" {icons}\n"
+ f" 진입 {_format_won(r.get('entry_price'))} "
+ f"손절 {_format_won(r.get('stop_price'))} "
+ f"익절 {_format_won(r.get('target_price'))} "
+ f"\\(R {_escape_md(r_pct_str)}%\\)"
+ )
+
+ # URL은 inline link로 감싸 URL 내부 . - ? = 이스케이프 회피
+ link = (
+ f"🔗 [전체 결과·11\\~20위]({PAGE_BASE}?run_id={run_id})"
+ if run_id else ""
+ )
+
+ text = header + "\n\n" + "\n\n".join(lines) + ("\n\n" + link if link else "")
+
+ return {
+ "chat_target": "default",
+ "parse_mode": "MarkdownV2",
+ "text": text,
+ }
diff --git a/stock-lab/app/test_screener_context.py b/stock-lab/app/test_screener_context.py
new file mode 100644
index 0000000..13cbc4f
--- /dev/null
+++ b/stock-lab/app/test_screener_context.py
@@ -0,0 +1,61 @@
+import datetime as dt
+import sqlite3
+
+import pandas as pd
+import pytest
+
+from app.screener.engine import ScreenContext
+from app.screener.schema import ensure_screener_schema
+from app.screener._test_fixtures import make_master, make_prices, make_flow
+
+
+@pytest.fixture
+def conn(tmp_path):
+ db_path = tmp_path / "ctx.db"
+ c = sqlite3.connect(db_path)
+ ensure_screener_schema(c)
+ yield c
+ c.close()
+
+
+def _seed(conn, master_df, prices_df, flow_df):
+ now = dt.datetime.utcnow().isoformat()
+ for t, row in master_df.iterrows():
+ conn.execute("""INSERT INTO krx_master (ticker,name,market,market_cap,
+ is_managed,is_preferred,is_spac,listed_date,updated_at)
+ VALUES (?,?,?,?,?,?,?,?,?)""",
+ (t, row["name"], row["market"], row["market_cap"],
+ row["is_managed"], row["is_preferred"], row["is_spac"], None, now))
+ prices_df.to_sql("krx_daily_prices", conn, if_exists="append", index=False)
+ flow_df.to_sql("krx_flow", conn, if_exists="append", index=False)
+ conn.commit()
+
+
+def test_load_returns_dataframes(conn):
+ asof = dt.date(2026, 5, 12)
+ _seed(conn,
+ make_master(["005930", "035420"]),
+ make_prices(["005930", "035420"], days=30, asof=asof),
+ make_flow(["005930", "035420"], days=30, asof=asof))
+
+ ctx = ScreenContext.load(conn, asof, lookback_days=30)
+
+ assert ctx.asof == asof
+ assert set(ctx.master.index) == {"005930", "035420"}
+ assert ctx.prices.shape[0] == 60 # 2 종목 × 30일
+ assert ctx.flow.shape[0] == 60
+
+
+def test_restrict_filters_tickers(conn):
+ asof = dt.date(2026, 5, 12)
+ _seed(conn,
+ make_master(["005930", "035420", "091990"]),
+ make_prices(["005930", "035420", "091990"], days=30, asof=asof),
+ make_flow(["005930", "035420", "091990"], days=30, asof=asof))
+
+ ctx = ScreenContext.load(conn, asof, lookback_days=30)
+ scoped = ctx.restrict(pd.Index(["005930"]))
+
+ assert list(scoped.master.index) == ["005930"]
+ assert (scoped.prices["ticker"] == "005930").all()
+ assert (scoped.flow["ticker"] == "005930").all()
diff --git a/stock-lab/app/test_screener_engine.py b/stock-lab/app/test_screener_engine.py
new file mode 100644
index 0000000..24288bb
--- /dev/null
+++ b/stock-lab/app/test_screener_engine.py
@@ -0,0 +1,55 @@
+import datetime as dt
+import pandas as pd
+import pytest
+
+from app.screener.engine import ScreenContext, Screener, combine
+from app.screener.nodes.hygiene import HygieneGate
+from app.screener.nodes.foreign_buy import ForeignBuy
+from app.screener.nodes.momentum import Momentum20
+from app.screener._test_fixtures import make_master, make_prices, make_flow, make_kospi
+
+
+def _ctx(master, prices, flow):
+ return ScreenContext(master=master, prices=prices, flow=flow,
+ kospi=make_kospi(days=260),
+ asof=dt.date(2026, 5, 12))
+
+
+def test_combine_weighted_average():
+ scores = {
+ "foreign_buy": pd.Series({"A": 80, "B": 20}),
+ "momentum": pd.Series({"A": 60, "B": 40}),
+ }
+ weights = {"foreign_buy": 2.0, "momentum": 1.0}
+ out = combine(scores, weights)
+ # A: (80*2 + 60*1)/3 = 73.33
+ assert abs(out["A"] - 73.333) < 0.1
+ assert abs(out["B"] - 26.666) < 0.1
+
+
+def test_combine_all_zero_weight_raises():
+ scores = {"foreign_buy": pd.Series({"A": 80})}
+ with pytest.raises(ValueError, match="no active"):
+ combine(scores, {"foreign_buy": 0})
+
+
+def test_screener_run_end_to_end():
+ asof = dt.date(2026, 5, 12)
+ master = make_master(["GOOD", "SMALL"],
+ market_caps={"GOOD": 200_000_000_000, "SMALL": 1_000_000_000})
+ prices = make_prices(["GOOD", "SMALL"], days=260, asof=asof, trend_pct=0.1)
+ flow = make_flow(["GOOD", "SMALL"], days=260, asof=asof,
+ foreign_per_day={"GOOD": 100_000_000, "SMALL": 0})
+ ctx = _ctx(master, prices, flow)
+
+ screener = Screener(
+ gate=HygieneGate(),
+ score_nodes=[ForeignBuy(), Momentum20()],
+ weights={"foreign_buy": 1.0, "momentum": 1.0},
+ node_params={"foreign_buy": {"window_days": 5}, "momentum": {"window_days": 20}},
+ gate_params={**HygieneGate.default_params, "min_listed_days": 0},
+ top_n=10,
+ )
+ result = screener.run(ctx)
+ assert result.survivors_count == 1 # SMALL은 게이트 탈락
+ assert result.ranked.index[0] == "GOOD"
diff --git a/stock-lab/app/test_screener_nodes_base.py b/stock-lab/app/test_screener_nodes_base.py
new file mode 100644
index 0000000..d3ee746
--- /dev/null
+++ b/stock-lab/app/test_screener_nodes_base.py
@@ -0,0 +1,24 @@
+import pandas as pd
+import pytest
+
+from app.screener.nodes.base import percentile_rank
+
+
+def test_percentile_rank_basic():
+ s = pd.Series([10, 20, 30, 40, 50])
+ out = percentile_rank(s)
+ assert (out >= 0).all() and (out <= 100).all()
+ assert out.iloc[0] < out.iloc[-1] # smallest gets lowest rank
+
+
+def test_percentile_rank_all_equal_returns_50():
+ s = pd.Series([42, 42, 42, 42])
+ out = percentile_rank(s)
+ assert (out == 50.0).all()
+
+
+def test_percentile_rank_handles_nan():
+ s = pd.Series([1.0, float("nan"), 3.0, 5.0])
+ out = percentile_rank(s)
+ assert pd.isna(out.iloc[1])
+ assert (out.dropna() >= 0).all()
diff --git a/stock-lab/app/test_screener_nodes_foreign_buy.py b/stock-lab/app/test_screener_nodes_foreign_buy.py
new file mode 100644
index 0000000..f795f2e
--- /dev/null
+++ b/stock-lab/app/test_screener_nodes_foreign_buy.py
@@ -0,0 +1,32 @@
+import datetime as dt
+import pandas as pd
+
+from app.screener.engine import ScreenContext
+from app.screener.nodes.foreign_buy import ForeignBuy
+from app.screener._test_fixtures import make_master, make_prices, make_flow
+
+
+def _ctx(master, prices, flow):
+ return ScreenContext(master=master, prices=prices, flow=flow,
+ kospi=pd.Series(dtype=float, name="kospi"),
+ asof=dt.date(2026, 5, 12))
+
+
+def test_higher_foreign_buy_gets_higher_score():
+ asof = dt.date(2026, 5, 12)
+ master = make_master(["A", "B"])
+ prices = make_prices(["A", "B"], days=30, asof=asof)
+ flow = make_flow(["A", "B"], days=30, asof=asof,
+ foreign_per_day={"A": 100_000_000, "B": 0})
+ out = ForeignBuy().compute(_ctx(master, prices, flow), {"window_days": 5})
+ assert out["A"] > out["B"]
+ assert 0 <= out.min() <= out.max() <= 100
+
+
+def test_all_zero_returns_50():
+ asof = dt.date(2026, 5, 12)
+ master = make_master(["A", "B"])
+ prices = make_prices(["A", "B"], days=30, asof=asof)
+ flow = make_flow(["A", "B"], days=30, asof=asof, foreign_per_day={"A": 0, "B": 0})
+ out = ForeignBuy().compute(_ctx(master, prices, flow), {"window_days": 5})
+ assert (out == 50.0).all()
diff --git a/stock-lab/app/test_screener_nodes_high52w.py b/stock-lab/app/test_screener_nodes_high52w.py
new file mode 100644
index 0000000..70f0ff1
--- /dev/null
+++ b/stock-lab/app/test_screener_nodes_high52w.py
@@ -0,0 +1,32 @@
+import datetime as dt
+import pandas as pd
+
+from app.screener.engine import ScreenContext
+from app.screener.nodes.high52w import High52WProximity
+from app.screener._test_fixtures import make_master, make_prices, make_flow
+
+
+def _ctx(master, prices, flow):
+ return ScreenContext(master=master, prices=prices, flow=flow,
+ kospi=pd.Series(dtype=float, name="kospi"),
+ asof=dt.date(2026, 5, 12))
+
+
+def test_proximity_at_high_returns_100():
+ asof = dt.date(2026, 5, 12)
+ master = make_master(["A"])
+ prices = make_prices(["A"], days=260, asof=asof, trend_pct=0.05)
+ flow = make_flow(["A"], days=260, asof=asof)
+
+ out = High52WProximity().compute(_ctx(master, prices, flow), {"window_days": 252})
+ assert out["A"] >= 95
+
+
+def test_proximity_below_70pct_returns_0():
+ asof = dt.date(2026, 5, 12)
+ master = make_master(["A"])
+ prices = make_prices(["A"], days=260, asof=asof, start_close=100000, trend_pct=-0.5)
+ flow = make_flow(["A"], days=260, asof=asof)
+
+ out = High52WProximity().compute(_ctx(master, prices, flow), {"window_days": 252})
+ assert out["A"] == 0
diff --git a/stock-lab/app/test_screener_nodes_hygiene.py b/stock-lab/app/test_screener_nodes_hygiene.py
new file mode 100644
index 0000000..b43db2e
--- /dev/null
+++ b/stock-lab/app/test_screener_nodes_hygiene.py
@@ -0,0 +1,46 @@
+import datetime as dt
+
+import pandas as pd
+
+from app.screener.nodes.hygiene import HygieneGate
+from app.screener.engine import ScreenContext
+from app.screener._test_fixtures import make_master, make_prices, make_flow
+
+
+def _ctx(master, prices, flow):
+ return ScreenContext(
+ master=master, prices=prices, flow=flow,
+ kospi=pd.Series(dtype=float, name="kospi"),
+ asof=dt.date(2026, 5, 12),
+ )
+
+
+def test_filter_excludes_small_cap():
+ g = HygieneGate()
+ ctx = _ctx(
+ make_master(["A", "B"], market_caps={"A": 1_000_000_000, "B": 100_000_000_000}),
+ make_prices(["A", "B"], days=30),
+ make_flow(["A", "B"], days=30),
+ )
+ out = g.filter(ctx, {**g.default_params, "min_listed_days": 0})
+ assert list(out) == ["B"]
+
+
+def test_filter_excludes_preferred():
+ g = HygieneGate()
+ ctx = _ctx(
+ make_master(["A", "B"], preferred={"B"}),
+ make_prices(["A", "B"], days=30),
+ make_flow(["A", "B"], days=30),
+ )
+ out = g.filter(ctx, {**g.default_params, "min_listed_days": 0})
+ assert list(out) == ["A"]
+
+
+def test_filter_excludes_low_value():
+ g = HygieneGate()
+ prices = make_prices(["A", "B"], days=30)
+ prices.loc[prices["ticker"] == "A", "value"] = 100_000 # 매우 작음
+ ctx = _ctx(make_master(["A", "B"]), prices, make_flow(["A", "B"], days=30))
+ out = g.filter(ctx, {**g.default_params, "min_listed_days": 0})
+ assert list(out) == ["B"]
diff --git a/stock-lab/app/test_screener_nodes_ma_alignment.py b/stock-lab/app/test_screener_nodes_ma_alignment.py
new file mode 100644
index 0000000..431ad3e
--- /dev/null
+++ b/stock-lab/app/test_screener_nodes_ma_alignment.py
@@ -0,0 +1,30 @@
+import datetime as dt
+import pandas as pd
+
+from app.screener.engine import ScreenContext
+from app.screener.nodes.ma_alignment import MaAlignment
+from app.screener._test_fixtures import make_master, make_prices, make_flow
+
+
+def _ctx(master, prices, flow):
+ return ScreenContext(master=master, prices=prices, flow=flow,
+ kospi=pd.Series(dtype=float, name="kospi"),
+ asof=dt.date(2026, 5, 12))
+
+
+def test_strong_uptrend_returns_100():
+ asof = dt.date(2026, 5, 12)
+ master = make_master(["UP"])
+ prices = make_prices(["UP"], days=260, asof=asof, start_close=50000, trend_pct=0.2)
+ flow = make_flow(["UP"], days=260, asof=asof)
+ out = MaAlignment().compute(_ctx(master, prices, flow), MaAlignment.default_params)
+ assert out["UP"] == 100.0
+
+
+def test_downtrend_returns_low():
+ asof = dt.date(2026, 5, 12)
+ master = make_master(["DN"])
+ prices = make_prices(["DN"], days=260, asof=asof, start_close=100000, trend_pct=-0.1)
+ flow = make_flow(["DN"], days=260, asof=asof)
+ out = MaAlignment().compute(_ctx(master, prices, flow), MaAlignment.default_params)
+ assert out["DN"] <= 20.0
diff --git a/stock-lab/app/test_screener_nodes_momentum.py b/stock-lab/app/test_screener_nodes_momentum.py
new file mode 100644
index 0000000..2e358d5
--- /dev/null
+++ b/stock-lab/app/test_screener_nodes_momentum.py
@@ -0,0 +1,24 @@
+import datetime as dt
+import pandas as pd
+
+from app.screener.engine import ScreenContext
+from app.screener.nodes.momentum import Momentum20
+from app.screener._test_fixtures import make_master, make_prices, make_flow
+
+
+def _ctx(master, prices, flow):
+ return ScreenContext(master=master, prices=prices, flow=flow,
+ kospi=pd.Series(dtype=float, name="kospi"),
+ asof=dt.date(2026, 5, 12))
+
+
+def test_higher_momentum_gets_higher_score():
+ asof = dt.date(2026, 5, 12)
+ master = make_master(["UP", "DN"])
+ up = make_prices(["UP"], days=30, asof=asof, trend_pct=0.5)
+ dn = make_prices(["DN"], days=30, asof=asof, trend_pct=-0.3)
+ prices = pd.concat([up, dn], ignore_index=True)
+ flow = make_flow(["UP", "DN"], days=30, asof=asof)
+
+ out = Momentum20().compute(_ctx(master, prices, flow), {"window_days": 20})
+ assert out["UP"] > out["DN"]
diff --git a/stock-lab/app/test_screener_nodes_rs_rating.py b/stock-lab/app/test_screener_nodes_rs_rating.py
new file mode 100644
index 0000000..ddbcaf0
--- /dev/null
+++ b/stock-lab/app/test_screener_nodes_rs_rating.py
@@ -0,0 +1,25 @@
+import datetime as dt
+import pandas as pd
+
+from app.screener.engine import ScreenContext
+from app.screener.nodes.rs_rating import RsRating
+from app.screener._test_fixtures import make_master, make_prices, make_flow, make_kospi
+
+
+def _ctx(master, prices, flow, kospi):
+ return ScreenContext(master=master, prices=prices, flow=flow,
+ kospi=kospi, asof=dt.date(2026, 5, 12))
+
+
+def test_outperformer_gets_higher_score():
+ asof = dt.date(2026, 5, 12)
+ master = make_master(["UP", "DN"])
+ up = make_prices(["UP"], days=260, asof=asof, trend_pct=0.3)
+ dn = make_prices(["DN"], days=260, asof=asof, trend_pct=-0.1)
+ prices = pd.concat([up, dn], ignore_index=True)
+ flow = make_flow(["UP", "DN"], days=260, asof=asof)
+ kospi = make_kospi(days=260, asof=asof, trend_pct=0.0)
+
+ out = RsRating().compute(_ctx(master, prices, flow, kospi),
+ RsRating.default_params)
+ assert out["UP"] > out["DN"]
diff --git a/stock-lab/app/test_screener_nodes_vcp_lite.py b/stock-lab/app/test_screener_nodes_vcp_lite.py
new file mode 100644
index 0000000..7c8c31f
--- /dev/null
+++ b/stock-lab/app/test_screener_nodes_vcp_lite.py
@@ -0,0 +1,36 @@
+import datetime as dt
+import pandas as pd
+
+from app.screener.engine import ScreenContext
+from app.screener.nodes.vcp_lite import VcpLite
+from app.screener._test_fixtures import make_master, make_prices, make_flow
+
+
+def _ctx(master, prices, flow):
+ return ScreenContext(master=master, prices=prices, flow=flow,
+ kospi=pd.Series(dtype=float, name="kospi"),
+ asof=dt.date(2026, 5, 12))
+
+
+def test_contracting_stock_scores_higher_than_expanding():
+ asof = dt.date(2026, 5, 12)
+ master = make_master(["CON", "EXP"])
+ prices = make_prices(["CON", "EXP"], days=260, asof=asof)
+
+ # CON: 최근 40일 변동성 축소 (high/low 좁힘)
+ mask_recent_con = (prices["ticker"] == "CON") & (
+ prices["date"] >= (asof - dt.timedelta(days=40)).isoformat()
+ )
+ prices.loc[mask_recent_con, "high"] = (prices.loc[mask_recent_con, "close"] * 1.003).astype(int)
+ prices.loc[mask_recent_con, "low"] = (prices.loc[mask_recent_con, "close"] * 0.997).astype(int)
+
+ # EXP: 최근 40일 변동성 확대
+ mask_recent_exp = (prices["ticker"] == "EXP") & (
+ prices["date"] >= (asof - dt.timedelta(days=40)).isoformat()
+ )
+ prices.loc[mask_recent_exp, "high"] = (prices.loc[mask_recent_exp, "close"] * 1.05).astype(int)
+ prices.loc[mask_recent_exp, "low"] = (prices.loc[mask_recent_exp, "close"] * 0.95).astype(int)
+
+ flow = make_flow(["CON", "EXP"], days=260, asof=asof)
+ out = VcpLite().compute(_ctx(master, prices, flow), VcpLite.default_params)
+ assert out["CON"] > out["EXP"]
diff --git a/stock-lab/app/test_screener_nodes_volume_surge.py b/stock-lab/app/test_screener_nodes_volume_surge.py
new file mode 100644
index 0000000..daabcf4
--- /dev/null
+++ b/stock-lab/app/test_screener_nodes_volume_surge.py
@@ -0,0 +1,28 @@
+import datetime as dt
+import pandas as pd
+
+from app.screener.engine import ScreenContext
+from app.screener.nodes.volume_surge import VolumeSurge
+from app.screener._test_fixtures import make_master, make_prices, make_flow
+
+
+def _ctx(master, prices, flow):
+ return ScreenContext(master=master, prices=prices, flow=flow,
+ kospi=pd.Series(dtype=float, name="kospi"),
+ asof=dt.date(2026, 5, 12))
+
+
+def test_recent_volume_surge_gets_higher_score():
+ asof = dt.date(2026, 5, 12)
+ master = make_master(["A", "B"])
+ prices = make_prices(["A", "B"], days=30, asof=asof)
+ # A는 최근 3일 거래량 10배로
+ mask = (prices["ticker"] == "A") & (prices["date"] >= (asof - dt.timedelta(days=3)).isoformat())
+ prices.loc[mask, "volume"] *= 10
+ flow = make_flow(["A", "B"], days=30, asof=asof)
+
+ out = VolumeSurge().compute(
+ _ctx(master, prices, flow),
+ {"baseline_days": 20, "eval_days": 3},
+ )
+ assert out["A"] > out["B"]
diff --git a/stock-lab/app/test_screener_position_sizer.py b/stock-lab/app/test_screener_position_sizer.py
new file mode 100644
index 0000000..1d0680d
--- /dev/null
+++ b/stock-lab/app/test_screener_position_sizer.py
@@ -0,0 +1,33 @@
+import datetime as dt
+import pandas as pd
+
+from app.screener.engine import ScreenContext
+from app.screener.position_sizer import compute_atr_wilder, plan_positions
+from app.screener._test_fixtures import make_master, make_prices, make_flow
+
+
+def _ctx(master, prices, flow):
+ return ScreenContext(master=master, prices=prices, flow=flow,
+ kospi=pd.Series(dtype=float, name="kospi"),
+ asof=dt.date(2026, 5, 12))
+
+
+def test_atr_wilder_positive_and_smooth():
+ df = make_prices(["A"], days=30)
+ atr = compute_atr_wilder(df[df["ticker"] == "A"], window=14)
+ assert atr > 0
+
+
+def test_plan_positions_returns_entry_stop_target():
+ asof = dt.date(2026, 5, 12)
+ master = make_master(["A"])
+ prices = make_prices(["A"], days=30, asof=asof, start_close=50000)
+ flow = make_flow(["A"], days=30, asof=asof)
+ ctx = _ctx(master, prices, flow)
+ sizing = plan_positions(ctx, ["A"], {"atr_window": 14, "atr_stop_mult": 2.0, "rr_ratio": 2.0})
+
+ row = sizing["A"]
+ assert row["entry_price"] > 0
+ assert row["stop_price"] < row["entry_price"]
+ assert row["target_price"] > row["entry_price"]
+ assert row["atr14"] > 0
diff --git a/stock-lab/app/test_screener_router.py b/stock-lab/app/test_screener_router.py
new file mode 100644
index 0000000..dd444d2
--- /dev/null
+++ b/stock-lab/app/test_screener_router.py
@@ -0,0 +1,154 @@
+import os
+import sqlite3
+import pytest
+from fastapi.testclient import TestClient
+
+from app.screener.schema import ensure_screener_schema
+
+
+@pytest.fixture(autouse=True)
+def isolated_db(tmp_path, monkeypatch):
+ db_path = tmp_path / "screener_router.db"
+ c = sqlite3.connect(db_path)
+ ensure_screener_schema(c)
+ c.close()
+ monkeypatch.setenv("STOCK_DB_PATH", str(db_path))
+
+
+@pytest.fixture
+def client():
+ from app.main import app
+ return TestClient(app)
+
+
+def test_get_nodes_lists_7_score_and_1_gate(client):
+ r = client.get("/api/stock/screener/nodes")
+ assert r.status_code == 200
+ body = r.json()
+ assert len(body["score_nodes"]) == 7
+ assert len(body["gate_nodes"]) == 1
+ assert {n["name"] for n in body["score_nodes"]} == {
+ "foreign_buy", "volume_surge", "momentum",
+ "high52w", "rs_rating", "ma_alignment", "vcp_lite",
+ }
+
+
+def test_settings_get_returns_defaults(client):
+ r = client.get("/api/stock/screener/settings")
+ assert r.status_code == 200
+ body = r.json()
+ assert body["weights"]["foreign_buy"] == 1.0
+ assert body["top_n"] == 20
+
+
+def test_settings_put_then_get_round_trip(client):
+ new_settings = {
+ "weights": {"foreign_buy": 2.5, "momentum": 1.0, "volume_surge": 1.0,
+ "high52w": 1.2, "rs_rating": 1.2, "ma_alignment": 1.0, "vcp_lite": 0.8},
+ "node_params": {"foreign_buy": {"window_days": 7}},
+ "gate_params": {"min_market_cap_won": 100_000_000_000,
+ "min_avg_value_won": 500_000_000,
+ "min_listed_days": 60,
+ "skip_managed": True, "skip_preferred": True, "skip_spac": True,
+ "skip_halted_days": 3},
+ "top_n": 30,
+ "rr_ratio": 2.5,
+ "atr_window": 14,
+ "atr_stop_mult": 2.0,
+ }
+ r = client.put("/api/stock/screener/settings", json=new_settings)
+ assert r.status_code == 200
+ r2 = client.get("/api/stock/screener/settings")
+ body = r2.json()
+ assert body["weights"]["foreign_buy"] == 2.5
+ assert body["top_n"] == 30
+
+
+# ---- /run tests ----
+
+from app.screener._test_fixtures import make_master, make_prices, make_flow
+
+
+def _seed_min(conn, asof_iso="2026-05-12"):
+ import datetime as dt
+ now = dt.datetime.utcnow().isoformat()
+ rows = [
+ ("BIG1", "큰주식1", "KOSPI", 200_000_000_000, 0, 0, 0, None, now),
+ ("BIG2", "큰주식2", "KOSPI", 100_000_000_000, 0, 0, 0, None, now),
+ ("SMALL", "작은주식", "KOSPI", 1_000_000_000, 0, 0, 0, None, now),
+ ]
+ for r in rows:
+ conn.execute("""INSERT INTO krx_master (ticker,name,market,market_cap,
+ is_managed,is_preferred,is_spac,listed_date,updated_at)
+ VALUES (?,?,?,?,?,?,?,?,?)""", r)
+ asof = dt.date(2026, 5, 12)
+ p = make_prices(["BIG1", "BIG2", "SMALL"], days=260, asof=asof)
+ f = make_flow(["BIG1", "BIG2", "SMALL"], days=260, asof=asof,
+ foreign_per_day={"BIG1": 100_000_000, "BIG2": 50_000_000, "SMALL": 0})
+ p.to_sql("krx_daily_prices", conn, if_exists="append", index=False)
+ f.to_sql("krx_flow", conn, if_exists="append", index=False)
+ conn.commit()
+
+
+def test_run_preview_no_save(client):
+ db_path = os.environ["STOCK_DB_PATH"]
+ c = sqlite3.connect(db_path)
+ _seed_min(c)
+ c.close()
+
+ r = client.post("/api/stock/screener/run", json={"mode": "preview", "asof": "2026-05-12"})
+ assert r.status_code == 200
+ body = r.json()
+ assert body["status"] == "success"
+ assert body["run_id"] is None
+ assert body["telegram_payload"] is not None
+
+ c = sqlite3.connect(db_path)
+ cnt = c.execute("SELECT count(*) FROM screener_runs").fetchone()[0]
+ assert cnt == 0
+
+
+def test_run_manual_save_writes_row(client):
+ db_path = os.environ["STOCK_DB_PATH"]
+ c = sqlite3.connect(db_path)
+ _seed_min(c)
+ c.close()
+
+ r = client.post("/api/stock/screener/run",
+ json={"mode": "manual_save", "asof": "2026-05-12"})
+ assert r.status_code == 200
+ assert r.json()["run_id"] is not None
+
+ c = sqlite3.connect(db_path)
+ cnt = c.execute("SELECT count(*) FROM screener_runs").fetchone()[0]
+ assert cnt == 1
+
+
+def test_runs_list_and_detail(client):
+ db_path = os.environ["STOCK_DB_PATH"]
+ c = sqlite3.connect(db_path)
+ _seed_min(c)
+ c.close()
+
+ saved = client.post(
+ "/api/stock/screener/run",
+ json={"mode": "manual_save", "asof": "2026-05-12"},
+ ).json()
+ run_id = saved["run_id"]
+
+ list_r = client.get("/api/stock/screener/runs?limit=5")
+ assert list_r.status_code == 200
+ assert any(r["id"] == run_id for r in list_r.json())
+
+ detail = client.get(f"/api/stock/screener/runs/{run_id}")
+ assert detail.status_code == 200
+ assert detail.json()["meta"]["id"] == run_id
+ assert isinstance(detail.json()["results"], list)
+
+
+def test_run_holiday_returns_skipped(client):
+ # 2026-05-09는 토요일 (주말). _is_holiday 가 weekday>=5를 잡음.
+ r = client.post("/api/stock/screener/run",
+ json={"mode": "auto", "asof": "2026-05-09"})
+ assert r.status_code == 200
+ assert r.json()["status"] == "skipped_holiday"
diff --git a/stock-lab/app/test_screener_schema.py b/stock-lab/app/test_screener_schema.py
new file mode 100644
index 0000000..46cf9d0
--- /dev/null
+++ b/stock-lab/app/test_screener_schema.py
@@ -0,0 +1,37 @@
+import sqlite3
+from app.screener.schema import ensure_screener_schema
+
+
+def test_creates_all_tables(tmp_path):
+ db_path = tmp_path / "test.db"
+ conn = sqlite3.connect(db_path)
+ ensure_screener_schema(conn)
+
+ tables = {r[0] for r in conn.execute(
+ "SELECT name FROM sqlite_master WHERE type='table'"
+ ).fetchall()}
+
+ expected = {
+ "krx_master", "krx_daily_prices", "krx_flow",
+ "screener_settings", "screener_runs", "screener_results",
+ }
+ assert expected.issubset(tables)
+
+
+def test_settings_seeded_with_singleton_row(tmp_path):
+ db_path = tmp_path / "test.db"
+ conn = sqlite3.connect(db_path)
+ ensure_screener_schema(conn)
+
+ rows = conn.execute("SELECT id FROM screener_settings").fetchall()
+ assert rows == [(1,)]
+
+
+def test_idempotent(tmp_path):
+ db_path = tmp_path / "test.db"
+ conn = sqlite3.connect(db_path)
+ ensure_screener_schema(conn)
+ ensure_screener_schema(conn) # 두 번 호출해도 에러 없어야 함
+
+ rows = conn.execute("SELECT count(*) FROM screener_settings").fetchall()
+ assert rows == [(1,)]
diff --git a/stock-lab/app/test_screener_snapshot.py b/stock-lab/app/test_screener_snapshot.py
new file mode 100644
index 0000000..224a716
--- /dev/null
+++ b/stock-lab/app/test_screener_snapshot.py
@@ -0,0 +1,129 @@
+import datetime as dt
+import sqlite3
+
+import pandas as pd
+import pytest
+
+from app.screener import snapshot as snap
+from app.screener.schema import ensure_screener_schema
+
+
+@pytest.fixture
+def conn(tmp_path):
+ db_path = tmp_path / "snap.db"
+ c = sqlite3.connect(db_path)
+ ensure_screener_schema(c)
+ yield c
+ c.close()
+
+
+def _stub_listing(monkeypatch):
+ df = pd.DataFrame([
+ {"Code": "005930", "Name": "삼성전자", "Market": "KOSPI",
+ "Marcap": 420_000_000_000_000,
+ "Open": 70000, "High": 72000, "Low": 69500, "Close": 71000,
+ "Volume": 12_000_000, "Amount": 840_000_000_000},
+ {"Code": "035420", "Name": "NAVER", "Market": "KOSPI",
+ "Marcap": 30_000_000_000_000,
+ "Open": 215000, "High": 220000, "Low": 213000, "Close": 218000,
+ "Volume": 1_000_000, "Amount": 218_000_000_000},
+ {"Code": "091990", "Name": "셀트리온헬스케어우", "Market": "KOSDAQ",
+ "Marcap": 10_000_000_000_000,
+ "Open": 60000, "High": 61000, "Low": 59500, "Close": 60500,
+ "Volume": 500_000, "Amount": 30_250_000_000},
+ ])
+ monkeypatch.setattr(snap, "fetch_master_listing", lambda: df)
+
+
+def _stub_flow(monkeypatch, mapping):
+ def fake_flow(ticker, *, client):
+ if mapping is None:
+ return None
+ v = mapping.get(ticker)
+ if v is None:
+ return None
+ return {
+ "date": dt.date(2026, 5, 12).isoformat(),
+ "foreign_net": v["foreign_net"],
+ "institution_net": v["institution_net"],
+ }
+ monkeypatch.setattr(snap, "fetch_flow_naver", fake_flow)
+
+
+def test_refresh_daily_writes_master_and_prices(conn, monkeypatch):
+ _stub_listing(monkeypatch)
+ _stub_flow(monkeypatch, None)
+ summary = snap.refresh_daily(conn, dt.date(2026, 5, 12),
+ flow_top_n=10, rate_limit_sec=0)
+ assert summary["master_count"] == 3
+ assert summary["prices_count"] == 3
+ assert summary["flow_count"] == 0
+
+ row = conn.execute(
+ "SELECT close FROM krx_daily_prices WHERE ticker='005930' AND date='2026-05-12'"
+ ).fetchone()
+ assert row[0] == 71000
+
+
+def test_refresh_daily_writes_flow_for_top_n(conn, monkeypatch):
+ _stub_listing(monkeypatch)
+ _stub_flow(monkeypatch, {
+ "005930": {"foreign_net": 12_000_000_000, "institution_net": 4_000_000_000},
+ "035420": {"foreign_net": -3_000_000_000, "institution_net": 8_000_000_000},
+ })
+ summary = snap.refresh_daily(conn, dt.date(2026, 5, 12),
+ flow_top_n=2, rate_limit_sec=0)
+ assert summary["flow_count"] == 2
+ row = conn.execute(
+ "SELECT foreign_net FROM krx_flow WHERE ticker='005930'"
+ ).fetchone()
+ assert row[0] == 12_000_000_000
+
+
+def test_master_flags_preferred(conn, monkeypatch):
+ _stub_listing(monkeypatch)
+ _stub_flow(monkeypatch, None)
+ snap.refresh_daily(conn, dt.date(2026, 5, 12), flow_top_n=0, rate_limit_sec=0)
+ pref = conn.execute(
+ "SELECT is_preferred FROM krx_master WHERE ticker='091990'"
+ ).fetchone()
+ assert pref[0] == 1
+
+
+def test_refresh_daily_is_idempotent(conn, monkeypatch):
+ _stub_listing(monkeypatch)
+ _stub_flow(monkeypatch, None)
+ snap.refresh_daily(conn, dt.date(2026, 5, 12), flow_top_n=0, rate_limit_sec=0)
+ snap.refresh_daily(conn, dt.date(2026, 5, 12), flow_top_n=0, rate_limit_sec=0)
+ cnt = conn.execute(
+ "SELECT count(*) FROM krx_daily_prices WHERE date='2026-05-12'"
+ ).fetchone()[0]
+ assert cnt == 3
+
+
+def test_fetch_flow_naver_parses_html():
+ """Real HTML structure parse with synthetic naver-like markup."""
+ html = """
+
| 날짜 | ||||||||
|---|---|---|---|---|---|---|---|---|
| 2026.05.12 | 71,000 | 500 | 0.71% | +12,000,000 | 4,000,000,000 | 12,000,000,000 | +1 | 53.0 |
| 2026.05.09 | 70,500 | -200 | -0.28% | +10,000,000 | 2,000,000,000 | 5,000,000,000 | +1 | 52.8 |