Files
web-page-backend/docs/superpowers/plans/2026-07-02-realtime-trade-alerts.md
gahusb baa3a3075d docs(stock): 실시간 매매 알람 BE 구현 계획 (9 tasks, TDD)
watchlist/alert_state/history DB → CRUD API → monitor-set 조립 → edge diff →
webai monitor-set/report → agent-office 텔레그램(너+아내) → /watch 봇 명령 → 회귀/배포.
워커(web-ai)·탭(web-ui)은 계약(스펙 §5)만 정의해 각 세션 핸드오프.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01EqCYBhvTcdeCTUDX3RhWx9
2026-07-02 15:25:26 +09:00

41 KiB
Raw Blame History

실시간 매매 알람 (BE 쪽) Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: web-backend(stock + agent-office)에 실시간 매매 알람의 NAS 쪽 절반을 구현한다 — watchlist 관리, 감시대상(monitor-set) 조립, 워커 발화 report의 edge 중복판정(영속), 텔레그램(너+아내) 발송, 봇 명령.

Architecture: Windows docker 워커(별도 repo)가 매 1분 monitor-set을 pull → KIS+TA로 발화집합 F를 계산 → NAS stock /report에 POST. stock이 F를 영속 trade_alert_state와 diff해 신규 edge만 agent-office로 push → 텔레그램. TA/조건판정은 워커, edge 중복판정 상태는 NAS 영속.

Tech Stack: Python 3.12, FastAPI, SQLite, httpx, pytest. stock=Alpine 컨테이너(UTC — KST는 _today_kst 관용), agent-office=telegram 봇.

스펙: docs/superpowers/specs/2026-07-02-realtime-trade-alerts-design.md (§5 계약이 web-ai/web-ui 세션 인터페이스).

Global Constraints

  • DB 마이그레이션은 멱등CREATE TABLE IF NOT EXISTS, startup init_db()에서 생성.
  • KST 날짜/시각은 _today_kst류 명시 변환(utcnow()+9h) — stock 컨테이너는 Alpine+tzdata無라 date.today()=UTC (feedback_alpine_tzdata_utc).
  • 텔레그램 발송 성공 시에만 trade_alert_state 갱신 — 실패 시 재시도 보장(node_monitor 관용).
  • edge 중복판정 상태는 영속 — 워커/NAS 재시작해도 재알림 없음.
  • webai 엔드포인트는 Depends(verify_webai_key) (stock app/auth.py).
  • 로컬 stock 테스트: PYTHONPATH="<repo-root>:<repo-root>/stock" python -m pytest.
  • 조건 키 네이밍(계약 고정): buy=buy_ma20_pullback|buy_breakout|buy_rsi_bounce, sell=sell_stop_loss|sell_ma_break|sell_take_profit|sell_climax|sell_trailing_stop.

Task 1: stock DB — watchlist / trade_alert_state / trade_alert_history 테이블 + 헬퍼

Files:

  • Modify: stock/app/db.py (init_db에 테이블 3개 추가 + 헬퍼 함수)
  • Test: stock/tests/test_trade_alerts_db.py (Create)

Interfaces:

  • Produces:

    • add_watchlist(ticker: str, name: str|None=None, note: str|None=None) -> None (INSERT OR IGNORE)
    • remove_watchlist(ticker: str) -> bool
    • get_watchlist() -> list[dict] (각 {ticker,name,note,params,added_at})
    • get_alert_state_firing() -> set[tuple[str,str,str]] (currently_firing=1인 (ticker,kind,condition))
    • set_alert_firing(ticker,kind,condition, firing: bool, at_iso: str|None=None) -> None (upsert; firing=True면 first/last_fired 갱신)
    • touch_alert_seen(keys: list[tuple], at_iso: str) -> None
    • add_alert_history(ticker,name,kind,condition,price,detail: dict) -> int
    • get_alert_history(days: int=7) -> list[dict]
  • Step 1: 실패 테스트 작성stock/tests/test_trade_alerts_db.py

import os, sqlite3, tempfile, datetime as dt
import pytest

@pytest.fixture
def db(monkeypatch, tmp_path):
    from app import db as _db
    monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db"))
    _db.init_db()
    return _db

def test_watchlist_add_get_remove(db):
    db.add_watchlist("005930", "삼성전자", note="관심")
    db.add_watchlist("005930", "삼성전자")  # 멱등
    wl = db.get_watchlist()
    assert [w["ticker"] for w in wl] == ["005930"]
    assert wl[0]["name"] == "삼성전자"
    assert db.remove_watchlist("005930") is True
    assert db.get_watchlist() == []

def test_alert_state_edge_firing_and_clear(db):
    key = ("005930", "buy", "buy_breakout")
    assert db.get_alert_state_firing() == set()
    db.set_alert_firing(*key, firing=True, at_iso="2026-07-02T00:01:00Z")
    assert key in db.get_alert_state_firing()
    db.set_alert_firing(*key, firing=False)
    assert key not in db.get_alert_state_firing()

def test_alert_history_records_and_reads(db):
    db.add_alert_history("005930", "삼성전자", "buy", "buy_breakout", 71500, {"vol": 2.1})
    rows = db.get_alert_history(days=7)
    assert len(rows) == 1
    assert rows[0]["ticker"] == "005930" and rows[0]["kind"] == "buy"
    assert rows[0]["detail"]["vol"] == 2.1
  • Step 2: 실패 확인

Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_db.py -q Expected: FAIL (AttributeError: module 'app.db' has no attribute 'add_watchlist')

  • Step 3: 최소 구현stock/app/db.py

