"""매매 알람 — 감시대상(monitor-set) 조립. 순수 조립 로직(HTTP/텔레그램 없음). 계약 §5.1 (docs/superpowers/specs/2026-07-02-realtime-trade-alerts-design.md) — Windows 워커가 GET /api/webai/trade-alert/monitor-set 로 받는 응답을 조립한다. NAS는 watchlist ∪ screener 최신 성공 run 후보를 buy_targets로, 보유 종목을 sell_targets로 병합해 넘긴다. TA/조건판정은 워커 쪽 책임. """ from datetime import datetime, timedelta, timezone from typing import Optional from app.db import get_all_portfolio, get_watchlist _KST = timezone(timedelta(hours=9)) def latest_screener_candidates(conn) -> list: """최신 성공(status='success') screener run의 후보 {ticker,name} 목록.""" row = conn.execute( "SELECT id FROM screener_runs WHERE status='success' ORDER BY asof DESC, id DESC LIMIT 1" ).fetchone() if not row: return [] run_id = row[0] rows = conn.execute( "SELECT ticker, name FROM screener_results WHERE run_id=? ORDER BY rank", (run_id,) ).fetchall() return [{"ticker": r[0], "name": r[1]} for r in rows] def holding_high(conn, ticker: str, lookback_days: int = 60) -> Optional[float]: """보유기간 고점(트레일링 스톱용) — krx_daily_prices 최근 lookback_days 최고 high.""" row = conn.execute( "SELECT MAX(high) FROM krx_daily_prices WHERE ticker=? " "AND date >= date('now', ?)", (ticker, f"-{int(lookback_days)} days"), ).fetchone() return row[0] if row and row[0] is not None else None def build_monitor_set(conn, session: str, exit_params: dict, buy_params: dict) -> dict: """계약 §5.1 monitor-set 응답 dict 조립. buy_targets = watchlist ∪ 최신 screener 후보 (ticker 기준 중복 제거, watchlist 우선) sell_targets = 보유 종목(portfolio) + avg_price/qty/holding_high """ buy: dict[str, dict] = {} for w in get_watchlist(): buy[w["ticker"]] = { "ticker": w["ticker"], "name": w["name"], "source": "watch", "params": w.get("params") or {}, } for c in latest_screener_candidates(conn): if c["ticker"] not in buy: buy[c["ticker"]] = { "ticker": c["ticker"], "name": c["name"], "source": "screener", "params": {}, } sell_targets = [] for p in get_all_portfolio(): ticker = p["ticker"] sell_targets.append({ "ticker": ticker, "name": p.get("name"), "avg_price": p.get("avg_price"), "qty": p.get("quantity"), "holding_high": holding_high(conn, ticker), "params": {}, }) return { "session": session, "as_of": datetime.now(_KST).isoformat(), "buy_targets": list(buy.values()), "sell_targets": sell_targets, "buy_params": buy_params, "exit_params": exit_params, } def diff_firing(reported: list, prev: set) -> dict: """워커 발화집합(reported) vs 직전 발화상태(prev) edge diff. reported 각 항목: {ticker,kind,condition,price,detail,name?}. key = (ticker,kind,condition). 반환 {"new":[신규 alert...], "cleared":[해제 key...], "seen":[현재 key...]}. """ cur = {} for a in reported: key = (a["ticker"], a["kind"], a["condition"]) cur[key] = a cur_keys = set(cur.keys()) new_keys = cur_keys - prev cleared = sorted(prev - cur_keys) return { "new": [cur[k] for k in cur_keys if k in new_keys], "cleared": cleared, "seen": sorted(cur_keys), }