"""매매 알람 — 감시대상(monitor-set) 조립. 순수 조립 로직(HTTP/텔레그램 없음). 계약 §5.1 (docs/superpowers/specs/2026-07-02-realtime-trade-alerts-design.md) — Windows 워커가 GET /api/webai/trade-alert/monitor-set 로 받는 응답을 조립한다. NAS는 watchlist ∪ screener 최신 성공 run 후보를 buy_targets로, 보유 종목을 sell_targets로 병합해 넘긴다. TA/조건판정은 워커 쪽 책임. """ import os import httpx from datetime import datetime, timedelta, timezone, time as _time from typing import Optional from app.db import get_all_portfolio, get_watchlist _KST = timezone(timedelta(hours=9)) # KST 세션 창(시:분) — 평일+휴장 판정은 호출부에서 is_market_open으로 별도 게이팅 _SESSIONS = [ ("pre", (8, 30), (9, 0)), ("regular", (9, 0), (15, 30)), ("after", (16, 0), (18, 0)), ] def current_session(now_kst) -> str: """now_kst의 time만으로 pre/regular/after/closed 세션 판정 (요일·휴장 무관).""" t = now_kst.time() for name, (sh, sm), (eh, em) in _SESSIONS: if _time(sh, sm) <= t < _time(eh, em): return name return "closed" DEFAULT_EXIT_PARAMS = {"stop_pct": 0.08, "take_pct": 0.25, "trailing_pct": 0.10} DEFAULT_BUY_PARAMS = {"rsi_oversold": 30, "breakout_vol_mult": 1.5, "pullback_pct": 0.02} def latest_screener_candidates(conn) -> list: """최신 성공(status='success') screener run의 후보 {ticker,name} 목록.""" row = conn.execute( "SELECT id FROM screener_runs WHERE status='success' ORDER BY asof DESC, id DESC LIMIT 1" ).fetchone() if not row: return [] run_id = row[0] rows = conn.execute( "SELECT ticker, name FROM screener_results WHERE run_id=? ORDER BY rank", (run_id,) ).fetchall() return [{"ticker": r[0], "name": r[1]} for r in rows] def holding_high(conn, ticker: str, lookback_days: int = 60) -> Optional[float]: """보유기간 고점(트레일링 스톱용) — krx_daily_prices 최근 lookback_days 최고 high.""" row = conn.execute( "SELECT MAX(high) FROM krx_daily_prices WHERE ticker=? " "AND date >= date('now', ?)", (ticker, f"-{int(lookback_days)} days"), ).fetchone() return row[0] if row and row[0] is not None else None def build_monitor_set(conn, session: str, exit_params: dict, buy_params: dict) -> dict: """계약 §5.1 monitor-set 응답 dict 조립. buy_targets = watchlist ∪ 최신 screener 후보 (ticker 기준 중복 제거, watchlist 우선) sell_targets = 보유 종목(portfolio) + avg_price/qty/holding_high """ buy: dict[str, dict] = {} for w in get_watchlist(): buy[w["ticker"]] = { "ticker": w["ticker"], "name": w["name"], "source": "watch", "params": w.get("params") or {}, } for c in latest_screener_candidates(conn): if c["ticker"] not in buy: buy[c["ticker"]] = { "ticker": c["ticker"], "name": c["name"], "source": "screener", "params": {}, } sell_targets = [] for p in get_all_portfolio(): ticker = p["ticker"] sell_targets.append({ "ticker": ticker, "name": p.get("name"), "avg_price": p.get("avg_price"), "qty": p.get("quantity"), "holding_high": holding_high(conn, ticker), "params": {}, }) return { "session": session, "as_of": datetime.now(_KST).isoformat(), "buy_targets": list(buy.values()), "sell_targets": sell_targets, "buy_params": buy_params, "exit_params": exit_params, } def diff_firing(reported: list, prev: set) -> dict: """워커 발화집합(reported) vs 직전 발화상태(prev) edge diff. reported 각 항목: {ticker,kind,condition,price,detail,name?}. key = (ticker,kind,condition). 반환 {"new":[신규 alert...], "cleared":[해제 key...], "seen":[현재 key...]}. """ cur = {} for a in reported: key = (a["ticker"], a["kind"], a["condition"]) cur[key] = a cur_keys = set(cur.keys()) new_keys = cur_keys - prev cleared = sorted(prev - cur_keys) return { "new": [cur[k] for k in cur_keys if k in new_keys], "cleared": cleared, "seen": sorted(cur_keys), } def notify_agent_office(alerts: list) -> bool: """신규 alert들을 agent-office로 push (계약 §5.2). 전송 성공 시 True. 실패(네트워크 오류/비-200)는 False — 호출부가 상태/이력 미채택 후 다음 사이클에 동일 alert를 재시도하도록 한다(멱등, at-least-once). """ url = os.getenv("AGENT_OFFICE_URL", "http://agent-office:8000") + "/api/agent-office/stock/trade-alert" try: with httpx.Client(timeout=10) as c: resp = c.post(url, json={"alerts": alerts}) return resp.status_code == 200 except httpx.HTTPError: return False