init_db() 안(기존 CREATE 블록들 뒤, seed 전)에 추가:

        conn.execute("""
            CREATE TABLE IF NOT EXISTS watchlist (
                ticker      TEXT PRIMARY KEY,
                name        TEXT,
                note        TEXT,
                params_json TEXT NOT NULL DEFAULT '{}',
                added_at    TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
            )
        """)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS trade_alert_state (
                ticker           TEXT NOT NULL,
                kind             TEXT NOT NULL,
                condition        TEXT NOT NULL,
                currently_firing INTEGER NOT NULL DEFAULT 0,
                first_fired_at   TEXT,
                last_fired_at    TEXT,
                last_seen_at     TEXT,
                PRIMARY KEY (ticker, kind, condition)
            )
        """)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS trade_alert_history (
                id          INTEGER PRIMARY KEY AUTOINCREMENT,
                ticker      TEXT NOT NULL,
                name        TEXT,
                kind        TEXT NOT NULL,
                condition   TEXT NOT NULL,
                price       REAL,
                detail_json TEXT,
                fired_at    TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
            )
        """)
        conn.execute("CREATE INDEX IF NOT EXISTS idx_tah_fired ON trade_alert_history(fired_at DESC)")

모듈 하단에 헬퍼 추가 (import json 상단에 이미 있음):

def add_watchlist(ticker: str, name: str = None, note: str = None) -> None:
    with _conn() as conn:
        conn.execute(
            "INSERT OR IGNORE INTO watchlist(ticker,name,note) VALUES(?,?,?)",
            (ticker, name, note),
        )
        # 이름/노트 갱신(이미 있으면)
        conn.execute(
            "UPDATE watchlist SET name=COALESCE(?,name), note=COALESCE(?,note) WHERE ticker=?",
            (name, note, ticker),
        )


def remove_watchlist(ticker: str) -> bool:
    with _conn() as conn:
        cur = conn.execute("DELETE FROM watchlist WHERE ticker=?", (ticker,))
        return cur.rowcount > 0


def get_watchlist() -> list:
    with _conn() as conn:
        rows = conn.execute("SELECT * FROM watchlist ORDER BY added_at").fetchall()
    return [
        {"ticker": r["ticker"], "name": r["name"], "note": r["note"],
         "params": json.loads(r["params_json"] or "{}"), "added_at": r["added_at"]}
        for r in rows
    ]


def get_alert_state_firing() -> set:
    with _conn() as conn:
        rows = conn.execute(
            "SELECT ticker,kind,condition FROM trade_alert_state WHERE currently_firing=1"
        ).fetchall()
    return {(r["ticker"], r["kind"], r["condition"]) for r in rows}


def set_alert_firing(ticker: str, kind: str, condition: str, firing: bool, at_iso: str = None) -> None:
    now = at_iso or _now_iso()
    with _conn() as conn:
        if firing:
            conn.execute(
                """INSERT INTO trade_alert_state(ticker,kind,condition,currently_firing,first_fired_at,last_fired_at,last_seen_at)
                   VALUES(?,?,?,1,?,?,?)
                   ON CONFLICT(ticker,kind,condition) DO UPDATE SET
                     currently_firing=1,
                     first_fired_at=COALESCE(first_fired_at,excluded.first_fired_at),
                     last_fired_at=excluded.last_fired_at,
                     last_seen_at=excluded.last_seen_at""",
                (ticker, kind, condition, now, now, now),
            )
        else:
            conn.execute(
                "UPDATE trade_alert_state SET currently_firing=0, last_seen_at=? WHERE ticker=? AND kind=? AND condition=?",
                (now, ticker, kind, condition),
            )


def touch_alert_seen(keys: list, at_iso: str) -> None:
    with _conn() as conn:
        for (ticker, kind, condition) in keys:
            conn.execute(
                "UPDATE trade_alert_state SET last_seen_at=? WHERE ticker=? AND kind=? AND condition=?",
                (at_iso, ticker, kind, condition),
            )


def add_alert_history(ticker: str, name: str, kind: str, condition: str, price, detail: dict) -> int:
    with _conn() as conn:
        cur = conn.execute(
            "INSERT INTO trade_alert_history(ticker,name,kind,condition,price,detail_json) VALUES(?,?,?,?,?,?)",
            (ticker, name, kind, condition, price, json.dumps(detail or {}, ensure_ascii=False)),
        )
        return cur.lastrowid


def get_alert_history(days: int = 7) -> list:
    with _conn() as conn:
        rows = conn.execute(
            "SELECT * FROM trade_alert_history WHERE fired_at >= datetime('now', ?) ORDER BY fired_at DESC",
            (f"-{int(days)} days",),
        ).fetchall()
    return [
        {"id": r["id"], "ticker": r["ticker"], "name": r["name"], "kind": r["kind"],
         "condition": r["condition"], "price": r["price"],
         "detail": json.loads(r["detail_json"] or "{}"), "fired_at": r["fired_at"]}
        for r in rows
    ]

_now_iso 헬퍼가 없으면 추가:

def _now_iso() -> str:
    return dt.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%fZ")

(파일 상단에 import datetime as dt 확인 — 없으면 추가)

  • Step 4: 통과 확인

Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_db.py -q Expected: PASS (3 passed)

  • Step 5: 커밋
git add stock/app/db.py stock/tests/test_trade_alerts_db.py
git commit -m "feat(stock): 매매알람 DB — watchlist/alert_state/history 테이블+헬퍼"

Task 2: stock — watchlist CRUD API + 알람 이력 API (public)

