Files
web-page-backend/stock/app/trade_alerts.py

138 lines
4.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""매매 알람 — 감시대상(monitor-set) 조립. 순수 조립 로직(HTTP/텔레그램 없음).
계약 §5.1 (docs/superpowers/specs/2026-07-02-realtime-trade-alerts-design.md) —
Windows 워커가 GET /api/webai/trade-alert/monitor-set 로 받는 응답을 조립한다.
NAS는 watchlist screener 최신 성공 run 후보를 buy_targets로, 보유 종목을
sell_targets로 병합해 넘긴다. TA/조건판정은 워커 쪽 책임.
"""
import os
import httpx
from datetime import datetime, timedelta, timezone, time as _time
from typing import Optional
from app.db import get_all_portfolio, get_watchlist
_KST = timezone(timedelta(hours=9))
# KST 세션 창(시:분) — 평일+휴장 판정은 호출부에서 is_market_open으로 별도 게이팅
_SESSIONS = [
("pre", (8, 30), (9, 0)),
("regular", (9, 0), (15, 30)),
("after", (16, 0), (18, 0)),
]
def current_session(now_kst) -> str:
"""now_kst의 time만으로 pre/regular/after/closed 세션 판정 (요일·휴장 무관)."""
t = now_kst.time()
for name, (sh, sm), (eh, em) in _SESSIONS:
if _time(sh, sm) <= t < _time(eh, em):
return name
return "closed"
DEFAULT_EXIT_PARAMS = {"stop_pct": 0.08, "take_pct": 0.25, "trailing_pct": 0.10}
DEFAULT_BUY_PARAMS = {"rsi_oversold": 30, "breakout_vol_mult": 1.5, "pullback_pct": 0.02}
def latest_screener_candidates(conn) -> list:
"""최신 성공(status='success') screener run의 후보 {ticker,name} 목록."""
row = conn.execute(
"SELECT id FROM screener_runs WHERE status='success' ORDER BY asof DESC, id DESC LIMIT 1"
).fetchone()
if not row:
return []
run_id = row[0]
rows = conn.execute(
"SELECT ticker, name FROM screener_results WHERE run_id=? ORDER BY rank", (run_id,)
).fetchall()
return [{"ticker": r[0], "name": r[1]} for r in rows]
def holding_high(conn, ticker: str, lookback_days: int = 60) -> Optional[float]:
"""보유기간 고점(트레일링 스톱용) — krx_daily_prices 최근 lookback_days 최고 high."""
row = conn.execute(
"SELECT MAX(high) FROM krx_daily_prices WHERE ticker=? "
"AND date >= date('now', ?)",
(ticker, f"-{int(lookback_days)} days"),
).fetchone()
return row[0] if row and row[0] is not None else None
def build_monitor_set(conn, session: str, exit_params: dict, buy_params: dict) -> dict:
"""계약 §5.1 monitor-set 응답 dict 조립.
buy_targets = watchlist 최신 screener 후보 (ticker 기준 중복 제거, watchlist 우선)
sell_targets = 보유 종목(portfolio) + avg_price/qty/holding_high
"""
buy: dict[str, dict] = {}
for w in get_watchlist():
buy[w["ticker"]] = {
"ticker": w["ticker"], "name": w["name"],
"source": "watch", "params": w.get("params") or {},
}
for c in latest_screener_candidates(conn):
if c["ticker"] not in buy:
buy[c["ticker"]] = {
"ticker": c["ticker"], "name": c["name"],
"source": "screener", "params": {},
}
sell_targets = []
for p in get_all_portfolio():
ticker = p["ticker"]
sell_targets.append({
"ticker": ticker,
"name": p.get("name"),
"avg_price": p.get("avg_price"),
"qty": p.get("quantity"),
"holding_high": holding_high(conn, ticker),
"params": {},
})
return {
"session": session,
"as_of": datetime.now(_KST).isoformat(),
"buy_targets": list(buy.values()),
"sell_targets": sell_targets,
"buy_params": buy_params,
"exit_params": exit_params,
}
def diff_firing(reported: list, prev: set) -> dict:
"""워커 발화집합(reported) vs 직전 발화상태(prev) edge diff.
reported 각 항목: {ticker,kind,condition,price,detail,name?}.
key = (ticker,kind,condition).
반환 {"new":[신규 alert...], "cleared":[해제 key...], "seen":[현재 key...]}.
"""
cur = {}
for a in reported:
key = (a["ticker"], a["kind"], a["condition"])
cur[key] = a
cur_keys = set(cur.keys())
new_keys = cur_keys - prev
cleared = sorted(prev - cur_keys)
return {
"new": [cur[k] for k in cur_keys if k in new_keys],
"cleared": cleared,
"seen": sorted(cur_keys),
}
def notify_agent_office(alerts: list) -> bool:
"""신규 alert들을 agent-office로 push (계약 §5.2). 전송 성공 시 True.
실패(네트워크 오류/비-200)는 False — 호출부가 상태/이력 미채택 후 다음
사이클에 동일 alert를 재시도하도록 한다(멱등, at-least-once).
"""
url = os.getenv("AGENT_OFFICE_URL", "http://agent-office:8000") + "/api/agent-office/stock/trade-alert"
try:
with httpx.Client(timeout=10) as c:
resp = c.post(url, json={"alerts": alerts})
return resp.status_code == 200
except httpx.HTTPError:
return False