23 Commits

Author SHA1 Message Date
4a333434ac Merge feature/stock-screener-board: Stock Screener Board MVP (backend + agent-office)
stock-lab:
- pykrx→FDR/네이버 데이터 전환 (KRX 인증 회피)
- 스키마 7테이블 + 디폴트 시드
- snapshot.py (FDR 마스터·일봉 + 네이버 외국인 수급, 시총 상위 500종목)
- ScreenContext, ScoreNode/GateNode 추상, percentile_rank
- 게이트 1 (HygieneGate) + 점수 노드 7 (ForeignBuy/VolumeSurge/Momentum20/
  High52WProximity/RsRating/MaAlignment/VcpLite)
- Screener 엔진 + combine + position_sizer (ATR Wilder) + telegram 빌더
- FastAPI 라우터: /nodes, /settings, /run (preview/manual_save/auto),
  /snapshot/refresh, /runs (리스트·상세), 공휴일·주말 skipped_holiday

agent-office:
- StockAgent.on_screener_schedule + run_screener 명령
- 평일 16:30 KST APScheduler cron (Asia/Seoul)
- service_proxy 헬퍼, send_raw parse_mode 확장 (MarkdownV2 지원)
- 5 신규 테스트, 38 회귀 통과
2026-05-13 07:23:43 +09:00
119ac88e1e feat(agent-office): stock screener 평일 16:30 KST 자동 잡 + 텔레그램 전송
- StockAgent.on_screener_schedule: snapshot/refresh → screener/run(mode=auto)
  → telegram_payload(MarkdownV2) 발송. skipped_holiday는 무발신,
  실패 시 운영자 HTML 알림.
- service_proxy: refresh_screener_snapshot, run_stock_screener 추가
  (각각 180s timeout, STOCK_LAB_URL 기존 env 재사용).
- telegram.messaging.send_raw: parse_mode 파라미터 추가
  (기본 HTML 유지, MarkdownV2 페이로드 직접 전달용).
- scheduler: cron day_of_week=mon-fri hour=16 minute=30 id=stock_screener
  (Asia/Seoul TZ).
- on_command 'run_screener' 수동 트리거 추가.
- tests: 성공/휴일/스냅샷실패/run실패/이상status 5케이스.
2026-05-12 14:54:24 +09:00
c4cb18a25c feat(stock-lab): /run mode=auto 공휴일·주말 skipped_holiday 처리 2026-05-12 13:49:45 +09:00
50e811c5dd feat(stock-lab): /snapshot/refresh + /runs 리스트·상세 라우터 2026-05-12 13:47:16 +09:00
5ec7c2461b feat(stock-lab): /run 엔드포인트 — preview/manual_save/auto 모드 매트릭스 2026-05-12 13:44:21 +09:00
5f0fed7f13 feat(stock-lab): /nodes + /settings 라우터 + main.py include
- screener/router.py: APIRouter prefix=/api/stock/screener
  - GET /nodes: NODE_REGISTRY + GATE_REGISTRY 메타 노출 (7 score + 1 gate)
  - GET /settings: screener_settings 싱글톤 row 조회
  - PUT /settings: 가중치/노드/게이트 파라미터 round-trip
- main.py: screener_router include (FastAPI 생성 직후)
- db.py: STOCK_DB_PATH 환경변수 지원 (테스트 격리, 기본값 /app/data/stock.db 유지)
- test_screener_router.py: 3 tests (nodes list, settings GET, PUT round-trip)
2026-05-12 13:41:24 +09:00
070f2de3f1 feat(stock-lab): screener Pydantic 스키마 2026-05-12 13:37:23 +09:00
01ebd2e7d9 feat(stock-lab): telegram.py 메시지 빌더 (Top10 + 아이콘 + 페이지 링크) 2026-05-12 09:34:53 +09:00
7db9869722 feat(stock-lab): Screener 엔진 + combine + ScreenerResult + 노드 레지스트리
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 09:29:10 +09:00
97cb38ca7f feat(stock-lab): position_sizer — ATR Wilder + entry/stop/target 2026-05-12 09:25:49 +09:00
90c408aa77 feat(stock-lab): VcpLite 노드 — 변동성 수축률 백분위 2026-05-12 09:07:59 +09:00
55f2fa9cff feat(stock-lab): MaAlignment 노드 — 이평선 정배열 5조건 룰 점수 2026-05-12 09:06:45 +09:00
3ded781059 feat(stock-lab): RsRating 노드 — IBD 가중 시장초과수익 백분위 2026-05-12 09:02:28 +09:00
4eaeea9833 feat(stock-lab): High52WProximity 노드 — 신고가 대비 근접도 룰 점수 2026-05-12 08:59:55 +09:00
9709e5b019 feat(stock-lab): Momentum20 노드 — N일 수익률 백분위 2026-05-12 08:58:43 +09:00
94d6a39ce8 feat(stock-lab): VolumeSurge 노드 — log(최근/평균) 거래량 급증 2026-05-12 08:54:47 +09:00
804fdcba26 feat(stock-lab): ForeignBuy 노드 — 외국인 N일 누적 순매수 강도 2026-05-12 08:19:44 +09:00
779e78405e feat(stock-lab): HygieneGate — 위생 필터 (시총/거래대금/우선주/관리종목) 2026-05-12 07:59:32 +09:00
16a651f670 feat(stock-lab): ScoreNode/GateNode 추상 + percentile_rank 유틸 2026-05-12 07:52:01 +09:00
e508b7dc35 feat(stock-lab): ScreenContext.load/restrict + 합성 픽스쳐 2026-05-12 07:49:15 +09:00
6c5481971b feat(stock-lab): FDR 종목 마스터+일봉 + naver 외국인 수급 (snapshot) 2026-05-12 07:41:40 +09:00
d7e235c008 feat(stock-lab): screener 스키마 7테이블 + 디폴트 설정 시드 2026-05-12 04:10:36 +09:00
8707d322e4 chore(stock-lab): FDR/네이버 데이터 의존성 + screener 패키지 골격 2026-05-12 04:07:52 +09:00
44 changed files with 2732 additions and 6 deletions

View File