Files:

  • Modify: stock/app/main.py (엔드포인트 4개 추가)
  • Test: stock/tests/test_watchlist_api.py (Create)

Interfaces:

  • Consumes: Task 1의 add_watchlist/remove_watchlist/get_watchlist/get_alert_history

  • Produces (계약 §5.3):

    • GET /api/stock/watchlist{"watchlist":[...]}
    • POST /api/stock/watchlist body {ticker, name?, note?} → 201 {"ok":true}
    • DELETE /api/stock/watchlist/{ticker} → 200/404
    • GET /api/stock/trade-alerts?days=N{"alerts":[...]}
  • Step 1: 실패 테스트stock/tests/test_watchlist_api.py

import pytest
from fastapi.testclient import TestClient

@pytest.fixture
def client(monkeypatch, tmp_path):
    from app import db as _db
    monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db"))
    _db.init_db()
    from app.main import app
    return TestClient(app)

def test_watchlist_crud(client):
    assert client.get("/api/stock/watchlist").json()["watchlist"] == []
    r = client.post("/api/stock/watchlist", json={"ticker": "005930", "name": "삼성전자"})
    assert r.status_code == 201
    wl = client.get("/api/stock/watchlist").json()["watchlist"]
    assert wl[0]["ticker"] == "005930"
    assert client.delete("/api/stock/watchlist/005930").status_code == 200
    assert client.delete("/api/stock/watchlist/005930").status_code == 404

def test_trade_alerts_history_empty(client):
    assert client.get("/api/stock/trade-alerts?days=7").json()["alerts"] == []
  • Step 2: 실패 확인

Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_watchlist_api.py -q Expected: FAIL (404 on /api/stock/watchlist)

  • Step 3: 구현stock/app/main.py (기존 portfolio 엔드포인트 근처에 추가)

상단 import에 db 헬퍼 추가:

from .db import add_watchlist, remove_watchlist, get_watchlist, get_alert_history

Pydantic 모델(파일의 다른 Request 모델 근처):

class WatchlistItemRequest(BaseModel):
    ticker: str
    name: str | None = None
    note: str | None = None

엔드포인트:

@app.get("/api/stock/watchlist")
def list_watchlist():
    return {"watchlist": get_watchlist()}


@app.post("/api/stock/watchlist", status_code=201)
def create_watchlist_item(req: WatchlistItemRequest):
    add_watchlist(req.ticker, req.name, req.note)
    return {"ok": True}


@app.delete("/api/stock/watchlist/{ticker}")
def delete_watchlist_item(ticker: str):
    if not remove_watchlist(ticker):
        raise HTTPException(status_code=404, detail="not in watchlist")
    return {"ok": True}


@app.get("/api/stock/trade-alerts")
def list_trade_alerts(days: int = 7):
    return {"alerts": get_alert_history(days)}
  • Step 4: 통과 확인

Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_watchlist_api.py -q Expected: PASS

  • Step 5: 커밋
git add stock/app/main.py stock/tests/test_watchlist_api.py
git commit -m "feat(stock): watchlist CRUD + 알람 이력 API"

Task 3: stock — 감시대상 조립 헬퍼 (monitor-set 데이터)

Files:

  • Create: stock/app/trade_alerts.py (순수 조립 로직 — 테스트 쉬움)
  • Test: stock/tests/test_trade_alerts_monitorset.py (Create)

Interfaces:

  • Consumes: db.get_watchlist(), db.get_all_portfolio(), 최신 screener_results

  • Produces:

    • def latest_screener_candidates(conn) -> list[dict] — 최신 성공 run의 {ticker,name}
    • def build_monitor_set(conn, session: str, exit_params: dict, buy_params: dict) -> dict — 계약 §5.1 응답 dict(buy_targets=watchlistscreener 중복제거, sell_targets=보유+avg_price/qty/holding_high)
    • def holding_high(conn, ticker: str, lookback_days: int = 60) -> float|Nonekrx_daily_prices 최근 lookback 최고 high
  • Step 1: 실패 테스트stock/tests/test_trade_alerts_monitorset.py

import sqlite3, pytest

@pytest.fixture
def conn(monkeypatch, tmp_path):
    from app import db as _db
    monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db"))
    _db.init_db()
    c = sqlite3.connect(_db.DB_PATH); c.row_factory = sqlite3.Row
    # 보유 1종목
    _db.add_portfolio_item(ticker="000660", name="SK하이닉스", quantity=10,
                           avg_price=180000, broker="kis", market="KR")
    # watchlist 1종목
    _db.add_watchlist("005930", "삼성전자")
    yield c
    c.close()

def test_build_monitor_set_merges_sources(conn):
    from app import trade_alerts as ta
    ms = ta.build_monitor_set(conn, session="regular",
                              exit_params={"stop_pct": 0.08}, buy_params={"rsi_oversold": 30})
    buy_tickers = {t["ticker"] for t in ms["buy_targets"]}
    sell_tickers = {t["ticker"] for t in ms["sell_targets"]}
    assert "005930" in buy_tickers            # watchlist
    assert "000660" in sell_tickers           # 보유
    assert ms["session"] == "regular"
    assert ms["exit_params"]["stop_pct"] == 0.08
    sell = next(t for t in ms["sell_targets"] if t["ticker"] == "000660")
    assert sell["avg_price"] == 180000 and sell["qty"] == 10

(참고: add_portfolio_item 실제 시그니처는 stock/app/db.py:167에서 확인해 인자명 맞출 것.)

  • Step 2: 실패 확인

Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_monitorset.py -q Expected: FAIL (ModuleNotFoundError: app.trade_alerts)

  • Step 3: 구현stock/app/trade_alerts.py
