watchlist/alert_state/history DB → CRUD API → monitor-set 조립 → edge diff → webai monitor-set/report → agent-office 텔레그램(너+아내) → /watch 봇 명령 → 회귀/배포. 워커(web-ai)·탭(web-ui)은 계약(스펙 §5)만 정의해 각 세션 핸드오프. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01EqCYBhvTcdeCTUDX3RhWx9
41 KiB
실시간 매매 알람 (BE 쪽) Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: web-backend(stock + agent-office)에 실시간 매매 알람의 NAS 쪽 절반을 구현한다 — watchlist 관리, 감시대상(monitor-set) 조립, 워커 발화 report의 edge 중복판정(영속), 텔레그램(너+아내) 발송, 봇 명령.
Architecture: Windows docker 워커(별도 repo)가 매 1분 monitor-set을 pull → KIS+TA로 발화집합 F를 계산 → NAS stock /report에 POST. stock이 F를 영속 trade_alert_state와 diff해 신규 edge만 agent-office로 push → 텔레그램. TA/조건판정은 워커, edge 중복판정 상태는 NAS 영속.
Tech Stack: Python 3.12, FastAPI, SQLite, httpx, pytest. stock=Alpine 컨테이너(UTC — KST는 _today_kst 관용), agent-office=telegram 봇.
스펙: docs/superpowers/specs/2026-07-02-realtime-trade-alerts-design.md (§5 계약이 web-ai/web-ui 세션 인터페이스).
Global Constraints
- DB 마이그레이션은 멱등 —
CREATE TABLE IF NOT EXISTS, startupinit_db()에서 생성. - KST 날짜/시각은
_today_kst류 명시 변환(utcnow()+9h) — stock 컨테이너는 Alpine+tzdata無라date.today()=UTC (feedback_alpine_tzdata_utc). - 텔레그램 발송 성공 시에만
trade_alert_state갱신 — 실패 시 재시도 보장(node_monitor 관용). - edge 중복판정 상태는 영속 — 워커/NAS 재시작해도 재알림 없음.
- webai 엔드포인트는
Depends(verify_webai_key)(stockapp/auth.py). - 로컬 stock 테스트:
PYTHONPATH="<repo-root>:<repo-root>/stock" python -m pytest. - 조건 키 네이밍(계약 고정): buy=
buy_ma20_pullback|buy_breakout|buy_rsi_bounce, sell=sell_stop_loss|sell_ma_break|sell_take_profit|sell_climax|sell_trailing_stop.
Task 1: stock DB — watchlist / trade_alert_state / trade_alert_history 테이블 + 헬퍼
Files:
- Modify:
stock/app/db.py(init_db에 테이블 3개 추가 + 헬퍼 함수) - Test:
stock/tests/test_trade_alerts_db.py(Create)
Interfaces:
-
Produces:
add_watchlist(ticker: str, name: str|None=None, note: str|None=None) -> None(INSERT OR IGNORE)remove_watchlist(ticker: str) -> boolget_watchlist() -> list[dict](각{ticker,name,note,params,added_at})get_alert_state_firing() -> set[tuple[str,str,str]](currently_firing=1인(ticker,kind,condition))set_alert_firing(ticker,kind,condition, firing: bool, at_iso: str|None=None) -> None(upsert; firing=True면 first/last_fired 갱신)touch_alert_seen(keys: list[tuple], at_iso: str) -> Noneadd_alert_history(ticker,name,kind,condition,price,detail: dict) -> intget_alert_history(days: int=7) -> list[dict]
-
Step 1: 실패 테스트 작성 —
stock/tests/test_trade_alerts_db.py
import os, sqlite3, tempfile, datetime as dt
import pytest
@pytest.fixture
def db(monkeypatch, tmp_path):
from app import db as _db
monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db"))
_db.init_db()
return _db
def test_watchlist_add_get_remove(db):
db.add_watchlist("005930", "삼성전자", note="관심")
db.add_watchlist("005930", "삼성전자") # 멱등
wl = db.get_watchlist()
assert [w["ticker"] for w in wl] == ["005930"]
assert wl[0]["name"] == "삼성전자"
assert db.remove_watchlist("005930") is True
assert db.get_watchlist() == []
def test_alert_state_edge_firing_and_clear(db):
key = ("005930", "buy", "buy_breakout")
assert db.get_alert_state_firing() == set()
db.set_alert_firing(*key, firing=True, at_iso="2026-07-02T00:01:00Z")
assert key in db.get_alert_state_firing()
db.set_alert_firing(*key, firing=False)
assert key not in db.get_alert_state_firing()
def test_alert_history_records_and_reads(db):
db.add_alert_history("005930", "삼성전자", "buy", "buy_breakout", 71500, {"vol": 2.1})
rows = db.get_alert_history(days=7)
assert len(rows) == 1
assert rows[0]["ticker"] == "005930" and rows[0]["kind"] == "buy"
assert rows[0]["detail"]["vol"] == 2.1
- Step 2: 실패 확인
Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_db.py -q
Expected: FAIL (AttributeError: module 'app.db' has no attribute 'add_watchlist')
- Step 3: 최소 구현 —
stock/app/db.py
init_db() 안(기존 CREATE 블록들 뒤, seed 전)에 추가:
conn.execute("""
CREATE TABLE IF NOT EXISTS watchlist (
ticker TEXT PRIMARY KEY,
name TEXT,
note TEXT,
params_json TEXT NOT NULL DEFAULT '{}',
added_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS trade_alert_state (
ticker TEXT NOT NULL,
kind TEXT NOT NULL,
condition TEXT NOT NULL,
currently_firing INTEGER NOT NULL DEFAULT 0,
first_fired_at TEXT,
last_fired_at TEXT,
last_seen_at TEXT,
PRIMARY KEY (ticker, kind, condition)
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS trade_alert_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL,
name TEXT,
kind TEXT NOT NULL,
condition TEXT NOT NULL,
price REAL,
detail_json TEXT,
fired_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_tah_fired ON trade_alert_history(fired_at DESC)")
모듈 하단에 헬퍼 추가 (import json 상단에 이미 있음):
def add_watchlist(ticker: str, name: str = None, note: str = None) -> None:
with _conn() as conn:
conn.execute(
"INSERT OR IGNORE INTO watchlist(ticker,name,note) VALUES(?,?,?)",
(ticker, name, note),
)
# 이름/노트 갱신(이미 있으면)
conn.execute(
"UPDATE watchlist SET name=COALESCE(?,name), note=COALESCE(?,note) WHERE ticker=?",
(name, note, ticker),
)
def remove_watchlist(ticker: str) -> bool:
with _conn() as conn:
cur = conn.execute("DELETE FROM watchlist WHERE ticker=?", (ticker,))
return cur.rowcount > 0
def get_watchlist() -> list:
with _conn() as conn:
rows = conn.execute("SELECT * FROM watchlist ORDER BY added_at").fetchall()
return [
{"ticker": r["ticker"], "name": r["name"], "note": r["note"],
"params": json.loads(r["params_json"] or "{}"), "added_at": r["added_at"]}
for r in rows
]
def get_alert_state_firing() -> set:
with _conn() as conn:
rows = conn.execute(
"SELECT ticker,kind,condition FROM trade_alert_state WHERE currently_firing=1"
).fetchall()
return {(r["ticker"], r["kind"], r["condition"]) for r in rows}
def set_alert_firing(ticker: str, kind: str, condition: str, firing: bool, at_iso: str = None) -> None:
now = at_iso or _now_iso()
with _conn() as conn:
if firing:
conn.execute(
"""INSERT INTO trade_alert_state(ticker,kind,condition,currently_firing,first_fired_at,last_fired_at,last_seen_at)
VALUES(?,?,?,1,?,?,?)
ON CONFLICT(ticker,kind,condition) DO UPDATE SET
currently_firing=1,
first_fired_at=COALESCE(first_fired_at,excluded.first_fired_at),
last_fired_at=excluded.last_fired_at,
last_seen_at=excluded.last_seen_at""",
(ticker, kind, condition, now, now, now),
)
else:
conn.execute(
"UPDATE trade_alert_state SET currently_firing=0, last_seen_at=? WHERE ticker=? AND kind=? AND condition=?",
(now, ticker, kind, condition),
)
def touch_alert_seen(keys: list, at_iso: str) -> None:
with _conn() as conn:
for (ticker, kind, condition) in keys:
conn.execute(
"UPDATE trade_alert_state SET last_seen_at=? WHERE ticker=? AND kind=? AND condition=?",
(at_iso, ticker, kind, condition),
)
def add_alert_history(ticker: str, name: str, kind: str, condition: str, price, detail: dict) -> int:
with _conn() as conn:
cur = conn.execute(
"INSERT INTO trade_alert_history(ticker,name,kind,condition,price,detail_json) VALUES(?,?,?,?,?,?)",
(ticker, name, kind, condition, price, json.dumps(detail or {}, ensure_ascii=False)),
)
return cur.lastrowid
def get_alert_history(days: int = 7) -> list:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM trade_alert_history WHERE fired_at >= datetime('now', ?) ORDER BY fired_at DESC",
(f"-{int(days)} days",),
).fetchall()
return [
{"id": r["id"], "ticker": r["ticker"], "name": r["name"], "kind": r["kind"],
"condition": r["condition"], "price": r["price"],
"detail": json.loads(r["detail_json"] or "{}"), "fired_at": r["fired_at"]}
for r in rows
]
_now_iso 헬퍼가 없으면 추가:
def _now_iso() -> str:
return dt.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%fZ")
(파일 상단에 import datetime as dt 확인 — 없으면 추가)
- Step 4: 통과 확인
Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_db.py -q
Expected: PASS (3 passed)
- Step 5: 커밋
git add stock/app/db.py stock/tests/test_trade_alerts_db.py
git commit -m "feat(stock): 매매알람 DB — watchlist/alert_state/history 테이블+헬퍼"
Task 2: stock — watchlist CRUD API + 알람 이력 API (public)
Files:
- Modify:
stock/app/main.py(엔드포인트 4개 추가) - Test:
stock/tests/test_watchlist_api.py(Create)
Interfaces:
-
Consumes: Task 1의
add_watchlist/remove_watchlist/get_watchlist/get_alert_history -
Produces (계약 §5.3):
GET /api/stock/watchlist→{"watchlist":[...]}POST /api/stock/watchlistbody{ticker, name?, note?}→ 201{"ok":true}DELETE /api/stock/watchlist/{ticker}→ 200/404GET /api/stock/trade-alerts?days=N→{"alerts":[...]}
-
Step 1: 실패 테스트 —
stock/tests/test_watchlist_api.py
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def client(monkeypatch, tmp_path):
from app import db as _db
monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db"))
_db.init_db()
from app.main import app
return TestClient(app)
def test_watchlist_crud(client):
assert client.get("/api/stock/watchlist").json()["watchlist"] == []
r = client.post("/api/stock/watchlist", json={"ticker": "005930", "name": "삼성전자"})
assert r.status_code == 201
wl = client.get("/api/stock/watchlist").json()["watchlist"]
assert wl[0]["ticker"] == "005930"
assert client.delete("/api/stock/watchlist/005930").status_code == 200
assert client.delete("/api/stock/watchlist/005930").status_code == 404
def test_trade_alerts_history_empty(client):
assert client.get("/api/stock/trade-alerts?days=7").json()["alerts"] == []
- Step 2: 실패 확인
Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_watchlist_api.py -q
Expected: FAIL (404 on /api/stock/watchlist)
- Step 3: 구현 —
stock/app/main.py(기존 portfolio 엔드포인트 근처에 추가)
상단 import에 db 헬퍼 추가:
from .db import add_watchlist, remove_watchlist, get_watchlist, get_alert_history
Pydantic 모델(파일의 다른 Request 모델 근처):
class WatchlistItemRequest(BaseModel):
ticker: str
name: str | None = None
note: str | None = None
엔드포인트:
@app.get("/api/stock/watchlist")
def list_watchlist():
return {"watchlist": get_watchlist()}
@app.post("/api/stock/watchlist", status_code=201)
def create_watchlist_item(req: WatchlistItemRequest):
add_watchlist(req.ticker, req.name, req.note)
return {"ok": True}
@app.delete("/api/stock/watchlist/{ticker}")
def delete_watchlist_item(ticker: str):
if not remove_watchlist(ticker):
raise HTTPException(status_code=404, detail="not in watchlist")
return {"ok": True}
@app.get("/api/stock/trade-alerts")
def list_trade_alerts(days: int = 7):
return {"alerts": get_alert_history(days)}
- Step 4: 통과 확인
Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_watchlist_api.py -q
Expected: PASS
- Step 5: 커밋
git add stock/app/main.py stock/tests/test_watchlist_api.py
git commit -m "feat(stock): watchlist CRUD + 알람 이력 API"
Task 3: stock — 감시대상 조립 헬퍼 (monitor-set 데이터)
Files:
- Create:
stock/app/trade_alerts.py(순수 조립 로직 — 테스트 쉬움) - Test:
stock/tests/test_trade_alerts_monitorset.py(Create)
Interfaces:
-
Consumes:
db.get_watchlist(),db.get_all_portfolio(), 최신 screener_results -
Produces:
def latest_screener_candidates(conn) -> list[dict]— 최신 성공 run의{ticker,name}def build_monitor_set(conn, session: str, exit_params: dict, buy_params: dict) -> dict— 계약 §5.1 응답 dict(buy_targets=watchlist∪screener 중복제거,sell_targets=보유+avg_price/qty/holding_high)def holding_high(conn, ticker: str, lookback_days: int = 60) -> float|None—krx_daily_prices최근 lookback 최고 high
-
Step 1: 실패 테스트 —
stock/tests/test_trade_alerts_monitorset.py
import sqlite3, pytest
@pytest.fixture
def conn(monkeypatch, tmp_path):
from app import db as _db
monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db"))
_db.init_db()
c = sqlite3.connect(_db.DB_PATH); c.row_factory = sqlite3.Row
# 보유 1종목
_db.add_portfolio_item(ticker="000660", name="SK하이닉스", quantity=10,
avg_price=180000, broker="kis", market="KR")
# watchlist 1종목
_db.add_watchlist("005930", "삼성전자")
yield c
c.close()
def test_build_monitor_set_merges_sources(conn):
from app import trade_alerts as ta
ms = ta.build_monitor_set(conn, session="regular",
exit_params={"stop_pct": 0.08}, buy_params={"rsi_oversold": 30})
buy_tickers = {t["ticker"] for t in ms["buy_targets"]}
sell_tickers = {t["ticker"] for t in ms["sell_targets"]}
assert "005930" in buy_tickers # watchlist
assert "000660" in sell_tickers # 보유
assert ms["session"] == "regular"
assert ms["exit_params"]["stop_pct"] == 0.08
sell = next(t for t in ms["sell_targets"] if t["ticker"] == "000660")
assert sell["avg_price"] == 180000 and sell["qty"] == 10
(참고: add_portfolio_item 실제 시그니처는 stock/app/db.py:167에서 확인해 인자명 맞출 것.)
- Step 2: 실패 확인
Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_monitorset.py -q
Expected: FAIL (ModuleNotFoundError: app.trade_alerts)
- Step 3: 구현 —
stock/app/trade_alerts.py
"""매매 알람 — 감시대상(monitor-set) 조립. 순수 조립 로직(HTTP/텔레그램 없음)."""
from typing import Optional
from .db import get_watchlist, get_all_portfolio
def latest_screener_candidates(conn) -> list:
row = conn.execute(
"SELECT id FROM screener_runs WHERE status='success' ORDER BY asof DESC, id DESC LIMIT 1"
).fetchone()
if not row:
return []
run_id = row[0]
rows = conn.execute(
"SELECT ticker,name FROM screener_results WHERE run_id=? ORDER BY rank", (run_id,)
).fetchall()
return [{"ticker": r[0], "name": r[1]} for r in rows]
def holding_high(conn, ticker: str, lookback_days: int = 60) -> Optional[float]:
row = conn.execute(
"SELECT MAX(high) FROM krx_daily_prices WHERE ticker=? "
"AND date >= date('now', ?)",
(ticker, f"-{int(lookback_days)} days"),
).fetchone()
return row[0] if row and row[0] is not None else None
def build_monitor_set(conn, session: str, exit_params: dict, buy_params: dict) -> dict:
from datetime import datetime, timezone, timedelta
kst = timezone(timedelta(hours=9))
watch = {w["ticker"]: {"ticker": w["ticker"], "name": w["name"], "source": "watch",
"params": w.get("params", {})}
for w in get_watchlist()}
for c in latest_screener_candidates(conn):
if c["ticker"] not in watch:
watch[c["ticker"]] = {"ticker": c["ticker"], "name": c["name"],
"source": "screener", "params": {}}
sell_targets = []
for p in get_all_portfolio():
if (p.get("market") or "KR") != "KR":
continue # KRX만 실시간 TA 대상(미국주 skip — 스펙 §12/제약)
t = p["ticker"]
sell_targets.append({
"ticker": t, "name": p.get("name"),
"avg_price": p.get("avg_price"), "qty": p.get("quantity"),
"holding_high": holding_high(conn, t), "params": {},
})
return {
"session": session,
"as_of": datetime.now(kst).isoformat(),
"buy_targets": list(watch.values()),
"sell_targets": sell_targets,
"buy_params": buy_params,
"exit_params": exit_params,
}
(주의: get_all_portfolio() 반환 dict의 키명(avg_price/quantity/market/name)을 stock/app/db.py:182에서 확인해 맞출 것. 다르면 매핑 조정.)
- Step 4: 통과 확인
Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_monitorset.py -q
Expected: PASS
- Step 5: 커밋
git add stock/app/trade_alerts.py stock/tests/test_trade_alerts_monitorset.py
git commit -m "feat(stock): 감시대상(monitor-set) 조립 로직"
Task 4: stock — edge diff 로직 (신규/해제/재무장, 순수 함수)
Files:
- Modify:
stock/app/trade_alerts.py(diff_firing추가) - Test:
stock/tests/test_trade_alerts_edge.py(Create)
Interfaces:
-
Produces:
def diff_firing(reported: list[dict], prev: set[tuple]) -> dict— 반환{"new":[alert...], "cleared":[key...], "seen":[key...]}. reported 각 항목{ticker,kind,condition,price,detail,name?}. key=(ticker,kind,condition).
-
Step 1: 실패 테스트 —
stock/tests/test_trade_alerts_edge.py
def test_diff_new_and_cleared_and_rearm():
from app.trade_alerts import diff_firing
reported = [{"ticker": "005930", "kind": "buy", "condition": "buy_breakout",
"price": 71500, "detail": {}}]
# 최초: prev 비어있음 → 신규
d1 = diff_firing(reported, prev=set())
assert [a["condition"] for a in d1["new"]] == ["buy_breakout"]
assert d1["cleared"] == []
# 유지: prev에 이미 있음 → 신규 없음
prev = {("005930", "buy", "buy_breakout")}
d2 = diff_firing(reported, prev=prev)
assert d2["new"] == []
# 해제: reported 비었고 prev에 있음 → cleared
d3 = diff_firing([], prev=prev)
assert d3["cleared"] == [("005930", "buy", "buy_breakout")]
# 재무장 후 재발화: prev 다시 비면 신규
d4 = diff_firing(reported, prev=set())
assert len(d4["new"]) == 1
- Step 2: 실패 확인
Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_edge.py -q
Expected: FAIL (ImportError: cannot import name 'diff_firing')
- Step 3: 구현 —
stock/app/trade_alerts.py하단에 추가
def diff_firing(reported: list, prev: set) -> dict:
cur = {}
for a in reported:
key = (a["ticker"], a["kind"], a["condition"])
cur[key] = a
cur_keys = set(cur.keys())
new_keys = cur_keys - prev
cleared = sorted(prev - cur_keys)
return {
"new": [cur[k] for k in cur_keys if k in new_keys],
"cleared": cleared,
"seen": sorted(cur_keys),
}
- Step 4: 통과 확인
Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_edge.py -q
Expected: PASS
- Step 5: 커밋
git add stock/app/trade_alerts.py stock/tests/test_trade_alerts_edge.py
git commit -m "feat(stock): edge diff(신규/해제/재무장) 순수 함수"
Task 5: stock — session 판정 + webai monitor-set 엔드포인트
Files:
- Modify:
stock/app/trade_alerts.py(current_session) - Modify:
stock/app/main.py(GET /api/webai/trade-alert/monitor-set) - Test:
stock/tests/test_trade_alerts_monitorset_api.py(Create)
Interfaces:
-
Consumes: Task3
build_monitor_set, 기존is_market_open,verify_webai_key -
Produces:
def current_session(now_kst) -> str— pre/regular/after/closed (평일+휴장 반영은 호출부에서is_market_open으로 게이팅)GET /api/webai/trade-alert/monitor-set(webai 인증) → 계약 §5.1
-
Step 1: 실패 테스트 —
stock/tests/test_trade_alerts_monitorset_api.py
import datetime as dt, pytest
from fastapi.testclient import TestClient
def test_current_session_windows():
from app.trade_alerts import current_session
d = dt.date(2026, 7, 2)
assert current_session(dt.datetime.combine(d, dt.time(8, 40))) == "pre"
assert current_session(dt.datetime.combine(d, dt.time(10, 0))) == "regular"
assert current_session(dt.datetime.combine(d, dt.time(17, 0))) == "after"
assert current_session(dt.datetime.combine(d, dt.time(20, 0))) == "closed"
@pytest.fixture
def client(monkeypatch, tmp_path):
from app import db as _db
monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db"))
_db.init_db()
monkeypatch.setenv("WEBAI_API_KEY", "k")
from app.main import app
return TestClient(app)
def test_monitor_set_requires_auth(client):
assert client.get("/api/webai/trade-alert/monitor-set").status_code == 401
def test_monitor_set_ok(client):
r = client.get("/api/webai/trade-alert/monitor-set", headers={"X-WebAI-Key": "k"})
assert r.status_code == 200
body = r.json()
assert body["session"] in ("pre", "regular", "after", "closed")
assert "buy_targets" in body and "sell_targets" in body
assert body["exit_params"]["trailing_pct"] == 0.10
- Step 2: 실패 확인
Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_monitorset_api.py -q
Expected: FAIL
- Step 3: 구현
stock/app/trade_alerts.py:
import datetime as _dt
# KST 세션 창(시:분)
_SESSIONS = [
("pre", (8, 30), (9, 0)),
("regular", (9, 0), (15, 30)),
("after", (16, 0), (18, 0)),
]
def current_session(now_kst) -> str:
t = now_kst.time()
for name, (sh, sm), (eh, em) in _SESSIONS:
if _dt.time(sh, sm) <= t < _dt.time(eh, em):
return name
return "closed"
DEFAULT_EXIT_PARAMS = {"stop_pct": 0.08, "take_pct": 0.25, "trailing_pct": 0.10}
DEFAULT_BUY_PARAMS = {"rsi_oversold": 30, "breakout_vol_mult": 1.5, "pullback_pct": 0.02}
stock/app/main.py (webai 섹션):
from .trade_alerts import (
build_monitor_set, current_session, DEFAULT_EXIT_PARAMS, DEFAULT_BUY_PARAMS,
)
@app.get("/api/webai/trade-alert/monitor-set", dependencies=[Depends(verify_webai_key)])
def get_trade_alert_monitor_set():
from datetime import datetime, timezone, timedelta, date as _date
kst = timezone(timedelta(hours=9))
now_kst = datetime.now(kst)
session = current_session(now_kst)
if not is_market_open(now_kst.date()):
session = "closed"
with _conn() as c:
return build_monitor_set(c, session, DEFAULT_EXIT_PARAMS, DEFAULT_BUY_PARAMS)
(주의: _conn은 stock main.py에서 db._conn 사용 형태 확인 — 기존 엔드포인트가 with _conn() as c: 쓰는지 보고 동일 패턴. 없으면 from .db import _conn.)
- Step 4: 통과 확인
Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_monitorset_api.py -q
Expected: PASS
- Step 5: 커밋
git add stock/app/trade_alerts.py stock/app/main.py stock/tests/test_trade_alerts_monitorset_api.py
git commit -m "feat(stock): session 판정 + webai monitor-set 엔드포인트"
Task 6: stock — webai report 엔드포인트 (edge diff → agent-office push → 상태/이력)
Files:
- Modify:
stock/app/main.py(POST /api/webai/trade-alert/report) - Modify:
stock/app/trade_alerts.py(notify_agent_officehttpx 헬퍼) - Test:
stock/tests/test_trade_alerts_report_api.py(Create)
Interfaces:
-
Consumes: Task1
get_alert_state_firing/set_alert_firing/touch_alert_seen/add_alert_history, Task4diff_firing -
Produces (계약 §5.2):
POST /api/webai/trade-alert/reportbody{as_of, firing:[...]}→{"new_alerts":N,"cleared":M}- agent-office로 신규 alert push (
AGENT_OFFICE_URLenv, 기본http://agent-office:8000), 전송 성공 시에만set_alert_firing(True)+add_alert_history
-
Step 1: 실패 테스트 —
stock/tests/test_trade_alerts_report_api.py
import pytest
from unittest.mock import patch
from fastapi.testclient import TestClient
@pytest.fixture
def client(monkeypatch, tmp_path):
from app import db as _db
monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db"))
_db.init_db()
monkeypatch.setenv("WEBAI_API_KEY", "k")
from app.main import app
return TestClient(app)
def _report(client, firing):
return client.post("/api/webai/trade-alert/report",
headers={"X-WebAI-Key": "k"},
json={"as_of": "2026-07-02T09:01:00+09:00", "firing": firing})
def test_report_new_edge_sends_and_persists(client):
firing = [{"ticker": "005930", "name": "삼성전자", "kind": "buy",
"condition": "buy_breakout", "price": 71500, "detail": {"vol": 2.0}}]
with patch("app.trade_alerts.notify_agent_office", return_value=True) as m:
r1 = _report(client, firing)
assert r1.json()["new_alerts"] == 1
assert m.called
# 2번째 동일 firing → 유지, 신규 0
with patch("app.trade_alerts.notify_agent_office", return_value=True):
r2 = _report(client, firing)
assert r2.json()["new_alerts"] == 0
# 이력 1건
assert len(client.get("/api/stock/trade-alerts?days=1").json()["alerts"]) == 1
def test_report_send_failure_does_not_persist(client):
firing = [{"ticker": "005930", "name": "삼성전자", "kind": "buy",
"condition": "buy_breakout", "price": 71500, "detail": {}}]
with patch("app.trade_alerts.notify_agent_office", return_value=False):
r = _report(client, firing)
assert r.json()["new_alerts"] == 0 # 전송 실패 → 미채택
# 다음 사이클(전송 성공) 재시도되어 알림
with patch("app.trade_alerts.notify_agent_office", return_value=True):
r2 = _report(client, firing)
assert r2.json()["new_alerts"] == 1
def test_report_cleared_rearm(client):
firing = [{"ticker": "005930", "name": "삼성", "kind": "buy",
"condition": "buy_breakout", "price": 71500, "detail": {}}]
with patch("app.trade_alerts.notify_agent_office", return_value=True):
_report(client, firing)
_report(client, []) # 해제
r = _report(client, firing) # 재발화
assert r.json()["new_alerts"] == 1
- Step 2: 실패 확인
Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_report_api.py -q
Expected: FAIL (404)
- Step 3: 구현
stock/app/trade_alerts.py:
import os
import httpx
def notify_agent_office(alerts: list) -> bool:
"""신규 alert들을 agent-office로 push. 성공 True."""
url = os.getenv("AGENT_OFFICE_URL", "http://agent-office:8000") + "/api/agent-office/stock/trade-alert"
try:
with httpx.Client(timeout=10) as c:
resp = c.post(url, json={"alerts": alerts})
return resp.status_code == 200
except httpx.HTTPError:
return False
stock/app/main.py:
from .trade_alerts import diff_firing, notify_agent_office
from .db import (
get_alert_state_firing, set_alert_firing, touch_alert_seen, add_alert_history,
)
class TradeAlertReport(BaseModel):
as_of: str | None = None
firing: list[dict] = []
@app.post("/api/webai/trade-alert/report", dependencies=[Depends(verify_webai_key)])
def post_trade_alert_report(body: TradeAlertReport):
prev = get_alert_state_firing()
d = diff_firing(body.firing, prev)
sent = 0
for a in d["new"]:
ok = notify_agent_office([a])
if ok:
set_alert_firing(a["ticker"], a["kind"], a["condition"], firing=True, at_iso=body.as_of)
add_alert_history(a["ticker"], a.get("name"), a["kind"], a["condition"],
a.get("price"), a.get("detail", {}))
sent += 1
for key in d["cleared"]:
set_alert_firing(key[0], key[1], key[2], firing=False)
touch_alert_seen(d["seen"], body.as_of or "")
return {"new_alerts": sent, "cleared": len(d["cleared"])}
- Step 4: 통과 확인
Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/tests/test_trade_alerts_report_api.py -q
Expected: PASS (3 passed)
- Step 5: 커밋
git add stock/app/main.py stock/app/trade_alerts.py stock/tests/test_trade_alerts_report_api.py
git commit -m "feat(stock): webai report — edge diff→agent-office push→상태/이력(전송성공시만)"
Task 7: agent-office — trade-alert notify 엔드포인트 (텔레그램 너+아내)
Files:
- Modify:
agent-office/app/main.py(POST /api/agent-office/stock/trade-alert) - Create:
agent-office/app/notifiers/telegram_trade.py(포맷+전송) - Test:
agent-office/tests/test_trade_alert_notify.py(Create)
Interfaces:
-
Consumes:
telegram.messaging.send_raw,config.TELEGRAM_CHAT_ID,config.TELEGRAM_WIFE_CHAT_ID -
Produces:
format_trade_alert(alert: dict) -> str(HTML)async def send_trade_alerts(alerts: list) -> dict— 너+아내 각각 send_raw.{"sent": n, "ok": bool}POST /api/agent-office/stock/trade-alertbody{alerts:[...]}→ 위 호출 +add_log
-
Step 1: 실패 테스트 —
agent-office/tests/test_trade_alert_notify.py
(파일 상단은 기존 test_youtube_publisher_retry.py의 _TMP/_init_db 픽스처 패턴 복사 — monkeypatch.setattr(app.db, "DB_PATH", _TMP) + -wal/-shm 삭제)
import pytest
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_send_trade_alerts_to_user_and_wife():
from app.notifiers import telegram_trade
alerts = [{"ticker": "005930", "name": "삼성전자", "kind": "buy",
"condition": "buy_breakout", "price": 71500, "detail": {}}]
with patch("app.notifiers.telegram_trade.send_raw",
new=AsyncMock(return_value={"ok": True})) as m, \
patch("app.notifiers.telegram_trade.TELEGRAM_CHAT_ID", "U"), \
patch("app.notifiers.telegram_trade.TELEGRAM_WIFE_CHAT_ID", "W"):
res = await telegram_trade.send_trade_alerts(alerts)
assert res["ok"] is True
chat_ids = {c.kwargs.get("chat_id") for c in m.await_args_list}
assert chat_ids == {"U", "W"} # 둘 다 발송
@pytest.mark.asyncio
async def test_format_trade_alert_has_direction():
from app.notifiers.telegram_trade import format_trade_alert
txt = format_trade_alert({"ticker": "005930", "name": "삼성전자", "kind": "sell",
"condition": "sell_stop_loss", "price": 60000, "detail": {}})
assert "매도" in txt and "삼성전자" in txt
- Step 2: 실패 확인
Run: cd agent-office && python -m pytest tests/test_trade_alert_notify.py -q
Expected: FAIL (ModuleNotFoundError: app.notifiers.telegram_trade)
- Step 3: 구현 —
agent-office/app/notifiers/telegram_trade.py
"""매매 알람 텔레그램 포맷+전송 (너+아내)."""
from ..telegram.messaging import send_raw
from ..config import TELEGRAM_CHAT_ID, TELEGRAM_WIFE_CHAT_ID
_KIND_LABEL = {"buy": "🟢 매수", "sell": "🔴 매도"}
_COND_LABEL = {
"buy_ma20_pullback": "지지선 되돌림", "buy_breakout": "돌파", "buy_rsi_bounce": "RSI 과매도 반등",
"sell_stop_loss": "손절", "sell_ma_break": "이평 이탈", "sell_take_profit": "익절",
"sell_climax": "급등 소진", "sell_trailing_stop": "트레일링 스톱",
}
def format_trade_alert(a: dict) -> str:
kind = _KIND_LABEL.get(a["kind"], a["kind"])
cond = _COND_LABEL.get(a["condition"], a["condition"])
name = a.get("name") or a["ticker"]
price = a.get("price")
price_s = f"{int(price):,}원" if price else "-"
return f"{kind} 알람\n<b>{name}</b> ({a['ticker']})\n조건: {cond}\n현재가: {price_s}"
async def send_trade_alerts(alerts: list) -> dict:
sent = 0
all_ok = True
chat_ids = [c for c in (TELEGRAM_CHAT_ID, TELEGRAM_WIFE_CHAT_ID) if c]
for a in alerts:
text = format_trade_alert(a)
for cid in chat_ids:
r = await send_raw(text=text, chat_id=cid)
if r.get("ok"):
sent += 1
else:
all_ok = False
return {"sent": sent, "ok": all_ok}
agent-office/app/main.py:
class TradeAlertBody(BaseModel):
alerts: list[dict] = []
@app.post("/api/agent-office/stock/trade-alert")
async def stock_trade_alert(body: TradeAlertBody):
from .notifiers.telegram_trade import send_trade_alerts
res = await send_trade_alerts(body.alerts)
for a in body.alerts:
add_log("stock", f"매매알람 {a.get('kind')} {a.get('ticker')} {a.get('condition')}", "info")
return res
(add_log import 확인 — main.py 상단에 이미 있을 것. 없으면 from .db import add_log.)
- Step 4: 통과 확인
Run: cd agent-office && python -m pytest tests/test_trade_alert_notify.py -q
Expected: PASS
- Step 5: 커밋
git add agent-office/app/notifiers/telegram_trade.py agent-office/app/main.py agent-office/tests/test_trade_alert_notify.py
git commit -m "feat(agent-office): 매매알람 텔레그램 notify(너+아내) 엔드포인트"
Task 8: agent-office — 봇 명령 /watch /unwatch /watchlist
Files:
- Modify:
agent-office/app/service_proxy.py(stock watchlist CRUD 호출 헬퍼) - Modify:
agent-office/app/telegram/webhook.py(_handle_message에 /watch류 처리) - Test:
agent-office/tests/test_watch_commands.py(Create)
Interfaces:
-
Consumes: 기존
service_proxyHTTP 패턴, stockPOST/DELETE/GET /api/stock/watchlist -
Produces:
service_proxy.watchlist_add(ticker) / watchlist_remove(ticker) / watchlist_list()- webhook
_handle_message가/watch <t>,/unwatch <t>,/watchlist를 인식해 텔레그램 응답
-
Step 1: 실패 테스트 —
agent-office/tests/test_watch_commands.py
import pytest
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_watch_command_calls_add():
from app.telegram import webhook
msg = {"chat": {"id": 1}, "text": "/watch 005930"}
with patch("app.telegram.webhook.service_proxy.watchlist_add",
new=AsyncMock(return_value={"ok": True})) as m, \
patch("app.telegram.webhook.api_call", new=AsyncMock(return_value={"ok": True})):
handled = await webhook.handle_watch_command(msg)
assert handled is True
m.assert_awaited_once_with("005930")
@pytest.mark.asyncio
async def test_non_watch_text_ignored():
from app.telegram import webhook
msg = {"chat": {"id": 1}, "text": "안녕"}
assert await webhook.handle_watch_command(msg) is False
- Step 2: 실패 확인
Run: cd agent-office && python -m pytest tests/test_watch_commands.py -q
Expected: FAIL (AttributeError: handle_watch_command)
- Step 3: 구현
agent-office/app/service_proxy.py (기존 httpx 패턴 따라 — STOCK_URL 사용):
async def watchlist_add(ticker: str) -> dict:
return await _post(f"{STOCK_URL}/api/stock/watchlist", {"ticker": ticker})
async def watchlist_remove(ticker: str) -> dict:
return await _delete(f"{STOCK_URL}/api/stock/watchlist/{ticker}")
async def watchlist_list() -> dict:
return await _get(f"{STOCK_URL}/api/stock/watchlist")
(_post/_get/_delete·STOCK_URL은 service_proxy.py 기존 헬퍼 확인해 맞춤. 없으면 httpx로 직접.)
agent-office/app/telegram/webhook.py (상단 from .. import service_proxy, from .client import api_call):
async def handle_watch_command(message: dict) -> bool:
"""/watch /unwatch /watchlist 처리. 처리했으면 True."""
text = (message.get("text") or "").strip()
chat_id = message.get("chat", {}).get("id")
parts = text.split()
cmd = parts[0].lower() if parts else ""
if cmd == "/watch" and len(parts) >= 2:
await service_proxy.watchlist_add(parts[1])
reply = f"관심종목 추가: {parts[1]}"
elif cmd == "/unwatch" and len(parts) >= 2:
await service_proxy.watchlist_remove(parts[1])
reply = f"관심종목 삭제: {parts[1]}"
elif cmd == "/watchlist":
res = await service_proxy.watchlist_list()
items = res.get("watchlist", [])
reply = "관심종목:\n" + ("\n".join(f"- {w['name'] or ''} ({w['ticker']})" for w in items) or "(없음)")
else:
return False
await api_call("sendMessage", {"chat_id": chat_id, "text": reply})
return True
handle_webhook의 message 분기(현재 line 26 근처)에서 slash 명령보다 먼저 시도:
if message and message.get("text"):
if await handle_watch_command(message):
return None
if message and message.get("text") and agent_dispatcher is not None:
return await _handle_message(message, agent_dispatcher)
- Step 4: 통과 확인
Run: cd agent-office && python -m pytest tests/test_watch_commands.py -q
Expected: PASS
- Step 5: 커밋
git add agent-office/app/service_proxy.py agent-office/app/telegram/webhook.py agent-office/tests/test_watch_commands.py
git commit -m "feat(agent-office): /watch /unwatch /watchlist 봇 명령"
Task 9: 회귀 + 메모리/카탈로그 갱신 + 배포
Files:
-
Modify:
CLAUDE.md(§9 stock/agent-office 엔드포인트 표에 신규 항목) -
Modify: 메모리
service_stock.md,service_agent_office.md,infra_distributed_workers.md(trade-monitor 워커 계약) -
Step 1: stock 전체 회귀
Run: PYTHONPATH="$PWD:$PWD/stock" python -m pytest stock/ -q
Expected: 기존 149 + 신규 통과, 0 fail
- Step 2: agent-office 전체 회귀
Run: cd agent-office && python -m pytest -q
Expected: 기존 140 + 신규 통과, 0 fail
-
Step 3: CLAUDE.md + 메모리 갱신
-
CLAUDE.md §9 stock:
GET/POST/DELETE /api/stock/watchlist,GET /api/stock/trade-alerts,GET/POST /api/webai/trade-alert/{monitor-set,report}. -
CLAUDE.md §9 agent-office:
POST /api/agent-office/stock/trade-alert+ 봇 명령/watch류. -
service_stock.md: 신규 3테이블 + trade_alerts.py + AGENT_OFFICE_URL env. -
service_agent_office.md: trade-alert notify + 봇 명령 + telegram_trade.py. -
infra_distributed_workers.md:trade-monitor워커 계약(monitor-set/report/heartbeat) 추가. -
Step 4: 커밋 + 배포
git add CLAUDE.md
git commit -m "docs(CLAUDE.md): 매매알람 엔드포인트 카탈로그 등재"
# nas-deploy 락 획득 후 push (자동배포)
git push origin main
- Step 5: 계약 핸드오프
co-gahusb 팀버스로 AI 세션(web-ai trade-monitor 워커) + FE 세션(web-ui 관심종목 탭)에 스펙 §5 계약 전달. WEBAI_API_KEY·AGENT_OFFICE_URL env 운영 주입 확인.
Self-Review
Spec coverage:
- §2 요구사항 6종 → watchlist(T1-2), monitor-set 조립 buy=watchlist∪screener·sell=보유(T3), edge dedup(T4,6), session/시간외(T5), 텔레그램 너+아내(T7), 봇 명령(T8). ✓
- §4 DB 3테이블 → T1. §5 계약 4종 → monitor-set(T5)·report(T6)·notify(T7)·watchlist CRUD(T2)·heartbeat(워커측, BE 범위 밖 명시). ✓
- §6 조건 로직 = 워커(web-ai) 담당 → BE 계획 범위 밖(계약만). ✓
- §7 edge 흐름 → T6. §8 세션 게이팅 → T5. §9 에러(전송성공시만) → T6 test. §10 테스트 → 각 Task TDD. ✓
- 트레일링 스톱 holding_high → T3
holding_high(lookback 60d). ✓
Placeholder scan: 각 Step에 실제 코드/명령/기대출력 포함. "기존 확인" 주석은 시그니처 매칭 지시(placeholder 아님). ✓
Type consistency: 조건 키 문자열(buy_breakout 등) 전 Task 동일. diff_firing 반환 {new,cleared,seen} T4정의→T6소비 일치. notify_agent_office/send_trade_alerts 시그니처 T6/T7 일치. ✓
범위: BE(web-backend)만. 워커(web-ai)·탭(web-ui)은 계약 정의 후 각 세션(스펙 §11). 단일 실행 계획으로 적정.