"""FastAPI router for /api/stock/screener/*""" from __future__ import annotations import datetime as dt import json import os import sqlite3 from typing import Optional from fastapi import APIRouter, HTTPException from . import schemas from .registry import NODE_REGISTRY, GATE_REGISTRY router = APIRouter(prefix="/api/stock/screener") import json as _json import pathlib as _pathlib _HOLIDAYS_CACHE = None def _holidays(): global _HOLIDAYS_CACHE if _HOLIDAYS_CACHE is None: path = _pathlib.Path(__file__).resolve().parent.parent / "holidays.json" try: with path.open(encoding="utf-8") as f: data = _json.load(f) _HOLIDAYS_CACHE = set(data) if isinstance(data, list) else set(data.keys()) except FileNotFoundError: _HOLIDAYS_CACHE = set() return _HOLIDAYS_CACHE def _is_holiday(d: dt.date) -> bool: return d.weekday() >= 5 or d.isoformat() in _holidays() def _db_path() -> str: return os.environ.get("STOCK_DB_PATH", "/app/data/stock.db") def _conn() -> sqlite3.Connection: # WAL 모드 + busy_timeout으로 동시 read/write lock 회피 # WAL은 reader vs writer 동시성만 해결 — writer 두 명은 직렬이므로 busy_timeout이 # snapshot/refresh의 write 시간보다 길어야 함 (네이버 스크래핑 ~20초 + DB upsert). conn = sqlite3.connect(_db_path(), timeout=120.0) conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA busy_timeout=120000") return conn # ---------- /nodes ---------- @router.get("/nodes", response_model=schemas.NodesResponse) def get_nodes(): score_nodes = [ schemas.NodeMeta( name=cls.name, label=cls.label, default_params=cls.default_params, param_schema=cls.param_schema, ) for cls in NODE_REGISTRY.values() ] gate_nodes = [ schemas.NodeMeta( name=cls.name, label=cls.label, default_params=cls.default_params, param_schema=cls.param_schema, ) for cls in GATE_REGISTRY.values() ] return schemas.NodesResponse(score_nodes=score_nodes, gate_nodes=gate_nodes) # ---------- /settings ---------- @router.get("/settings", response_model=schemas.SettingsResponse) def get_settings(): with _conn() as c: row = c.execute( "SELECT weights_json, node_params_json, gate_params_json, " "top_n, rr_ratio, atr_window, atr_stop_mult, updated_at " "FROM screener_settings WHERE id=1" ).fetchone() if row is None: raise HTTPException(503, "settings not initialized") return schemas.SettingsResponse( weights=json.loads(row[0]), node_params=json.loads(row[1]), gate_params=json.loads(row[2]), top_n=row[3], rr_ratio=row[4], atr_window=row[5], atr_stop_mult=row[6], updated_at=row[7], ) @router.put("/settings", response_model=schemas.SettingsResponse) def put_settings(body: schemas.SettingsBody): now = dt.datetime.utcnow().isoformat() with _conn() as c: c.execute( """UPDATE screener_settings SET weights_json=?, node_params_json=?, gate_params_json=?, top_n=?, rr_ratio=?, atr_window=?, atr_stop_mult=?, updated_at=? WHERE id=1""", ( json.dumps(body.weights), json.dumps(body.node_params), json.dumps(body.gate_params), body.top_n, body.rr_ratio, body.atr_window, body.atr_stop_mult, now, ), ) c.commit() return schemas.SettingsResponse(**body.model_dump(), updated_at=now) # ---------- /run ---------- from . import telegram as _tg from .engine import Screener, ScreenContext def _resolve_asof(asof_str, conn: sqlite3.Connection) -> dt.date: if asof_str: return dt.date.fromisoformat(asof_str) row = conn.execute("SELECT max(date) FROM krx_daily_prices").fetchone() if not row or row[0] is None: raise HTTPException(503, "no snapshot available — run /snapshot/refresh first") return dt.date.fromisoformat(row[0]) def _load_settings(conn) -> dict: row = conn.execute( "SELECT weights_json,node_params_json,gate_params_json,top_n," "rr_ratio,atr_window,atr_stop_mult FROM screener_settings WHERE id=1" ).fetchone() return { "weights": json.loads(row[0]), "node_params": json.loads(row[1]), "gate_params": json.loads(row[2]), "top_n": row[3], "rr_ratio": row[4], "atr_window": row[5], "atr_stop_mult": row[6], } def _persist_run(conn, asof, mode, weights, node_params, gate_params, top_n, result, started_at, finished_at) -> int: cur = conn.execute( """INSERT INTO screener_runs (asof,mode,status,started_at,finished_at, weights_json,node_params_json,gate_params_json,top_n,survivors_count,telegram_sent) VALUES (?,?,?,?,?,?,?,?,?,?,0)""", (asof.isoformat(), mode, "success", started_at, finished_at, json.dumps(weights), json.dumps(node_params), json.dumps(gate_params), top_n, result.survivors_count), ) run_id = cur.lastrowid for row in result.rows: conn.execute( """INSERT INTO screener_results (run_id,rank,ticker,name,total_score, scores_json,close,market_cap,entry_price,stop_price,target_price,atr14) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""", (run_id, row["rank"], row["ticker"], row["name"], row["total_score"], json.dumps(row["scores"]), row["close"], row["market_cap"], row["entry_price"], row["stop_price"], row["target_price"], row["atr14"]), ) conn.commit() return run_id @router.post("/run", response_model=schemas.RunResponse) def post_run(body: schemas.RunRequest): from .registry import NODE_REGISTRY as _NR, GATE_REGISTRY as _GR started_at = dt.datetime.utcnow().isoformat() with _conn() as c: asof = _resolve_asof(body.asof, c) # Skipped holiday handling for mode='auto' if body.mode == "auto" and _is_holiday(asof): return schemas.RunResponse( asof=asof.isoformat(), mode="auto", status="skipped_holiday", run_id=None, survivors_count=None, weights={}, top_n=0, results=[], telegram_payload=None, warnings=[f"{asof.isoformat()} is a holiday — skipped"], ) defaults = _load_settings(c) if body.mode == "auto": weights = defaults["weights"] node_params = defaults["node_params"] gate_params = defaults["gate_params"] top_n = defaults["top_n"] else: weights = body.weights if body.weights is not None else defaults["weights"] node_params = body.node_params if body.node_params is not None else defaults["node_params"] gate_params = body.gate_params if body.gate_params is not None else defaults["gate_params"] top_n = body.top_n if body.top_n is not None else defaults["top_n"] sizer_params = { "atr_window": defaults["atr_window"], "atr_stop_mult": defaults["atr_stop_mult"], "rr_ratio": defaults["rr_ratio"], } ctx = ScreenContext.load(c, asof) score_nodes = [cls() for name, cls in _NR.items() if weights.get(name, 0) > 0] gate = _GR["hygiene"]() try: screener = Screener( gate=gate, score_nodes=score_nodes, weights=weights, node_params=node_params, gate_params=gate_params, top_n=top_n, sizer_params=sizer_params, ) result = screener.run(ctx) except ValueError as e: raise HTTPException(422, str(e)) finished_at = dt.datetime.utcnow().isoformat() run_id = None if body.mode in ("manual_save", "auto"): run_id = _persist_run(c, asof, body.mode, weights, node_params, gate_params, top_n, result, started_at, finished_at) payload = _tg.build_telegram_payload( asof=asof, mode=body.mode, survivors_count=result.survivors_count, top_n=top_n, rows=result.rows, run_id=run_id, ) return schemas.RunResponse( asof=asof.isoformat(), mode=body.mode, status="success", run_id=run_id, survivors_count=result.survivors_count, weights=weights, top_n=top_n, results=result.rows, telegram_payload=schemas.TelegramPayload(**payload), warnings=result.warnings, ) # ---------- /snapshot/refresh ---------- from . import snapshot as _snap @router.post("/snapshot/refresh") def post_snapshot_refresh(asof: Optional[str] = None): asof_date = dt.date.fromisoformat(asof) if asof else dt.date.today() if asof_date.weekday() >= 5: return {"asof": asof_date.isoformat(), "status": "skipped_weekend"} with _conn() as c: summary = _snap.refresh_daily(c, asof_date) return summary # ---------- /runs ---------- @router.get("/runs", response_model=list[schemas.RunSummary]) def list_runs(limit: int = 30): with _conn() as c: rows = c.execute( "SELECT id,asof,mode,status,started_at,finished_at,top_n," "survivors_count,telegram_sent FROM screener_runs " "ORDER BY asof DESC, id DESC LIMIT ?", (limit,), ).fetchall() return [ schemas.RunSummary( id=r[0], asof=r[1], mode=r[2], status=r[3], started_at=r[4], finished_at=r[5], top_n=r[6], survivors_count=r[7], telegram_sent=bool(r[8]), ) for r in rows ] @router.get("/runs/{run_id}") def get_run(run_id: int): with _conn() as c: meta = c.execute( "SELECT id,asof,mode,status,started_at,finished_at,top_n," "survivors_count,telegram_sent,weights_json,node_params_json,gate_params_json " "FROM screener_runs WHERE id=?", (run_id,), ).fetchone() if not meta: raise HTTPException(404, "run not found") rows = c.execute( "SELECT rank,ticker,name,total_score,scores_json,close,market_cap," "entry_price,stop_price,target_price,atr14 " "FROM screener_results WHERE run_id=? ORDER BY rank", (run_id,), ).fetchall() return { "meta": { "id": meta[0], "asof": meta[1], "mode": meta[2], "status": meta[3], "started_at": meta[4], "finished_at": meta[5], "top_n": meta[6], "survivors_count": meta[7], "telegram_sent": bool(meta[8]), "weights": json.loads(meta[9]), "node_params": json.loads(meta[10]), "gate_params": json.loads(meta[11]), }, "results": [ { "rank": r[0], "ticker": r[1], "name": r[2], "total_score": r[3], "scores": json.loads(r[4]), "close": r[5], "market_cap": r[6], "entry_price": r[7], "stop_price": r[8], "target_price": r[9], "atr14": r[10], } for r in rows ], }