"""매매 알람 — 감시대상(monitor-set) 조립. 순수 조립 로직(HTTP/텔레그램 없음)."""
from typing import Optional
from .db import get_watchlist, get_all_portfolio


def latest_screener_candidates(conn) -> list:
    row = conn.execute(
        "SELECT id FROM screener_runs WHERE status='success' ORDER BY asof DESC, id DESC LIMIT 1"
    ).fetchone()
    if not row:
        return []
    run_id = row[0]
    rows = conn.execute(
        "SELECT ticker,name FROM screener_results WHERE run_id=? ORDER BY rank", (run_id,)
    ).fetchall()
    return [{"ticker": r[0], "name": r[1]} for r in rows]


def holding_high(conn, ticker: str, lookback_days: int = 60) -> Optional[float]:
    row = conn.execute(
        "SELECT MAX(high) FROM krx_daily_prices WHERE ticker=? "
        "AND date >= date('now', ?)",
        (ticker, f"-{int(lookback_days)} days"),
    ).fetchone()
    return row[0] if row and row[0] is not None else None


def build_monitor_set(conn, session: str, exit_params: dict, buy_params: dict) -> dict:
    from datetime import datetime, timezone, timedelta
    kst = timezone(timedelta(hours=9))
    watch = {w["ticker"]: {"ticker": w["ticker"], "name": w["name"], "source": "watch",
                            "params": w.get("params", {})}
             for w in get_watchlist()}
    for c in latest_screener_candidates(conn):
        if c["ticker"] not in watch:
            watch[c["ticker"]] = {"ticker": c["ticker"], "name": c["name"],
                                   "source": "screener", "params": {}}
    sell_targets = []
    for p in get_all_portfolio():
        if (p.get("market") or "KR") != "KR":
            continue  # KRX만 실시간 TA 대상(미국주 skip — 스펙 §12/제약)
        t = p["ticker"]
        sell_targets.append({
            "ticker": t, "name": p.get("name"),
            "avg_price": p.get("avg_price"), "qty": p.get("quantity"),
            "holding_high": holding_high(conn, t), "params": {},
        })
    return {
        "session": session,
        "as_of": datetime.now(kst).isoformat(),
        "buy_targets": list(watch.values()),
        "sell_targets": sell_targets,
        "buy_params": buy_params,
        "exit_params": exit_params,
    }

(주의: get_all_portfolio() 반환 dict의 키명(avg_price/quantity/market/name)을 stock/app/db.py:182에서 확인해 맞출 것. 다르면 매핑 조정.)

  • Step 4: 통과 확인

Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_monitorset.py -q Expected: PASS

  • Step 5: 커밋
git add stock/app/trade_alerts.py stock/tests/test_trade_alerts_monitorset.py
git commit -m "feat(stock): 감시대상(monitor-set) 조립 로직"

Task 4: stock — edge diff 로직 (신규/해제/재무장, 순수 함수)

Files:

  • Modify: stock/app/trade_alerts.py (diff_firing 추가)
  • Test: stock/tests/test_trade_alerts_edge.py (Create)

Interfaces:

  • Produces:

    • def diff_firing(reported: list[dict], prev: set[tuple]) -> dict — 반환 {"new":[alert...], "cleared":[key...], "seen":[key...]}. reported 각 항목 {ticker,kind,condition,price,detail,name?}. key=(ticker,kind,condition).
  • Step 1: 실패 테스트stock/tests/test_trade_alerts_edge.py

def test_diff_new_and_cleared_and_rearm():
    from app.trade_alerts import diff_firing
    reported = [{"ticker": "005930", "kind": "buy", "condition": "buy_breakout",
                 "price": 71500, "detail": {}}]
    # 최초: prev 비어있음 → 신규
    d1 = diff_firing(reported, prev=set())
    assert [a["condition"] for a in d1["new"]] == ["buy_breakout"]
    assert d1["cleared"] == []
    # 유지: prev에 이미 있음 → 신규 없음
    prev = {("005930", "buy", "buy_breakout")}
    d2 = diff_firing(reported, prev=prev)
    assert d2["new"] == []
    # 해제: reported 비었고 prev에 있음 → cleared
    d3 = diff_firing([], prev=prev)
    assert d3["cleared"] == [("005930", "buy", "buy_breakout")]
    # 재무장 후 재발화: prev 다시 비면 신규
    d4 = diff_firing(reported, prev=set())
    assert len(d4["new"]) == 1
  • Step 2: 실패 확인

Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_edge.py -q Expected: FAIL (ImportError: cannot import name 'diff_firing')

  • Step 3: 구현stock/app/trade_alerts.py 하단에 추가
def diff_firing(reported: list, prev: set) -> dict:
    cur = {}
    for a in reported:
        key = (a["ticker"], a["kind"], a["condition"])
        cur[key] = a
    cur_keys = set(cur.keys())
    new_keys = cur_keys - prev
    cleared = sorted(prev - cur_keys)
    return {
        "new": [cur[k] for k in cur_keys if k in new_keys],
        "cleared": cleared,
        "seen": sorted(cur_keys),
    }
  • Step 4: 통과 확인

Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_edge.py -q Expected: PASS

  • Step 5: 커밋
git add stock/app/trade_alerts.py stock/tests/test_trade_alerts_edge.py
git commit -m "feat(stock): edge diff(신규/해제/재무장) 순수 함수"

Task 5: stock — session 판정 + webai monitor-set 엔드포인트

