Files
ai-trade/services/trade-monitor/PLAN.md

51 KiB

trade-monitor 워커 구현 계획

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: 실시간 매매 알람 스펙(§5·§6)의 Windows-side trade-monitor 워커를 구현한다 — 60초 루프로 NAS monitor-set을 조회하고 KIS 시세로 8개 매수/매도 조건을 평가해 firing 집합을 report하며 Redis heartbeat를 발신.

Architecture: FastAPI + asyncio 루프 워커(형제 task-watcher/image-render 관례). 순수 모듈(indicators/conditions)에 §6 조건 로직을 격리하고, 경계 모듈(kis_client/nas_client)에 HTTP를 격리한다. monitor.py가 오케스트레이션하고 _shared.heartbeat.heartbeat_loop을 15초 독립 태스크로 재사용한다.

Tech Stack: Python 3.12, FastAPI, httpx(async), redis.asyncio, pytest + respx. WSL2 Docker(services/docker-compose.yml).

설계 원본: services/trade-monitor/DESIGN.md · 권위 계약: web-backend docs/superpowers/specs/2026-07-02-realtime-trade-alerts-design.md.

Global Constraints

  • Python 3.12 (Dockerfile python:3.12-slim-bookworm).
  • 워커 무상태 — dedup 없음, 매 사이클 firing 전체 집합 전송(빈 배열 포함).
  • 비-KRX 티커 skip — 6자리 숫자 티커만 처리.
  • 세션 게이트는 monitor-set.session에 위임 — 워커는 KST 캘린더 재구현 안 함. session=="closed"면 KIS 호출 0.
  • KIS 토큰: 전용 TM_KIS_APP_KEY/SECRET로 자체 발급(ai_trade와 분리).
  • heartbeat 키 worker:trade-monitor:heartbeat, TTL EX45, kind trader, state ∈ {market_open, market_closed, idle}.
  • 테스트 실행: C:\Users\jaeoh\AppData\Local\Programs\Python\Python312\python.exe -m pytest services/trade-monitor/tests -q (web-ai 루트에서). 이하 python은 이 경로를 뜻함.
  • 커밋 경로: web-ai repo에서만 commit/push.
  • 모든 firing 항목 스키마: {"ticker","kind","condition","price","detail":{...}}.

File Structure

