diff --git a/docs/superpowers/plans/2026-07-02-realtime-trade-alerts.md b/docs/superpowers/plans/2026-07-02-realtime-trade-alerts.md new file mode 100644 index 0000000..c14d152 --- /dev/null +++ b/docs/superpowers/plans/2026-07-02-realtime-trade-alerts.md @@ -0,0 +1,1055 @@ +# 실시간 매매 알람 (BE 쪽) Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** web-backend(stock + agent-office)에 실시간 매매 알람의 NAS 쪽 절반을 구현한다 — watchlist 관리, 감시대상(monitor-set) 조립, 워커 발화 report의 edge 중복판정(영속), 텔레그램(너+아내) 발송, 봇 명령. + +**Architecture:** Windows docker 워커(별도 repo)가 매 1분 monitor-set을 pull → KIS+TA로 발화집합 F를 계산 → NAS stock `/report`에 POST. stock이 F를 영속 `trade_alert_state`와 diff해 신규 edge만 agent-office로 push → 텔레그램. TA/조건판정은 워커, edge 중복판정 상태는 NAS 영속. + +**Tech Stack:** Python 3.12, FastAPI, SQLite, httpx, pytest. stock=Alpine 컨테이너(UTC — KST는 `_today_kst` 관용), agent-office=telegram 봇. + +**스펙:** `docs/superpowers/specs/2026-07-02-realtime-trade-alerts-design.md` (§5 계약이 web-ai/web-ui 세션 인터페이스). + +## Global Constraints + +- **DB 마이그레이션은 멱등** — `CREATE TABLE IF NOT EXISTS`, startup `init_db()`에서 생성. +- **KST 날짜/시각은 `_today_kst`류 명시 변환**(`utcnow()+9h`) — stock 컨테이너는 Alpine+tzdata無라 `date.today()`=UTC (`feedback_alpine_tzdata_utc`). +- **텔레그램 발송 성공 시에만 `trade_alert_state` 갱신** — 실패 시 재시도 보장(node_monitor 관용). +- **edge 중복판정 상태는 영속** — 워커/NAS 재시작해도 재알림 없음. +- **webai 엔드포인트는 `Depends(verify_webai_key)`** (stock `app/auth.py`). +- 로컬 stock 테스트: `PYTHONPATH=":/stock" python -m pytest`. +- 조건 키 네이밍(계약 고정): buy=`buy_ma20_pullback`|`buy_breakout`|`buy_rsi_bounce`, sell=`sell_stop_loss`|`sell_ma_break`|`sell_take_profit`|`sell_climax`|`sell_trailing_stop`. + +--- + +### Task 1: stock DB — watchlist / trade_alert_state / trade_alert_history 테이블 + 헬퍼 + +**Files:** +- Modify: `stock/app/db.py` (init_db에 테이블 3개 추가 + 헬퍼 함수) +- Test: `stock/tests/test_trade_alerts_db.py` (Create) + +**Interfaces:** +- Produces: + - `add_watchlist(ticker: str, name: str|None=None, note: str|None=None) -> None` (INSERT OR IGNORE) + - `remove_watchlist(ticker: str) -> bool` + - `get_watchlist() -> list[dict]` (각 `{ticker,name,note,params,added_at}`) + - `get_alert_state_firing() -> set[tuple[str,str,str]]` (currently_firing=1인 `(ticker,kind,condition)`) + - `set_alert_firing(ticker,kind,condition, firing: bool, at_iso: str|None=None) -> None` (upsert; firing=True면 first/last_fired 갱신) + - `touch_alert_seen(keys: list[tuple], at_iso: str) -> None` + - `add_alert_history(ticker,name,kind,condition,price,detail: dict) -> int` + - `get_alert_history(days: int=7) -> list[dict]` + +- [ ] **Step 1: 실패 테스트 작성** — `stock/tests/test_trade_alerts_db.py` + +```python +import os, sqlite3, tempfile, datetime as dt +import pytest + +@pytest.fixture +def db(monkeypatch, tmp_path): + from app import db as _db + monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db")) + _db.init_db() + return _db + +def test_watchlist_add_get_remove(db): + db.add_watchlist("005930", "삼성전자", note="관심") + db.add_watchlist("005930", "삼성전자") # 멱등 + wl = db.get_watchlist() + assert [w["ticker"] for w in wl] == ["005930"] + assert wl[0]["name"] == "삼성전자" + assert db.remove_watchlist("005930") is True + assert db.get_watchlist() == [] + +def test_alert_state_edge_firing_and_clear(db): + key = ("005930", "buy", "buy_breakout") + assert db.get_alert_state_firing() == set() + db.set_alert_firing(*key, firing=True, at_iso="2026-07-02T00:01:00Z") + assert key in db.get_alert_state_firing() + db.set_alert_firing(*key, firing=False) + assert key not in db.get_alert_state_firing() + +def test_alert_history_records_and_reads(db): + db.add_alert_history("005930", "삼성전자", "buy", "buy_breakout", 71500, {"vol": 2.1}) + rows = db.get_alert_history(days=7) + assert len(rows) == 1 + assert rows[0]["ticker"] == "005930" and rows[0]["kind"] == "buy" + assert rows[0]["detail"]["vol"] == 2.1 +``` + +- [ ] **Step 2: 실패 확인** + +Run: `PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_db.py -q` +Expected: FAIL (`AttributeError: module 'app.db' has no attribute 'add_watchlist'`) + +- [ ] **Step 3: 최소 구현** — `stock/app/db.py` + +`init_db()` 안(기존 CREATE 블록들 뒤, seed 전)에 추가: + +```python + conn.execute(""" + CREATE TABLE IF NOT EXISTS watchlist ( + ticker TEXT PRIMARY KEY, + name TEXT, + note TEXT, + params_json TEXT NOT NULL DEFAULT '{}', + added_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) + ) + """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS trade_alert_state ( + ticker TEXT NOT NULL, + kind TEXT NOT NULL, + condition TEXT NOT NULL, + currently_firing INTEGER NOT NULL DEFAULT 0, + first_fired_at TEXT, + last_fired_at TEXT, + last_seen_at TEXT, + PRIMARY KEY (ticker, kind, condition) + ) + """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS trade_alert_history ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ticker TEXT NOT NULL, + name TEXT, + kind TEXT NOT NULL, + condition TEXT NOT NULL, + price REAL, + detail_json TEXT, + fired_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) + ) + """) + conn.execute("CREATE INDEX IF NOT EXISTS idx_tah_fired ON trade_alert_history(fired_at DESC)") +``` + +모듈 하단에 헬퍼 추가 (`import json` 상단에 이미 있음): + +```python +def add_watchlist(ticker: str, name: str = None, note: str = None) -> None: + with _conn() as conn: + conn.execute( + "INSERT OR IGNORE INTO watchlist(ticker,name,note) VALUES(?,?,?)", + (ticker, name, note), + ) + # 이름/노트 갱신(이미 있으면) + conn.execute( + "UPDATE watchlist SET name=COALESCE(?,name), note=COALESCE(?,note) WHERE ticker=?", + (name, note, ticker), + ) + + +def remove_watchlist(ticker: str) -> bool: + with _conn() as conn: + cur = conn.execute("DELETE FROM watchlist WHERE ticker=?", (ticker,)) + return cur.rowcount > 0 + + +def get_watchlist() -> list: + with _conn() as conn: + rows = conn.execute("SELECT * FROM watchlist ORDER BY added_at").fetchall() + return [ + {"ticker": r["ticker"], "name": r["name"], "note": r["note"], + "params": json.loads(r["params_json"] or "{}"), "added_at": r["added_at"]} + for r in rows + ] + + +def get_alert_state_firing() -> set: + with _conn() as conn: + rows = conn.execute( + "SELECT ticker,kind,condition FROM trade_alert_state WHERE currently_firing=1" + ).fetchall() + return {(r["ticker"], r["kind"], r["condition"]) for r in rows} + + +def set_alert_firing(ticker: str, kind: str, condition: str, firing: bool, at_iso: str = None) -> None: + now = at_iso or _now_iso() + with _conn() as conn: + if firing: + conn.execute( + """INSERT INTO trade_alert_state(ticker,kind,condition,currently_firing,first_fired_at,last_fired_at,last_seen_at) + VALUES(?,?,?,1,?,?,?) + ON CONFLICT(ticker,kind,condition) DO UPDATE SET + currently_firing=1, + first_fired_at=COALESCE(first_fired_at,excluded.first_fired_at), + last_fired_at=excluded.last_fired_at, + last_seen_at=excluded.last_seen_at""", + (ticker, kind, condition, now, now, now), + ) + else: + conn.execute( + "UPDATE trade_alert_state SET currently_firing=0, last_seen_at=? WHERE ticker=? AND kind=? AND condition=?", + (now, ticker, kind, condition), + ) + + +def touch_alert_seen(keys: list, at_iso: str) -> None: + with _conn() as conn: + for (ticker, kind, condition) in keys: + conn.execute( + "UPDATE trade_alert_state SET last_seen_at=? WHERE ticker=? AND kind=? AND condition=?", + (at_iso, ticker, kind, condition), + ) + + +def add_alert_history(ticker: str, name: str, kind: str, condition: str, price, detail: dict) -> int: + with _conn() as conn: + cur = conn.execute( + "INSERT INTO trade_alert_history(ticker,name,kind,condition,price,detail_json) VALUES(?,?,?,?,?,?)", + (ticker, name, kind, condition, price, json.dumps(detail or {}, ensure_ascii=False)), + ) + return cur.lastrowid + + +def get_alert_history(days: int = 7) -> list: + with _conn() as conn: + rows = conn.execute( + "SELECT * FROM trade_alert_history WHERE fired_at >= datetime('now', ?) ORDER BY fired_at DESC", + (f"-{int(days)} days",), + ).fetchall() + return [ + {"id": r["id"], "ticker": r["ticker"], "name": r["name"], "kind": r["kind"], + "condition": r["condition"], "price": r["price"], + "detail": json.loads(r["detail_json"] or "{}"), "fired_at": r["fired_at"]} + for r in rows + ] +``` + +`_now_iso` 헬퍼가 없으면 추가: +```python +def _now_iso() -> str: + return dt.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%fZ") +``` +(파일 상단에 `import datetime as dt` 확인 — 없으면 추가) + +- [ ] **Step 4: 통과 확인** + +Run: `PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_db.py -q` +Expected: PASS (3 passed) + +- [ ] **Step 5: 커밋** + +```bash +git add stock/app/db.py stock/tests/test_trade_alerts_db.py +git commit -m "feat(stock): 매매알람 DB — watchlist/alert_state/history 테이블+헬퍼" +``` + +--- + +### Task 2: stock — watchlist CRUD API + 알람 이력 API (public) + +**Files:** +- Modify: `stock/app/main.py` (엔드포인트 4개 추가) +- Test: `stock/tests/test_watchlist_api.py` (Create) + +**Interfaces:** +- Consumes: Task 1의 `add_watchlist`/`remove_watchlist`/`get_watchlist`/`get_alert_history` +- Produces (계약 §5.3): + - `GET /api/stock/watchlist` → `{"watchlist":[...]}` + - `POST /api/stock/watchlist` body `{ticker, name?, note?}` → 201 `{"ok":true}` + - `DELETE /api/stock/watchlist/{ticker}` → 200/404 + - `GET /api/stock/trade-alerts?days=N` → `{"alerts":[...]}` + +- [ ] **Step 1: 실패 테스트** — `stock/tests/test_watchlist_api.py` + +```python +import pytest +from fastapi.testclient import TestClient + +@pytest.fixture +def client(monkeypatch, tmp_path): + from app import db as _db + monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db")) + _db.init_db() + from app.main import app + return TestClient(app) + +def test_watchlist_crud(client): + assert client.get("/api/stock/watchlist").json()["watchlist"] == [] + r = client.post("/api/stock/watchlist", json={"ticker": "005930", "name": "삼성전자"}) + assert r.status_code == 201 + wl = client.get("/api/stock/watchlist").json()["watchlist"] + assert wl[0]["ticker"] == "005930" + assert client.delete("/api/stock/watchlist/005930").status_code == 200 + assert client.delete("/api/stock/watchlist/005930").status_code == 404 + +def test_trade_alerts_history_empty(client): + assert client.get("/api/stock/trade-alerts?days=7").json()["alerts"] == [] +``` + +- [ ] **Step 2: 실패 확인** + +Run: `PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_watchlist_api.py -q` +Expected: FAIL (404 on /api/stock/watchlist) + +- [ ] **Step 3: 구현** — `stock/app/main.py` (기존 portfolio 엔드포인트 근처에 추가) + +상단 import에 db 헬퍼 추가: +```python +from .db import add_watchlist, remove_watchlist, get_watchlist, get_alert_history +``` + +Pydantic 모델(파일의 다른 Request 모델 근처): +```python +class WatchlistItemRequest(BaseModel): + ticker: str + name: str | None = None + note: str | None = None +``` + +엔드포인트: +```python +@app.get("/api/stock/watchlist") +def list_watchlist(): + return {"watchlist": get_watchlist()} + + +@app.post("/api/stock/watchlist", status_code=201) +def create_watchlist_item(req: WatchlistItemRequest): + add_watchlist(req.ticker, req.name, req.note) + return {"ok": True} + + +@app.delete("/api/stock/watchlist/{ticker}") +def delete_watchlist_item(ticker: str): + if not remove_watchlist(ticker): + raise HTTPException(status_code=404, detail="not in watchlist") + return {"ok": True} + + +@app.get("/api/stock/trade-alerts") +def list_trade_alerts(days: int = 7): + return {"alerts": get_alert_history(days)} +``` + +- [ ] **Step 4: 통과 확인** + +Run: `PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_watchlist_api.py -q` +Expected: PASS + +- [ ] **Step 5: 커밋** + +```bash +git add stock/app/main.py stock/tests/test_watchlist_api.py +git commit -m "feat(stock): watchlist CRUD + 알람 이력 API" +``` + +--- + +### Task 3: stock — 감시대상 조립 헬퍼 (monitor-set 데이터) + +**Files:** +- Create: `stock/app/trade_alerts.py` (순수 조립 로직 — 테스트 쉬움) +- Test: `stock/tests/test_trade_alerts_monitorset.py` (Create) + +**Interfaces:** +- Consumes: `db.get_watchlist()`, `db.get_all_portfolio()`, 최신 screener_results +- Produces: + - `def latest_screener_candidates(conn) -> list[dict]` — 최신 성공 run의 `{ticker,name}` + - `def build_monitor_set(conn, session: str, exit_params: dict, buy_params: dict) -> dict` — 계약 §5.1 응답 dict(`buy_targets`=watchlist∪screener 중복제거, `sell_targets`=보유+avg_price/qty/holding_high) + - `def holding_high(conn, ticker: str, lookback_days: int = 60) -> float|None` — `krx_daily_prices` 최근 lookback 최고 high + +- [ ] **Step 1: 실패 테스트** — `stock/tests/test_trade_alerts_monitorset.py` + +```python +import sqlite3, pytest + +@pytest.fixture +def conn(monkeypatch, tmp_path): + from app import db as _db + monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db")) + _db.init_db() + c = sqlite3.connect(_db.DB_PATH); c.row_factory = sqlite3.Row + # 보유 1종목 + _db.add_portfolio_item(ticker="000660", name="SK하이닉스", quantity=10, + avg_price=180000, broker="kis", market="KR") + # watchlist 1종목 + _db.add_watchlist("005930", "삼성전자") + yield c + c.close() + +def test_build_monitor_set_merges_sources(conn): + from app import trade_alerts as ta + ms = ta.build_monitor_set(conn, session="regular", + exit_params={"stop_pct": 0.08}, buy_params={"rsi_oversold": 30}) + buy_tickers = {t["ticker"] for t in ms["buy_targets"]} + sell_tickers = {t["ticker"] for t in ms["sell_targets"]} + assert "005930" in buy_tickers # watchlist + assert "000660" in sell_tickers # 보유 + assert ms["session"] == "regular" + assert ms["exit_params"]["stop_pct"] == 0.08 + sell = next(t for t in ms["sell_targets"] if t["ticker"] == "000660") + assert sell["avg_price"] == 180000 and sell["qty"] == 10 +``` + +(참고: `add_portfolio_item` 실제 시그니처는 `stock/app/db.py:167`에서 확인해 인자명 맞출 것.) + +- [ ] **Step 2: 실패 확인** + +Run: `PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_monitorset.py -q` +Expected: FAIL (`ModuleNotFoundError: app.trade_alerts`) + +- [ ] **Step 3: 구현** — `stock/app/trade_alerts.py` + +```python +"""매매 알람 — 감시대상(monitor-set) 조립. 순수 조립 로직(HTTP/텔레그램 없음).""" +from typing import Optional +from .db import get_watchlist, get_all_portfolio + + +def latest_screener_candidates(conn) -> list: + row = conn.execute( + "SELECT id FROM screener_runs WHERE status='success' ORDER BY asof DESC, id DESC LIMIT 1" + ).fetchone() + if not row: + return [] + run_id = row[0] + rows = conn.execute( + "SELECT ticker,name FROM screener_results WHERE run_id=? ORDER BY rank", (run_id,) + ).fetchall() + return [{"ticker": r[0], "name": r[1]} for r in rows] + + +def holding_high(conn, ticker: str, lookback_days: int = 60) -> Optional[float]: + row = conn.execute( + "SELECT MAX(high) FROM krx_daily_prices WHERE ticker=? " + "AND date >= date('now', ?)", + (ticker, f"-{int(lookback_days)} days"), + ).fetchone() + return row[0] if row and row[0] is not None else None + + +def build_monitor_set(conn, session: str, exit_params: dict, buy_params: dict) -> dict: + from datetime import datetime, timezone, timedelta + kst = timezone(timedelta(hours=9)) + watch = {w["ticker"]: {"ticker": w["ticker"], "name": w["name"], "source": "watch", + "params": w.get("params", {})} + for w in get_watchlist()} + for c in latest_screener_candidates(conn): + if c["ticker"] not in watch: + watch[c["ticker"]] = {"ticker": c["ticker"], "name": c["name"], + "source": "screener", "params": {}} + sell_targets = [] + for p in get_all_portfolio(): + if (p.get("market") or "KR") != "KR": + continue # KRX만 실시간 TA 대상(미국주 skip — 스펙 §12/제약) + t = p["ticker"] + sell_targets.append({ + "ticker": t, "name": p.get("name"), + "avg_price": p.get("avg_price"), "qty": p.get("quantity"), + "holding_high": holding_high(conn, t), "params": {}, + }) + return { + "session": session, + "as_of": datetime.now(kst).isoformat(), + "buy_targets": list(watch.values()), + "sell_targets": sell_targets, + "buy_params": buy_params, + "exit_params": exit_params, + } +``` + +(주의: `get_all_portfolio()` 반환 dict의 키명(`avg_price`/`quantity`/`market`/`name`)을 `stock/app/db.py:182`에서 확인해 맞출 것. 다르면 매핑 조정.) + +- [ ] **Step 4: 통과 확인** + +Run: `PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_monitorset.py -q` +Expected: PASS + +- [ ] **Step 5: 커밋** + +```bash +git add stock/app/trade_alerts.py stock/tests/test_trade_alerts_monitorset.py +git commit -m "feat(stock): 감시대상(monitor-set) 조립 로직" +``` + +--- + +### Task 4: stock — edge diff 로직 (신규/해제/재무장, 순수 함수) + +**Files:** +- Modify: `stock/app/trade_alerts.py` (`diff_firing` 추가) +- Test: `stock/tests/test_trade_alerts_edge.py` (Create) + +**Interfaces:** +- Produces: + - `def diff_firing(reported: list[dict], prev: set[tuple]) -> dict` — 반환 `{"new":[alert...], "cleared":[key...], "seen":[key...]}`. reported 각 항목 `{ticker,kind,condition,price,detail,name?}`. key=`(ticker,kind,condition)`. + +- [ ] **Step 1: 실패 테스트** — `stock/tests/test_trade_alerts_edge.py` + +```python +def test_diff_new_and_cleared_and_rearm(): + from app.trade_alerts import diff_firing + reported = [{"ticker": "005930", "kind": "buy", "condition": "buy_breakout", + "price": 71500, "detail": {}}] + # 최초: prev 비어있음 → 신규 + d1 = diff_firing(reported, prev=set()) + assert [a["condition"] for a in d1["new"]] == ["buy_breakout"] + assert d1["cleared"] == [] + # 유지: prev에 이미 있음 → 신규 없음 + prev = {("005930", "buy", "buy_breakout")} + d2 = diff_firing(reported, prev=prev) + assert d2["new"] == [] + # 해제: reported 비었고 prev에 있음 → cleared + d3 = diff_firing([], prev=prev) + assert d3["cleared"] == [("005930", "buy", "buy_breakout")] + # 재무장 후 재발화: prev 다시 비면 신규 + d4 = diff_firing(reported, prev=set()) + assert len(d4["new"]) == 1 +``` + +- [ ] **Step 2: 실패 확인** + +Run: `PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_edge.py -q` +Expected: FAIL (`ImportError: cannot import name 'diff_firing'`) + +- [ ] **Step 3: 구현** — `stock/app/trade_alerts.py` 하단에 추가 + +```python +def diff_firing(reported: list, prev: set) -> dict: + cur = {} + for a in reported: + key = (a["ticker"], a["kind"], a["condition"]) + cur[key] = a + cur_keys = set(cur.keys()) + new_keys = cur_keys - prev + cleared = sorted(prev - cur_keys) + return { + "new": [cur[k] for k in cur_keys if k in new_keys], + "cleared": cleared, + "seen": sorted(cur_keys), + } +``` + +- [ ] **Step 4: 통과 확인** + +Run: `PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_edge.py -q` +Expected: PASS + +- [ ] **Step 5: 커밋** + +```bash +git add stock/app/trade_alerts.py stock/tests/test_trade_alerts_edge.py +git commit -m "feat(stock): edge diff(신규/해제/재무장) 순수 함수" +``` + +--- + +### Task 5: stock — session 판정 + webai monitor-set 엔드포인트 + +**Files:** +- Modify: `stock/app/trade_alerts.py` (`current_session`) +- Modify: `stock/app/main.py` (`GET /api/webai/trade-alert/monitor-set`) +- Test: `stock/tests/test_trade_alerts_monitorset_api.py` (Create) + +**Interfaces:** +- Consumes: Task3 `build_monitor_set`, 기존 `is_market_open`, `verify_webai_key` +- Produces: + - `def current_session(now_kst) -> str` — pre/regular/after/closed (평일+휴장 반영은 호출부에서 `is_market_open`으로 게이팅) + - `GET /api/webai/trade-alert/monitor-set` (webai 인증) → 계약 §5.1 + +- [ ] **Step 1: 실패 테스트** — `stock/tests/test_trade_alerts_monitorset_api.py` + +```python +import datetime as dt, pytest +from fastapi.testclient import TestClient + +def test_current_session_windows(): + from app.trade_alerts import current_session + d = dt.date(2026, 7, 2) + assert current_session(dt.datetime.combine(d, dt.time(8, 40))) == "pre" + assert current_session(dt.datetime.combine(d, dt.time(10, 0))) == "regular" + assert current_session(dt.datetime.combine(d, dt.time(17, 0))) == "after" + assert current_session(dt.datetime.combine(d, dt.time(20, 0))) == "closed" + +@pytest.fixture +def client(monkeypatch, tmp_path): + from app import db as _db + monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db")) + _db.init_db() + monkeypatch.setenv("WEBAI_API_KEY", "k") + from app.main import app + return TestClient(app) + +def test_monitor_set_requires_auth(client): + assert client.get("/api/webai/trade-alert/monitor-set").status_code == 401 + +def test_monitor_set_ok(client): + r = client.get("/api/webai/trade-alert/monitor-set", headers={"X-WebAI-Key": "k"}) + assert r.status_code == 200 + body = r.json() + assert body["session"] in ("pre", "regular", "after", "closed") + assert "buy_targets" in body and "sell_targets" in body + assert body["exit_params"]["trailing_pct"] == 0.10 +``` + +- [ ] **Step 2: 실패 확인** + +Run: `PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_monitorset_api.py -q` +Expected: FAIL + +- [ ] **Step 3: 구현** + +`stock/app/trade_alerts.py`: +```python +import datetime as _dt + +# KST 세션 창(시:분) +_SESSIONS = [ + ("pre", (8, 30), (9, 0)), + ("regular", (9, 0), (15, 30)), + ("after", (16, 0), (18, 0)), +] + +def current_session(now_kst) -> str: + t = now_kst.time() + for name, (sh, sm), (eh, em) in _SESSIONS: + if _dt.time(sh, sm) <= t < _dt.time(eh, em): + return name + return "closed" + +DEFAULT_EXIT_PARAMS = {"stop_pct": 0.08, "take_pct": 0.25, "trailing_pct": 0.10} +DEFAULT_BUY_PARAMS = {"rsi_oversold": 30, "breakout_vol_mult": 1.5, "pullback_pct": 0.02} +``` + +`stock/app/main.py` (webai 섹션): +```python +from .trade_alerts import ( + build_monitor_set, current_session, DEFAULT_EXIT_PARAMS, DEFAULT_BUY_PARAMS, +) + +@app.get("/api/webai/trade-alert/monitor-set", dependencies=[Depends(verify_webai_key)]) +def get_trade_alert_monitor_set(): + from datetime import datetime, timezone, timedelta, date as _date + kst = timezone(timedelta(hours=9)) + now_kst = datetime.now(kst) + session = current_session(now_kst) + if not is_market_open(now_kst.date()): + session = "closed" + with _conn() as c: + return build_monitor_set(c, session, DEFAULT_EXIT_PARAMS, DEFAULT_BUY_PARAMS) +``` +(주의: `_conn`은 stock main.py에서 db._conn 사용 형태 확인 — 기존 엔드포인트가 `with _conn() as c:` 쓰는지 보고 동일 패턴. 없으면 `from .db import _conn`.) + +- [ ] **Step 4: 통과 확인** + +Run: `PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_monitorset_api.py -q` +Expected: PASS + +- [ ] **Step 5: 커밋** + +```bash +git add stock/app/trade_alerts.py stock/app/main.py stock/tests/test_trade_alerts_monitorset_api.py +git commit -m "feat(stock): session 판정 + webai monitor-set 엔드포인트" +``` + +--- + +### Task 6: stock — webai report 엔드포인트 (edge diff → agent-office push → 상태/이력) + +**Files:** +- Modify: `stock/app/main.py` (`POST /api/webai/trade-alert/report`) +- Modify: `stock/app/trade_alerts.py` (`notify_agent_office` httpx 헬퍼) +- Test: `stock/tests/test_trade_alerts_report_api.py` (Create) + +**Interfaces:** +- Consumes: Task1 `get_alert_state_firing`/`set_alert_firing`/`touch_alert_seen`/`add_alert_history`, Task4 `diff_firing` +- Produces (계약 §5.2): + - `POST /api/webai/trade-alert/report` body `{as_of, firing:[...]}` → `{"new_alerts":N,"cleared":M}` + - agent-office로 신규 alert push (`AGENT_OFFICE_URL` env, 기본 `http://agent-office:8000`), **전송 성공 시에만** `set_alert_firing(True)` + `add_alert_history` + +- [ ] **Step 1: 실패 테스트** — `stock/tests/test_trade_alerts_report_api.py` + +```python +import pytest +from unittest.mock import patch +from fastapi.testclient import TestClient + +@pytest.fixture +def client(monkeypatch, tmp_path): + from app import db as _db + monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db")) + _db.init_db() + monkeypatch.setenv("WEBAI_API_KEY", "k") + from app.main import app + return TestClient(app) + +def _report(client, firing): + return client.post("/api/webai/trade-alert/report", + headers={"X-WebAI-Key": "k"}, + json={"as_of": "2026-07-02T09:01:00+09:00", "firing": firing}) + +def test_report_new_edge_sends_and_persists(client): + firing = [{"ticker": "005930", "name": "삼성전자", "kind": "buy", + "condition": "buy_breakout", "price": 71500, "detail": {"vol": 2.0}}] + with patch("app.trade_alerts.notify_agent_office", return_value=True) as m: + r1 = _report(client, firing) + assert r1.json()["new_alerts"] == 1 + assert m.called + # 2번째 동일 firing → 유지, 신규 0 + with patch("app.trade_alerts.notify_agent_office", return_value=True): + r2 = _report(client, firing) + assert r2.json()["new_alerts"] == 0 + # 이력 1건 + assert len(client.get("/api/stock/trade-alerts?days=1").json()["alerts"]) == 1 + +def test_report_send_failure_does_not_persist(client): + firing = [{"ticker": "005930", "name": "삼성전자", "kind": "buy", + "condition": "buy_breakout", "price": 71500, "detail": {}}] + with patch("app.trade_alerts.notify_agent_office", return_value=False): + r = _report(client, firing) + assert r.json()["new_alerts"] == 0 # 전송 실패 → 미채택 + # 다음 사이클(전송 성공) 재시도되어 알림 + with patch("app.trade_alerts.notify_agent_office", return_value=True): + r2 = _report(client, firing) + assert r2.json()["new_alerts"] == 1 + +def test_report_cleared_rearm(client): + firing = [{"ticker": "005930", "name": "삼성", "kind": "buy", + "condition": "buy_breakout", "price": 71500, "detail": {}}] + with patch("app.trade_alerts.notify_agent_office", return_value=True): + _report(client, firing) + _report(client, []) # 해제 + r = _report(client, firing) # 재발화 + assert r.json()["new_alerts"] == 1 +``` + +- [ ] **Step 2: 실패 확인** + +Run: `PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_report_api.py -q` +Expected: FAIL (404) + +- [ ] **Step 3: 구현** + +`stock/app/trade_alerts.py`: +```python +import os +import httpx + +def notify_agent_office(alerts: list) -> bool: + """신규 alert들을 agent-office로 push. 성공 True.""" + url = os.getenv("AGENT_OFFICE_URL", "http://agent-office:8000") + "/api/agent-office/stock/trade-alert" + try: + with httpx.Client(timeout=10) as c: + resp = c.post(url, json={"alerts": alerts}) + return resp.status_code == 200 + except httpx.HTTPError: + return False +``` + +`stock/app/main.py`: +```python +from .trade_alerts import diff_firing, notify_agent_office +from .db import ( + get_alert_state_firing, set_alert_firing, touch_alert_seen, add_alert_history, +) + +class TradeAlertReport(BaseModel): + as_of: str | None = None + firing: list[dict] = [] + +@app.post("/api/webai/trade-alert/report", dependencies=[Depends(verify_webai_key)]) +def post_trade_alert_report(body: TradeAlertReport): + prev = get_alert_state_firing() + d = diff_firing(body.firing, prev) + sent = 0 + for a in d["new"]: + ok = notify_agent_office([a]) + if ok: + set_alert_firing(a["ticker"], a["kind"], a["condition"], firing=True, at_iso=body.as_of) + add_alert_history(a["ticker"], a.get("name"), a["kind"], a["condition"], + a.get("price"), a.get("detail", {})) + sent += 1 + for key in d["cleared"]: + set_alert_firing(key[0], key[1], key[2], firing=False) + touch_alert_seen(d["seen"], body.as_of or "") + return {"new_alerts": sent, "cleared": len(d["cleared"])} +``` + +- [ ] **Step 4: 통과 확인** + +Run: `PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_report_api.py -q` +Expected: PASS (3 passed) + +- [ ] **Step 5: 커밋** + +```bash +git add stock/app/main.py stock/app/trade_alerts.py stock/tests/test_trade_alerts_report_api.py +git commit -m "feat(stock): webai report — edge diff→agent-office push→상태/이력(전송성공시만)" +``` + +--- + +### Task 7: agent-office — trade-alert notify 엔드포인트 (텔레그램 너+아내) + +**Files:** +- Modify: `agent-office/app/main.py` (`POST /api/agent-office/stock/trade-alert`) +- Create: `agent-office/app/notifiers/telegram_trade.py` (포맷+전송) +- Test: `agent-office/tests/test_trade_alert_notify.py` (Create) + +**Interfaces:** +- Consumes: `telegram.messaging.send_raw`, `config.TELEGRAM_CHAT_ID`, `config.TELEGRAM_WIFE_CHAT_ID` +- Produces: + - `format_trade_alert(alert: dict) -> str` (HTML) + - `async def send_trade_alerts(alerts: list) -> dict` — 너+아내 각각 send_raw. `{"sent": n, "ok": bool}` + - `POST /api/agent-office/stock/trade-alert` body `{alerts:[...]}` → 위 호출 + `add_log` + +- [ ] **Step 1: 실패 테스트** — `agent-office/tests/test_trade_alert_notify.py` + +(파일 상단은 기존 `test_youtube_publisher_retry.py`의 `_TMP`/`_init_db` 픽스처 패턴 복사 — `monkeypatch.setattr(app.db, "DB_PATH", _TMP)` + `-wal/-shm` 삭제) + +```python +import pytest +from unittest.mock import AsyncMock, patch + +@pytest.mark.asyncio +async def test_send_trade_alerts_to_user_and_wife(): + from app.notifiers import telegram_trade + alerts = [{"ticker": "005930", "name": "삼성전자", "kind": "buy", + "condition": "buy_breakout", "price": 71500, "detail": {}}] + with patch("app.notifiers.telegram_trade.send_raw", + new=AsyncMock(return_value={"ok": True})) as m, \ + patch("app.notifiers.telegram_trade.TELEGRAM_CHAT_ID", "U"), \ + patch("app.notifiers.telegram_trade.TELEGRAM_WIFE_CHAT_ID", "W"): + res = await telegram_trade.send_trade_alerts(alerts) + assert res["ok"] is True + chat_ids = {c.kwargs.get("chat_id") for c in m.await_args_list} + assert chat_ids == {"U", "W"} # 둘 다 발송 + +@pytest.mark.asyncio +async def test_format_trade_alert_has_direction(): + from app.notifiers.telegram_trade import format_trade_alert + txt = format_trade_alert({"ticker": "005930", "name": "삼성전자", "kind": "sell", + "condition": "sell_stop_loss", "price": 60000, "detail": {}}) + assert "매도" in txt and "삼성전자" in txt +``` + +- [ ] **Step 2: 실패 확인** + +Run: `cd agent-office && python -m pytest tests/test_trade_alert_notify.py -q` +Expected: FAIL (`ModuleNotFoundError: app.notifiers.telegram_trade`) + +- [ ] **Step 3: 구현** — `agent-office/app/notifiers/telegram_trade.py` + +```python +"""매매 알람 텔레그램 포맷+전송 (너+아내).""" +from ..telegram.messaging import send_raw +from ..config import TELEGRAM_CHAT_ID, TELEGRAM_WIFE_CHAT_ID + +_KIND_LABEL = {"buy": "🟢 매수", "sell": "🔴 매도"} +_COND_LABEL = { + "buy_ma20_pullback": "지지선 되돌림", "buy_breakout": "돌파", "buy_rsi_bounce": "RSI 과매도 반등", + "sell_stop_loss": "손절", "sell_ma_break": "이평 이탈", "sell_take_profit": "익절", + "sell_climax": "급등 소진", "sell_trailing_stop": "트레일링 스톱", +} + +def format_trade_alert(a: dict) -> str: + kind = _KIND_LABEL.get(a["kind"], a["kind"]) + cond = _COND_LABEL.get(a["condition"], a["condition"]) + name = a.get("name") or a["ticker"] + price = a.get("price") + price_s = f"{int(price):,}원" if price else "-" + return f"{kind} 알람\n{name} ({a['ticker']})\n조건: {cond}\n현재가: {price_s}" + +async def send_trade_alerts(alerts: list) -> dict: + sent = 0 + all_ok = True + chat_ids = [c for c in (TELEGRAM_CHAT_ID, TELEGRAM_WIFE_CHAT_ID) if c] + for a in alerts: + text = format_trade_alert(a) + for cid in chat_ids: + r = await send_raw(text=text, chat_id=cid) + if r.get("ok"): + sent += 1 + else: + all_ok = False + return {"sent": sent, "ok": all_ok} +``` + +`agent-office/app/main.py`: +```python +class TradeAlertBody(BaseModel): + alerts: list[dict] = [] + +@app.post("/api/agent-office/stock/trade-alert") +async def stock_trade_alert(body: TradeAlertBody): + from .notifiers.telegram_trade import send_trade_alerts + res = await send_trade_alerts(body.alerts) + for a in body.alerts: + add_log("stock", f"매매알람 {a.get('kind')} {a.get('ticker')} {a.get('condition')}", "info") + return res +``` +(`add_log` import 확인 — main.py 상단에 이미 있을 것. 없으면 `from .db import add_log`.) + +- [ ] **Step 4: 통과 확인** + +Run: `cd agent-office && python -m pytest tests/test_trade_alert_notify.py -q` +Expected: PASS + +- [ ] **Step 5: 커밋** + +```bash +git add agent-office/app/notifiers/telegram_trade.py agent-office/app/main.py agent-office/tests/test_trade_alert_notify.py +git commit -m "feat(agent-office): 매매알람 텔레그램 notify(너+아내) 엔드포인트" +``` + +--- + +### Task 8: agent-office — 봇 명령 /watch /unwatch /watchlist + +**Files:** +- Modify: `agent-office/app/service_proxy.py` (stock watchlist CRUD 호출 헬퍼) +- Modify: `agent-office/app/telegram/webhook.py` (`_handle_message`에 /watch류 처리) +- Test: `agent-office/tests/test_watch_commands.py` (Create) + +**Interfaces:** +- Consumes: 기존 `service_proxy` HTTP 패턴, stock `POST/DELETE/GET /api/stock/watchlist` +- Produces: + - `service_proxy.watchlist_add(ticker) / watchlist_remove(ticker) / watchlist_list()` + - webhook `_handle_message`가 `/watch `, `/unwatch `, `/watchlist`를 인식해 텔레그램 응답 + +- [ ] **Step 1: 실패 테스트** — `agent-office/tests/test_watch_commands.py` + +```python +import pytest +from unittest.mock import AsyncMock, patch + +@pytest.mark.asyncio +async def test_watch_command_calls_add(): + from app.telegram import webhook + msg = {"chat": {"id": 1}, "text": "/watch 005930"} + with patch("app.telegram.webhook.service_proxy.watchlist_add", + new=AsyncMock(return_value={"ok": True})) as m, \ + patch("app.telegram.webhook.api_call", new=AsyncMock(return_value={"ok": True})): + handled = await webhook.handle_watch_command(msg) + assert handled is True + m.assert_awaited_once_with("005930") + +@pytest.mark.asyncio +async def test_non_watch_text_ignored(): + from app.telegram import webhook + msg = {"chat": {"id": 1}, "text": "안녕"} + assert await webhook.handle_watch_command(msg) is False +``` + +- [ ] **Step 2: 실패 확인** + +Run: `cd agent-office && python -m pytest tests/test_watch_commands.py -q` +Expected: FAIL (`AttributeError: handle_watch_command`) + +- [ ] **Step 3: 구현** + +`agent-office/app/service_proxy.py` (기존 httpx 패턴 따라 — STOCK_URL 사용): +```python +async def watchlist_add(ticker: str) -> dict: + return await _post(f"{STOCK_URL}/api/stock/watchlist", {"ticker": ticker}) + +async def watchlist_remove(ticker: str) -> dict: + return await _delete(f"{STOCK_URL}/api/stock/watchlist/{ticker}") + +async def watchlist_list() -> dict: + return await _get(f"{STOCK_URL}/api/stock/watchlist") +``` +(`_post/_get/_delete`·`STOCK_URL`은 service_proxy.py 기존 헬퍼 확인해 맞춤. 없으면 httpx로 직접.) + +`agent-office/app/telegram/webhook.py` (상단 `from .. import service_proxy`, `from .client import api_call`): +```python +async def handle_watch_command(message: dict) -> bool: + """/watch /unwatch /watchlist 처리. 처리했으면 True.""" + text = (message.get("text") or "").strip() + chat_id = message.get("chat", {}).get("id") + parts = text.split() + cmd = parts[0].lower() if parts else "" + if cmd == "/watch" and len(parts) >= 2: + await service_proxy.watchlist_add(parts[1]) + reply = f"관심종목 추가: {parts[1]}" + elif cmd == "/unwatch" and len(parts) >= 2: + await service_proxy.watchlist_remove(parts[1]) + reply = f"관심종목 삭제: {parts[1]}" + elif cmd == "/watchlist": + res = await service_proxy.watchlist_list() + items = res.get("watchlist", []) + reply = "관심종목:\n" + ("\n".join(f"- {w['name'] or ''} ({w['ticker']})" for w in items) or "(없음)") + else: + return False + await api_call("sendMessage", {"chat_id": chat_id, "text": reply}) + return True +``` + +`handle_webhook`의 message 분기(현재 line 26 근처)에서 slash 명령보다 먼저 시도: +```python + if message and message.get("text"): + if await handle_watch_command(message): + return None + if message and message.get("text") and agent_dispatcher is not None: + return await _handle_message(message, agent_dispatcher) +``` + +- [ ] **Step 4: 통과 확인** + +Run: `cd agent-office && python -m pytest tests/test_watch_commands.py -q` +Expected: PASS + +- [ ] **Step 5: 커밋** + +```bash +git add agent-office/app/service_proxy.py agent-office/app/telegram/webhook.py agent-office/tests/test_watch_commands.py +git commit -m "feat(agent-office): /watch /unwatch /watchlist 봇 명령" +``` + +--- + +### Task 9: 회귀 + 메모리/카탈로그 갱신 + 배포 + +**Files:** +- Modify: `CLAUDE.md` (§9 stock/agent-office 엔드포인트 표에 신규 항목) +- Modify: 메모리 `service_stock.md`, `service_agent_office.md`, `infra_distributed_workers.md`(trade-monitor 워커 계약) + +- [ ] **Step 1: stock 전체 회귀** + +Run: `PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/ -q` +Expected: 기존 149 + 신규 통과, 0 fail + +- [ ] **Step 2: agent-office 전체 회귀** + +Run: `cd agent-office && python -m pytest -q` +Expected: 기존 140 + 신규 통과, 0 fail + +- [ ] **Step 3: CLAUDE.md + 메모리 갱신** + +- CLAUDE.md §9 stock: `GET/POST/DELETE /api/stock/watchlist`, `GET /api/stock/trade-alerts`, `GET/POST /api/webai/trade-alert/{monitor-set,report}`. +- CLAUDE.md §9 agent-office: `POST /api/agent-office/stock/trade-alert` + 봇 명령 `/watch`류. +- `service_stock.md`: 신규 3테이블 + trade_alerts.py + AGENT_OFFICE_URL env. +- `service_agent_office.md`: trade-alert notify + 봇 명령 + telegram_trade.py. +- `infra_distributed_workers.md`: `trade-monitor` 워커 계약(monitor-set/report/heartbeat) 추가. + +- [ ] **Step 4: 커밋 + 배포** + +```bash +git add CLAUDE.md +git commit -m "docs(CLAUDE.md): 매매알람 엔드포인트 카탈로그 등재" +# nas-deploy 락 획득 후 push (자동배포) +git push origin main +``` + +- [ ] **Step 5: 계약 핸드오프** + +co-gahusb 팀버스로 AI 세션(web-ai `trade-monitor` 워커) + FE 세션(web-ui 관심종목 탭)에 스펙 §5 계약 전달. `WEBAI_API_KEY`·`AGENT_OFFICE_URL` env 운영 주입 확인. + +--- + +## Self-Review + +**Spec coverage:** +- §2 요구사항 6종 → watchlist(T1-2), monitor-set 조립 buy=watchlist∪screener·sell=보유(T3), edge dedup(T4,6), session/시간외(T5), 텔레그램 너+아내(T7), 봇 명령(T8). ✓ +- §4 DB 3테이블 → T1. §5 계약 4종 → monitor-set(T5)·report(T6)·notify(T7)·watchlist CRUD(T2)·heartbeat(워커측, BE 범위 밖 명시). ✓ +- §6 조건 로직 = 워커(web-ai) 담당 → BE 계획 범위 밖(계약만). ✓ +- §7 edge 흐름 → T6. §8 세션 게이팅 → T5. §9 에러(전송성공시만) → T6 test. §10 테스트 → 각 Task TDD. ✓ +- 트레일링 스톱 holding_high → T3 `holding_high(lookback 60d)`. ✓ + +**Placeholder scan:** 각 Step에 실제 코드/명령/기대출력 포함. "기존 확인" 주석은 시그니처 매칭 지시(placeholder 아님). ✓ + +**Type consistency:** 조건 키 문자열(buy_breakout 등) 전 Task 동일. `diff_firing` 반환 `{new,cleared,seen}` T4정의→T6소비 일치. `notify_agent_office`/`send_trade_alerts` 시그니처 T6/T7 일치. ✓ + +**범위:** BE(web-backend)만. 워커(web-ai)·탭(web-ui)은 계약 정의 후 각 세션(스펙 §11). 단일 실행 계획으로 적정.