Files:

  • Modify: stock/app/trade_alerts.py (current_session)
  • Modify: stock/app/main.py (GET /api/webai/trade-alert/monitor-set)
  • Test: stock/tests/test_trade_alerts_monitorset_api.py (Create)

Interfaces:

  • Consumes: Task3 build_monitor_set, 기존 is_market_open, verify_webai_key

  • Produces:

    • def current_session(now_kst) -> str — pre/regular/after/closed (평일+휴장 반영은 호출부에서 is_market_open으로 게이팅)
    • GET /api/webai/trade-alert/monitor-set (webai 인증) → 계약 §5.1
  • Step 1: 실패 테스트stock/tests/test_trade_alerts_monitorset_api.py

import datetime as dt, pytest
from fastapi.testclient import TestClient

def test_current_session_windows():
    from app.trade_alerts import current_session
    d = dt.date(2026, 7, 2)
    assert current_session(dt.datetime.combine(d, dt.time(8, 40))) == "pre"
    assert current_session(dt.datetime.combine(d, dt.time(10, 0))) == "regular"
    assert current_session(dt.datetime.combine(d, dt.time(17, 0))) == "after"
    assert current_session(dt.datetime.combine(d, dt.time(20, 0))) == "closed"

@pytest.fixture
def client(monkeypatch, tmp_path):
    from app import db as _db
    monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db"))
    _db.init_db()
    monkeypatch.setenv("WEBAI_API_KEY", "k")
    from app.main import app
    return TestClient(app)

def test_monitor_set_requires_auth(client):
    assert client.get("/api/webai/trade-alert/monitor-set").status_code == 401

def test_monitor_set_ok(client):
    r = client.get("/api/webai/trade-alert/monitor-set", headers={"X-WebAI-Key": "k"})
    assert r.status_code == 200
    body = r.json()
    assert body["session"] in ("pre", "regular", "after", "closed")
    assert "buy_targets" in body and "sell_targets" in body
    assert body["exit_params"]["trailing_pct"] == 0.10
  • Step 2: 실패 확인

Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_monitorset_api.py -q Expected: FAIL

  • Step 3: 구현

stock/app/trade_alerts.py:

import datetime as _dt

# KST 세션 창(시:분)
_SESSIONS = [
    ("pre",     (8, 30), (9, 0)),
    ("regular", (9, 0),  (15, 30)),
    ("after",   (16, 0), (18, 0)),
]

def current_session(now_kst) -> str:
    t = now_kst.time()
    for name, (sh, sm), (eh, em) in _SESSIONS:
        if _dt.time(sh, sm) <= t < _dt.time(eh, em):
            return name
    return "closed"

DEFAULT_EXIT_PARAMS = {"stop_pct": 0.08, "take_pct": 0.25, "trailing_pct": 0.10}
DEFAULT_BUY_PARAMS = {"rsi_oversold": 30, "breakout_vol_mult": 1.5, "pullback_pct": 0.02}

stock/app/main.py (webai 섹션):

from .trade_alerts import (
    build_monitor_set, current_session, DEFAULT_EXIT_PARAMS, DEFAULT_BUY_PARAMS,
)

@app.get("/api/webai/trade-alert/monitor-set", dependencies=[Depends(verify_webai_key)])
def get_trade_alert_monitor_set():
    from datetime import datetime, timezone, timedelta, date as _date
    kst = timezone(timedelta(hours=9))
    now_kst = datetime.now(kst)
    session = current_session(now_kst)
    if not is_market_open(now_kst.date()):
        session = "closed"
    with _conn() as c:
        return build_monitor_set(c, session, DEFAULT_EXIT_PARAMS, DEFAULT_BUY_PARAMS)

(주의: _conn은 stock main.py에서 db._conn 사용 형태 확인 — 기존 엔드포인트가 with _conn() as c: 쓰는지 보고 동일 패턴. 없으면 from .db import _conn.)

  • Step 4: 통과 확인

Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_monitorset_api.py -q Expected: PASS

  • Step 5: 커밋
git add stock/app/trade_alerts.py stock/app/main.py stock/tests/test_trade_alerts_monitorset_api.py
git commit -m "feat(stock): session 판정 + webai monitor-set 엔드포인트"

Task 6: stock — webai report 엔드포인트 (edge diff → agent-office push → 상태/이력)

Files:

  • Modify: stock/app/main.py (POST /api/webai/trade-alert/report)
  • Modify: stock/app/trade_alerts.py (notify_agent_office httpx 헬퍼)
  • Test: stock/tests/test_trade_alerts_report_api.py (Create)