파일 책임
services/trade-monitor/config.py Settings dataclass + load_settings() (env)
services/trade-monitor/indicators.py 순수: sma, rsi_series, highest_high
services/trade-monitor/conditions.py 순수: evaluate_buy, evaluate_sell, _fire
services/trade-monitor/kis_client.py KISClient: 자체 토큰 + get_quote + get_daily_ohlcv + throttle
services/trade-monitor/nas_client.py NASClient: get_monitor_set + post_report (X-WebAI-Key)
services/trade-monitor/monitor.py MonitorState, filter_krx, _build_ctx, run_cycle, monitor_loop, make_state_fn
services/trade-monitor/main.py FastAPI lifespan(루프+heartbeat 스폰) + /health
services/trade-monitor/conftest.py services/ 루트를 sys.path에 추가
services/trade-monitor/tests/* 단위 테스트
services/trade-monitor/Dockerfile task-watcher 관례 복제(_shared COPY)
services/trade-monitor/requirements.txt, .env.example 의존성 / env 문서
services/docker-compose.yml (수정) trade-monitor 서비스(포트 18715)

Task 1: 스캐폴딩 + config

Files:

  • Create: services/trade-monitor/requirements.txt
  • Create: services/trade-monitor/conftest.py
  • Create: services/trade-monitor/tests/__init__.py
  • Create: services/trade-monitor/config.py
  • Test: services/trade-monitor/tests/test_config.py

Interfaces:

  • Produces: Settings dataclass — fields nas_base_url:str, webai_api_key:str, redis_url:str, kis_app_key:str, kis_app_secret:str, kis_account:str, kis_is_virtual:bool, loop_interval:int, climax_vol_mult:float. load_settings() -> Settings (reads env with defaults).

  • Step 1: 스캐폴딩 파일 생성

services/trade-monitor/requirements.txt:

fastapi==0.115.6
uvicorn[standard]==0.34.0
redis>=5.0
httpx>=0.27
pytest>=8.0
respx>=0.21

services/trade-monitor/conftest.py:

"""services/ 루트를 sys.path에 추가 — from _shared.heartbeat import 가능하게."""
import sys
from pathlib import Path

sys.path.insert(0, str(Path(__file__).resolve().parent.parent))

services/trade-monitor/tests/__init__.py: (빈 파일)

  • Step 2: 실패 테스트 작성services/trade-monitor/tests/test_config.py
"""Settings env 로드 — 기본값 + override."""
from config import load_settings


def test_defaults(monkeypatch):
    for k in ("NAS_BASE_URL", "WEBAI_API_KEY", "REDIS_URL", "TM_KIS_APP_KEY",
              "TM_KIS_APP_SECRET", "TM_KIS_ACCOUNT", "TM_KIS_IS_VIRTUAL",
              "TM_LOOP_INTERVAL", "TM_CLIMAX_VOL_MULT"):
        monkeypatch.delenv(k, raising=False)
    s = load_settings()
    assert s.nas_base_url == "http://192.168.45.54:18500"
    assert s.redis_url == "redis://192.168.45.54:6379"
    assert s.kis_is_virtual is False
    assert s.loop_interval == 60
    assert s.climax_vol_mult == 3.0


def test_override(monkeypatch):
    monkeypatch.setenv("TM_KIS_IS_VIRTUAL", "1")
    monkeypatch.setenv("TM_LOOP_INTERVAL", "30")
    monkeypatch.setenv("TM_CLIMAX_VOL_MULT", "2.5")
    monkeypatch.setenv("WEBAI_API_KEY", "secret")
    s = load_settings()
    assert s.kis_is_virtual is True
    assert s.loop_interval == 30
    assert s.climax_vol_mult == 2.5
    assert s.webai_api_key == "secret"
  • Step 3: 실패 확인

Run: python -m pytest services/trade-monitor/tests/test_config.py -q Expected: FAIL — ModuleNotFoundError: No module named 'config'

  • Step 4: config.py 구현
"""Settings — 환경변수 로드. TM_ 접두사로 ai_trade와 분리."""
from __future__ import annotations

import os
from dataclasses import dataclass


@dataclass
class Settings:
    nas_base_url: str
    webai_api_key: str
    redis_url: str
    kis_app_key: str
    kis_app_secret: str
    kis_account: str
    kis_is_virtual: bool
    loop_interval: int
    climax_vol_mult: float


def load_settings() -> Settings:
    return Settings(
        nas_base_url=os.getenv("NAS_BASE_URL", "http://192.168.45.54:18500"),
        webai_api_key=os.getenv("WEBAI_API_KEY", ""),
        redis_url=os.getenv("REDIS_URL", "redis://192.168.45.54:6379"),
        kis_app_key=os.getenv("TM_KIS_APP_KEY", ""),
        kis_app_secret=os.getenv("TM_KIS_APP_SECRET", ""),
        kis_account=os.getenv("TM_KIS_ACCOUNT", ""),
        kis_is_virtual=os.getenv("TM_KIS_IS_VIRTUAL", "0") == "1",
        loop_interval=int(os.getenv("TM_LOOP_INTERVAL", "60")),
        climax_vol_mult=float(os.getenv("TM_CLIMAX_VOL_MULT", "3.0")),
    )
  • Step 5: 통과 확인

Run: python -m pytest services/trade-monitor/tests/test_config.py -q Expected: PASS (2 passed)

  • Step 6: 커밋
git add services/trade-monitor/requirements.txt services/trade-monitor/conftest.py services/trade-monitor/tests/__init__.py services/trade-monitor/config.py services/trade-monitor/tests/test_config.py services/trade-monitor/DESIGN.md services/trade-monitor/PLAN.md
git commit -m "feat(trade-monitor): 스캐폴딩 + config"

Task 2: indicators (순수)

Files:

  • Create: services/trade-monitor/indicators.py
  • Test: services/trade-monitor/tests/test_indicators.py

Interfaces:

  • Produces:

    • sma(values: list[float], period: int) -> float | None — 최근 period개 평균, 부족하면 None
    • rsi_series(closes: list[float], period: int = 14) -> list[float] — Wilder RSI 시계열(closes[period:]에 정렬). 부족하면 []
    • highest_high(highs: list[float], period: int) -> float | None — 최근 period개 최댓값, 부족하면 None
  • Step 1: 실패 테스트 작성services/trade-monitor/tests/test_indicators.py

"""indicators — 순수 수치 검증."""
from indicators import sma, rsi_series, highest_high


def test_sma_basic():
    assert sma([1, 2, 3, 4, 5], 5) == 3.0
    assert sma([1, 2, 3, 4, 5], 2) == 4.5


def test_sma_insufficient():
    assert sma([1, 2], 5) is None
    assert sma([], 3) is None


def test_highest_high():
    assert highest_high([1, 9, 3, 4], 3) == 9
    assert highest_high([1, 2, 3], 3) == 3
    assert highest_high([1, 2], 3) is None


def test_rsi_all_gains_is_100():
    # 단조 증가 → 손실 0 → RSI 100
    closes = [float(i) for i in range(1, 20)]
    rs = rsi_series(closes, 14)
    assert rs, "series should not be empty"
    assert rs[-1] == 100.0


def test_rsi_insufficient():
    assert rsi_series([1, 2, 3], 14) == []


def test_rsi_known_range():
    # 등락 섞인 시계열 → RSI는 0~100 사이
    closes = [10, 11, 10.5, 11.5, 11, 12, 11.8, 12.5, 12, 13,
              12.7, 13.2, 12.9, 13.5, 13.1, 13.8]
    rs = rsi_series(closes, 14)
    assert len(rs) == len(closes) - 14
    assert all(0.0 <= v <= 100.0 for v in rs)
  • Step 2: 실패 확인

Run: python -m pytest services/trade-monitor/tests/test_indicators.py -q Expected: FAIL — ModuleNotFoundError: No module named 'indicators'

  • Step 3: indicators.py 구현
"""순수 TA 지표 — sma / rsi_series / highest_high."""
from __future__ import annotations


def sma(values: list[float], period: int) -> float | None:
    if period <= 0 or len(values) < period:
        return None
    return sum(values[-period:]) / period


def highest_high(highs: list[float], period: int) -> float | None:
    if period <= 0 or len(highs) < period:
        return None
    return max(highs[-period:])


def rsi_series(closes: list[float], period: int = 14) -> list[float]:
    """Wilder RSI. 반환 리스트는 closes[period:]에 1:1 정렬. 부족하면 []."""
    if len(closes) <= period:
        return []
    deltas = [closes[i] - closes[i - 1] for i in range(1, len(closes))]
    gains = [d if d > 0 else 0.0 for d in deltas]
    losses = [-d if d < 0 else 0.0 for d in deltas]

    def _rsi(ag: float, al: float) -> float:
        if al == 0:
            return 100.0
        rs = ag / al
        return 100.0 - 100.0 / (1.0 + rs)

    avg_gain = sum(gains[:period]) / period
    avg_loss = sum(losses[:period]) / period
    out = [_rsi(avg_gain, avg_loss)]
    for i in range(period, len(deltas)):
        avg_gain = (avg_gain * (period - 1) + gains[i]) / period
        avg_loss = (avg_loss * (period - 1) + losses[i]) / period
        out.append(_rsi(avg_gain, avg_loss))
    return out
  • Step 4: 통과 확인

Run: python -m pytest services/trade-monitor/tests/test_indicators.py -q Expected: PASS (6 passed)

  • Step 5: 커밋
git add services/trade-monitor/indicators.py services/trade-monitor/tests/test_indicators.py
git commit -m "feat(trade-monitor): 순수 지표 모듈 (sma/rsi/highest_high)"

Task 3: conditions — 매수 (순수, §6)

Files:

  • Create: services/trade-monitor/conditions.py
  • Test: services/trade-monitor/tests/test_conditions_buy.py

Interfaces:

  • Consumes: indicators.sma, indicators.rsi_series, indicators.highest_high.

  • ctx 계약(dict): ticker,name,price,day_open,today_volume,closes,highs,lows,volumes,avg_price,qty,holding_high,climax_vol_mult.

  • Produces:

    • _fire(ctx, kind, condition, price, detail) -> dict{"ticker","kind","condition","price","detail"}
    • evaluate_buy(ctx: dict, params: dict) -> list[dict] — params {rsi_oversold, breakout_vol_mult, pullback_pct}
  • Step 1: 실패 테스트 작성services/trade-monitor/tests/test_conditions_buy.py

"""evaluate_buy — 3개 매수 조건 경계."""
from conditions import evaluate_buy

BUY_PARAMS = {"rsi_oversold": 30, "breakout_vol_mult": 1.5, "pullback_pct": 0.02}


def _ctx(**over):
    base = dict(
        ticker="005930", name="삼성전자", price=100.0, day_open=99.0,
        today_volume=1000.0, closes=[], highs=[], lows=[], volumes=[],
        avg_price=None, qty=None, holding_high=None, climax_vol_mult=3.0,
    )
    base.update(over)
    return base


def _conditions(firing):
    return {f["condition"] for f in firing}


def test_ma20_pullback_fires():
    # 정배열(ma20>ma50>ma200), 최근 저가가 ma20 근처, price가 ma20 위로 반등
    closes = [90.0] * 200 + [100.0] * 20   # ma20=100, ma50/ma200 낮음 → 정배열
    lows = [90.0] * 217 + [100.5, 100.4, 100.3]  # 최근 3봉 저가 ~ma20*(1.02)=102 이하
    ctx = _ctx(price=101.0, closes=closes, highs=closes, lows=lows,
               volumes=[1.0] * len(closes))
    assert "buy_ma20_pullback" in _conditions(evaluate_buy(ctx, BUY_PARAMS))


def test_ma20_pullback_skips_when_not_aligned():
    closes = [100.0] * 200 + [90.0] * 20   # 역배열
    ctx = _ctx(price=91.0, closes=closes, highs=closes, lows=closes,
               volumes=[1.0] * len(closes))
    assert "buy_ma20_pullback" not in _conditions(evaluate_buy(ctx, BUY_PARAMS))


def test_breakout_fires():
    closes = [50.0] * 25
    highs = [60.0] * 25           # 직전 20봉 최고 60
    vols = [100.0] * 25           # avg20=100
    ctx = _ctx(price=61.0, today_volume=200.0, closes=closes, highs=highs,
               lows=closes, volumes=vols)   # 61>60, 200>1.5*100
    assert "buy_breakout" in _conditions(evaluate_buy(ctx, BUY_PARAMS))


def test_breakout_skips_on_low_volume():
    highs = [60.0] * 25
    ctx = _ctx(price=61.0, today_volume=120.0, closes=[50.0] * 25, highs=highs,
               lows=[50.0] * 25, volumes=[100.0] * 25)  # 120 < 1.5*100=150
    assert "buy_breakout" not in _conditions(evaluate_buy(ctx, BUY_PARAMS))


def test_rsi_bounce_fires():
    # 급락으로 RSI<30 찍고 반등하는 시계열
    closes = [100.0]
    for _ in range(14):
        closes.append(closes[-1] * 0.97)   # 하락 → RSI 저하
    closes.append(closes[-1] * 1.05)        # 마지막 반등
    closes.append(closes[-1] * 1.05)
    ctx = _ctx(price=closes[-1], closes=closes, highs=closes, lows=closes,
               volumes=[1.0] * len(closes))
    assert "buy_rsi_bounce" in _conditions(evaluate_buy(ctx, BUY_PARAMS))


def test_empty_series_no_fire():
    assert evaluate_buy(_ctx(), BUY_PARAMS) == []
  • Step 2: 실패 확인

Run: python -m pytest services/trade-monitor/tests/test_conditions_buy.py -q Expected: FAIL — ModuleNotFoundError: No module named 'conditions'

  • Step 3: conditions.py 구현 (매수 + _fire)
"""§6 조건 로직 (순수). ctx + params → firing 리스트."""
from __future__ import annotations

from indicators import sma, rsi_series, highest_high


def _fire(ctx: dict, kind: str, condition: str, price: float, detail: dict) -> dict:
    return {
        "ticker": ctx["ticker"], "kind": kind,
        "condition": condition, "price": price, "detail": detail,
    }


def evaluate_buy(ctx: dict, params: dict) -> list[dict]:
    price = ctx["price"]
    closes, highs, lows, vols = ctx["closes"], ctx["highs"], ctx["lows"], ctx["volumes"]
    rsi_os = params.get("rsi_oversold", 30)
    vol_mult = params.get("breakout_vol_mult", 1.5)
    pullback = params.get("pullback_pct", 0.02)
    firing: list[dict] = []

    # buy_ma20_pullback — 정배열 + ma20 근접 저가 + 반등 복귀
    ma20, ma50, ma200 = sma(closes, 20), sma(closes, 50), sma(closes, 200)
    if ma20 and ma50 and ma200 and ma20 > ma50 > ma200 and len(lows) >= 3:
        recent_low = min(lows[-3:])
        if recent_low <= ma20 * (1 + pullback) and price > ma20:
            firing.append(_fire(ctx, "buy", "buy_ma20_pullback", price, {
                "ma20": round(ma20, 1), "ma50": round(ma50, 1),
                "ma200": round(ma200, 1), "recent_low": recent_low,
            }))

    # buy_breakout — 직전 20봉 고점 돌파 + 거래량 배수
    prior_high20 = highest_high(highs, 20)
    avg_vol20 = sma(vols, 20)
    if prior_high20 and avg_vol20 and price > prior_high20 \
            and ctx["today_volume"] > vol_mult * avg_vol20:
        firing.append(_fire(ctx, "buy", "buy_breakout", price, {
            "prior_high_20": prior_high20,
            "vol_mult": round(ctx["today_volume"] / avg_vol20, 2),
            "avg_vol_20": round(avg_vol20, 0),
        }))

    # buy_rsi_bounce — RSI 과매도 후 반등 (무상태 재계산)
    rs = rsi_series(closes, 14)
    if len(rs) >= 3 and min(rs[-3:]) < rsi_os and rs[-1] > rsi_os and rs[-1] > rs[-2]:
        firing.append(_fire(ctx, "buy", "buy_rsi_bounce", price, {
            "rsi": round(rs[-1], 1), "rsi_prev": round(rs[-2], 1),
            "rsi_oversold": rsi_os,
        }))

    return firing
  • Step 4: 통과 확인

Run: python -m pytest services/trade-monitor/tests/test_conditions_buy.py -q Expected: PASS (6 passed)

  • Step 5: 커밋
git add services/trade-monitor/conditions.py services/trade-monitor/tests/test_conditions_buy.py
git commit -m "feat(trade-monitor): 매수 조건 (ma20_pullback/breakout/rsi_bounce)"

Task 4: conditions — 매도 (순수, §6)

Files:

  • Modify: services/trade-monitor/conditions.py (add evaluate_sell)
  • Test: services/trade-monitor/tests/test_conditions_sell.py

Interfaces:

  • Produces: evaluate_sell(ctx: dict, params: dict) -> list[dict] — params {stop_pct, take_pct, trailing_pct}. sell_climax 임계는 ctx["climax_vol_mult"].

  • Step 1: 실패 테스트 작성services/trade-monitor/tests/test_conditions_sell.py

"""evaluate_sell — 5개 매도 조건 경계."""
from conditions import evaluate_sell

EXIT = {"stop_pct": 0.08, "take_pct": 0.25, "trailing_pct": 0.10}


def _ctx(**over):
    base = dict(
        ticker="000660", name="SK하이닉스", price=100.0, day_open=100.0,
        today_volume=100.0, closes=[100.0] * 60, highs=[100.0] * 60,
        lows=[100.0] * 60, volumes=[100.0] * 60,
        avg_price=100.0, qty=10, holding_high=100.0, climax_vol_mult=3.0,
    )
    base.update(over)
    return base


def _c(firing):
    return {f["condition"] for f in firing}


def test_stop_loss_fires():
    ctx = _ctx(price=90.0, avg_price=100.0)   # -10% <= -8%
    assert "sell_stop_loss" in _c(evaluate_sell(ctx, EXIT))


def test_stop_loss_skips_above_threshold():
    ctx = _ctx(price=95.0, avg_price=100.0)   # -5% > -8%
    assert "sell_stop_loss" not in _c(evaluate_sell(ctx, EXIT))


def test_take_profit_fires():
    ctx = _ctx(price=130.0, avg_price=100.0)  # +30% >= 25%
    assert "sell_take_profit" in _c(evaluate_sell(ctx, EXIT))


def test_trailing_stop_fires():
    ctx = _ctx(price=89.0, holding_high=100.0)  # 89 <= 100*0.9=90
    assert "sell_trailing_stop" in _c(evaluate_sell(ctx, EXIT))


def test_ma_break_severity_high():
    # price가 ma50/ma200 아래 → severity high
    closes = [200.0] * 60
    ctx = _ctx(price=100.0, closes=closes, avg_price=100.0, holding_high=100.0)
    firing = evaluate_sell(ctx, EXIT)
    mb = [f for f in firing if f["condition"] == "sell_ma_break"]
    assert mb and mb[0]["detail"]["severity"] == "high"


def test_climax_fires():
    # 거래량 3배 이상 + 종가(현재가)<시가 반전
    ctx = _ctx(price=98.0, day_open=100.0, today_volume=400.0,
               volumes=[100.0] * 60)   # 400 >= 3*100, 98<100
    assert "sell_climax" in _c(evaluate_sell(ctx, EXIT))


def test_climax_skips_when_not_reversal():
    ctx = _ctx(price=101.0, day_open=100.0, today_volume=400.0,
               volumes=[100.0] * 60)   # 상승 마감 → 반전 아님
    assert "sell_climax" not in _c(evaluate_sell(ctx, EXIT))


def test_no_avg_no_pnl_conditions():
    # avg_price None(보유정보 없음) → stop/take 미발화
    ctx = _ctx(price=50.0, avg_price=None, holding_high=None,
               closes=[100.0] * 60)
    conds = _c(evaluate_sell(ctx, EXIT))
    assert "sell_stop_loss" not in conds and "sell_take_profit" not in conds
  • Step 2: 실패 확인

Run: python -m pytest services/trade-monitor/tests/test_conditions_sell.py -q Expected: FAIL — ImportError: cannot import name 'evaluate_sell'

  • Step 3: conditions.py에 evaluate_sell 추가
def evaluate_sell(ctx: dict, params: dict) -> list[dict]:
    price = ctx["price"]
    avg = ctx.get("avg_price")
    hh = ctx.get("holding_high")
    closes, vols = ctx["closes"], ctx["volumes"]
    stop = params.get("stop_pct", 0.08)
    take = params.get("take_pct", 0.25)
    trail = params.get("trailing_pct", 0.10)
    climax_mult = ctx.get("climax_vol_mult", 3.0)
    firing: list[dict] = []

    if avg:
        pnl = (price - avg) / avg
        if pnl <= -stop:
            firing.append(_fire(ctx, "sell", "sell_stop_loss", price, {
                "avg_price": avg, "pnl_pct": round(pnl, 4), "stop_pct": stop}))
        if pnl >= take:
            firing.append(_fire(ctx, "sell", "sell_take_profit", price, {
                "avg_price": avg, "pnl_pct": round(pnl, 4), "take_pct": take}))

    if hh and price <= hh * (1 - trail):
        firing.append(_fire(ctx, "sell", "sell_trailing_stop", price, {
            "holding_high": hh, "trailing_pct": trail,
            "drawdown_pct": round((price - hh) / hh, 4)}))

    ma50, ma200 = sma(closes, 50), sma(closes, 200)
    if ma50 and price < ma50:
        severity = "high" if (ma200 and price < ma200) else "normal"
        firing.append(_fire(ctx, "sell", "sell_ma_break", price, {
            "ma50": round(ma50, 1),
            "ma200": round(ma200, 1) if ma200 else None,
            "severity": severity}))

    # sell_climax — 휴리스틱(추후 holdings_intel 정합): 거래량 급증 + 반전 캔들
    avg_vol20 = sma(vols, 20)
    if avg_vol20 and ctx["today_volume"] >= climax_mult * avg_vol20 \
            and price < ctx["day_open"]:
        firing.append(_fire(ctx, "sell", "sell_climax", price, {
            "vol_mult": round(ctx["today_volume"] / avg_vol20, 2),
            "day_open": ctx["day_open"]}))  # TODO: holdings_intel 대조

    return firing
  • Step 4: 통과 확인

Run: python -m pytest services/trade-monitor/tests/test_conditions_sell.py -q Expected: PASS (8 passed)

  • Step 5: 커밋
git add services/trade-monitor/conditions.py services/trade-monitor/tests/test_conditions_sell.py
git commit -m "feat(trade-monitor): 매도 조건 (stop/take/trailing/ma_break/climax)"

Task 5: kis_client (자체 토큰)

Files:

  • Create: services/trade-monitor/kis_client.py
  • Test: services/trade-monitor/tests/test_kis_client.py

Interfaces:

  • Produces KISClient:

    • __init__(app_key, app_secret, account, is_virtual, timeout=10.0)
    • async _issue_token() -> str (메모리 캐시, 만료 10분 전 재발급)
    • async get_quote(ticker: str) -> dict{"price":int,"day_open":int,"today_volume":int,"as_of":str}
    • async get_daily_ohlcv(ticker: str, days: int = 250) -> list[dict][{"datetime","open","high","low","close","volume"}] 오름차순
    • async close()
  • Step 1: 실패 테스트 작성services/trade-monitor/tests/test_kis_client.py

"""KISClient — 토큰 발급/캐시 + quote/daily 파싱 (respx)."""
import httpx
import pytest
import respx

from kis_client import KISClient

BASE = "https://openapi.koreainvestment.com:9443"


def _client():
    return KISClient("APPKEY", "APPSECRET", "12345678-01", is_virtual=False)


@pytest.mark.asyncio
@respx.mock
async def test_issue_token_cached():
    route = respx.post(f"{BASE}/oauth2/tokenP").mock(
        return_value=httpx.Response(200, json={"access_token": "TKN", "expires_in": 86400}))
    c = _client()
    t1 = await c._issue_token()
    t2 = await c._issue_token()
    assert t1 == "TKN" and t2 == "TKN"
    assert route.call_count == 1   # 캐시 → 1회만 발급
    await c.close()


@pytest.mark.asyncio
@respx.mock
async def test_get_quote_parses():
    respx.post(f"{BASE}/oauth2/tokenP").mock(
        return_value=httpx.Response(200, json={"access_token": "TKN", "expires_in": 86400}))
    respx.get(f"{BASE}/uapi/domestic-stock/v1/quotations/inquire-price").mock(
        return_value=httpx.Response(200, json={"output": {
            "stck_prpr": "71500", "stck_oprc": "71000", "acml_vol": "1234567"}}))
    c = _client()
    q = await c.get_quote("005930")
    assert q["price"] == 71500 and q["day_open"] == 71000 and q["today_volume"] == 1234567
    await c.close()


@pytest.mark.asyncio
@respx.mock
async def test_get_daily_ascending():
    respx.post(f"{BASE}/oauth2/tokenP").mock(
        return_value=httpx.Response(200, json={"access_token": "TKN", "expires_in": 86400}))
    # KIS는 내림차순 반환 → 오름차순으로 뒤집혀야 함
    respx.get(f"{BASE}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice").mock(
        return_value=httpx.Response(200, json={"output2": [
            {"stck_bsop_date": "20260702", "stck_oprc": "100", "stck_hgpr": "110",
             "stck_lwpr": "90", "stck_clpr": "105", "acml_vol": "5"},
            {"stck_bsop_date": "20260701", "stck_oprc": "95", "stck_hgpr": "102",
             "stck_lwpr": "94", "stck_clpr": "100", "acml_vol": "4"}]}))
    c = _client()
    bars = await c.get_daily_ohlcv("005930", days=250)
    assert bars[0]["datetime"] == "2026-07-01"
    assert bars[-1]["close"] == 105
    await c.close()
  • Step 2: 실패 확인

Run: python -m pytest services/trade-monitor/tests/test_kis_client.py -q Expected: FAIL — ModuleNotFoundError: No module named 'kis_client'

  • Step 3: kis_client.py 구현
"""KIS REST client — 자체 OAuth 토큰(TM_KIS_*) + quote + 일봉 + throttle."""
from __future__ import annotations

import asyncio
import logging
import time
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo

import httpx

logger = logging.getLogger(__name__)
KST = ZoneInfo("Asia/Seoul")

_MAX_ATTEMPTS = 3
_THROTTLE_INTERVAL = 0.5  # 초당 2회
_TOKEN_MARGIN = 600       # 만료 10분 전 재발급


class KISClient:
    def __init__(self, app_key, app_secret, account, is_virtual, timeout: float = 10.0):
        self._app_key = app_key
        self._app_secret = app_secret
        self._account = account
        self._base_url = (
            "https://openapivts.koreainvestment.com:29443" if is_virtual
            else "https://openapi.koreainvestment.com:9443"
        )
        self._client = httpx.AsyncClient(timeout=timeout)
        self._token: str | None = None
        self._token_exp: float = 0.0
        self._last_throttle_at = 0.0
        self._throttle_lock = asyncio.Lock()
        self._token_lock = asyncio.Lock()

    async def close(self) -> None:
        await self._client.aclose()

    async def _issue_token(self) -> str:
        async with self._token_lock:
            now = time.time()
            if self._token and now < self._token_exp - _TOKEN_MARGIN:
                return self._token
            r = await self._client.post(
                f"{self._base_url}/oauth2/tokenP",
                json={"grant_type": "client_credentials",
                      "appkey": self._app_key, "appsecret": self._app_secret},
            )
            r.raise_for_status()
            data = r.json()
            self._token = data["access_token"]
            self._token_exp = now + int(data.get("expires_in", 86400))
            return self._token

    async def _throttle(self) -> None:
        async with self._throttle_lock:
            elapsed = time.monotonic() - self._last_throttle_at
            if elapsed < _THROTTLE_INTERVAL:
                await asyncio.sleep(_THROTTLE_INTERVAL - elapsed)
            self._last_throttle_at = time.monotonic()

    async def _request(self, method: str, path: str, tr_id: str, **kwargs) -> dict:
        token = await self._issue_token()
        headers = {
            "authorization": f"Bearer {token}",
            "appkey": self._app_key, "appsecret": self._app_secret,
            "tr_id": tr_id, "custtype": "P",
        }
        url = f"{self._base_url}{path}"
        for attempt in range(_MAX_ATTEMPTS):
            await self._throttle()
            try:
                resp = await self._client.request(method, url, headers=headers, **kwargs)
                if resp.status_code == 429 and attempt < _MAX_ATTEMPTS - 1:
                    await asyncio.sleep(2 ** attempt)
                    continue
                resp.raise_for_status()
                return resp.json()
            except httpx.TimeoutException:
                if attempt < _MAX_ATTEMPTS - 1:
                    await asyncio.sleep(2 ** attempt)
                    continue
                raise
        raise RuntimeError("retry exhausted")

    async def get_quote(self, ticker: str) -> dict:
        raw = await self._request(
            "GET", "/uapi/domestic-stock/v1/quotations/inquire-price",
            tr_id="FHKST01010100",
            params={"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": ticker},
        )
        o = raw.get("output", {})
        return {
            "price": int(o["stck_prpr"]),
            "day_open": int(o["stck_oprc"]),
            "today_volume": int(o["acml_vol"]),
            "as_of": datetime.now(KST).isoformat(),
        }

    async def get_daily_ohlcv(self, ticker: str, days: int = 250) -> list[dict]:
        today = datetime.now(KST).strftime("%Y%m%d")
        start = (datetime.now(KST) - timedelta(days=days * 2)).strftime("%Y%m%d")
        raw = await self._request(
            "GET", "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice",
            tr_id="FHKST03010100",
            params={"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": ticker,
                    "FID_INPUT_DATE_1": start, "FID_INPUT_DATE_2": today,
                    "FID_PERIOD_DIV_CODE": "D", "FID_ORG_ADJ_PRC": "1"},
        )
        bars = []
        for row in raw.get("output2", []):
            try:
                d = row["stck_bsop_date"]
                bars.append({
                    "datetime": f"{d[:4]}-{d[4:6]}-{d[6:]}",
                    "open": int(row["stck_oprc"]), "high": int(row["stck_hgpr"]),
                    "low": int(row["stck_lwpr"]), "close": int(row["stck_clpr"]),
                    "volume": int(row["acml_vol"]),
                })
            except (KeyError, ValueError):
                continue
        bars.reverse()
        return bars[-days:]
  • Step 4: 통과 확인

Run: python -m pytest services/trade-monitor/tests/test_kis_client.py -q Expected: PASS (3 passed)

respx/pytest-asyncio 관련: asyncio_mode 필요 시 pytest.ini가 아니라 각 테스트에 @pytest.mark.asyncio를 명시(위 테스트대로). pytest-asyncio 미설치면 pip install pytest-asynciorequirements.txt에 추가하고 커밋에 포함.

  • Step 5: 커밋
git add services/trade-monitor/kis_client.py services/trade-monitor/tests/test_kis_client.py services/trade-monitor/requirements.txt
git commit -m "feat(trade-monitor): KIS 자체 토큰 + quote + 일봉 클라이언트"

Task 6: nas_client (X-WebAI-Key)

Files:

  • Create: services/trade-monitor/nas_client.py
  • Test: services/trade-monitor/tests/test_nas_client.py

Interfaces:

  • Produces NASClient:

    • __init__(base_url, api_key, timeout=10.0)
    • async get_monitor_set() -> dict (GET /api/webai/trade-alert/monitor-set)
    • async post_report(as_of: str, firing: list[dict]) -> dict (POST /api/webai/trade-alert/report)
    • async close()
  • Step 1: 실패 테스트 작성services/trade-monitor/tests/test_nas_client.py

"""NASClient — monitor-set/report + X-WebAI-Key (respx)."""
import httpx
import pytest
import respx

from nas_client import NASClient

BASE = "http://nas.test"


@pytest.mark.asyncio
@respx.mock
async def test_get_monitor_set_sends_key():
    route = respx.get(f"{BASE}/api/webai/trade-alert/monitor-set").mock(
        return_value=httpx.Response(200, json={"session": "regular", "buy_targets": []}))
    c = NASClient(BASE, "KEY")
    ms = await c.get_monitor_set()
    assert ms["session"] == "regular"
    assert route.calls.last.request.headers["X-WebAI-Key"] == "KEY"
    await c.close()


@pytest.mark.asyncio
@respx.mock
async def test_post_report_payload():
    captured = {}

    def _resp(request):
        import json as _j
        captured.update(_j.loads(request.content))
        return httpx.Response(200, json={"new_alerts": 1, "cleared": 0})

    respx.post(f"{BASE}/api/webai/trade-alert/report").mock(side_effect=_resp)
    c = NASClient(BASE, "KEY")
    firing = [{"ticker": "005930", "kind": "buy", "condition": "buy_breakout",
               "price": 71500, "detail": {}}]
    out = await c.post_report("2026-07-02T09:01:00+09:00", firing)
    assert out["new_alerts"] == 1
    assert captured["as_of"] == "2026-07-02T09:01:00+09:00"
    assert captured["firing"] == firing
    await c.close()
  • Step 2: 실패 확인

Run: python -m pytest services/trade-monitor/tests/test_nas_client.py -q Expected: FAIL — ModuleNotFoundError: No module named 'nas_client'

  • Step 3: nas_client.py 구현
"""NAS stock 백엔드 trade-alert 계약 — X-WebAI-Key + retry."""
from __future__ import annotations

import asyncio
import logging

import httpx

logger = logging.getLogger(__name__)

_MAX_ATTEMPTS = 3
_RETRY_STATUSES = {429, 500, 502, 503, 504}


class NASClient:
    def __init__(self, base_url: str, api_key: str, timeout: float = 10.0):
        self._base_url = base_url.rstrip("/")
        self._api_key = api_key
        self._client = httpx.AsyncClient(timeout=timeout)

    async def close(self) -> None:
        await self._client.aclose()

    async def get_monitor_set(self) -> dict:
        return await self._request("GET", "/api/webai/trade-alert/monitor-set")

    async def post_report(self, as_of: str, firing: list[dict]) -> dict:
        return await self._request(
            "POST", "/api/webai/trade-alert/report",
            json={"as_of": as_of, "firing": firing})

    async def _request(self, method: str, path: str, **kwargs) -> dict:
        url = f"{self._base_url}{path}"
        headers = {"X-WebAI-Key": self._api_key}
        for attempt in range(_MAX_ATTEMPTS):
            try:
                resp = await self._client.request(method, url, headers=headers, **kwargs)
                if resp.status_code in _RETRY_STATUSES and attempt < _MAX_ATTEMPTS - 1:
                    await asyncio.sleep(2 ** attempt)
                    continue
                resp.raise_for_status()
                return resp.json()
            except httpx.TimeoutException:
                if attempt < _MAX_ATTEMPTS - 1:
                    await asyncio.sleep(2 ** attempt)
                    continue
                raise
        raise RuntimeError("retry exhausted")
  • Step 4: 통과 확인

Run: python -m pytest services/trade-monitor/tests/test_nas_client.py -q Expected: PASS (2 passed)

  • Step 5: 커밋
git add services/trade-monitor/nas_client.py services/trade-monitor/tests/test_nas_client.py
git commit -m "feat(trade-monitor): NAS trade-alert 클라이언트 (monitor-set/report)"

Task 7: monitor 오케스트레이션

Files:

  • Create: services/trade-monitor/monitor.py
  • Test: services/trade-monitor/tests/test_monitor.py

Interfaces:

  • Consumes: NASClient(get_monitor_set/post_report), KISClient(get_quote/get_daily_ohlcv), evaluate_buy/evaluate_sell, _shared.heartbeat.WorkerStats, config.Settings.

  • Produces:

    • class MonitorState.session_state:str="idle", .last_alert_at:str|None=None
    • filter_krx(targets: list[dict]) -> list[dict] — 6자리 숫자 티커만
    • async _build_ctx(kis, target: dict, settings) -> dict — ctx 조립(quote+daily)
    • async run_cycle(nas, kis, state, stats, settings) -> None — 1 사이클
    • async monitor_loop(nas, kis, state, stats, settings) -> None
    • make_state_fn(state) -> callable — heartbeat state_fn (redis, stats) -> (state_str, {"last_alert_at":...})
  • Step 1: 실패 테스트 작성services/trade-monitor/tests/test_monitor.py

"""monitor.run_cycle — 게이트/필터/조립/격리."""
import pytest

import monitor
from monitor import MonitorState, filter_krx, run_cycle
from config import load_settings
from _shared.heartbeat import WorkerStats


def test_filter_krx_keeps_only_numeric6():
    targets = [{"ticker": "005930"}, {"ticker": "AAPL"}, {"ticker": "00660"},
               {"ticker": "000660"}, {"ticker": "0059301"}]
    kept = {t["ticker"] for t in filter_krx(targets)}
    assert kept == {"005930", "000660"}


class _FakeNAS:
    def __init__(self, ms):
        self._ms = ms
        self.reported = None

    async def get_monitor_set(self):
        return self._ms

    async def post_report(self, as_of, firing):
        self.reported = {"as_of": as_of, "firing": firing}
        return {"new_alerts": len(firing), "cleared": 0}


class _FakeKIS:
    def __init__(self, price=100, fail_on=None):
        self._price = price
        self._fail_on = fail_on or set()

    async def get_quote(self, ticker):
        if ticker in self._fail_on:
            raise RuntimeError("KIS down")
        return {"price": self._price, "day_open": 99, "today_volume": 1000,
                "as_of": "x"}

    async def get_daily_ohlcv(self, ticker, days=250):
        # 정배열 + 저가 근접 → ma20_pullback 발화 유도
        return [{"open": 90, "high": 90, "low": 90, "close": 90, "volume": 1}] * 200 \
            + [{"open": 100, "high": 100, "low": 100, "close": 100, "volume": 1}] * 20


@pytest.mark.asyncio
async def test_closed_session_skips_kis(monkeypatch):
    nas = _FakeNAS({"session": "closed"})
    state, stats = MonitorState(), WorkerStats()
    await run_cycle(nas, _FakeKIS(), state, stats, load_settings())
    assert state.session_state == "market_closed"
    assert nas.reported is None   # report도 안 함


@pytest.mark.asyncio
async def test_non_krx_skipped_and_report_sent():
    nas = _FakeNAS({"session": "regular",
                    "buy_targets": [{"ticker": "AAPL", "name": "Apple"}],
                    "sell_targets": [], "buy_params": {}, "exit_params": {}})
    state, stats = MonitorState(), WorkerStats()
    await run_cycle(nas, _FakeKIS(), state, stats, load_settings())
    assert state.session_state == "market_open"
    assert nas.reported is not None
    assert nas.reported["firing"] == []   # 알파벳 티커 skip → 빈 발화


@pytest.mark.asyncio
async def test_firing_assembled_and_last_alert_set():
    nas = _FakeNAS({"session": "regular",
                    "buy_targets": [{"ticker": "005930", "name": "삼성전자"}],
                    "sell_targets": [], "buy_params": {"pullback_pct": 0.02},
                    "exit_params": {}})
    state, stats = MonitorState(), WorkerStats()
    await run_cycle(nas, _FakeKIS(price=101), state, stats, load_settings())
    conds = {f["condition"] for f in nas.reported["firing"]}
    assert "buy_ma20_pullback" in conds
    assert state.last_alert_at is not None


@pytest.mark.asyncio
async def test_per_ticker_failure_isolated():
    nas = _FakeNAS({"session": "regular",
                    "buy_targets": [{"ticker": "005930"}, {"ticker": "000660"}],
                    "sell_targets": [], "buy_params": {}, "exit_params": {}})
    state, stats = MonitorState(), WorkerStats()
    # 005930은 실패, 000660은 성공 → 루프가 죽지 않고 report 전송
    await run_cycle(nas, _FakeKIS(fail_on={"005930"}), state, stats, load_settings())
    assert nas.reported is not None
    assert state.session_state == "market_open"


@pytest.mark.asyncio
async def test_monitor_set_failure_sets_idle():
    class _BadNAS(_FakeNAS):
        async def get_monitor_set(self):
            raise RuntimeError("NAS down")

    nas = _BadNAS({})
    state, stats = MonitorState(), WorkerStats()
    await run_cycle(nas, _FakeKIS(), state, stats, load_settings())
    assert state.session_state == "idle"
    assert nas.reported is None
  • Step 2: 실패 확인

Run: python -m pytest services/trade-monitor/tests/test_monitor.py -q Expected: FAIL — ModuleNotFoundError: No module named 'monitor'

  • Step 3: monitor.py 구현
"""오케스트레이션 — monitor-set 조회 → 조건 평가 → report + heartbeat state."""
from __future__ import annotations

import asyncio
import logging
from datetime import datetime
from zoneinfo import ZoneInfo

from conditions import evaluate_buy, evaluate_sell

logger = logging.getLogger(__name__)
KST = ZoneInfo("Asia/Seoul")


class MonitorState:
    """monitor_loop가 갱신, heartbeat state_fn이 읽는 공유 상태."""
    def __init__(self):
        self.session_state = "idle"          # market_open | market_closed | idle
        self.last_alert_at: str | None = None


def filter_krx(targets: list[dict]) -> list[dict]:
    """6자리 숫자 티커(KRX)만. 알파벳 티커 skip."""
    out = []
    for t in targets:
        tk = str(t.get("ticker", ""))
        if tk.isdigit() and len(tk) == 6:
            out.append(t)
    return out


async def _build_ctx(kis, target: dict, settings) -> dict:
    ticker = target["ticker"]
    quote = await kis.get_quote(ticker)
    daily = await kis.get_daily_ohlcv(ticker, 250)
    return {
        "ticker": ticker, "name": target.get("name", ""),
        "price": quote["price"], "day_open": quote["day_open"],
        "today_volume": quote["today_volume"],
        "closes": [b["close"] for b in daily],
        "highs": [b["high"] for b in daily],
        "lows": [b["low"] for b in daily],
        "volumes": [b["volume"] for b in daily],
        "avg_price": target.get("avg_price"),
        "qty": target.get("qty"),
        "holding_high": target.get("holding_high"),
        "climax_vol_mult": settings.climax_vol_mult,
    }


async def run_cycle(nas, kis, state, stats, settings) -> None:
    try:
        ms = await nas.get_monitor_set()
    except Exception:
        logger.exception("monitor-set 조회 실패")
        state.session_state = "idle"
        stats.jobs_failed += 1
        return

    session = ms.get("session", "closed")
    if session == "closed":
        state.session_state = "market_closed"
        return

    buy_targets = filter_krx(ms.get("buy_targets", []))
    sell_targets = filter_krx(ms.get("sell_targets", []))
    buy_params = ms.get("buy_params", {})
    exit_params = ms.get("exit_params", {})

    firing: list[dict] = []
    for t in buy_targets:
        try:
            firing += evaluate_buy(await _build_ctx(kis, t, settings), buy_params)
        except Exception:
            logger.exception("buy 평가 실패 %s", t.get("ticker"))
    for t in sell_targets:
        try:
            firing += evaluate_sell(await _build_ctx(kis, t, settings), exit_params)
        except Exception:
            logger.exception("sell 평가 실패 %s", t.get("ticker"))

    as_of = datetime.now(KST).isoformat(timespec="seconds")
    if firing:
        state.last_alert_at = as_of
        logger.info("firing %d개: %s", len(firing),
                    [f"{f['ticker']}:{f['condition']}" for f in firing])
    try:
        await nas.post_report(as_of, firing)   # 빈 배열도 전송(edge clear)
    except Exception:
        logger.exception("report 전송 실패")

    state.session_state = "market_open"
    stats.jobs_done += 1
    stats.last_job_at = as_of


async def monitor_loop(nas, kis, state, stats, settings) -> None:
    logger.info("trade-monitor loop 시작 interval=%ds", settings.loop_interval)
    while True:
        try:
            await run_cycle(nas, kis, state, stats, settings)
        except asyncio.CancelledError:
            logger.info("monitor_loop cancelled")
            raise
        except Exception:
            logger.exception("monitor_loop iteration 실패")
        await asyncio.sleep(settings.loop_interval)


def make_state_fn(state):
    async def state_fn(redis, stats):
        return state.session_state, {"last_alert_at": state.last_alert_at}
    return state_fn
  • Step 4: 통과 확인

Run: python -m pytest services/trade-monitor/tests/test_monitor.py -q Expected: PASS (6 passed)

  • Step 5: 커밋
git add services/trade-monitor/monitor.py services/trade-monitor/tests/test_monitor.py
git commit -m "feat(trade-monitor): monitor 오케스트레이션 (run_cycle/loop/state_fn)"

Task 8: main.py (FastAPI lifespan + /health)

Files:

  • Create: services/trade-monitor/main.py
  • Test: services/trade-monitor/tests/test_health.py

Interfaces:

  • Consumes: config.load_settings, NASClient, KISClient, monitor.*, _shared.heartbeat.heartbeat_loop, _shared.heartbeat.WorkerStats.

  • Produces: FastAPI app with /health{"ok": True, "service": "trade-monitor"}.

  • Step 1: 실패 테스트 작성services/trade-monitor/tests/test_health.py

"""/health — lifespan 미기동(직접 라우트 호출) 대신 TestClient 없이 함수 검증."""
from main import health


def test_health():
    assert health() == {"ok": True, "service": "trade-monitor"}
  • Step 2: 실패 확인

Run: python -m pytest services/trade-monitor/tests/test_health.py -q Expected: FAIL — ModuleNotFoundError: No module named 'main'

  • Step 3: main.py 구현
"""trade-monitor FastAPI entry — lifespan(monitor_loop + heartbeat_loop) + /health."""
from __future__ import annotations

import asyncio
import logging
from contextlib import asynccontextmanager

import redis.asyncio as aioredis
from fastapi import FastAPI

import monitor
from config import load_settings
from kis_client import KISClient
from nas_client import NASClient
from _shared.heartbeat import heartbeat_loop, WorkerStats

logging.basicConfig(level=logging.INFO,
                    format="%(asctime)s %(name)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)

HEARTBEAT_INTERVAL = 15   # 60초 루프 > TTL 45초 → 독립 15초 발신으로 만료갭 해소
HEARTBEAT_TTL = 45


@asynccontextmanager
async def lifespan(app: FastAPI):
    settings = load_settings()
    nas = NASClient(settings.nas_base_url, settings.webai_api_key)
    kis = KISClient(settings.kis_app_key, settings.kis_app_secret,
                    settings.kis_account, settings.kis_is_virtual)
    state = monitor.MonitorState()
    stats = WorkerStats()
    redis = aioredis.from_url(settings.redis_url, decode_responses=False)

    mon_task = asyncio.create_task(
        monitor.monitor_loop(nas, kis, state, stats, settings))
    hb_task = asyncio.create_task(heartbeat_loop(
        redis, "trade-monitor", "trader", stats,
        interval=HEARTBEAT_INTERVAL, ttl=HEARTBEAT_TTL,
        state_fn=monitor.make_state_fn(state)))
    logger.info("trade-monitor lifespan 시작")
    try:
        yield
    finally:
        for t in (mon_task, hb_task):
            t.cancel()
            try:
                await t
            except asyncio.CancelledError:
                pass
        await kis.close()
        await nas.close()
        await redis.aclose()
        logger.info("trade-monitor lifespan 종료")


app = FastAPI(lifespan=lifespan)


@app.get("/health")
def health():
    return {"ok": True, "service": "trade-monitor"}
  • Step 4: 통과 확인 + 전체 스위트

Run: python -m pytest services/trade-monitor/tests -q Expected: PASS (전체 통과 — config 2 + indicators 6 + buy 6 + sell 8 + kis 3 + nas 2 + monitor 6 + health 1)

  • Step 5: 커밋
git add services/trade-monitor/main.py services/trade-monitor/tests/test_health.py
git commit -m "feat(trade-monitor): FastAPI lifespan + heartbeat 배선 + /health"

Task 9: 배포 (Dockerfile + compose + .env.example)

Files:

  • Create: services/trade-monitor/Dockerfile
  • Create: services/trade-monitor/.env.example
  • Modify: services/docker-compose.yml (add trade-monitor 서비스)

Interfaces: 없음(배포 산출물). 검증은 docker compose config.

  • Step 1: Dockerfile 작성services/trade-monitor/Dockerfile
FROM python:3.12-slim-bookworm
ENV PYTHONUNBUFFERED=1

WORKDIR /app

RUN apt-get update && apt-get install -y --no-install-recommends \
    ca-certificates tzdata \
 && rm -rf /var/lib/apt/lists/*

COPY trade-monitor/requirements.txt /app/
RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt

# 공통 heartbeat 모듈 (services/_shared) — main.py가 from _shared.heartbeat import
COPY _shared /app/_shared
COPY trade-monitor/. /app/
ENV PYTHONPATH=/app

EXPOSE 8000
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]
  • Step 2: .env.example 작성services/trade-monitor/.env.example
# Plan-realtime-trade-alerts — trade-monitor

# NAS Redis (heartbeat)
REDIS_URL=redis://192.168.45.54:6379

# NAS stock 백엔드 (monitor-set / report)
NAS_BASE_URL=http://192.168.45.54:18500
WEBAI_API_KEY=

# KIS 자체 토큰 (ai_trade와 분리된 전용 app_key)
TM_KIS_APP_KEY=
TM_KIS_APP_SECRET=
TM_KIS_ACCOUNT=
TM_KIS_IS_VIRTUAL=0

# 루프 주기(초) / sell_climax 거래량 배수 임계
TM_LOOP_INTERVAL=60
TM_CLIMAX_VOL_MULT=3.0
  • Step 3: docker-compose.yml에 서비스 추가

services/docker-compose.ymlimage-render 블록 뒤(파일 끝, 들여쓰기 2칸)에 추가:

  trade-monitor:
    build:
      context: .
      dockerfile: trade-monitor/Dockerfile
    container_name: trade-monitor
    restart: unless-stopped
    ports:
      - "18715:8000"
    environment:
      - TZ=Asia/Seoul
      - REDIS_URL=${REDIS_URL:-redis://192.168.45.54:6379}
      - NAS_BASE_URL=${NAS_BASE_URL:-http://192.168.45.54:18500}
      - WEBAI_API_KEY=${WEBAI_API_KEY:-}
      - TM_KIS_APP_KEY=${TM_KIS_APP_KEY:-}
      - TM_KIS_APP_SECRET=${TM_KIS_APP_SECRET:-}
      - TM_KIS_ACCOUNT=${TM_KIS_ACCOUNT:-}
      - TM_KIS_IS_VIRTUAL=${TM_KIS_IS_VIRTUAL:-0}
      - TM_LOOP_INTERVAL=${TM_LOOP_INTERVAL:-60}
      - TM_CLIMAX_VOL_MULT=${TM_CLIMAX_VOL_MULT:-3.0}
    healthcheck:
      test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
      interval: 60s
      timeout: 5s
      retries: 3
  • Step 4: compose 검증

Run: cd services && docker compose config (Windows 로컬에 docker 없으면 skip — NAS/WSL2에서 검증. YAML 들여쓰기만 육안 확인) Expected: trade-monitor 서비스가 파싱되고 포트 18715 매핑 표시.

  • Step 5: 커밋
git add services/trade-monitor/Dockerfile services/trade-monitor/.env.example services/docker-compose.yml
git commit -m "feat(trade-monitor): Dockerfile + compose 서비스(18715) + .env.example"

Self-Review

1. Spec coverage:

  • §5.1 monitor-set(GET) → Task 6 get_monitor_set + Task 7 세션 게이트.
  • §5.2 report(POST) → Task 6 post_report + Task 7 firing 조립(빈 배열 포함).
  • §5.4 heartbeat(EX45, kind trader, state) → Task 7 make_state_fn + Task 8 heartbeat_loop(15s/45s).
  • §6 매수 3조건 → Task 3. / 매도 5조건 → Task 4.
  • 비-KRX skip → Task 7 filter_krx. / 무상태 → Task 7(로컬 dedup 없음).
  • KIS 자체 토큰 → Task 5. / 배포(_shared COPY, 18715) → Task 9.

2. Placeholder scan: 실제 코드/명령/기대출력 모두 기재. TODO: holdings_intel 대조는 의도된 코드 주석(플래그).

3. Type consistency: ctx 키(closes/highs/lows/volumes/today_volume/day_open/avg_price/holding_high/climax_vol_mult)가 _build_ctx(Task 7) 생성 ↔ evaluate_buy/sell(Task 3/4) 소비 일치. WorkerStats(jobs_done/jobs_failed/last_job_at) _shared와 일치. heartbeat state_fn 시그니처 (redis, stats)->(str, dict)_shared.heartbeat_loop 호출부와 일치.

미해결 플래그(DESIGN.md §11): sell_climax 휴리스틱 근사, KIS 필드 실검증, 매수 해석 4주 IC 재조정, KIS rate limit 공존, after 시간외 시세.