From d8b3267b98fe1d23494698e7ca399b7206c466dd Mon Sep 17 00:00:00 2001 From: gahusb Date: Thu, 2 Jul 2026 15:51:06 +0900 Subject: [PATCH] =?UTF-8?q?feat(stock):=20=EA=B0=90=EC=8B=9C=EB=8C=80?= =?UTF-8?q?=EC=83=81(monitor-set)=20=EC=A1=B0=EB=A6=BD=20=EB=A1=9C?= =?UTF-8?q?=EC=A7=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- stock/app/trade_alerts.py | 78 +++++++++++++++ stock/tests/test_trade_alerts_monitorset.py | 101 ++++++++++++++++++++ 2 files changed, 179 insertions(+) create mode 100644 stock/app/trade_alerts.py create mode 100644 stock/tests/test_trade_alerts_monitorset.py diff --git a/stock/app/trade_alerts.py b/stock/app/trade_alerts.py new file mode 100644 index 0000000..8e617cc --- /dev/null +++ b/stock/app/trade_alerts.py @@ -0,0 +1,78 @@ +"""매매 알람 — 감시대상(monitor-set) 조립. 순수 조립 로직(HTTP/텔레그램 없음). + +계약 §5.1 (docs/superpowers/specs/2026-07-02-realtime-trade-alerts-design.md) — +Windows 워커가 GET /api/webai/trade-alert/monitor-set 로 받는 응답을 조립한다. +NAS는 watchlist ∪ screener 최신 성공 run 후보를 buy_targets로, 보유 종목을 +sell_targets로 병합해 넘긴다. TA/조건판정은 워커 쪽 책임. +""" +from datetime import datetime, timedelta, timezone +from typing import Optional + +from app.db import get_all_portfolio, get_watchlist + +_KST = timezone(timedelta(hours=9)) + + +def latest_screener_candidates(conn) -> list: + """최신 성공(status='success') screener run의 후보 {ticker,name} 목록.""" + row = conn.execute( + "SELECT id FROM screener_runs WHERE status='success' ORDER BY asof DESC, id DESC LIMIT 1" + ).fetchone() + if not row: + return [] + run_id = row[0] + rows = conn.execute( + "SELECT ticker, name FROM screener_results WHERE run_id=? ORDER BY rank", (run_id,) + ).fetchall() + return [{"ticker": r[0], "name": r[1]} for r in rows] + + +def holding_high(conn, ticker: str, lookback_days: int = 60) -> Optional[float]: + """보유기간 고점(트레일링 스톱용) — krx_daily_prices 최근 lookback_days 최고 high.""" + row = conn.execute( + "SELECT MAX(high) FROM krx_daily_prices WHERE ticker=? " + "AND date >= date('now', ?)", + (ticker, f"-{int(lookback_days)} days"), + ).fetchone() + return row[0] if row and row[0] is not None else None + + +def build_monitor_set(conn, session: str, exit_params: dict, buy_params: dict) -> dict: + """계약 §5.1 monitor-set 응답 dict 조립. + + buy_targets = watchlist ∪ 최신 screener 후보 (ticker 기준 중복 제거, watchlist 우선) + sell_targets = 보유 종목(portfolio) + avg_price/qty/holding_high + """ + buy: dict[str, dict] = {} + for w in get_watchlist(): + buy[w["ticker"]] = { + "ticker": w["ticker"], "name": w["name"], + "source": "watch", "params": w.get("params") or {}, + } + for c in latest_screener_candidates(conn): + if c["ticker"] not in buy: + buy[c["ticker"]] = { + "ticker": c["ticker"], "name": c["name"], + "source": "screener", "params": {}, + } + + sell_targets = [] + for p in get_all_portfolio(): + ticker = p["ticker"] + sell_targets.append({ + "ticker": ticker, + "name": p.get("name"), + "avg_price": p.get("avg_price"), + "qty": p.get("quantity"), + "holding_high": holding_high(conn, ticker), + "params": {}, + }) + + return { + "session": session, + "as_of": datetime.now(_KST).isoformat(), + "buy_targets": list(buy.values()), + "sell_targets": sell_targets, + "buy_params": buy_params, + "exit_params": exit_params, + } diff --git a/stock/tests/test_trade_alerts_monitorset.py b/stock/tests/test_trade_alerts_monitorset.py new file mode 100644 index 0000000..87a1fb6 --- /dev/null +++ b/stock/tests/test_trade_alerts_monitorset.py @@ -0,0 +1,101 @@ +import sqlite3 +import pytest + + +@pytest.fixture +def conn(monkeypatch, tmp_path): + from app import db as _db + monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db")) + _db.init_db() + c = sqlite3.connect(_db.DB_PATH) + c.row_factory = sqlite3.Row + # 보유 1종목 (add_portfolio_item 실제 시그니처: broker/ticker/name/quantity/avg_price — market 파라미터 없음) + _db.add_portfolio_item(ticker="000660", name="SK하이닉스", quantity=10, + avg_price=180000, broker="kis") + # watchlist 1종목 + _db.add_watchlist("005930", "삼성전자") + yield c + c.close() + + +def test_build_monitor_set_merges_sources(conn): + from app import trade_alerts as ta + ms = ta.build_monitor_set(conn, session="regular", + exit_params={"stop_pct": 0.08}, buy_params={"rsi_oversold": 30}) + buy_tickers = {t["ticker"] for t in ms["buy_targets"]} + sell_tickers = {t["ticker"] for t in ms["sell_targets"]} + assert "005930" in buy_tickers # watchlist + assert "000660" in sell_tickers # 보유 + assert ms["session"] == "regular" + assert ms["exit_params"]["stop_pct"] == 0.08 + sell = next(t for t in ms["sell_targets"] if t["ticker"] == "000660") + assert sell["avg_price"] == 180000 and sell["qty"] == 10 + + +def test_latest_screener_candidates_empty_when_no_run(conn): + from app import trade_alerts as ta + assert ta.latest_screener_candidates(conn) == [] + + +def test_latest_screener_candidates_picks_latest_success_run(conn): + from app import trade_alerts as ta + now = "2026-07-02T09:00:00Z" + conn.execute( + "INSERT INTO screener_runs (asof, mode, status, started_at, weights_json, " + "node_params_json, gate_params_json, top_n) VALUES (?,?,?,?,?,?,?,?)", + (now, "manual", "failed", now, "{}", "{}", "{}", 20), + ) + conn.execute( + "INSERT INTO screener_runs (asof, mode, status, started_at, weights_json, " + "node_params_json, gate_params_json, top_n) VALUES (?,?,?,?,?,?,?,?)", + (now, "manual", "success", now, "{}", "{}", "{}", 20), + ) + run_id = conn.execute("SELECT id FROM screener_runs WHERE status='success'").fetchone()[0] + conn.execute( + "INSERT INTO screener_results (run_id, rank, ticker, name, total_score, scores_json) " + "VALUES (?,?,?,?,?,?)", + (run_id, 1, "035720", "카카오", 88.5, "{}"), + ) + conn.commit() + candidates = ta.latest_screener_candidates(conn) + assert candidates == [{"ticker": "035720", "name": "카카오"}] + + +def test_holding_high_returns_max_high_within_lookback(conn): + from app import trade_alerts as ta + conn.execute( + "INSERT INTO krx_daily_prices (ticker, date, high) VALUES (?,?,?)", + ("000660", "2026-06-01", 200000), + ) + conn.execute( + "INSERT INTO krx_daily_prices (ticker, date, high) VALUES (?,?,?)", + ("000660", "2026-06-20", 210000), + ) + conn.commit() + assert ta.holding_high(conn, "000660", lookback_days=60) == 210000 + + +def test_holding_high_none_when_no_price_history(conn): + from app import trade_alerts as ta + assert ta.holding_high(conn, "999999") is None + + +def test_build_monitor_set_dedupes_watchlist_and_screener_overlap(conn): + from app import trade_alerts as ta + now = "2026-07-02T09:00:00Z" + cur = conn.execute( + "INSERT INTO screener_runs (asof, mode, status, started_at, weights_json, " + "node_params_json, gate_params_json, top_n) VALUES (?,?,?,?,?,?,?,?)", + (now, "manual", "success", now, "{}", "{}", "{}", 20), + ) + run_id = cur.lastrowid + # 스크리너 후보가 watchlist와 중복(005930) + conn.execute( + "INSERT INTO screener_results (run_id, rank, ticker, name, total_score, scores_json) " + "VALUES (?,?,?,?,?,?)", + (run_id, 1, "005930", "삼성전자", 90.0, "{}"), + ) + conn.commit() + ms = ta.build_monitor_set(conn, session="regular", exit_params={}, buy_params={}) + buy_tickers = [t["ticker"] for t in ms["buy_targets"]] + assert buy_tickers.count("005930") == 1