Interfaces:

  • Consumes: Task1 get_alert_state_firing/set_alert_firing/touch_alert_seen/add_alert_history, Task4 diff_firing

  • Produces (계약 §5.2):

    • POST /api/webai/trade-alert/report body {as_of, firing:[...]}{"new_alerts":N,"cleared":M}
    • agent-office로 신규 alert push (AGENT_OFFICE_URL env, 기본 http://agent-office:8000), 전송 성공 시에만 set_alert_firing(True) + add_alert_history
  • Step 1: 실패 테스트stock/tests/test_trade_alerts_report_api.py

import pytest
from unittest.mock import patch
from fastapi.testclient import TestClient

@pytest.fixture
def client(monkeypatch, tmp_path):
    from app import db as _db
    monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db"))
    _db.init_db()
    monkeypatch.setenv("WEBAI_API_KEY", "k")
    from app.main import app
    return TestClient(app)

def _report(client, firing):
    return client.post("/api/webai/trade-alert/report",
                       headers={"X-WebAI-Key": "k"},
                       json={"as_of": "2026-07-02T09:01:00+09:00", "firing": firing})

def test_report_new_edge_sends_and_persists(client):
    firing = [{"ticker": "005930", "name": "삼성전자", "kind": "buy",
               "condition": "buy_breakout", "price": 71500, "detail": {"vol": 2.0}}]
    with patch("app.trade_alerts.notify_agent_office", return_value=True) as m:
        r1 = _report(client, firing)
    assert r1.json()["new_alerts"] == 1
    assert m.called
    # 2번째 동일 firing → 유지, 신규 0
    with patch("app.trade_alerts.notify_agent_office", return_value=True):
        r2 = _report(client, firing)
    assert r2.json()["new_alerts"] == 0
    # 이력 1건
    assert len(client.get("/api/stock/trade-alerts?days=1").json()["alerts"]) == 1

def test_report_send_failure_does_not_persist(client):
    firing = [{"ticker": "005930", "name": "삼성전자", "kind": "buy",
               "condition": "buy_breakout", "price": 71500, "detail": {}}]
    with patch("app.trade_alerts.notify_agent_office", return_value=False):
        r = _report(client, firing)
    assert r.json()["new_alerts"] == 0            # 전송 실패 → 미채택
    # 다음 사이클(전송 성공) 재시도되어 알림
    with patch("app.trade_alerts.notify_agent_office", return_value=True):
        r2 = _report(client, firing)
    assert r2.json()["new_alerts"] == 1

def test_report_cleared_rearm(client):
    firing = [{"ticker": "005930", "name": "삼성", "kind": "buy",
               "condition": "buy_breakout", "price": 71500, "detail": {}}]
    with patch("app.trade_alerts.notify_agent_office", return_value=True):
        _report(client, firing)
        _report(client, [])                        # 해제
        r = _report(client, firing)                # 재발화
    assert r.json()["new_alerts"] == 1
  • Step 2: 실패 확인

Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_report_api.py -q Expected: FAIL (404)

  • Step 3: 구현

stock/app/trade_alerts.py:

import os
import httpx

def notify_agent_office(alerts: list) -> bool:
    """신규 alert들을 agent-office로 push. 성공 True."""
    url = os.getenv("AGENT_OFFICE_URL", "http://agent-office:8000") + "/api/agent-office/stock/trade-alert"
    try:
        with httpx.Client(timeout=10) as c:
            resp = c.post(url, json={"alerts": alerts})
        return resp.status_code == 200
    except httpx.HTTPError:
        return False

stock/app/main.py:

from .trade_alerts import diff_firing, notify_agent_office
from .db import (
    get_alert_state_firing, set_alert_firing, touch_alert_seen, add_alert_history,
)

class TradeAlertReport(BaseModel):
    as_of: str | None = None
    firing: list[dict] = []

@app.post("/api/webai/trade-alert/report", dependencies=[Depends(verify_webai_key)])
def post_trade_alert_report(body: TradeAlertReport):
    prev = get_alert_state_firing()
    d = diff_firing(body.firing, prev)
    sent = 0
    for a in d["new"]:
        ok = notify_agent_office([a])
        if ok:
            set_alert_firing(a["ticker"], a["kind"], a["condition"], firing=True, at_iso=body.as_of)
            add_alert_history(a["ticker"], a.get("name"), a["kind"], a["condition"],
                              a.get("price"), a.get("detail", {}))
            sent += 1
    for key in d["cleared"]:
        set_alert_firing(key[0], key[1], key[2], firing=False)
    touch_alert_seen(d["seen"], body.as_of or "")
    return {"new_alerts": sent, "cleared": len(d["cleared"])}
  • Step 4: 통과 확인

Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_report_api.py -q Expected: PASS (3 passed)

  • Step 5: 커밋
git add stock/app/main.py stock/app/trade_alerts.py stock/tests/test_trade_alerts_report_api.py
git commit -m "feat(stock): webai report — edge diff→agent-office push→상태/이력(전송성공시만)"

Task 7: agent-office — trade-alert notify 엔드포인트 (텔레그램 너+아내)

Files:

  • Modify: agent-office/app/main.py (POST /api/agent-office/stock/trade-alert)
  • Create: agent-office/app/notifiers/telegram_trade.py (포맷+전송)
  • Test: agent-office/tests/test_trade_alert_notify.py (Create)

Interfaces:

  • Consumes: telegram.messaging.send_raw, config.TELEGRAM_CHAT_ID, config.TELEGRAM_WIFE_CHAT_ID

  • Produces:

    • format_trade_alert(alert: dict) -> str (HTML)
    • async def send_trade_alerts(alerts: list) -> dict — 너+아내 각각 send_raw. {"sent": n, "ok": bool}
    • POST /api/agent-office/stock/trade-alert body {alerts:[...]} → 위 호출 + add_log
  • Step 1: 실패 테스트agent-office/tests/test_trade_alert_notify.py

(파일 상단은 기존 test_youtube_publisher_retry.py_TMP/_init_db 픽스처 패턴 복사 — monkeypatch.setattr(app.db, "DB_PATH", _TMP) + -wal/-shm 삭제)

import pytest
from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_send_trade_alerts_to_user_and_wife():
    from app.notifiers import telegram_trade
    alerts = [{"ticker": "005930", "name": "삼성전자", "kind": "buy",
               "condition": "buy_breakout", "price": 71500, "detail": {}}]
    with patch("app.notifiers.telegram_trade.send_raw",
               new=AsyncMock(return_value={"ok": True})) as m, \
         patch("app.notifiers.telegram_trade.TELEGRAM_CHAT_ID", "U"), \
         patch("app.notifiers.telegram_trade.TELEGRAM_WIFE_CHAT_ID", "W"):
        res = await telegram_trade.send_trade_alerts(alerts)
    assert res["ok"] is True
    chat_ids = {c.kwargs.get("chat_id") for c in m.await_args_list}
    assert chat_ids == {"U", "W"}      # 둘 다 발송

@pytest.mark.asyncio
async def test_format_trade_alert_has_direction():
    from app.notifiers.telegram_trade import format_trade_alert
    txt = format_trade_alert({"ticker": "005930", "name": "삼성전자", "kind": "sell",
                              "condition": "sell_stop_loss", "price": 60000, "detail": {}})
    assert "매도" in txt and "삼성전자" in txt
  • Step 2: 실패 확인

Run: cd agent-office && python -m pytest tests/test_trade_alert_notify.py -q Expected: FAIL (ModuleNotFoundError: app.notifiers.telegram_trade)

  • Step 3: 구현agent-office/app/notifiers/telegram_trade.py
"""매매 알람 텔레그램 포맷+전송 (너+아내)."""
from ..telegram.messaging import send_raw
from ..config import TELEGRAM_CHAT_ID, TELEGRAM_WIFE_CHAT_ID

_KIND_LABEL = {"buy": "🟢 매수", "sell": "🔴 매도"}
_COND_LABEL = {
    "buy_ma20_pullback": "지지선 되돌림", "buy_breakout": "돌파", "buy_rsi_bounce": "RSI 과매도 반등",
    "sell_stop_loss": "손절", "sell_ma_break": "이평 이탈", "sell_take_profit": "익절",
    "sell_climax": "급등 소진", "sell_trailing_stop": "트레일링 스톱",
}

def format_trade_alert(a: dict) -> str:
    kind = _KIND_LABEL.get(a["kind"], a["kind"])
    cond = _COND_LABEL.get(a["condition"], a["condition"])
    name = a.get("name") or a["ticker"]
    price = a.get("price")
    price_s = f"{int(price):,}원" if price else "-"
    return f"{kind} 알람\n<b>{name}</b> ({a['ticker']})\n조건: {cond}\n현재가: {price_s}"

async def send_trade_alerts(alerts: list) -> dict:
    sent = 0
    all_ok = True
    chat_ids = [c for c in (TELEGRAM_CHAT_ID, TELEGRAM_WIFE_CHAT_ID) if c]
    for a in alerts:
        text = format_trade_alert(a)
        for cid in chat_ids:
            r = await send_raw(text=text, chat_id=cid)
            if r.get("ok"):
                sent += 1
            else:
                all_ok = False
    return {"sent": sent, "ok": all_ok}

agent-office/app/main.py:

class TradeAlertBody(BaseModel):
    alerts: list[dict] = []

@app.post("/api/agent-office/stock/trade-alert")
async def stock_trade_alert(body: TradeAlertBody):
    from .notifiers.telegram_trade import send_trade_alerts
    res = await send_trade_alerts(body.alerts)
    for a in body.alerts:
        add_log("stock", f"매매알람 {a.get('kind')} {a.get('ticker')} {a.get('condition')}", "info")
    return res

(add_log import 확인 — main.py 상단에 이미 있을 것. 없으면 from .db import add_log.)

  • Step 4: 통과 확인

Run: cd agent-office && python -m pytest tests/test_trade_alert_notify.py -q Expected: PASS

  • Step 5: 커밋
git add agent-office/app/notifiers/telegram_trade.py agent-office/app/main.py agent-office/tests/test_trade_alert_notify.py
git commit -m "feat(agent-office): 매매알람 텔레그램 notify(너+아내) 엔드포인트"

Task 8: agent-office — 봇 명령 /watch /unwatch /watchlist

Files:

  • Modify: agent-office/app/service_proxy.py (stock watchlist CRUD 호출 헬퍼)
  • Modify: agent-office/app/telegram/webhook.py (_handle_message에 /watch류 처리)
  • Test: agent-office/tests/test_watch_commands.py (Create)

Interfaces:

  • Consumes: 기존 service_proxy HTTP 패턴, stock POST/DELETE/GET /api/stock/watchlist

  • Produces:

    • service_proxy.watchlist_add(ticker) / watchlist_remove(ticker) / watchlist_list()
    • webhook _handle_message/watch <t>, /unwatch <t>, /watchlist를 인식해 텔레그램 응답
  • Step 1: 실패 테스트agent-office/tests/test_watch_commands.py

import pytest
from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_watch_command_calls_add():
    from app.telegram import webhook
    msg = {"chat": {"id": 1}, "text": "/watch 005930"}
    with patch("app.telegram.webhook.service_proxy.watchlist_add",
               new=AsyncMock(return_value={"ok": True})) as m, \
         patch("app.telegram.webhook.api_call", new=AsyncMock(return_value={"ok": True})):
        handled = await webhook.handle_watch_command(msg)
    assert handled is True
    m.assert_awaited_once_with("005930")

@pytest.mark.asyncio
async def test_non_watch_text_ignored():
    from app.telegram import webhook
    msg = {"chat": {"id": 1}, "text": "안녕"}
    assert await webhook.handle_watch_command(msg) is False
  • Step 2: 실패 확인

Run: cd agent-office && python -m pytest tests/test_watch_commands.py -q Expected: FAIL (AttributeError: handle_watch_command)

  • Step 3: 구현

agent-office/app/service_proxy.py (기존 httpx 패턴 따라 — STOCK_URL 사용):

async def watchlist_add(ticker: str) -> dict:
    return await _post(f"{STOCK_URL}/api/stock/watchlist", {"ticker": ticker})

async def watchlist_remove(ticker: str) -> dict:
    return await _delete(f"{STOCK_URL}/api/stock/watchlist/{ticker}")

async def watchlist_list() -> dict:
    return await _get(f"{STOCK_URL}/api/stock/watchlist")

(_post/_get/_delete·STOCK_URL은 service_proxy.py 기존 헬퍼 확인해 맞춤. 없으면 httpx로 직접.)

agent-office/app/telegram/webhook.py (상단 from .. import service_proxy, from .client import api_call):

async def handle_watch_command(message: dict) -> bool:
    """/watch /unwatch /watchlist 처리. 처리했으면 True."""
    text = (message.get("text") or "").strip()
    chat_id = message.get("chat", {}).get("id")
    parts = text.split()
    cmd = parts[0].lower() if parts else ""
    if cmd == "/watch" and len(parts) >= 2:
        await service_proxy.watchlist_add(parts[1])
        reply = f"관심종목 추가: {parts[1]}"
    elif cmd == "/unwatch" and len(parts) >= 2:
        await service_proxy.watchlist_remove(parts[1])
        reply = f"관심종목 삭제: {parts[1]}"
    elif cmd == "/watchlist":
        res = await service_proxy.watchlist_list()
        items = res.get("watchlist", [])
        reply = "관심종목:\n" + ("\n".join(f"- {w['name'] or ''} ({w['ticker']})" for w in items) or "(없음)")
    else:
        return False
    await api_call("sendMessage", {"chat_id": chat_id, "text": reply})
    return True

handle_webhook의 message 분기(현재 line 26 근처)에서 slash 명령보다 먼저 시도:

    if message and message.get("text"):
        if await handle_watch_command(message):
            return None
    if message and message.get("text") and agent_dispatcher is not None:
        return await _handle_message(message, agent_dispatcher)
  • Step 4: 통과 확인

Run: cd agent-office && python -m pytest tests/test_watch_commands.py -q Expected: PASS

  • Step 5: 커밋
git add agent-office/app/service_proxy.py agent-office/app/telegram/webhook.py agent-office/tests/test_watch_commands.py
git commit -m "feat(agent-office): /watch /unwatch /watchlist 봇 명령"

Task 9: 회귀 + 메모리/카탈로그 갱신 + 배포

Files:

  • Modify: CLAUDE.md (§9 stock/agent-office 엔드포인트 표에 신규 항목)

  • Modify: 메모리 service_stock.md, service_agent_office.md, infra_distributed_workers.md(trade-monitor 워커 계약)

  • Step 1: stock 전체 회귀

Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/ -q Expected: 기존 149 + 신규 통과, 0 fail

  • Step 2: agent-office 전체 회귀

Run: cd agent-office && python -m pytest -q Expected: 기존 140 + 신규 통과, 0 fail

  • Step 3: CLAUDE.md + 메모리 갱신

  • CLAUDE.md §9 stock: GET/POST/DELETE /api/stock/watchlist, GET /api/stock/trade-alerts, GET/POST /api/webai/trade-alert/{monitor-set,report}.

  • CLAUDE.md §9 agent-office: POST /api/agent-office/stock/trade-alert + 봇 명령 /watch류.

  • service_stock.md: 신규 3테이블 + trade_alerts.py + AGENT_OFFICE_URL env.

  • service_agent_office.md: trade-alert notify + 봇 명령 + telegram_trade.py.

  • infra_distributed_workers.md: trade-monitor 워커 계약(monitor-set/report/heartbeat) 추가.

  • Step 4: 커밋 + 배포

git add CLAUDE.md
git commit -m "docs(CLAUDE.md): 매매알람 엔드포인트 카탈로그 등재"
# nas-deploy 락 획득 후 push (자동배포)
git push origin main
  • Step 5: 계약 핸드오프

co-gahusb 팀버스로 AI 세션(web-ai trade-monitor 워커) + FE 세션(web-ui 관심종목 탭)에 스펙 §5 계약 전달. WEBAI_API_KEY·AGENT_OFFICE_URL env 운영 주입 확인.


Self-Review

Spec coverage:

  • §2 요구사항 6종 → watchlist(T1-2), monitor-set 조립 buy=watchlistscreener·sell=보유(T3), edge dedup(T4,6), session/시간외(T5), 텔레그램 너+아내(T7), 봇 명령(T8). ✓
  • §4 DB 3테이블 → T1. §5 계약 4종 → monitor-set(T5)·report(T6)·notify(T7)·watchlist CRUD(T2)·heartbeat(워커측, BE 범위 밖 명시). ✓
  • §6 조건 로직 = 워커(web-ai) 담당 → BE 계획 범위 밖(계약만). ✓
  • §7 edge 흐름 → T6. §8 세션 게이팅 → T5. §9 에러(전송성공시만) → T6 test. §10 테스트 → 각 Task TDD. ✓
  • 트레일링 스톱 holding_high → T3 holding_high(lookback 60d). ✓

Placeholder scan: 각 Step에 실제 코드/명령/기대출력 포함. "기존 확인" 주석은 시그니처 매칭 지시(placeholder 아님). ✓

Type consistency: 조건 키 문자열(buy_breakout 등) 전 Task 동일. diff_firing 반환 {new,cleared,seen} T4정의→T6소비 일치. notify_agent_office/send_trade_alerts 시그니처 T6/T7 일치. ✓

범위: BE(web-backend)만. 워커(web-ai)·탭(web-ui)은 계약 정의 후 각 세션(스펙 §11). 단일 실행 계획으로 적정.