@@ -119,7 +119,125 @@ class StockAgent(BaseAgent):
update_task_status(task_id, "failed", {"error": str(e)})
await self.transition("idle", f"오류: {e}")
async def on_screener_schedule(self) -> None:
"""KRX 강세주 스크리너 자동 잡 (평일 16:30 KST).
흐름:
1) snapshot/refresh — 일봉 갱신 (실패해도 진행, 경고 로그)
2) screener/run mode='auto' — 실행 + 결과 영구화 + telegram_payload 응답
3) status=='skipped_holiday' → 종료 (텔레그램 미발신)
4) status=='success' → telegram_payload.text 를 parse_mode 그대로 전송
5) 예외/실패 → 운영자에게 별도 텔레그램 알림 (HTML)
"""
if self.state not in ("idle", "break"):
return
task_id = create_task(self.agent_id, "screener_run", {"mode": "auto"})
await self.transition("working", "스크리너 스냅샷 갱신 중...", task_id)
try:
# 1) 스냅샷 갱신 — 실패해도 기존 일봉 데이터로 진행
try:
snap = await service_proxy.refresh_screener_snapshot()
add_log(
self.agent_id,
f"snapshot refreshed: status={snap.get('status', '?')}",
"info", task_id,
)
except Exception as e:
add_log(
self.agent_id,
f"스냅샷 갱신 실패 (기존 데이터로 진행): {e}",
"warning", task_id,
)
await self.transition("working", "스크리너 실행 중...")
# 2) 스크리너 실행
body = await service_proxy.run_stock_screener(mode="auto")
status = body.get("status")
asof = body.get("asof")
# 3) 공휴일 — 종료
if status == "skipped_holiday":
update_task_status(task_id, "succeeded", {
"status": status,
"asof": asof,
"telegram_sent": False,
})
add_log(self.agent_id, f"스크리너 건너뜀 (휴일): {asof}", "info", task_id)
await self.transition("idle", "휴일 — 스크리너 건너뜀")
return
# 4) 성공 → 텔레그램 전송
if status == "success":
payload = body.get("telegram_payload") or {}
text = payload.get("text") or ""
parse_mode = payload.get("parse_mode", "MarkdownV2")
if not text:
raise RuntimeError("telegram_payload.text 누락")
await self.transition("reporting", "스크리너 결과 전송 중...")
from ..telegram.messaging import send_raw
tg = await send_raw(text, parse_mode=parse_mode)
update_task_status(task_id, "succeeded", {
"status": status,
"asof": asof,
"run_id": body.get("run_id"),
"survivors_count": body.get("survivors_count"),
"telegram_sent": tg.get("ok", False),
"telegram_message_id": tg.get("message_id"),
})
if not tg.get("ok"):
desc = tg.get("description") or "unknown"
code = tg.get("error_code")
add_log(
self.agent_id,
f"Screener telegram send failed: [{code}] {desc}",
"warning", task_id,
)
if self._ws_manager:
await self._ws_manager.send_notification(
self.agent_id, "telegram_failed", task_id,
"스크리너 텔레그램 전송 실패",
)
await self.transition("idle", "스크리너 완료")
return
# 5) 기타 status — failed 취급
raise RuntimeError(f"unexpected screener status: {status}")
except Exception as e:
err_msg = str(e)
add_log(self.agent_id, f"Screener job failed: {err_msg}", "error", task_id)
update_task_status(task_id, "failed", {"error": err_msg})
# 운영자 알림 — 기본 HTML parse_mode 사용
try:
from ..telegram.messaging import send_raw
await send_raw(
f"⚠️ <b>KRX 스크리너 실패</b>\n"
f"<code>{html.escape(err_msg)[:500]}</code>"
)
except Exception as notify_err:
add_log(
self.agent_id,
f"operator notify failed: {notify_err}",
"warning", task_id,
)
await self.transition("idle", f"스크리너 오류: {err_msg[:80]}")
async def on_command(self, command: str, params: dict) -> dict:
if command == "run_screener":
await self.on_screener_schedule()
return {"ok": True, "message": "스크리너 실행 트리거 완료"}
if command == "test_telegram":
from ..telegram import send_agent_message
result = await send_agent_message(

View File

@@ -14,6 +14,11 @@ async def _run_stock_schedule():
if agent:
await agent.on_schedule()
async def _run_stock_screener():
agent = AGENT_REGISTRY.get("stock")
if agent:
await agent.on_screener_schedule()
async def _run_blog_schedule():
agent = AGENT_REGISTRY.get("blog")
if agent:
@@ -41,6 +46,14 @@ async def _poll_pipelines():
def init_scheduler():
scheduler.add_job(_run_stock_schedule, "cron", hour=7, minute=30, id="stock_news")
scheduler.add_job(
_run_stock_screener,
"cron",
day_of_week="mon-fri",
hour=16,
minute=30,
id="stock_screener",
)
scheduler.add_job(_run_blog_schedule, "cron", hour=10, minute=0, id="blog_pipeline")
scheduler.add_job(_run_lotto_schedule, "cron", day_of_week="mon", hour=9, minute=0, id="lotto_curate")
scheduler.add_job(_run_youtube_research, "cron", hour=9, minute=0, id="youtube_research")

View File

@@ -32,6 +32,34 @@ async def summarize_stock_news(limit: int = 15) -> Dict[str, Any]:
return resp.json()
async def refresh_screener_snapshot() -> Dict[str, Any]:
"""stock-lab의 KRX 일봉 스냅샷 갱신 (스크리너 실행 전 호출).
네이버 금융 일괄 다운로드라 보통 30~120s, 여유있게 180s.
"""
async with httpx.AsyncClient(timeout=180.0) as client:
resp = await client.post(f"{STOCK_LAB_URL}/api/stock/screener/snapshot/refresh")
resp.raise_for_status()
return resp.json()
async def run_stock_screener(mode: str = "auto") -> Dict[str, Any]:
"""stock-lab의 스크리너 실행.
반환 status:
- 'skipped_holiday': 공휴일/주말 — telegram_payload 없음
- 'success': telegram_payload 동봉
엔진 자체는 수 초 내 끝나지만, 컨텍스트 로드+200종목 처리 여유 180s.
"""
async with httpx.AsyncClient(timeout=180.0) as client:
resp = await client.post(
f"{STOCK_LAB_URL}/api/stock/screener/run",
json={"mode": mode},
)
resp.raise_for_status()
return resp.json()
async def scrape_stock_news() -> Dict[str, Any]:
"""stock-lab의 수동 뉴스 스크랩 트리거 — DB에 최신 뉴스 저장.

View File

@@ -8,14 +8,22 @@ from .client import _enabled, api_call
from .formatter import MessageKind, format_agent_message
async def send_raw(text: str, reply_markup: Optional[dict] = None, chat_id: Optional[str] = None) -> dict:
"""가장 저수준. 원문 텍스트 그대로 전송. chat_id 생략 시 기본 TELEGRAM_CHAT_ID로."""
async def send_raw(
text: str,
reply_markup: Optional[dict] = None,
chat_id: Optional[str] = None,
parse_mode: str = "HTML",
) -> dict:
"""가장 저수준. 원문 텍스트 그대로 전송. chat_id 생략 시 기본 TELEGRAM_CHAT_ID로.
parse_mode: 기본 'HTML'. MarkdownV2 페이로드(예: 스크리너) 전송 시 명시 지정.
"""
if not _enabled():
return {"ok": False, "message_id": None}
payload = {
"chat_id": chat_id or TELEGRAM_CHAT_ID,
"text": text,
"parse_mode": "HTML",
"parse_mode": parse_mode,
}
if reply_markup:
payload["reply_markup"] = reply_markup

View File

@@ -0,0 +1,177 @@
"""StockAgent.on_screener_schedule — 평일 16:30 KST 자동 잡 단위 테스트.
stock-lab HTTP 호출은 service_proxy mock, 텔레그램은 messaging.send_raw mock.
"""
import os
import sys
import tempfile
_fd, _TMP = tempfile.mkstemp(suffix=".db")
os.close(_fd)
os.unlink(_TMP)
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import asyncio
from unittest.mock import AsyncMock, patch
import pytest
@pytest.fixture(autouse=True)
def _init_db():
import gc
gc.collect()
if os.path.exists(_TMP):
os.remove(_TMP)
from app.db import init_db
init_db()
yield
gc.collect()
def _success_body(asof="2026-05-12"):
return {
"asof": asof,
"mode": "auto",
"status": "success",
"run_id": 42,
"survivors_count": 600,
"top_n": 20,
"results": [],
"telegram_payload": {
"chat_target": "default",
"parse_mode": "MarkdownV2",
"text": "*KRX 강세주 스크리너* test body",
},
"warnings": [],
}
def _holiday_body(asof="2026-05-05"):
return {
"asof": asof,
"mode": "auto",
"status": "skipped_holiday",
"run_id": None,
"survivors_count": None,
"top_n": 0,
"results": [],
"telegram_payload": None,
"warnings": [f"{asof} is a holiday — skipped"],
}
def test_screener_success_sends_markdownv2_telegram():
from app.agents.stock import StockAgent
from app import service_proxy
from app.telegram import messaging
fake_snap = AsyncMock(return_value={"status": "ok"})
fake_run = AsyncMock(return_value=_success_body())
fake_send = AsyncMock(return_value={"ok": True, "message_id": 7777})
with patch.object(service_proxy, "refresh_screener_snapshot", fake_snap), \
patch.object(service_proxy, "run_stock_screener", fake_run), \
patch.object(messaging, "send_raw", fake_send):
agent = StockAgent()
asyncio.run(agent.on_screener_schedule())
fake_snap.assert_awaited_once()
fake_run.assert_awaited_once_with(mode="auto")
fake_send.assert_awaited_once()
args, kwargs = fake_send.call_args
# 첫 인자(text) 또는 kwargs로 전달
text = args[0] if args else kwargs.get("text")
assert "KRX 강세주 스크리너" in text
assert kwargs.get("parse_mode") == "MarkdownV2"
assert agent.state == "idle"
def test_screener_holiday_skips_telegram():
from app.agents.stock import StockAgent
from app import service_proxy
from app.telegram import messaging
fake_snap = AsyncMock(return_value={"status": "skipped_weekend"})
fake_run = AsyncMock(return_value=_holiday_body())
fake_send = AsyncMock(return_value={"ok": True, "message_id": 1})
with patch.object(service_proxy, "refresh_screener_snapshot", fake_snap), \
patch.object(service_proxy, "run_stock_screener", fake_run), \
patch.object(messaging, "send_raw", fake_send):
agent = StockAgent()
asyncio.run(agent.on_screener_schedule())
fake_run.assert_awaited_once()
# 휴일이면 텔레그램 미발신
fake_send.assert_not_awaited()
assert agent.state == "idle"
def test_screener_snapshot_failure_still_runs_screener():
"""스냅샷 실패는 경고만 남기고 screener 호출은 계속됨."""
from app.agents.stock import StockAgent
from app import service_proxy
from app.telegram import messaging
fake_snap = AsyncMock(side_effect=RuntimeError("snapshot upstream down"))
fake_run = AsyncMock(return_value=_success_body())
fake_send = AsyncMock(return_value={"ok": True, "message_id": 8888})
with patch.object(service_proxy, "refresh_screener_snapshot", fake_snap), \
patch.object(service_proxy, "run_stock_screener", fake_run), \
patch.object(messaging, "send_raw", fake_send):
agent = StockAgent()
asyncio.run(agent.on_screener_schedule())
fake_snap.assert_awaited_once()
fake_run.assert_awaited_once_with(mode="auto")
fake_send.assert_awaited_once()
def test_screener_run_failure_notifies_operator():
"""screener/run 실패 시 운영자 알림 텔레그램 발송."""
from app.agents.stock import StockAgent
from app import service_proxy
from app.telegram import messaging
fake_snap = AsyncMock(return_value={"status": "ok"})
fake_run = AsyncMock(side_effect=RuntimeError("stock-lab 500"))
fake_send = AsyncMock(return_value={"ok": True, "message_id": 1})
with patch.object(service_proxy, "refresh_screener_snapshot", fake_snap), \
patch.object(service_proxy, "run_stock_screener", fake_run), \
patch.object(messaging, "send_raw", fake_send):
agent = StockAgent()
asyncio.run(agent.on_screener_schedule())
# 운영자 알림 1회는 호출
assert fake_send.await_count == 1
args, kwargs = fake_send.call_args
text = args[0] if args else kwargs.get("text")
assert "스크리너 실패" in text
assert agent.state == "idle"
def test_screener_unexpected_status_treated_as_failure():
from app.agents.stock import StockAgent
from app import service_proxy
from app.telegram import messaging
fake_snap = AsyncMock(return_value={"status": "ok"})
fake_run = AsyncMock(return_value={"status": "weird", "asof": "2026-05-12"})
fake_send = AsyncMock(return_value={"ok": True, "message_id": 1})
with patch.object(service_proxy, "refresh_screener_snapshot", fake_snap), \
patch.object(service_proxy, "run_stock_screener", fake_run), \
patch.object(messaging, "send_raw", fake_send):
agent = StockAgent()
asyncio.run(agent.on_screener_schedule())
# 운영자 알림 1회 + screener payload 미발송
assert fake_send.await_count == 1
args, kwargs = fake_send.call_args
text = args[0] if args else kwargs.get("text")
assert "스크리너 실패" in text

View File

@@ -3,11 +3,16 @@ import os
import hashlib
from typing import List, Dict, Any, Optional
DB_PATH = "/app/data/stock.db"
from app.screener.schema import ensure_screener_schema
DB_PATH = os.environ.get("STOCK_DB_PATH", "/app/data/stock.db")
def _conn() -> sqlite3.Connection:
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
db_path = os.environ.get("STOCK_DB_PATH", DB_PATH)
parent = os.path.dirname(db_path)
if parent:
os.makedirs(parent, exist_ok=True)
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
return conn
@@ -96,6 +101,9 @@ def init_db():
if "commission" not in sh_cols:
conn.execute("ALTER TABLE sell_history ADD COLUMN commission REAL NOT NULL DEFAULT 0")
# Screener 스키마 부트스트랩 (7테이블 + 디폴트 설정 시드)
ensure_screener_schema(conn)
def save_articles(articles: List[Dict[str, str]]) -> int:
count = 0
with _conn() as conn:

View File

@@ -27,6 +27,10 @@ from .ai_summarizer import summarize_news, OllamaError
app = FastAPI()
# Screener 라우터 등록
from app.screener.router import router as screener_router
app.include_router(screener_router)
# CORS 설정 (프론트엔드 접근 허용)
_cors_origins = os.getenv("CORS_ALLOW_ORIGINS", "http://localhost:3007,http://localhost:8080").split(",")
app.add_middleware(

View File

@@ -0,0 +1,12 @@
"""Stock screener — KRX 강세주 분석 노드 기반 보드.
See docs/superpowers/specs/2026-05-12-stock-screener-board-design.md
"""
from .engine import Screener, ScreenContext, ScreenerResult
from .registry import NODE_REGISTRY, GATE_REGISTRY
__all__ = [
"Screener", "ScreenContext", "ScreenerResult",
"NODE_REGISTRY", "GATE_REGISTRY",
]

View File

@@ -0,0 +1,76 @@
"""Synthetic fixtures for screener tests — no DB / no FDR / no naver."""
import datetime as dt
import pandas as pd
def make_master(tickers: list[str], market_caps: dict | None = None,
preferred: set | None = None, managed: set | None = None) -> pd.DataFrame:
market_caps = market_caps or {t: 100_000_000_000 for t in tickers}
preferred = preferred or set()
managed = managed or set()
return pd.DataFrame([
{
"ticker": t,
"name": f"테스트{t}",
"market": "KOSPI",
"market_cap": market_caps.get(t),
"is_managed": int(t in managed),
"is_preferred": int(t in preferred),
"is_spac": 0,
"listed_date": None,
}
for t in tickers
]).set_index("ticker")
def make_prices(tickers: list[str], days: int = 260, start_close: int = 50000,
trend_pct: float = 0.0,
asof: dt.date = dt.date(2026, 5, 12)) -> pd.DataFrame:
"""trend_pct: 일별 종가 등락률(%). 양수면 상승 추세."""
rows = []
for t in tickers:
close = start_close
for i in range(days):
day_idx = days - 1 - i # asof가 마지막
date = asof - dt.timedelta(days=day_idx)
high = int(close * 1.012)
low = int(close * 0.988)
rows.append({
"ticker": t, "date": date.isoformat(),
"open": close, "high": high, "low": low, "close": close,
"volume": 1_000_000, "value": close * 1_000_000,
})
close = int(close * (1 + trend_pct / 100))
return pd.DataFrame(rows)
def make_flow(tickers: list[str], days: int = 260,
foreign_per_day: dict | None = None,
asof: dt.date = dt.date(2026, 5, 12)) -> pd.DataFrame:
foreign_per_day = foreign_per_day or {t: 0 for t in tickers}
rows = []
for t in tickers:
for i in range(days):
day_idx = days - 1 - i
date = asof - dt.timedelta(days=day_idx)
rows.append({
"ticker": t, "date": date.isoformat(),
"foreign_net": foreign_per_day.get(t, 0),
"institution_net": 0,
})
return pd.DataFrame(rows)
def make_kospi(days: int = 260, start: int = 2500, trend_pct: float = 0.0,
asof: dt.date = dt.date(2026, 5, 12)) -> pd.Series:
values = []
dates = []
v = start
for i in range(days):
day_idx = days - 1 - i
d = asof - dt.timedelta(days=day_idx)
dates.append(d.isoformat())
values.append(v)
v = v * (1 + trend_pct / 100)
return pd.Series(values, index=dates, name="kospi")

View File

@@ -0,0 +1,161 @@
"""Screener engine — ScreenContext (Phase 0) + Screener/combine (Phase 2)."""
from __future__ import annotations
import datetime as dt
import sqlite3
from dataclasses import dataclass, replace
import pandas as pd
@dataclass(frozen=True)
class ScreenContext:
"""1회 실행 동안 공유되는 읽기 전용 데이터 컨테이너."""
master: pd.DataFrame # index=ticker
prices: pd.DataFrame # cols: ticker,date,open,high,low,close,volume,value
flow: pd.DataFrame # cols: ticker,date,foreign_net,institution_net
kospi: pd.Series # index=date(str), name="kospi"
asof: dt.date
@classmethod
def load(cls, conn: sqlite3.Connection, asof: dt.date,
lookback_days: int = 252 * 2) -> "ScreenContext":
cutoff = (asof - dt.timedelta(days=int(lookback_days * 1.5))).isoformat()
asof_iso = asof.isoformat()
master = pd.read_sql_query(
"SELECT * FROM krx_master",
conn, index_col="ticker",
)
prices = pd.read_sql_query(
"SELECT ticker,date,open,high,low,close,volume,value "
"FROM krx_daily_prices WHERE date BETWEEN ? AND ? ORDER BY date",
conn, params=(cutoff, asof_iso),
)
flow = pd.read_sql_query(
"SELECT ticker,date,foreign_net,institution_net "
"FROM krx_flow WHERE date BETWEEN ? AND ? ORDER BY date",
conn, params=(cutoff, asof_iso),
)
# KOSPI 지수: MVP에서는 005930(삼성전자) 종가를 시장 대용으로 사용.
# 후속 슬라이스에서 ^KS11 별도 캐시.
kospi = pd.Series(dtype=float, name="kospi")
if "005930" in master.index and not prices.empty:
sub = prices[prices["ticker"] == "005930"].set_index("date")["close"]
kospi = sub.copy()
kospi.name = "kospi"
return cls(master=master, prices=prices, flow=flow, kospi=kospi, asof=asof)
def restrict(self, tickers) -> "ScreenContext":
tickers = pd.Index(tickers)
return replace(
self,
master=self.master.loc[self.master.index.intersection(tickers)],
prices=self.prices[self.prices["ticker"].isin(tickers)],
flow=self.flow[self.flow["ticker"].isin(tickers)],
)
def latest_close(self) -> pd.Series:
if self.prices.empty:
return pd.Series(dtype=float)
return self.prices.sort_values("date").groupby("ticker")["close"].last()
def latest_high(self) -> pd.Series:
if self.prices.empty:
return pd.Series(dtype=float)
return self.prices.sort_values("date").groupby("ticker")["high"].last()
# ---- combine + Screener (Phase 2) ----
from . import position_sizer as _ps
def combine(scores: dict, weights: dict) -> pd.Series:
"""Weighted average across score nodes. ValueError if all weights = 0."""
active = {k: w for k, w in weights.items() if w > 0 and k in scores}
if not active:
raise ValueError("no active score nodes (all weights = 0)")
df = pd.DataFrame({k: scores[k] for k in active})
w = pd.Series(active)
weighted = (df.fillna(0).multiply(w, axis=1)).sum(axis=1) / w.sum()
return weighted
@dataclass
class ScreenerResult:
asof: dt.date
survivors_count: int
scores: dict # node name → pd.Series
weights: dict
ranked: pd.Series # ticker → total_score (sorted desc, head=top_n)
rows: list # list of dicts (for serialization)
warnings: list
class Screener:
def __init__(self, gate, score_nodes, weights: dict, node_params: dict,
gate_params: dict, top_n: int = 20, sizer_params: dict = None):
self.gate = gate
self.score_nodes = score_nodes
self.weights = weights
self.node_params = node_params
self.gate_params = gate_params
self.top_n = top_n
self.sizer_params = sizer_params or {"atr_window": 14, "atr_stop_mult": 2.0, "rr_ratio": 2.0}
def run(self, ctx: ScreenContext) -> ScreenerResult:
warnings: list = []
survivors = self.gate.filter(ctx, self.gate_params)
if len(survivors) == 0:
raise ValueError("no survivors after hygiene gate")
if len(survivors) < 100:
warnings.append(f"survivors_count={len(survivors)} < 100 — 백분위 정규화 신뢰도 낮음")
scoped = ctx.restrict(survivors)
scores: dict = {}
for n in self.score_nodes:
w = self.weights.get(n.name, 0)
if w <= 0:
continue
try:
scores[n.name] = n.compute(scoped, self.node_params.get(n.name, {}))
except Exception as e:
warnings.append(f"node '{n.name}' failed: {e}")
scores[n.name] = pd.Series(0.0, index=scoped.master.index)
total = combine(scores, self.weights)
ranked = total.sort_values(ascending=False).head(self.top_n)
sizing = _ps.plan_positions(scoped, list(ranked.index), self.sizer_params)
latest_close = scoped.latest_close()
rows = []
for rank_idx, ticker in enumerate(ranked.index, start=1):
s = sizing.get(ticker, {})
row = {
"rank": rank_idx,
"ticker": ticker,
"name": str(scoped.master.loc[ticker, "name"]),
"total_score": float(ranked.loc[ticker]),
"scores": {k: float(v.get(ticker, 0.0)) for k, v in scores.items()},
"close": int(latest_close.get(ticker, 0)),
"market_cap": int(scoped.master.loc[ticker, "market_cap"] or 0),
"entry_price": s.get("entry_price"),
"stop_price": s.get("stop_price"),
"target_price": s.get("target_price"),
"atr14": s.get("atr14"),
"r_pct": s.get("r_pct"),
}
rows.append(row)
return ScreenerResult(
asof=ctx.asof, survivors_count=len(survivors),
scores=scores, weights=self.weights,
ranked=ranked, rows=rows, warnings=warnings,
)

View File

View File

@@ -0,0 +1,40 @@
"""Node base classes + helpers."""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any, ClassVar
import pandas as pd
class ScoreNode(ABC):
name: ClassVar[str]
label: ClassVar[str]
default_params: ClassVar[dict]
param_schema: ClassVar[dict]
@abstractmethod
def compute(self, ctx: "Any", params: dict) -> pd.Series:
"""returns Series indexed by ticker, 0..100 float."""
class GateNode(ABC):
name: ClassVar[str]
label: ClassVar[str]
default_params: ClassVar[dict]
param_schema: ClassVar[dict]
@abstractmethod
def filter(self, ctx: "Any", params: dict) -> pd.Index:
"""returns surviving tickers."""
def percentile_rank(series: pd.Series) -> pd.Series:
"""Percentile rank in [0, 100]. All-equal → 50. NaN preserved."""
if series.empty:
return series.astype(float)
if series.dropna().nunique() == 1:
return pd.Series(50.0, index=series.index)
ranked = series.rank(pct=True, na_option="keep") * 100.0
return ranked

View File

@@ -0,0 +1,33 @@
"""외국인 N일 누적 순매수 강도 (시총 대비)."""
import pandas as pd
from .base import ScoreNode, percentile_rank
class ForeignBuy(ScoreNode):
name = "foreign_buy"
label = "외국인 누적 순매수"
default_params = {"window_days": 5}
param_schema = {
"type": "object",
"properties": {
"window_days": {"type": "integer", "minimum": 1, "maximum": 60, "default": 5}
},
}
def compute(self, ctx, params: dict) -> pd.Series:
window = int(params.get("window_days", 5))
flow = ctx.flow
if flow.empty:
return pd.Series(dtype=float)
last_dates = (
flow.sort_values("date").groupby("ticker").tail(window)
)
net_sum = last_dates.groupby("ticker")["foreign_net"].sum()
market_cap = ctx.master["market_cap"].fillna(0).reindex(net_sum.index)
raw = (net_sum / market_cap.replace(0, pd.NA)).astype(float)
return percentile_rank(raw).fillna(50.0)

View File

@@ -0,0 +1,30 @@
"""52주 신고가 근접도 (룰 기반: 70% 미만 0점, 100% 도달 100점, 선형)."""
import pandas as pd
from .base import ScoreNode
class High52WProximity(ScoreNode):
name = "high52w"
label = "52주 신고가 근접도"
default_params = {"window_days": 252}
param_schema = {
"type": "object",
"properties": {
"window_days": {"type": "integer", "minimum": 60, "maximum": 504, "default": 252}
},
}
def compute(self, ctx, params: dict) -> pd.Series:
window = int(params.get("window_days", 252))
prices = ctx.prices
if prices.empty:
return pd.Series(dtype=float)
ordered = prices.sort_values("date")
last = ordered.groupby("ticker").tail(window)
agg = last.groupby("ticker").agg(close=("close", "last"), high=("high", "max"))
proximity = (agg["close"] / agg["high"]).clip(upper=1.0)
score = ((proximity - 0.7) / 0.3).clip(lower=0.0, upper=1.0) * 100.0
return score.fillna(0.0)

View File

@@ -0,0 +1,81 @@
"""HygieneGate — pre-filter for screener."""
from __future__ import annotations
import pandas as pd
from .base import GateNode
class HygieneGate(GateNode):
name = "hygiene"
label = "위생 게이트"
default_params = {
"min_market_cap_won": 50_000_000_000,
"min_avg_value_won": 500_000_000,
"min_listed_days": 60,
"skip_managed": True,
"skip_preferred": True,
"skip_spac": True,
"skip_halted_days": 3,
}
param_schema = {
"type": "object",
"properties": {
"min_market_cap_won": {"type": "integer", "minimum": 0},
"min_avg_value_won": {"type": "integer", "minimum": 0},
"min_listed_days": {"type": "integer", "minimum": 0},
"skip_managed": {"type": "boolean"},
"skip_preferred": {"type": "boolean"},
"skip_spac": {"type": "boolean"},
"skip_halted_days": {"type": "integer", "minimum": 0},
},
}
def filter(self, ctx, params: dict) -> pd.Index:
master = ctx.master.copy()
prices = ctx.prices
# 시총
master = master[master["market_cap"].fillna(0) >= params["min_market_cap_won"]]
# 우선주·관리·스팩
if params.get("skip_preferred", True):
master = master[master["is_preferred"] == 0]
if params.get("skip_managed", True):
master = master[master["is_managed"] == 0]
if params.get("skip_spac", True):
master = master[master["is_spac"] == 0]
candidates = master.index
# 20일 평균 거래대금
if not prices.empty:
recent20 = (
prices[prices["ticker"].isin(candidates)]
.sort_values("date")
.groupby("ticker")
.tail(20)
)
avg_value = recent20.groupby("ticker")["value"].mean()
ok = avg_value[avg_value >= params["min_avg_value_won"]].index
candidates = candidates.intersection(ok)
# 최근 N일 거래정지 (volume==0 N일 이상)
halted_days = params.get("skip_halted_days", 3)
if halted_days > 0 and not prices.empty:
recent = (
prices[prices["ticker"].isin(candidates)]
.sort_values("date")
.groupby("ticker")
.tail(halted_days)
)
zero_count = (
recent.assign(z=lambda d: (d["volume"] == 0).astype(int))
.groupby("ticker")["z"].sum()
)
healthy = zero_count[zero_count < halted_days].index
candidates = candidates.intersection(healthy)
# 상장 N일 — MVP에선 listed_date null 허용, null이면 통과
return pd.Index(candidates)

View File

@@ -0,0 +1,51 @@
"""이평선 정배열 점수 — 5개 조건 충족 개수 / 5 × 100."""
import pandas as pd
from .base import ScoreNode
class MaAlignment(ScoreNode):
name = "ma_alignment"
label = "이평선 정배열"
default_params = {"ma_periods": [50, 150, 200]}
param_schema = {
"type": "object",
"properties": {
"ma_periods": {"type": "array", "items": {"type": "integer"}}
},
}
def compute(self, ctx, params: dict) -> pd.Series:
ma_periods = params.get("ma_periods", self.default_params["ma_periods"])
if len(ma_periods) != 3:
raise ValueError("ma_periods must have 3 entries (short, medium, long)")
ma_s, ma_m, ma_l = (int(x) for x in ma_periods)
prices = ctx.prices
if prices.empty:
return pd.Series(dtype=float)
ordered = prices.sort_values("date")
min_history = max(252, ma_l)
def _score(s: pd.Series) -> float:
closes = s.astype(float).reset_index(drop=True)
if len(closes) < min_history:
return float("nan")
close = closes.iloc[-1]
ma_short = closes.rolling(ma_s).mean().iloc[-1]
ma_medium = closes.rolling(ma_m).mean().iloc[-1]
ma_long = closes.rolling(ma_l).mean().iloc[-1]
low52 = closes.iloc[-252:].min()
conds = [
close > ma_short,
ma_short > ma_medium,
ma_medium > ma_long,
close > ma_long,
close >= low52 * 1.25,
]
return sum(conds) / 5 * 100.0
raw = ordered.groupby("ticker", group_keys=False)["close"].apply(_score)
return raw.fillna(0.0)

View File

@@ -0,0 +1,34 @@
"""20일 모멘텀."""
import pandas as pd
from .base import ScoreNode, percentile_rank
class Momentum20(ScoreNode):
name = "momentum"
label = "20일 모멘텀"
default_params = {"window_days": 20}
param_schema = {
"type": "object",
"properties": {
"window_days": {"type": "integer", "minimum": 5, "maximum": 120, "default": 20}
},
}
def compute(self, ctx, params: dict) -> pd.Series:
window = int(params.get("window_days", 20))
prices = ctx.prices
if prices.empty:
return pd.Series(dtype=float)
ordered = prices.sort_values("date")
last = ordered.groupby("ticker").tail(window + 1)
def _ret(s):
if len(s) < window + 1:
return float("nan")
return s.iloc[-1] / s.iloc[0] - 1
raw = last.groupby("ticker")["close"].apply(_ret)
return percentile_rank(raw).fillna(50.0)

View File

@@ -0,0 +1,48 @@
"""RS Rating — IBD 가중 (3m=2,6m=1,9m=1,12m=1)."""
import pandas as pd
from .base import ScoreNode, percentile_rank
_PERIOD_TO_DAYS = {"3m": 63, "6m": 126, "9m": 189, "12m": 252}
class RsRating(ScoreNode):
name = "rs_rating"
label = "RS Rating (시장 대비 상대강도)"
default_params = {"weights": {"3m": 2, "6m": 1, "9m": 1, "12m": 1}}
param_schema = {
"type": "object",
"properties": {
"weights": {"type": "object"}
},
}
def compute(self, ctx, params: dict) -> pd.Series:
weights: dict = params.get("weights", self.default_params["weights"])
prices = ctx.prices
kospi = ctx.kospi
if prices.empty or kospi.empty:
return pd.Series(dtype=float)
ordered = prices.sort_values("date")
def _excess_for_ticker(g: pd.DataFrame) -> float:
closes = g.set_index("date")["close"]
total = 0.0
wsum = 0.0
for period, w in weights.items():
k = _PERIOD_TO_DAYS.get(period, 0)
if len(closes) <= k or len(kospi) <= k:
continue
r_stock = closes.iloc[-1] / closes.iloc[-(k + 1)] - 1
r_market = kospi.iloc[-1] / kospi.iloc[-(k + 1)] - 1
total += w * (r_stock - r_market)
wsum += w
return total / wsum if wsum else float("nan")
raw = ordered.groupby("ticker", group_keys=False).apply(
_excess_for_ticker, include_groups=False
)
return percentile_rank(raw).fillna(50.0)

View File

@@ -0,0 +1,40 @@
"""VCP-lite — 단기/장기 일중 변동성 비율 기반 수축률."""
import pandas as pd
from .base import ScoreNode, percentile_rank
class VcpLite(ScoreNode):
name = "vcp_lite"
label = "VCP-lite (변동성 수축)"
default_params = {"short_window": 40, "long_window": 252}
param_schema = {
"type": "object",
"properties": {
"short_window": {"type": "integer", "minimum": 10, "maximum": 120, "default": 40},
"long_window": {"type": "integer", "minimum": 60, "maximum": 504, "default": 252},
},
}
def compute(self, ctx, params: dict) -> pd.Series:
short_w = int(params.get("short_window", 40))
long_w = int(params.get("long_window", 252))
prices = ctx.prices
if prices.empty:
return pd.Series(dtype=float)
ordered = prices.sort_values("date").copy()
ordered["range_pct"] = (ordered["high"] - ordered["low"]) / ordered["close"]
def _ratio(s: pd.Series) -> float:
if len(s) < long_w:
return float("nan")
short_vol = s.tail(short_w).mean()
long_vol = s.tail(long_w).mean()
if long_vol == 0 or pd.isna(long_vol):
return float("nan")
return 1 - (short_vol / long_vol)
raw = ordered.groupby("ticker", group_keys=False)["range_pct"].apply(_ratio)
return percentile_rank(raw).fillna(50.0)

View File

@@ -0,0 +1,40 @@
"""거래량 급증 — log1p(recent/baseline)."""
import numpy as np
import pandas as pd
from .base import ScoreNode, percentile_rank
class VolumeSurge(ScoreNode):
name = "volume_surge"
label = "거래량 급증"
default_params = {"baseline_days": 20, "eval_days": 3}
param_schema = {
"type": "object",
"properties": {
"baseline_days": {"type": "integer", "minimum": 5, "maximum": 60, "default": 20},
"eval_days": {"type": "integer", "minimum": 1, "maximum": 10, "default": 3},
},
}
def compute(self, ctx, params: dict) -> pd.Series:
baseline = int(params.get("baseline_days", 20))
eval_d = int(params.get("eval_days", 3))
prices = ctx.prices
if prices.empty:
return pd.Series(dtype=float)
ordered = prices.sort_values("date")
last_recent = ordered.groupby("ticker").tail(eval_d).groupby("ticker")["volume"].mean()
last_baseline = (
ordered.groupby("ticker")
.tail(baseline + eval_d)
.groupby("ticker")
.head(baseline)
.groupby("ticker")["volume"]
.mean()
)
ratio = last_recent / last_baseline.replace(0, pd.NA)
raw = np.log1p(ratio.astype(float))
return percentile_rank(raw).fillna(50.0)

View File

@@ -0,0 +1,51 @@
"""ATR Wilder smoothing + entry/stop/target 계산."""
import pandas as pd
def compute_atr_wilder(df_one_ticker: pd.DataFrame, window: int = 14) -> float:
"""단일 종목 DataFrame(date·open·high·low·close)에 대해 Wilder ATR 마지막 값."""
g = df_one_ticker.sort_values("date").copy()
high = g["high"].astype(float)
low = g["low"].astype(float)
close = g["close"].astype(float)
prev_close = close.shift(1)
tr = pd.concat([
(high - low),
(high - prev_close).abs(),
(low - prev_close).abs(),
], axis=1).max(axis=1)
atr = tr.ewm(alpha=1 / window, adjust=False).mean()
return float(atr.iloc[-1])
def round_won(x: float) -> int:
return int(round(x))
def plan_positions(ctx, tickers: list, params: dict) -> dict:
"""각 ticker 에 대해 entry/stop/target/atr14 반환."""
atr_window = int(params.get("atr_window", 14))
stop_mult = float(params.get("atr_stop_mult", 2.0))
rr = float(params.get("rr_ratio", 2.0))
prices = ctx.prices.sort_values("date")
out: dict = {}
for t in tickers:
sub = prices[prices["ticker"] == t]
if sub.empty:
continue
close = float(sub["close"].iloc[-1])
atr14 = compute_atr_wilder(sub, window=atr_window)
entry = round_won(close * 1.005)
stop = round_won(close - stop_mult * atr14)
target = round_won(entry + rr * (entry - stop))
r_pct = (entry - stop) / entry * 100 if entry else 0.0
out[t] = {
"entry_price": entry,
"stop_price": stop,
"target_price": target,
"atr14": atr14,
"r_pct": r_pct,
}
return out

View File

@@ -0,0 +1,24 @@
"""Registry of node classes (single source of truth for /nodes endpoint)."""
from .nodes.hygiene import HygieneGate
from .nodes.foreign_buy import ForeignBuy
from .nodes.volume_surge import VolumeSurge
from .nodes.momentum import Momentum20
from .nodes.high52w import High52WProximity
from .nodes.rs_rating import RsRating
from .nodes.ma_alignment import MaAlignment
from .nodes.vcp_lite import VcpLite
NODE_REGISTRY: dict = {
"foreign_buy": ForeignBuy,
"volume_surge": VolumeSurge,
"momentum": Momentum20,
"high52w": High52WProximity,
"rs_rating": RsRating,
"ma_alignment": MaAlignment,
"vcp_lite": VcpLite,
}
GATE_REGISTRY: dict = {
"hygiene": HygieneGate,
}

View File

@@ -0,0 +1,310 @@
"""FastAPI router for /api/stock/screener/*"""
from __future__ import annotations
import datetime as dt
import json
import os
import sqlite3
from typing import Optional
from fastapi import APIRouter, HTTPException
from . import schemas
from .registry import NODE_REGISTRY, GATE_REGISTRY
router = APIRouter(prefix="/api/stock/screener")
import json as _json
import pathlib as _pathlib
_HOLIDAYS_CACHE = None
def _holidays():
global _HOLIDAYS_CACHE
if _HOLIDAYS_CACHE is None:
path = _pathlib.Path(__file__).resolve().parent.parent / "holidays.json"
try:
with path.open(encoding="utf-8") as f:
data = _json.load(f)
_HOLIDAYS_CACHE = set(data) if isinstance(data, list) else set(data.keys())
except FileNotFoundError:
_HOLIDAYS_CACHE = set()
return _HOLIDAYS_CACHE
def _is_holiday(d: dt.date) -> bool:
return d.weekday() >= 5 or d.isoformat() in _holidays()
def _db_path() -> str:
return os.environ.get("STOCK_DB_PATH", "/app/data/stock.db")
def _conn() -> sqlite3.Connection:
return sqlite3.connect(_db_path())
# ---------- /nodes ----------
@router.get("/nodes", response_model=schemas.NodesResponse)
def get_nodes():
score_nodes = [
schemas.NodeMeta(
name=cls.name, label=cls.label,
default_params=cls.default_params, param_schema=cls.param_schema,
)
for cls in NODE_REGISTRY.values()
]
gate_nodes = [
schemas.NodeMeta(
name=cls.name, label=cls.label,
default_params=cls.default_params, param_schema=cls.param_schema,
)
for cls in GATE_REGISTRY.values()
]
return schemas.NodesResponse(score_nodes=score_nodes, gate_nodes=gate_nodes)
# ---------- /settings ----------
@router.get("/settings", response_model=schemas.SettingsResponse)
def get_settings():
with _conn() as c:
row = c.execute(
"SELECT weights_json, node_params_json, gate_params_json, "
"top_n, rr_ratio, atr_window, atr_stop_mult, updated_at "
"FROM screener_settings WHERE id=1"
).fetchone()
if row is None:
raise HTTPException(503, "settings not initialized")
return schemas.SettingsResponse(
weights=json.loads(row[0]),
node_params=json.loads(row[1]),
gate_params=json.loads(row[2]),
top_n=row[3], rr_ratio=row[4], atr_window=row[5], atr_stop_mult=row[6],
updated_at=row[7],
)
@router.put("/settings", response_model=schemas.SettingsResponse)
def put_settings(body: schemas.SettingsBody):
now = dt.datetime.utcnow().isoformat()
with _conn() as c:
c.execute(
"""UPDATE screener_settings SET
weights_json=?, node_params_json=?, gate_params_json=?,
top_n=?, rr_ratio=?, atr_window=?, atr_stop_mult=?, updated_at=?
WHERE id=1""",
(
json.dumps(body.weights), json.dumps(body.node_params),
json.dumps(body.gate_params),
body.top_n, body.rr_ratio, body.atr_window, body.atr_stop_mult, now,
),
)
c.commit()
return schemas.SettingsResponse(**body.model_dump(), updated_at=now)
# ---------- /run ----------
from . import telegram as _tg
from .engine import Screener, ScreenContext
def _resolve_asof(asof_str, conn: sqlite3.Connection) -> dt.date:
if asof_str:
return dt.date.fromisoformat(asof_str)
row = conn.execute("SELECT max(date) FROM krx_daily_prices").fetchone()
if not row or row[0] is None:
raise HTTPException(503, "no snapshot available — run /snapshot/refresh first")
return dt.date.fromisoformat(row[0])
def _load_settings(conn) -> dict:
row = conn.execute(
"SELECT weights_json,node_params_json,gate_params_json,top_n,"
"rr_ratio,atr_window,atr_stop_mult FROM screener_settings WHERE id=1"
).fetchone()
return {
"weights": json.loads(row[0]),
"node_params": json.loads(row[1]),
"gate_params": json.loads(row[2]),
"top_n": row[3],
"rr_ratio": row[4],
"atr_window": row[5],
"atr_stop_mult": row[6],
}
def _persist_run(conn, asof, mode, weights, node_params, gate_params, top_n,
result, started_at, finished_at) -> int:
cur = conn.execute(
"""INSERT INTO screener_runs (asof,mode,status,started_at,finished_at,
weights_json,node_params_json,gate_params_json,top_n,survivors_count,telegram_sent)
VALUES (?,?,?,?,?,?,?,?,?,?,0)""",
(asof.isoformat(), mode, "success", started_at, finished_at,
json.dumps(weights), json.dumps(node_params), json.dumps(gate_params),
top_n, result.survivors_count),
)
run_id = cur.lastrowid
for row in result.rows:
conn.execute(
"""INSERT INTO screener_results (run_id,rank,ticker,name,total_score,
scores_json,close,market_cap,entry_price,stop_price,target_price,atr14)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""",
(run_id, row["rank"], row["ticker"], row["name"], row["total_score"],
json.dumps(row["scores"]), row["close"], row["market_cap"],
row["entry_price"], row["stop_price"], row["target_price"], row["atr14"]),
)
conn.commit()
return run_id
@router.post("/run", response_model=schemas.RunResponse)
def post_run(body: schemas.RunRequest):
from .registry import NODE_REGISTRY as _NR, GATE_REGISTRY as _GR
started_at = dt.datetime.utcnow().isoformat()
with _conn() as c:
asof = _resolve_asof(body.asof, c)
# Skipped holiday handling for mode='auto'
if body.mode == "auto" and _is_holiday(asof):
return schemas.RunResponse(
asof=asof.isoformat(), mode="auto", status="skipped_holiday",
run_id=None, survivors_count=None,
weights={}, top_n=0,
results=[], telegram_payload=None,
warnings=[f"{asof.isoformat()} is a holiday — skipped"],
)
defaults = _load_settings(c)
if body.mode == "auto":
weights = defaults["weights"]
node_params = defaults["node_params"]
gate_params = defaults["gate_params"]
top_n = defaults["top_n"]
else:
weights = body.weights if body.weights is not None else defaults["weights"]
node_params = body.node_params if body.node_params is not None else defaults["node_params"]
gate_params = body.gate_params if body.gate_params is not None else defaults["gate_params"]
top_n = body.top_n if body.top_n is not None else defaults["top_n"]
sizer_params = {
"atr_window": defaults["atr_window"],
"atr_stop_mult": defaults["atr_stop_mult"],
"rr_ratio": defaults["rr_ratio"],
}
ctx = ScreenContext.load(c, asof)
score_nodes = [cls() for name, cls in _NR.items() if weights.get(name, 0) > 0]
gate = _GR["hygiene"]()
try:
screener = Screener(
gate=gate, score_nodes=score_nodes, weights=weights,
node_params=node_params, gate_params=gate_params,
top_n=top_n, sizer_params=sizer_params,
)
result = screener.run(ctx)
except ValueError as e:
raise HTTPException(422, str(e))
finished_at = dt.datetime.utcnow().isoformat()
run_id = None
if body.mode in ("manual_save", "auto"):
run_id = _persist_run(c, asof, body.mode, weights, node_params, gate_params,
top_n, result, started_at, finished_at)
payload = _tg.build_telegram_payload(
asof=asof, mode=body.mode, survivors_count=result.survivors_count,
top_n=top_n, rows=result.rows, run_id=run_id,
)
return schemas.RunResponse(
asof=asof.isoformat(), mode=body.mode, status="success",
run_id=run_id, survivors_count=result.survivors_count,
weights=weights, top_n=top_n,
results=result.rows,
telegram_payload=schemas.TelegramPayload(**payload),
warnings=result.warnings,
)
# ---------- /snapshot/refresh ----------
from . import snapshot as _snap
@router.post("/snapshot/refresh")
def post_snapshot_refresh(asof: Optional[str] = None):
asof_date = dt.date.fromisoformat(asof) if asof else dt.date.today()
if asof_date.weekday() >= 5:
return {"asof": asof_date.isoformat(), "status": "skipped_weekend"}
with _conn() as c:
summary = _snap.refresh_daily(c, asof_date)
return summary
# ---------- /runs ----------
@router.get("/runs", response_model=list[schemas.RunSummary])
def list_runs(limit: int = 30):
with _conn() as c:
rows = c.execute(
"SELECT id,asof,mode,status,started_at,finished_at,top_n,"
"survivors_count,telegram_sent FROM screener_runs "
"ORDER BY asof DESC, id DESC LIMIT ?", (limit,),
).fetchall()
return [
schemas.RunSummary(
id=r[0], asof=r[1], mode=r[2], status=r[3],
started_at=r[4], finished_at=r[5], top_n=r[6],
survivors_count=r[7], telegram_sent=bool(r[8]),
)
for r in rows
]
@router.get("/runs/{run_id}")
def get_run(run_id: int):
with _conn() as c:
meta = c.execute(
"SELECT id,asof,mode,status,started_at,finished_at,top_n,"
"survivors_count,telegram_sent,weights_json,node_params_json,gate_params_json "
"FROM screener_runs WHERE id=?",
(run_id,),
).fetchone()
if not meta:
raise HTTPException(404, "run not found")
rows = c.execute(
"SELECT rank,ticker,name,total_score,scores_json,close,market_cap,"
"entry_price,stop_price,target_price,atr14 "
"FROM screener_results WHERE run_id=? ORDER BY rank",
(run_id,),
).fetchall()
return {
"meta": {
"id": meta[0], "asof": meta[1], "mode": meta[2], "status": meta[3],
"started_at": meta[4], "finished_at": meta[5], "top_n": meta[6],
"survivors_count": meta[7], "telegram_sent": bool(meta[8]),
"weights": json.loads(meta[9]),
"node_params": json.loads(meta[10]),
"gate_params": json.loads(meta[11]),
},
"results": [
{
"rank": r[0], "ticker": r[1], "name": r[2],
"total_score": r[3], "scores": json.loads(r[4]),
"close": r[5], "market_cap": r[6],
"entry_price": r[7], "stop_price": r[8], "target_price": r[9],
"atr14": r[10],
}
for r in rows
],
}

View File

@@ -0,0 +1,136 @@
"""Screener schema bootstrap. Called once at module import via db.py."""
import json
import sqlite3
from datetime import datetime, timezone
DEFAULT_WEIGHTS = {
"foreign_buy": 1.0,
"volume_surge": 1.0,
"momentum": 1.0,
"high52w": 1.2,
"rs_rating": 1.2,
"ma_alignment": 1.0,
"vcp_lite": 0.8,
}
DEFAULT_NODE_PARAMS = {
"foreign_buy": {"window_days": 5},
"volume_surge": {"baseline_days": 20, "eval_days": 3},
"momentum": {"window_days": 20},
"high52w": {"window_days": 252},
"rs_rating": {"weights": {"3m": 2, "6m": 1, "9m": 1, "12m": 1}},
"ma_alignment": {"ma_periods": [50, 150, 200]},
"vcp_lite": {"short_window": 40, "long_window": 252},
}
DEFAULT_GATE_PARAMS = {
"min_market_cap_won": 50_000_000_000,
"min_avg_value_won": 500_000_000,
"min_listed_days": 60,
"skip_managed": True,
"skip_preferred": True,
"skip_spac": True,
"skip_halted_days": 3,
}
DDL = """
CREATE TABLE IF NOT EXISTS krx_master (
ticker TEXT PRIMARY KEY,
name TEXT NOT NULL,
market TEXT NOT NULL,
market_cap INTEGER,
is_managed INTEGER NOT NULL DEFAULT 0,
is_preferred INTEGER NOT NULL DEFAULT 0,
is_spac INTEGER NOT NULL DEFAULT 0,
listed_date TEXT,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS krx_daily_prices (
ticker TEXT NOT NULL,
date TEXT NOT NULL,
open INTEGER, high INTEGER, low INTEGER, close INTEGER,
volume INTEGER,
value INTEGER,
PRIMARY KEY (ticker, date)
);
CREATE INDEX IF NOT EXISTS idx_prices_date ON krx_daily_prices(date);
CREATE TABLE IF NOT EXISTS krx_flow (
ticker TEXT NOT NULL,
date TEXT NOT NULL,
foreign_net INTEGER,
institution_net INTEGER,
PRIMARY KEY (ticker, date)
);
CREATE INDEX IF NOT EXISTS idx_flow_date ON krx_flow(date);
CREATE TABLE IF NOT EXISTS screener_settings (
id INTEGER PRIMARY KEY CHECK (id = 1),
weights_json TEXT NOT NULL,
node_params_json TEXT NOT NULL,
gate_params_json TEXT NOT NULL,
top_n INTEGER NOT NULL DEFAULT 20,
rr_ratio REAL NOT NULL DEFAULT 2.0,
atr_window INTEGER NOT NULL DEFAULT 14,
atr_stop_mult REAL NOT NULL DEFAULT 2.0,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS screener_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
asof TEXT NOT NULL,
mode TEXT NOT NULL,
status TEXT NOT NULL,
error TEXT,
started_at TEXT NOT NULL,
finished_at TEXT,
weights_json TEXT NOT NULL,
node_params_json TEXT NOT NULL,
gate_params_json TEXT NOT NULL,
top_n INTEGER NOT NULL,
survivors_count INTEGER,
telegram_sent INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_runs_asof ON screener_runs(asof DESC);
CREATE TABLE IF NOT EXISTS screener_results (
run_id INTEGER NOT NULL,
rank INTEGER NOT NULL,
ticker TEXT NOT NULL,
name TEXT NOT NULL,
total_score REAL NOT NULL,
scores_json TEXT NOT NULL,
close INTEGER,
market_cap INTEGER,
entry_price INTEGER,
stop_price INTEGER,
target_price INTEGER,
atr14 REAL,
PRIMARY KEY (run_id, ticker),
FOREIGN KEY (run_id) REFERENCES screener_runs(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_results_run_rank ON screener_results(run_id, rank);
"""
def ensure_screener_schema(conn: sqlite3.Connection) -> None:
"""Create tables and seed default settings (idempotent)."""
conn.executescript(DDL)
existing = conn.execute("SELECT id FROM screener_settings WHERE id=1").fetchone()
if existing is None:
now = datetime.now(timezone.utc).isoformat()
conn.execute(
"""
INSERT INTO screener_settings (
id, weights_json, node_params_json, gate_params_json,
top_n, rr_ratio, atr_window, atr_stop_mult, updated_at
) VALUES (1, ?, ?, ?, 20, 2.0, 14, 2.0, ?)
""",
(
json.dumps(DEFAULT_WEIGHTS),
json.dumps(DEFAULT_NODE_PARAMS),
json.dumps(DEFAULT_GATE_PARAMS),
now,
),
)
conn.commit()

View File

@@ -0,0 +1,85 @@
from __future__ import annotations
from typing import Literal, Optional
from pydantic import BaseModel, Field
class NodeMeta(BaseModel):
name: str
label: str
default_params: dict
param_schema: dict
class NodesResponse(BaseModel):
score_nodes: list[NodeMeta]
gate_nodes: list[NodeMeta]
class SettingsBody(BaseModel):
weights: dict[str, float]
node_params: dict[str, dict] = Field(default_factory=dict)
gate_params: dict
top_n: int = 20
rr_ratio: float = 2.0
atr_window: int = 14
atr_stop_mult: float = 2.0
class SettingsResponse(SettingsBody):
updated_at: str
class RunRequest(BaseModel):
mode: Literal["preview", "manual_save", "auto"] = "preview"
asof: Optional[str] = None
weights: Optional[dict[str, float]] = None
node_params: Optional[dict[str, dict]] = None
gate_params: Optional[dict] = None
top_n: Optional[int] = None
class ResultRow(BaseModel):
rank: int
ticker: str
name: str
total_score: float
scores: dict[str, float]
close: int
market_cap: int
entry_price: Optional[int] = None
stop_price: Optional[int] = None
target_price: Optional[int] = None
atr14: Optional[float] = None
r_pct: Optional[float] = None
class TelegramPayload(BaseModel):
chat_target: str
parse_mode: str
text: str
class RunResponse(BaseModel):
asof: str
mode: str
status: Literal["success", "failed", "skipped_holiday"]
run_id: Optional[int] = None
survivors_count: Optional[int] = None
weights: dict[str, float]
top_n: int
results: list[ResultRow] = Field(default_factory=list)
telegram_payload: Optional[TelegramPayload] = None
warnings: list[str] = Field(default_factory=list)
error: Optional[str] = None
class RunSummary(BaseModel):
id: int
asof: str
mode: str
status: str
started_at: str
finished_at: Optional[str] = None
top_n: int
survivors_count: Optional[int] = None
telegram_sent: bool

View File

@@ -0,0 +1,247 @@
"""KRX daily snapshot loader (FDR + naver finance scraping)."""
from __future__ import annotations
import datetime as dt
import logging
import re
import sqlite3
import time
from dataclasses import dataclass
import FinanceDataReader as fdr
import httpx
import pandas as pd
from bs4 import BeautifulSoup
log = logging.getLogger(__name__)
NAVER_FRGN_URL = "https://finance.naver.com/item/frgn.naver"
NAVER_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Referer": "https://finance.naver.com/",
}
DEFAULT_FLOW_TOP_N = 500
DEFAULT_RATE_LIMIT_SEC = 0.2
@dataclass
class RefreshSummary:
asof: dt.date
master_count: int
prices_count: int
flow_count: int
failures: list[str]
def asdict(self) -> dict:
return {
"asof": self.asof.isoformat(),
"master_count": self.master_count,
"prices_count": self.prices_count,
"flow_count": self.flow_count,
"failures": self.failures,
}
def _iso(d: dt.date) -> str:
return d.isoformat()
def _is_preferred(name: str) -> int:
"""우선주 휴리스틱: 종목명이 ''로 끝나거나 '우[A-Z]?'/'\\d?' 패턴."""
n = name or ""
return 1 if re.search(r"우[A-Z]?$|우\d?$", n) else 0
def _is_spac(name: str) -> int:
return 1 if "스팩" in (name or "") else 0
def fetch_master_listing() -> pd.DataFrame:
"""fdr.StockListing('KRX'). Wrapped for stub-ability in tests."""
return fdr.StockListing("KRX")
def fetch_ohlcv_for_ticker(ticker: str, start: str, end: str) -> pd.DataFrame:
"""fdr.DataReader for backfill."""
return fdr.DataReader(ticker, start, end)
def fetch_flow_naver(ticker: str, *, client) -> dict | None:
"""Scrape naver frgn page; return latest-day flow dict, or None."""
r = client.get(NAVER_FRGN_URL, params={"code": ticker, "page": 1})
if r.status_code != 200:
return None
soup = BeautifulSoup(r.text, "lxml")
for row in soup.select("table.type2 tr"):
cells = [c.get_text(strip=True).replace(",", "") for c in row.select("td")]
if not cells or not cells[0]:
continue
if not re.match(r"\d{4}\.\d{2}\.\d{2}", cells[0]):
continue
try:
inst = int(cells[5]) if cells[5] not in ("", "-") else 0
foreign = int(cells[6]) if cells[6] not in ("", "-") else 0
return {
"date": cells[0].replace(".", "-"),
"foreign_net": foreign,
"institution_net": inst,
}
except (IndexError, ValueError):
return None
return None
def _master_and_prices_rows(asof: dt.date,
df: pd.DataFrame) -> tuple[list[tuple], list[tuple]]:
iso = _iso(asof)
now_iso = dt.datetime.utcnow().isoformat()
master_rows: list[tuple] = []
price_rows: list[tuple] = []
for _, row in df.iterrows():
ticker = str(row.get("Code") or "").strip()
name = str(row.get("Name") or "").strip()
if not ticker or not name:
continue
market_raw = str(row.get("Market") or "").upper()
market = "KOSDAQ" if "KOSDAQ" in market_raw else "KOSPI"
try:
market_cap = int(row["Marcap"]) if pd.notna(row.get("Marcap")) else None
except (TypeError, ValueError):
market_cap = None
master_rows.append((
ticker, name, market, market_cap,
0, _is_preferred(name), _is_spac(name),
None, now_iso,
))
try:
o = int(row["Open"]) if pd.notna(row.get("Open")) else None
h = int(row["High"]) if pd.notna(row.get("High")) else None
l = int(row["Low"]) if pd.notna(row.get("Low")) else None
c = int(row["Close"]) if pd.notna(row.get("Close")) else None
v = int(row["Volume"]) if pd.notna(row.get("Volume")) else None
amt = row.get("Amount")
a = int(amt) if pd.notna(amt) else None
if c is not None and v is not None:
price_rows.append((ticker, iso, o, h, l, c, v, a))
except (TypeError, KeyError):
pass
return master_rows, price_rows
def _gather_flow_naver(asof: dt.date, tickers: list[str],
*, rate_limit_sec: float = DEFAULT_RATE_LIMIT_SEC) -> list[tuple]:
iso = _iso(asof)
rows: list[tuple] = []
if not tickers:
return rows
with httpx.Client(timeout=10, headers=NAVER_HEADERS) as client:
for t in tickers:
try:
data = fetch_flow_naver(t, client=client)
if data and data["date"] == iso:
rows.append((t, iso, data["foreign_net"], data["institution_net"]))
except Exception as e:
log.warning("flow scrape failed for %s: %s", t, e)
if rate_limit_sec > 0:
time.sleep(rate_limit_sec)
return rows
def refresh_daily(conn: sqlite3.Connection, asof: dt.date,
flow_top_n: int = DEFAULT_FLOW_TOP_N,
rate_limit_sec: float = DEFAULT_RATE_LIMIT_SEC) -> dict:
"""Pull master + prices (FDR) + flow (naver scraping for top N by market cap)."""
df = fetch_master_listing()
master_rows, price_rows = _master_and_prices_rows(asof, df)
conn.executemany("""
INSERT INTO krx_master (
ticker, name, market, market_cap,
is_managed, is_preferred, is_spac,
listed_date, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(ticker) DO UPDATE SET
name=excluded.name, market=excluded.market,
market_cap=excluded.market_cap,
is_managed=excluded.is_managed,
is_preferred=excluded.is_preferred,
is_spac=excluded.is_spac,
updated_at=excluded.updated_at
""", master_rows)
conn.executemany("""
INSERT OR REPLACE INTO krx_daily_prices
(ticker, date, open, high, low, close, volume, value)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", price_rows)
# 외국인/기관: 시총 상위 N종목만 (rate limit 보호)
if flow_top_n > 0:
top = sorted(master_rows, key=lambda r: r[3] or 0, reverse=True)[:flow_top_n]
flow_tickers = [r[0] for r in top]
else:
flow_tickers = []
flow_rows = _gather_flow_naver(asof, flow_tickers, rate_limit_sec=rate_limit_sec)
conn.executemany("""
INSERT OR REPLACE INTO krx_flow
(ticker, date, foreign_net, institution_net)
VALUES (?, ?, ?, ?)
""", flow_rows)
conn.commit()
return RefreshSummary(
asof=asof, master_count=len(master_rows),
prices_count=len(price_rows), flow_count=len(flow_rows),
failures=[],
).asdict()
def backfill(conn: sqlite3.Connection, start: dt.date, end: dt.date) -> list[dict]:
"""5년치 일봉 백필 — 종목별 fdr.DataReader 호출. Master는 end 기준 (FDR은 historical master 미지원)."""
df = fetch_master_listing()
master_rows, _ = _master_and_prices_rows(end, df)
conn.executemany("""
INSERT INTO krx_master (
ticker, name, market, market_cap,
is_managed, is_preferred, is_spac,
listed_date, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(ticker) DO UPDATE SET name=excluded.name
""", master_rows)
iso_start = start.isoformat()
iso_end = end.isoformat()
results = []
for r in master_rows:
t = r[0]
try:
ddf = fetch_ohlcv_for_ticker(t, iso_start, iso_end)
if ddf is None or ddf.empty:
continue
ddf = ddf.reset_index()
ddf["Date"] = pd.to_datetime(ddf["Date"]).dt.strftime("%Y-%m-%d")
rows = []
for _, rr in ddf.iterrows():
if pd.isna(rr["Close"]) or pd.isna(rr["Volume"]):
continue
rows.append((
t, rr["Date"],
int(rr["Open"]) if pd.notna(rr["Open"]) else None,
int(rr["High"]) if pd.notna(rr["High"]) else None,
int(rr["Low"]) if pd.notna(rr["Low"]) else None,
int(rr["Close"]),
int(rr["Volume"]),
int(rr["Close"] * rr["Volume"]),
))
conn.executemany("""
INSERT OR REPLACE INTO krx_daily_prices
(ticker, date, open, high, low, close, volume, value)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", rows)
results.append({"ticker": t, "count": len(rows)})
except Exception as e:
log.error("backfill failed for %s: %s", t, e)
results.append({"ticker": t, "error": str(e)})
conn.commit()
return results

View File

@@ -0,0 +1,72 @@
"""Telegram payload builder. Caller (agent-office) handles actual delivery."""
from __future__ import annotations
import datetime as dt
NODE_ICONS = {
"foreign_buy": "👤외",
"volume_surge": "⚡거",
"momentum": "🚀모",
"high52w": "🆙고",
"rs_rating": "💪RS",
"ma_alignment": "📈MA",
"vcp_lite": "🌀VCP",
}
PAGE_BASE = "https://gahusb.synology.me/stock/screener"
def _escape_md(s: str) -> str:
"""Minimal MarkdownV2 escape — extend if formatting breaks."""
for ch in r"\_*[]()~`>#+-=|{}.!":
s = s.replace(ch, "\\" + ch)
return s
def _format_won(n) -> str:
if n is None:
return "-"
return f"{int(n):,}"
def build_telegram_payload(asof: dt.date, mode: str, survivors_count: int,
top_n: int, rows: list, run_id) -> dict:
title = "*KRX 강세주 스크리너*"
header = (
f"🎯 {title}{_escape_md(asof.isoformat())} \\({_escape_md(mode)}\\)\n"
f"통과 {survivors_count}종 / Top {top_n} / 본문 1\\-10"
)
lines = []
for r in rows[:10]:
icons = " ".join(
NODE_ICONS[name] for name, sc in r["scores"].items()
if sc >= 70 and name in NODE_ICONS
)
score_str = f"{r['total_score']:.1f}"
r_pct = r.get("r_pct")
r_pct_str = f"{r_pct:.1f}" if r_pct is not None else "-"
lines.append(
f"{r['rank']}\\. *{_escape_md(r['name'])}* `{r['ticker']}` "
f"{_escape_md(score_str)}\n"
f" {icons}\n"
f" 진입 {_format_won(r.get('entry_price'))} "
f"손절 {_format_won(r.get('stop_price'))} "
f"익절 {_format_won(r.get('target_price'))} "
f"\\(R {_escape_md(r_pct_str)}%\\)"
)
# URL은 inline link로 감싸 URL 내부 . - ? = 이스케이프 회피
link = (
f"🔗 [전체 결과·11\\~20위]({PAGE_BASE}?run_id={run_id})"
if run_id else ""
)
text = header + "\n\n" + "\n\n".join(lines) + ("\n\n" + link if link else "")
return {
"chat_target": "default",
"parse_mode": "MarkdownV2",
"text": text,
}

View File

@@ -0,0 +1,61 @@
import datetime as dt
import sqlite3
import pandas as pd
import pytest
from app.screener.engine import ScreenContext
from app.screener.schema import ensure_screener_schema
from app.screener._test_fixtures import make_master, make_prices, make_flow
@pytest.fixture
def conn(tmp_path):
db_path = tmp_path / "ctx.db"
c = sqlite3.connect(db_path)
ensure_screener_schema(c)
yield c
c.close()
def _seed(conn, master_df, prices_df, flow_df):
now = dt.datetime.utcnow().isoformat()
for t, row in master_df.iterrows():
conn.execute("""INSERT INTO krx_master (ticker,name,market,market_cap,
is_managed,is_preferred,is_spac,listed_date,updated_at)
VALUES (?,?,?,?,?,?,?,?,?)""",
(t, row["name"], row["market"], row["market_cap"],
row["is_managed"], row["is_preferred"], row["is_spac"], None, now))
prices_df.to_sql("krx_daily_prices", conn, if_exists="append", index=False)
flow_df.to_sql("krx_flow", conn, if_exists="append", index=False)
conn.commit()
def test_load_returns_dataframes(conn):
asof = dt.date(2026, 5, 12)
_seed(conn,
make_master(["005930", "035420"]),
make_prices(["005930", "035420"], days=30, asof=asof),
make_flow(["005930", "035420"], days=30, asof=asof))
ctx = ScreenContext.load(conn, asof, lookback_days=30)
assert ctx.asof == asof
assert set(ctx.master.index) == {"005930", "035420"}
assert ctx.prices.shape[0] == 60 # 2 종목 × 30일
assert ctx.flow.shape[0] == 60
def test_restrict_filters_tickers(conn):
asof = dt.date(2026, 5, 12)
_seed(conn,
make_master(["005930", "035420", "091990"]),
make_prices(["005930", "035420", "091990"], days=30, asof=asof),
make_flow(["005930", "035420", "091990"], days=30, asof=asof))
ctx = ScreenContext.load(conn, asof, lookback_days=30)
scoped = ctx.restrict(pd.Index(["005930"]))
assert list(scoped.master.index) == ["005930"]
assert (scoped.prices["ticker"] == "005930").all()
assert (scoped.flow["ticker"] == "005930").all()

View File

@@ -0,0 +1,55 @@
import datetime as dt
import pandas as pd
import pytest
from app.screener.engine import ScreenContext, Screener, combine
from app.screener.nodes.hygiene import HygieneGate
from app.screener.nodes.foreign_buy import ForeignBuy
from app.screener.nodes.momentum import Momentum20
from app.screener._test_fixtures import make_master, make_prices, make_flow, make_kospi
def _ctx(master, prices, flow):
return ScreenContext(master=master, prices=prices, flow=flow,
kospi=make_kospi(days=260),
asof=dt.date(2026, 5, 12))
def test_combine_weighted_average():
scores = {
"foreign_buy": pd.Series({"A": 80, "B": 20}),
"momentum": pd.Series({"A": 60, "B": 40}),
}
weights = {"foreign_buy": 2.0, "momentum": 1.0}
out = combine(scores, weights)
# A: (80*2 + 60*1)/3 = 73.33
assert abs(out["A"] - 73.333) < 0.1
assert abs(out["B"] - 26.666) < 0.1
def test_combine_all_zero_weight_raises():
scores = {"foreign_buy": pd.Series({"A": 80})}
with pytest.raises(ValueError, match="no active"):
combine(scores, {"foreign_buy": 0})
def test_screener_run_end_to_end():
asof = dt.date(2026, 5, 12)
master = make_master(["GOOD", "SMALL"],
market_caps={"GOOD": 200_000_000_000, "SMALL": 1_000_000_000})
prices = make_prices(["GOOD", "SMALL"], days=260, asof=asof, trend_pct=0.1)
flow = make_flow(["GOOD", "SMALL"], days=260, asof=asof,
foreign_per_day={"GOOD": 100_000_000, "SMALL": 0})
ctx = _ctx(master, prices, flow)
screener = Screener(
gate=HygieneGate(),
score_nodes=[ForeignBuy(), Momentum20()],
weights={"foreign_buy": 1.0, "momentum": 1.0},
node_params={"foreign_buy": {"window_days": 5}, "momentum": {"window_days": 20}},
gate_params={**HygieneGate.default_params, "min_listed_days": 0},
top_n=10,
)
result = screener.run(ctx)
assert result.survivors_count == 1 # SMALL은 게이트 탈락
assert result.ranked.index[0] == "GOOD"

View File

@@ -0,0 +1,24 @@
import pandas as pd
import pytest
from app.screener.nodes.base import percentile_rank
def test_percentile_rank_basic():
s = pd.Series([10, 20, 30, 40, 50])
out = percentile_rank(s)
assert (out >= 0).all() and (out <= 100).all()
assert out.iloc[0] < out.iloc[-1] # smallest gets lowest rank
def test_percentile_rank_all_equal_returns_50():
s = pd.Series([42, 42, 42, 42])
out = percentile_rank(s)
assert (out == 50.0).all()
def test_percentile_rank_handles_nan():
s = pd.Series([1.0, float("nan"), 3.0, 5.0])
out = percentile_rank(s)
assert pd.isna(out.iloc[1])
assert (out.dropna() >= 0).all()

View File

@@ -0,0 +1,32 @@
import datetime as dt
import pandas as pd
from app.screener.engine import ScreenContext
from app.screener.nodes.foreign_buy import ForeignBuy
from app.screener._test_fixtures import make_master, make_prices, make_flow
def _ctx(master, prices, flow):
return ScreenContext(master=master, prices=prices, flow=flow,
kospi=pd.Series(dtype=float, name="kospi"),
asof=dt.date(2026, 5, 12))
def test_higher_foreign_buy_gets_higher_score():
asof = dt.date(2026, 5, 12)
master = make_master(["A", "B"])
prices = make_prices(["A", "B"], days=30, asof=asof)
flow = make_flow(["A", "B"], days=30, asof=asof,
foreign_per_day={"A": 100_000_000, "B": 0})
out = ForeignBuy().compute(_ctx(master, prices, flow), {"window_days": 5})
assert out["A"] > out["B"]
assert 0 <= out.min() <= out.max() <= 100
def test_all_zero_returns_50():
asof = dt.date(2026, 5, 12)
master = make_master(["A", "B"])
prices = make_prices(["A", "B"], days=30, asof=asof)
flow = make_flow(["A", "B"], days=30, asof=asof, foreign_per_day={"A": 0, "B": 0})
out = ForeignBuy().compute(_ctx(master, prices, flow), {"window_days": 5})
assert (out == 50.0).all()

View File

@@ -0,0 +1,32 @@
import datetime as dt
import pandas as pd
from app.screener.engine import ScreenContext
from app.screener.nodes.high52w import High52WProximity
from app.screener._test_fixtures import make_master, make_prices, make_flow
def _ctx(master, prices, flow):
return ScreenContext(master=master, prices=prices, flow=flow,
kospi=pd.Series(dtype=float, name="kospi"),
asof=dt.date(2026, 5, 12))
def test_proximity_at_high_returns_100():
asof = dt.date(2026, 5, 12)
master = make_master(["A"])
prices = make_prices(["A"], days=260, asof=asof, trend_pct=0.05)
flow = make_flow(["A"], days=260, asof=asof)
out = High52WProximity().compute(_ctx(master, prices, flow), {"window_days": 252})
assert out["A"] >= 95
def test_proximity_below_70pct_returns_0():
asof = dt.date(2026, 5, 12)
master = make_master(["A"])
prices = make_prices(["A"], days=260, asof=asof, start_close=100000, trend_pct=-0.5)
flow = make_flow(["A"], days=260, asof=asof)
out = High52WProximity().compute(_ctx(master, prices, flow), {"window_days": 252})
assert out["A"] == 0

View File

@@ -0,0 +1,46 @@
import datetime as dt
import pandas as pd
from app.screener.nodes.hygiene import HygieneGate
from app.screener.engine import ScreenContext
from app.screener._test_fixtures import make_master, make_prices, make_flow
def _ctx(master, prices, flow):
return ScreenContext(
master=master, prices=prices, flow=flow,
kospi=pd.Series(dtype=float, name="kospi"),
asof=dt.date(2026, 5, 12),
)
def test_filter_excludes_small_cap():
g = HygieneGate()
ctx = _ctx(
make_master(["A", "B"], market_caps={"A": 1_000_000_000, "B": 100_000_000_000}),
make_prices(["A", "B"], days=30),
make_flow(["A", "B"], days=30),
)
out = g.filter(ctx, {**g.default_params, "min_listed_days": 0})
assert list(out) == ["B"]
def test_filter_excludes_preferred():
g = HygieneGate()
ctx = _ctx(
make_master(["A", "B"], preferred={"B"}),
make_prices(["A", "B"], days=30),
make_flow(["A", "B"], days=30),
)
out = g.filter(ctx, {**g.default_params, "min_listed_days": 0})
assert list(out) == ["A"]
def test_filter_excludes_low_value():
g = HygieneGate()
prices = make_prices(["A", "B"], days=30)
prices.loc[prices["ticker"] == "A", "value"] = 100_000 # 매우 작음
ctx = _ctx(make_master(["A", "B"]), prices, make_flow(["A", "B"], days=30))
out = g.filter(ctx, {**g.default_params, "min_listed_days": 0})
assert list(out) == ["B"]

View File

@@ -0,0 +1,30 @@
import datetime as dt
import pandas as pd
from app.screener.engine import ScreenContext
from app.screener.nodes.ma_alignment import MaAlignment
from app.screener._test_fixtures import make_master, make_prices, make_flow
def _ctx(master, prices, flow):
return ScreenContext(master=master, prices=prices, flow=flow,
kospi=pd.Series(dtype=float, name="kospi"),
asof=dt.date(2026, 5, 12))
def test_strong_uptrend_returns_100():
asof = dt.date(2026, 5, 12)
master = make_master(["UP"])
prices = make_prices(["UP"], days=260, asof=asof, start_close=50000, trend_pct=0.2)
flow = make_flow(["UP"], days=260, asof=asof)
out = MaAlignment().compute(_ctx(master, prices, flow), MaAlignment.default_params)
assert out["UP"] == 100.0
def test_downtrend_returns_low():
asof = dt.date(2026, 5, 12)
master = make_master(["DN"])
prices = make_prices(["DN"], days=260, asof=asof, start_close=100000, trend_pct=-0.1)
flow = make_flow(["DN"], days=260, asof=asof)
out = MaAlignment().compute(_ctx(master, prices, flow), MaAlignment.default_params)
assert out["DN"] <= 20.0

View File

@@ -0,0 +1,24 @@
import datetime as dt
import pandas as pd
from app.screener.engine import ScreenContext
from app.screener.nodes.momentum import Momentum20
from app.screener._test_fixtures import make_master, make_prices, make_flow
def _ctx(master, prices, flow):
return ScreenContext(master=master, prices=prices, flow=flow,
kospi=pd.Series(dtype=float, name="kospi"),
asof=dt.date(2026, 5, 12))
def test_higher_momentum_gets_higher_score():
asof = dt.date(2026, 5, 12)
master = make_master(["UP", "DN"])
up = make_prices(["UP"], days=30, asof=asof, trend_pct=0.5)
dn = make_prices(["DN"], days=30, asof=asof, trend_pct=-0.3)
prices = pd.concat([up, dn], ignore_index=True)
flow = make_flow(["UP", "DN"], days=30, asof=asof)
out = Momentum20().compute(_ctx(master, prices, flow), {"window_days": 20})
assert out["UP"] > out["DN"]

View File

@@ -0,0 +1,25 @@
import datetime as dt
import pandas as pd
from app.screener.engine import ScreenContext
from app.screener.nodes.rs_rating import RsRating
from app.screener._test_fixtures import make_master, make_prices, make_flow, make_kospi
def _ctx(master, prices, flow, kospi):
return ScreenContext(master=master, prices=prices, flow=flow,
kospi=kospi, asof=dt.date(2026, 5, 12))
def test_outperformer_gets_higher_score():
asof = dt.date(2026, 5, 12)
master = make_master(["UP", "DN"])
up = make_prices(["UP"], days=260, asof=asof, trend_pct=0.3)
dn = make_prices(["DN"], days=260, asof=asof, trend_pct=-0.1)
prices = pd.concat([up, dn], ignore_index=True)
flow = make_flow(["UP", "DN"], days=260, asof=asof)
kospi = make_kospi(days=260, asof=asof, trend_pct=0.0)
out = RsRating().compute(_ctx(master, prices, flow, kospi),
RsRating.default_params)
assert out["UP"] > out["DN"]

View File

@@ -0,0 +1,36 @@
import datetime as dt
import pandas as pd
from app.screener.engine import ScreenContext
from app.screener.nodes.vcp_lite import VcpLite
from app.screener._test_fixtures import make_master, make_prices, make_flow
def _ctx(master, prices, flow):
return ScreenContext(master=master, prices=prices, flow=flow,
kospi=pd.Series(dtype=float, name="kospi"),
asof=dt.date(2026, 5, 12))
def test_contracting_stock_scores_higher_than_expanding():
asof = dt.date(2026, 5, 12)
master = make_master(["CON", "EXP"])
prices = make_prices(["CON", "EXP"], days=260, asof=asof)
# CON: 최근 40일 변동성 축소 (high/low 좁힘)
mask_recent_con = (prices["ticker"] == "CON") & (
prices["date"] >= (asof - dt.timedelta(days=40)).isoformat()
)
prices.loc[mask_recent_con, "high"] = (prices.loc[mask_recent_con, "close"] * 1.003).astype(int)
prices.loc[mask_recent_con, "low"] = (prices.loc[mask_recent_con, "close"] * 0.997).astype(int)
# EXP: 최근 40일 변동성 확대
mask_recent_exp = (prices["ticker"] == "EXP") & (
prices["date"] >= (asof - dt.timedelta(days=40)).isoformat()
)
prices.loc[mask_recent_exp, "high"] = (prices.loc[mask_recent_exp, "close"] * 1.05).astype(int)
prices.loc[mask_recent_exp, "low"] = (prices.loc[mask_recent_exp, "close"] * 0.95).astype(int)
flow = make_flow(["CON", "EXP"], days=260, asof=asof)
out = VcpLite().compute(_ctx(master, prices, flow), VcpLite.default_params)
assert out["CON"] > out["EXP"]

View File

@@ -0,0 +1,28 @@
import datetime as dt
import pandas as pd
from app.screener.engine import ScreenContext
from app.screener.nodes.volume_surge import VolumeSurge
from app.screener._test_fixtures import make_master, make_prices, make_flow
def _ctx(master, prices, flow):
return ScreenContext(master=master, prices=prices, flow=flow,
kospi=pd.Series(dtype=float, name="kospi"),
asof=dt.date(2026, 5, 12))
def test_recent_volume_surge_gets_higher_score():
asof = dt.date(2026, 5, 12)
master = make_master(["A", "B"])
prices = make_prices(["A", "B"], days=30, asof=asof)
# A는 최근 3일 거래량 10배로
mask = (prices["ticker"] == "A") & (prices["date"] >= (asof - dt.timedelta(days=3)).isoformat())
prices.loc[mask, "volume"] *= 10
flow = make_flow(["A", "B"], days=30, asof=asof)
out = VolumeSurge().compute(
_ctx(master, prices, flow),
{"baseline_days": 20, "eval_days": 3},
)
assert out["A"] > out["B"]

View File

@@ -0,0 +1,33 @@
import datetime as dt
import pandas as pd
from app.screener.engine import ScreenContext
from app.screener.position_sizer import compute_atr_wilder, plan_positions
from app.screener._test_fixtures import make_master, make_prices, make_flow
def _ctx(master, prices, flow):
return ScreenContext(master=master, prices=prices, flow=flow,
kospi=pd.Series(dtype=float, name="kospi"),
asof=dt.date(2026, 5, 12))
def test_atr_wilder_positive_and_smooth():
df = make_prices(["A"], days=30)
atr = compute_atr_wilder(df[df["ticker"] == "A"], window=14)
assert atr > 0
def test_plan_positions_returns_entry_stop_target():
asof = dt.date(2026, 5, 12)
master = make_master(["A"])
prices = make_prices(["A"], days=30, asof=asof, start_close=50000)
flow = make_flow(["A"], days=30, asof=asof)
ctx = _ctx(master, prices, flow)
sizing = plan_positions(ctx, ["A"], {"atr_window": 14, "atr_stop_mult": 2.0, "rr_ratio": 2.0})
row = sizing["A"]
assert row["entry_price"] > 0
assert row["stop_price"] < row["entry_price"]
assert row["target_price"] > row["entry_price"]
assert row["atr14"] > 0

View File

@@ -0,0 +1,154 @@
import os
import sqlite3
import pytest
from fastapi.testclient import TestClient
from app.screener.schema import ensure_screener_schema
@pytest.fixture(autouse=True)
def isolated_db(tmp_path, monkeypatch):
db_path = tmp_path / "screener_router.db"
c = sqlite3.connect(db_path)
ensure_screener_schema(c)
c.close()
monkeypatch.setenv("STOCK_DB_PATH", str(db_path))
@pytest.fixture
def client():
from app.main import app
return TestClient(app)
def test_get_nodes_lists_7_score_and_1_gate(client):
r = client.get("/api/stock/screener/nodes")
assert r.status_code == 200
body = r.json()
assert len(body["score_nodes"]) == 7
assert len(body["gate_nodes"]) == 1
assert {n["name"] for n in body["score_nodes"]} == {
"foreign_buy", "volume_surge", "momentum",
"high52w", "rs_rating", "ma_alignment", "vcp_lite",
}
def test_settings_get_returns_defaults(client):
r = client.get("/api/stock/screener/settings")
assert r.status_code == 200
body = r.json()
assert body["weights"]["foreign_buy"] == 1.0
assert body["top_n"] == 20
def test_settings_put_then_get_round_trip(client):
new_settings = {
"weights": {"foreign_buy": 2.5, "momentum": 1.0, "volume_surge": 1.0,
"high52w": 1.2, "rs_rating": 1.2, "ma_alignment": 1.0, "vcp_lite": 0.8},
"node_params": {"foreign_buy": {"window_days": 7}},
"gate_params": {"min_market_cap_won": 100_000_000_000,
"min_avg_value_won": 500_000_000,
"min_listed_days": 60,
"skip_managed": True, "skip_preferred": True, "skip_spac": True,
"skip_halted_days": 3},
"top_n": 30,
"rr_ratio": 2.5,
"atr_window": 14,
"atr_stop_mult": 2.0,
}
r = client.put("/api/stock/screener/settings", json=new_settings)
assert r.status_code == 200
r2 = client.get("/api/stock/screener/settings")
body = r2.json()
assert body["weights"]["foreign_buy"] == 2.5
assert body["top_n"] == 30
# ---- /run tests ----
from app.screener._test_fixtures import make_master, make_prices, make_flow
def _seed_min(conn, asof_iso="2026-05-12"):
import datetime as dt
now = dt.datetime.utcnow().isoformat()
rows = [
("BIG1", "큰주식1", "KOSPI", 200_000_000_000, 0, 0, 0, None, now),
("BIG2", "큰주식2", "KOSPI", 100_000_000_000, 0, 0, 0, None, now),
("SMALL", "작은주식", "KOSPI", 1_000_000_000, 0, 0, 0, None, now),
]
for r in rows:
conn.execute("""INSERT INTO krx_master (ticker,name,market,market_cap,
is_managed,is_preferred,is_spac,listed_date,updated_at)
VALUES (?,?,?,?,?,?,?,?,?)""", r)
asof = dt.date(2026, 5, 12)
p = make_prices(["BIG1", "BIG2", "SMALL"], days=260, asof=asof)
f = make_flow(["BIG1", "BIG2", "SMALL"], days=260, asof=asof,
foreign_per_day={"BIG1": 100_000_000, "BIG2": 50_000_000, "SMALL": 0})
p.to_sql("krx_daily_prices", conn, if_exists="append", index=False)
f.to_sql("krx_flow", conn, if_exists="append", index=False)
conn.commit()
def test_run_preview_no_save(client):
db_path = os.environ["STOCK_DB_PATH"]
c = sqlite3.connect(db_path)
_seed_min(c)
c.close()
r = client.post("/api/stock/screener/run", json={"mode": "preview", "asof": "2026-05-12"})
assert r.status_code == 200
body = r.json()
assert body["status"] == "success"
assert body["run_id"] is None
assert body["telegram_payload"] is not None
c = sqlite3.connect(db_path)
cnt = c.execute("SELECT count(*) FROM screener_runs").fetchone()[0]
assert cnt == 0
def test_run_manual_save_writes_row(client):
db_path = os.environ["STOCK_DB_PATH"]
c = sqlite3.connect(db_path)
_seed_min(c)
c.close()
r = client.post("/api/stock/screener/run",
json={"mode": "manual_save", "asof": "2026-05-12"})
assert r.status_code == 200
assert r.json()["run_id"] is not None
c = sqlite3.connect(db_path)
cnt = c.execute("SELECT count(*) FROM screener_runs").fetchone()[0]
assert cnt == 1
def test_runs_list_and_detail(client):
db_path = os.environ["STOCK_DB_PATH"]
c = sqlite3.connect(db_path)
_seed_min(c)
c.close()
saved = client.post(
"/api/stock/screener/run",
json={"mode": "manual_save", "asof": "2026-05-12"},
).json()
run_id = saved["run_id"]
list_r = client.get("/api/stock/screener/runs?limit=5")
assert list_r.status_code == 200
assert any(r["id"] == run_id for r in list_r.json())
detail = client.get(f"/api/stock/screener/runs/{run_id}")
assert detail.status_code == 200
assert detail.json()["meta"]["id"] == run_id
assert isinstance(detail.json()["results"], list)
def test_run_holiday_returns_skipped(client):
# 2026-05-09는 토요일 (주말). _is_holiday 가 weekday>=5를 잡음.
r = client.post("/api/stock/screener/run",
json={"mode": "auto", "asof": "2026-05-09"})
assert r.status_code == 200
assert r.json()["status"] == "skipped_holiday"

View File

@@ -0,0 +1,37 @@
import sqlite3
from app.screener.schema import ensure_screener_schema
def test_creates_all_tables(tmp_path):
db_path = tmp_path / "test.db"
conn = sqlite3.connect(db_path)
ensure_screener_schema(conn)
tables = {r[0] for r in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()}
expected = {
"krx_master", "krx_daily_prices", "krx_flow",
"screener_settings", "screener_runs", "screener_results",
}
assert expected.issubset(tables)
def test_settings_seeded_with_singleton_row(tmp_path):
db_path = tmp_path / "test.db"
conn = sqlite3.connect(db_path)
ensure_screener_schema(conn)
rows = conn.execute("SELECT id FROM screener_settings").fetchall()
assert rows == [(1,)]
def test_idempotent(tmp_path):
db_path = tmp_path / "test.db"
conn = sqlite3.connect(db_path)
ensure_screener_schema(conn)
ensure_screener_schema(conn) # 두 번 호출해도 에러 없어야 함
rows = conn.execute("SELECT count(*) FROM screener_settings").fetchall()
assert rows == [(1,)]

View File

@@ -0,0 +1,129 @@
import datetime as dt
import sqlite3
import pandas as pd
import pytest
from app.screener import snapshot as snap
from app.screener.schema import ensure_screener_schema
@pytest.fixture
def conn(tmp_path):
db_path = tmp_path / "snap.db"
c = sqlite3.connect(db_path)
ensure_screener_schema(c)
yield c
c.close()
def _stub_listing(monkeypatch):
df = pd.DataFrame([
{"Code": "005930", "Name": "삼성전자", "Market": "KOSPI",
"Marcap": 420_000_000_000_000,
"Open": 70000, "High": 72000, "Low": 69500, "Close": 71000,
"Volume": 12_000_000, "Amount": 840_000_000_000},
{"Code": "035420", "Name": "NAVER", "Market": "KOSPI",
"Marcap": 30_000_000_000_000,
"Open": 215000, "High": 220000, "Low": 213000, "Close": 218000,
"Volume": 1_000_000, "Amount": 218_000_000_000},
{"Code": "091990", "Name": "셀트리온헬스케어우", "Market": "KOSDAQ",
"Marcap": 10_000_000_000_000,
"Open": 60000, "High": 61000, "Low": 59500, "Close": 60500,
"Volume": 500_000, "Amount": 30_250_000_000},
])
monkeypatch.setattr(snap, "fetch_master_listing", lambda: df)
def _stub_flow(monkeypatch, mapping):
def fake_flow(ticker, *, client):
if mapping is None:
return None
v = mapping.get(ticker)
if v is None:
return None
return {
"date": dt.date(2026, 5, 12).isoformat(),
"foreign_net": v["foreign_net"],
"institution_net": v["institution_net"],
}
monkeypatch.setattr(snap, "fetch_flow_naver", fake_flow)
def test_refresh_daily_writes_master_and_prices(conn, monkeypatch):
_stub_listing(monkeypatch)
_stub_flow(monkeypatch, None)
summary = snap.refresh_daily(conn, dt.date(2026, 5, 12),
flow_top_n=10, rate_limit_sec=0)
assert summary["master_count"] == 3
assert summary["prices_count"] == 3
assert summary["flow_count"] == 0
row = conn.execute(
"SELECT close FROM krx_daily_prices WHERE ticker='005930' AND date='2026-05-12'"
).fetchone()
assert row[0] == 71000
def test_refresh_daily_writes_flow_for_top_n(conn, monkeypatch):
_stub_listing(monkeypatch)
_stub_flow(monkeypatch, {
"005930": {"foreign_net": 12_000_000_000, "institution_net": 4_000_000_000},
"035420": {"foreign_net": -3_000_000_000, "institution_net": 8_000_000_000},
})
summary = snap.refresh_daily(conn, dt.date(2026, 5, 12),
flow_top_n=2, rate_limit_sec=0)
assert summary["flow_count"] == 2
row = conn.execute(
"SELECT foreign_net FROM krx_flow WHERE ticker='005930'"
).fetchone()
assert row[0] == 12_000_000_000
def test_master_flags_preferred(conn, monkeypatch):
_stub_listing(monkeypatch)
_stub_flow(monkeypatch, None)
snap.refresh_daily(conn, dt.date(2026, 5, 12), flow_top_n=0, rate_limit_sec=0)
pref = conn.execute(
"SELECT is_preferred FROM krx_master WHERE ticker='091990'"
).fetchone()
assert pref[0] == 1
def test_refresh_daily_is_idempotent(conn, monkeypatch):
_stub_listing(monkeypatch)
_stub_flow(monkeypatch, None)
snap.refresh_daily(conn, dt.date(2026, 5, 12), flow_top_n=0, rate_limit_sec=0)
snap.refresh_daily(conn, dt.date(2026, 5, 12), flow_top_n=0, rate_limit_sec=0)
cnt = conn.execute(
"SELECT count(*) FROM krx_daily_prices WHERE date='2026-05-12'"
).fetchone()[0]
assert cnt == 3
def test_fetch_flow_naver_parses_html():
"""Real HTML structure parse with synthetic naver-like markup."""
html = """
<html><body>
<table class="type2">
<tr><th>날짜</th></tr>
<tr><td>2026.05.12</td><td>71,000</td><td>500</td><td>0.71%</td>
<td>12,000,000</td><td>4,000,000,000</td><td>12,000,000,000</td>
<td>1</td><td>53.0</td></tr>
<tr><td>2026.05.09</td><td>70,500</td><td>-200</td><td>-0.28%</td>
<td>10,000,000</td><td>2,000,000,000</td><td>5,000,000,000</td>
<td>1</td><td>52.8</td></tr>
</table>
</body></html>
"""
class FakeResp:
status_code = 200
text = html
class FakeClient:
def get(self, url, params): return FakeResp()
out = snap.fetch_flow_naver("005930", client=FakeClient())
assert out == {
"date": "2026-05-12",
"foreign_net": 12_000_000_000,
"institution_net": 4_000_000_000,
}

View File

@@ -0,0 +1,51 @@
import datetime as dt
from app.screener.telegram import build_telegram_payload
def test_build_payload_includes_top10_and_link():
rows = [
{
"rank": i, "ticker": f"00{i:04}", "name": f"종목{i}",
"total_score": 90 - i,
"scores": {"foreign_buy": 80 + i, "volume_surge": 60, "momentum": 70,
"high52w": 75, "rs_rating": 85, "ma_alignment": 80, "vcp_lite": 30},
"close": 50000, "entry_price": 50250, "stop_price": 48500,
"target_price": 53750, "r_pct": 3.5,
}
for i in range(1, 21)
]
p = build_telegram_payload(
asof=dt.date(2026, 5, 12),
mode="auto",
survivors_count=612,
top_n=20,
rows=rows,
run_id=42,
)
assert p["parse_mode"] == "MarkdownV2"
text = p["text"]
assert "2026" in text and "05" in text and "12" in text
assert "종목1" in text
assert "종목10" in text
assert "종목11" not in text # 본문 1-10만
assert "42" in text # run_id 링크
def test_score_threshold_filters_icons():
rows = [{
"rank": 1, "ticker": "A", "name": "A주",
"total_score": 80,
"scores": {"foreign_buy": 90, "volume_surge": 50, "momentum": 70,
"high52w": 30, "rs_rating": 80, "ma_alignment": 80, "vcp_lite": 60},
"close": 50000, "entry_price": 50250, "stop_price": 48500,
"target_price": 53750, "r_pct": 3.5,
}]
p = build_telegram_payload(dt.date(2026, 5, 12), "auto", 100, 1, rows, run_id=1)
# foreign_buy(90), momentum(70), rs_rating(80), ma_alignment(80) 만 표시 (≥70)
assert "👤외" in p["text"]
assert "🚀모" in p["text"]
assert "💪RS" in p["text"]
assert "📈MA" in p["text"]
assert "⚡거" not in p["text"]
assert "🆙고" not in p["text"]
assert "🌀VCP" not in p["text"]

View File

@@ -6,4 +6,6 @@ fastapi==0.115.6
uvicorn[standard]==0.30.6
apscheduler==3.10.4
python-dotenv==1.0.1
finance-datareader==0.9.110
lxml==6.1.0