From 141209ad42ef9e78695994093e3ef9e2b37ae78e Mon Sep 17 00:00:00 2001 From: gahusb Date: Fri, 3 Jul 2026 01:44:25 +0900 Subject: [PATCH] =?UTF-8?q?feat(trade-monitor):=20=EC=8A=A4=EC=BA=90?= =?UTF-8?q?=ED=8F=B4=EB=94=A9=20+=20config?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/trade-monitor/DESIGN.md | 166 +++ services/trade-monitor/PLAN.md | 1437 +++++++++++++++++++ services/trade-monitor/config.py | 32 + services/trade-monitor/conftest.py | 5 + services/trade-monitor/pytest.ini | 2 + services/trade-monitor/requirements.txt | 7 + services/trade-monitor/tests/__init__.py | 0 services/trade-monitor/tests/test_config.py | 27 + 8 files changed, 1676 insertions(+) create mode 100644 services/trade-monitor/DESIGN.md create mode 100644 services/trade-monitor/PLAN.md create mode 100644 services/trade-monitor/config.py create mode 100644 services/trade-monitor/conftest.py create mode 100644 services/trade-monitor/pytest.ini create mode 100644 services/trade-monitor/requirements.txt create mode 100644 services/trade-monitor/tests/__init__.py create mode 100644 services/trade-monitor/tests/test_config.py diff --git a/services/trade-monitor/DESIGN.md b/services/trade-monitor/DESIGN.md new file mode 100644 index 0000000..cd35a07 --- /dev/null +++ b/services/trade-monitor/DESIGN.md @@ -0,0 +1,166 @@ +# trade-monitor 워커 — 구현 설계 + +> **2026-07-03 · web-ai 소유.** 실시간 매매 알람 파이프라인의 Windows-side 워커. +> **권위 계약(원본 스펙)**: web-backend repo `docs/superpowers/specs/2026-07-02-realtime-trade-alerts-design.md` §5(계약)·§6(조건). +> 이 문서는 그 계약을 Windows Docker 워커로 구현하기 위한 **구현 설계**(모듈 분해·조건 해석·배포)만 다룬다. + +--- + +## 1. 역할 · 경계 + +`services/trade-monitor/` 는 형제 워커(`image-render`, `task-watcher`)와 동일한 관례를 따르는 **FastAPI + asyncio 루프** 워커다. WSL2 Docker(`services/docker-compose.yml`)에서 구동. + +**책임** +1. 60초 루프로 NAS `monitor-set` 조회 → 세션 게이트. +2. 비-KRX(알파벳) 티커 skip. +3. KIS 실시간 현재가 + 일봉 OHLCV 조회 → TA 지표 계산. +4. 매수/매도 조건 평가 → 발화집합 F 구성. +5. `POST report`로 F 전체 전송(무상태 — dedup은 NAS 영속). +6. Redis heartbeat 발신(`worker:trade-monitor:heartbeat` EX45). +7. KIS 오류는 사이클/종목 단위 격리(다음 분 재시도). + +**경계 밖(안 함)**: dedup 상태 보관, 텔레그램 전송(NAS 담당), KST 세션/휴장 캘린더 재구현(NAS가 `session` 판정), 주문 실행. + +--- + +## 2. 모듈 분해 + +| 파일 | 책임 | 인터페이스(순수/부작용) | +|------|------|------| +| `main.py` | FastAPI app + lifespan. `monitor_loop` + `heartbeat_loop` 스폰, `/health` | 부작용(태스크 스폰) | +| `monitor.py` | 오케스트레이션 루프. monitor-set→게이트→종목순회→firing→report. 공유 `MonitorState` 갱신 | 부작용 | +| `nas_client.py` | `get_monitor_set()` / `post_report(as_of, firing)` — `X-WebAI-Key` + retry | 부작용(HTTP) | +| `kis_client.py` | KIS REST: `_issue_token()`(OAuth 자체 발급, 24h 캐시) + `get_quote()` + `get_daily_ohlcv()` + 0.5s throttle | 부작용(HTTP) | +| `indicators.py` | `sma`, `rsi`, `avg_volume`, `highest_high` | **순수** | +| `conditions.py` | `evaluate_buy(ctx, buy_params)` / `evaluate_sell(ctx, exit_params)` → `list[firing]` | **순수** | +| `config.py` | `Settings` — env 로드 | 순수 | + +순수 모듈(`indicators`, `conditions`)에 조건 로직을 격리해 테이블 기반 단위 테스트로 검증 가능하게 한다. HTTP·시간·Redis는 경계 모듈에만. + +--- + +## 3. 데이터 흐름 (monitor_loop, 60초) + +``` +매 사이클: + ms = nas.get_monitor_set() # §5.1 + state.session = ms.session + if ms.session == "closed": + state.hb = "market_closed"; sleep; continue # KIS 호출 0 + targets = filter_krx(ms.buy_targets, ms.sell_targets) # 알파벳 티커 skip + firing = [] + for t in targets: # 종목 단위 try/except 격리 + quote = kis.get_quote(t.ticker) # 현재가 + 당일 누적 거래량 + daily = kis.get_daily_ohlcv(t.ticker, 250) # MA200·52주 고점용 + ctx = build_ctx(t, quote, daily) + if t is buy_target: firing += evaluate_buy(ctx, ms.buy_params) + if t is sell_target: firing += evaluate_sell(ctx, ms.exit_params) + if firing: state.last_alert_at = now + nas.post_report(as_of=now_kst_iso, firing=firing) # §5.2 — 빈 배열도 전송(edge clear 위해) + state.hb = "market_open" + stats.jobs_done += 1 +``` + +`heartbeat_loop`(별도 15초 태스크)이 `state`를 읽어 §5.4 페이로드 발신. + +--- + +## 4. 조건 로직 해석 (§6) + +지표는 **일봉 시계열**(최신 last, 오름차순) + **실시간 현재가**(`price`) 기준. 데이터 부족(예: MA200용 200봉 미만)이면 해당 조건은 **미발화**(graceful skip). 각 firing은 `{ticker, kind, condition, price, detail}`. + +### 매수 (`buy_targets`, `buy_params={rsi_oversold, breakout_vol_mult, pullback_pct}`) + +| condition | 발화 규칙(해석) | detail | +|-----------|----------------|--------| +| `buy_ma20_pullback` | `ma20>ma50>ma200`(정배열) **AND** 최근 3봉 최저가가 `ma20*(1+pullback_pct)` 이하로 접근 **AND** `price>ma20`(반등 복귀) | `ma20, ma50, ma200, recent_low` | +| `buy_breakout` | `price > 직전 20봉 최고가`(당일 제외) **AND** `today_volume > breakout_vol_mult × avg_volume(20)` | `prior_high_20, vol_mult, avg_vol_20` | +| `buy_rsi_bounce` | RSI(14) 시계열에서 `min(rsi[-3:]) < rsi_oversold` **AND** `rsi[-1] > rsi_oversold` **AND** `rsi[-1] > rsi[-2]`(반등). 사이클마다 재계산·무상태 | `rsi, rsi_prev, rsi_oversold` | + +종합: 각 조건 독립 발화(신뢰도 가중합은 NAS/텔레그램 단계 책임 아님 — 워커는 조건 발화만). + +### 매도 (`sell_targets`, `exit_params={stop_pct, take_pct, trailing_pct}`, target에 `avg_price, qty, holding_high`) + +| condition | 발화 규칙 | detail | +|-----------|----------|--------| +| `sell_stop_loss` | `(price-avg)/avg ≤ -stop_pct` | `avg_price, pnl_pct, stop_pct` | +| `sell_take_profit` | `(price-avg)/avg ≥ take_pct` | `avg_price, pnl_pct, take_pct` | +| `sell_trailing_stop` | `price ≤ holding_high × (1-trailing_pct)` (기본 0.10) | `holding_high, trailing_pct, drawdown_pct` | +| `sell_ma_break` | `price < ma50` (추가 `price ⚠️ **운영 함정**: ai_trade와 KIS를 동시 호출하면 전용 app_key라도 KIS 계정 전체 rate limit을 공유할 수 있음. 별도 app_key로 무효화는 회피되나, 운영 시 동시 부하 모니터링 필요(Phase 7 백로그 연계). + +--- + +## 6. heartbeat (§5.4) + +`_shared.heartbeat.heartbeat_loop(redis, "trade-monitor", "trader", stats, interval=15, ttl=45, state_fn=...)`. +- `state_fn`이 `MonitorState`를 읽어 `state ∈ {market_open, market_closed, idle}` + `extra={"last_alert_at": ...}` 반환. +- **디커플링 이유**: 루프 60초 > TTL 45초 → 인라인 발신 시 만료 갭. 15초 독립 태스크로 해소(형제 워커와 동일 구조). §5.4 필수 필드(name/kind/state/ts/last_alert_at) 충족, `jobs_done/jobs_failed`는 형제 워커처럼 superset 유지. +- 초기 상태 `idle`(첫 monitor-set 조회 전). + +--- + +## 7. 설정 (env) — `TM_` 접두사로 ai_trade와 분리 + +| env | 기본값 | 용도 | +|-----|--------|------| +| `NAS_BASE_URL` | `http://192.168.45.54:18500` | stock 백엔드 | +| `WEBAI_API_KEY` | (필수) | `X-WebAI-Key` | +| `REDIS_URL` | `redis://192.168.45.54:6379` | heartbeat | +| `TM_KIS_APP_KEY` / `TM_KIS_APP_SECRET` | (필수) | KIS 자체 토큰 | +| `TM_KIS_ACCOUNT` | (필수) | KIS 계좌 | +| `TM_KIS_IS_VIRTUAL` | `0` | 실전/모의 | +| `TM_LOOP_INTERVAL` | `60` | 루프 주기(초) | +| `TM_CLIMAX_VOL_MULT` | `3.0` | sell_climax 임계 | + +--- + +## 8. 에러 처리 + +- **monitor-set 실패**: 사이클 skip(report 안 함), heartbeat=`idle`, 다음 분 재시도. +- **KIS 종목 실패**: 해당 종목만 skip(로그 warning), 나머지 종목 계속. +- **report 실패**: 로그 error, 다음 사이클 신선 firing 재전송(무상태라 손실 허용). +- 루프 최상위 `try/except` — 어떤 예외도 루프를 죽이지 않음(task-watcher 패턴). + +--- + +## 9. 테스트 전략 (pytest, 시스템 Python) + +| 파일 | 검증 | +|------|------| +| `test_indicators.py` | sma/rsi/avg_volume/highest_high 수치(알려진 시계열), 데이터 부족 시 None | +| `test_conditions.py` | 8개 조건 테이블 기반(발화/미발화 경계), detail 필드 | +| `test_nas_client.py` | respx — monitor-set 파싱, report 페이로드, X-WebAI-Key 헤더, retry | +| `test_kis_client.py` | respx — 토큰 발급/캐시, quote/daily 파싱, throttle | +| `test_monitor.py` | 루프 1회(mock): closed skip, 비-KRX skip, firing 조립, last_alert_at 갱신, 종목 실패 격리 | + +--- + +## 10. 배포 + +- `services/trade-monitor/Dockerfile`: task-watcher 관례 복제 — `COPY _shared /app/_shared` **필수**(빌드 컨텍스트 `.` 에서), `COPY trade-monitor/. /app/`, `PYTHONPATH=/app`, uvicorn `:8000`. +- `services/docker-compose.yml`: `trade-monitor` 서비스 추가, 포트 **18715**(image-render 18714 다음), `TZ=Asia/Seoul`, KIS/WEBAI/REDIS env, healthcheck `/health`. +- `services/.env`(비커밋): `TM_KIS_*`, `WEBAI_API_KEY` 실값. `.env.example`에 키만 기재. + +--- + +## 11. 미해결 플래그 / 후속 + +1. **sell_climax** 휴리스틱은 근사 — holdings_intel 원본 확보 후 정합(BE에 원본 요청). +2. **KIS 지표 필드 실검증** — quote의 `acml_vol`/`stck_oprc`, daily TR 응답 필드는 첫 운영 raw 캡처로 대조. +3. **`buy_ma20_pullback`·`buy_rsi_bounce` 해석** — "current candle series" 문구를 일봉 시계열로 해석. 첫 운영 4주 IC 검증 시 재조정 가능. +4. **KIS rate limit 공존** — ai_trade와 동시 부하. 전용 app_key로 토큰 무효화는 회피, 초당 호출 총량은 운영 모니터링. +5. **after 세션 시간외 시세** — `inquire-price`가 시간외 단일가를 반영하는지 첫 운영 대조. diff --git a/services/trade-monitor/PLAN.md b/services/trade-monitor/PLAN.md new file mode 100644 index 0000000..43367cc --- /dev/null +++ b/services/trade-monitor/PLAN.md @@ -0,0 +1,1437 @@ +# trade-monitor 워커 구현 계획 + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** 실시간 매매 알람 스펙(§5·§6)의 Windows-side `trade-monitor` 워커를 구현한다 — 60초 루프로 NAS monitor-set을 조회하고 KIS 시세로 8개 매수/매도 조건을 평가해 firing 집합을 report하며 Redis heartbeat를 발신. + +**Architecture:** FastAPI + asyncio 루프 워커(형제 `task-watcher`/`image-render` 관례). 순수 모듈(`indicators`/`conditions`)에 §6 조건 로직을 격리하고, 경계 모듈(`kis_client`/`nas_client`)에 HTTP를 격리한다. `monitor.py`가 오케스트레이션하고 `_shared.heartbeat.heartbeat_loop`을 15초 독립 태스크로 재사용한다. + +**Tech Stack:** Python 3.12, FastAPI, httpx(async), redis.asyncio, pytest + respx. WSL2 Docker(`services/docker-compose.yml`). + +**설계 원본:** `services/trade-monitor/DESIGN.md` · 권위 계약: web-backend `docs/superpowers/specs/2026-07-02-realtime-trade-alerts-design.md`. + +## Global Constraints + +- Python `3.12` (Dockerfile `python:3.12-slim-bookworm`). +- 워커 **무상태** — dedup 없음, 매 사이클 firing 전체 집합 전송(빈 배열 포함). +- 비-KRX 티커 skip — 6자리 숫자 티커만 처리. +- 세션 게이트는 `monitor-set.session`에 위임 — 워커는 KST 캘린더 재구현 안 함. `session=="closed"`면 KIS 호출 0. +- KIS 토큰: **전용** `TM_KIS_APP_KEY/SECRET`로 자체 발급(ai_trade와 분리). +- heartbeat 키 `worker:trade-monitor:heartbeat`, TTL **EX45**, kind `trader`, state ∈ `{market_open, market_closed, idle}`. +- 테스트 실행: `C:\Users\jaeoh\AppData\Local\Programs\Python\Python312\python.exe -m pytest services/trade-monitor/tests -q` (web-ai 루트에서). 이하 `python`은 이 경로를 뜻함. +- 커밋 경로: **web-ai repo에서만** commit/push. +- 모든 firing 항목 스키마: `{"ticker","kind","condition","price","detail":{...}}`. + +--- + +## File Structure + +| 파일 | 책임 | +|------|------| +| `services/trade-monitor/config.py` | `Settings` dataclass + `load_settings()` (env) | +| `services/trade-monitor/indicators.py` | 순수: `sma`, `rsi_series`, `highest_high` | +| `services/trade-monitor/conditions.py` | 순수: `evaluate_buy`, `evaluate_sell`, `_fire` | +| `services/trade-monitor/kis_client.py` | `KISClient`: 자체 토큰 + `get_quote` + `get_daily_ohlcv` + throttle | +| `services/trade-monitor/nas_client.py` | `NASClient`: `get_monitor_set` + `post_report` (X-WebAI-Key) | +| `services/trade-monitor/monitor.py` | `MonitorState`, `filter_krx`, `_build_ctx`, `run_cycle`, `monitor_loop`, `make_state_fn` | +| `services/trade-monitor/main.py` | FastAPI lifespan(루프+heartbeat 스폰) + `/health` | +| `services/trade-monitor/conftest.py` | services/ 루트를 sys.path에 추가 | +| `services/trade-monitor/tests/*` | 단위 테스트 | +| `services/trade-monitor/Dockerfile` | task-watcher 관례 복제(`_shared` COPY) | +| `services/trade-monitor/requirements.txt`, `.env.example` | 의존성 / env 문서 | +| `services/docker-compose.yml` (수정) | `trade-monitor` 서비스(포트 18715) | + +--- + +### Task 1: 스캐폴딩 + config + +**Files:** +- Create: `services/trade-monitor/requirements.txt` +- Create: `services/trade-monitor/conftest.py` +- Create: `services/trade-monitor/tests/__init__.py` +- Create: `services/trade-monitor/config.py` +- Test: `services/trade-monitor/tests/test_config.py` + +**Interfaces:** +- Produces: `Settings` dataclass — fields `nas_base_url:str, webai_api_key:str, redis_url:str, kis_app_key:str, kis_app_secret:str, kis_account:str, kis_is_virtual:bool, loop_interval:int, climax_vol_mult:float`. `load_settings() -> Settings` (reads env with defaults). + +- [ ] **Step 1: 스캐폴딩 파일 생성** + +`services/trade-monitor/requirements.txt`: +``` +fastapi==0.115.6 +uvicorn[standard]==0.34.0 +redis>=5.0 +httpx>=0.27 +pytest>=8.0 +respx>=0.21 +``` + +`services/trade-monitor/conftest.py`: +```python +"""services/ 루트를 sys.path에 추가 — from _shared.heartbeat import 가능하게.""" +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) +``` + +`services/trade-monitor/tests/__init__.py`: (빈 파일) +```python +``` + +- [ ] **Step 2: 실패 테스트 작성** — `services/trade-monitor/tests/test_config.py` + +```python +"""Settings env 로드 — 기본값 + override.""" +from config import load_settings + + +def test_defaults(monkeypatch): + for k in ("NAS_BASE_URL", "WEBAI_API_KEY", "REDIS_URL", "TM_KIS_APP_KEY", + "TM_KIS_APP_SECRET", "TM_KIS_ACCOUNT", "TM_KIS_IS_VIRTUAL", + "TM_LOOP_INTERVAL", "TM_CLIMAX_VOL_MULT"): + monkeypatch.delenv(k, raising=False) + s = load_settings() + assert s.nas_base_url == "http://192.168.45.54:18500" + assert s.redis_url == "redis://192.168.45.54:6379" + assert s.kis_is_virtual is False + assert s.loop_interval == 60 + assert s.climax_vol_mult == 3.0 + + +def test_override(monkeypatch): + monkeypatch.setenv("TM_KIS_IS_VIRTUAL", "1") + monkeypatch.setenv("TM_LOOP_INTERVAL", "30") + monkeypatch.setenv("TM_CLIMAX_VOL_MULT", "2.5") + monkeypatch.setenv("WEBAI_API_KEY", "secret") + s = load_settings() + assert s.kis_is_virtual is True + assert s.loop_interval == 30 + assert s.climax_vol_mult == 2.5 + assert s.webai_api_key == "secret" +``` + +- [ ] **Step 3: 실패 확인** + +Run: `python -m pytest services/trade-monitor/tests/test_config.py -q` +Expected: FAIL — `ModuleNotFoundError: No module named 'config'` + +- [ ] **Step 4: config.py 구현** + +```python +"""Settings — 환경변수 로드. TM_ 접두사로 ai_trade와 분리.""" +from __future__ import annotations + +import os +from dataclasses import dataclass + + +@dataclass +class Settings: + nas_base_url: str + webai_api_key: str + redis_url: str + kis_app_key: str + kis_app_secret: str + kis_account: str + kis_is_virtual: bool + loop_interval: int + climax_vol_mult: float + + +def load_settings() -> Settings: + return Settings( + nas_base_url=os.getenv("NAS_BASE_URL", "http://192.168.45.54:18500"), + webai_api_key=os.getenv("WEBAI_API_KEY", ""), + redis_url=os.getenv("REDIS_URL", "redis://192.168.45.54:6379"), + kis_app_key=os.getenv("TM_KIS_APP_KEY", ""), + kis_app_secret=os.getenv("TM_KIS_APP_SECRET", ""), + kis_account=os.getenv("TM_KIS_ACCOUNT", ""), + kis_is_virtual=os.getenv("TM_KIS_IS_VIRTUAL", "0") == "1", + loop_interval=int(os.getenv("TM_LOOP_INTERVAL", "60")), + climax_vol_mult=float(os.getenv("TM_CLIMAX_VOL_MULT", "3.0")), + ) +``` + +- [ ] **Step 5: 통과 확인** + +Run: `python -m pytest services/trade-monitor/tests/test_config.py -q` +Expected: PASS (2 passed) + +- [ ] **Step 6: 커밋** + +```bash +git add services/trade-monitor/requirements.txt services/trade-monitor/conftest.py services/trade-monitor/tests/__init__.py services/trade-monitor/config.py services/trade-monitor/tests/test_config.py services/trade-monitor/DESIGN.md services/trade-monitor/PLAN.md +git commit -m "feat(trade-monitor): 스캐폴딩 + config" +``` + +--- + +### Task 2: indicators (순수) + +**Files:** +- Create: `services/trade-monitor/indicators.py` +- Test: `services/trade-monitor/tests/test_indicators.py` + +**Interfaces:** +- Produces: + - `sma(values: list[float], period: int) -> float | None` — 최근 `period`개 평균, 부족하면 None + - `rsi_series(closes: list[float], period: int = 14) -> list[float]` — Wilder RSI 시계열(closes[period:]에 정렬). 부족하면 `[]` + - `highest_high(highs: list[float], period: int) -> float | None` — 최근 `period`개 최댓값, 부족하면 None + +- [ ] **Step 1: 실패 테스트 작성** — `services/trade-monitor/tests/test_indicators.py` + +```python +"""indicators — 순수 수치 검증.""" +from indicators import sma, rsi_series, highest_high + + +def test_sma_basic(): + assert sma([1, 2, 3, 4, 5], 5) == 3.0 + assert sma([1, 2, 3, 4, 5], 2) == 4.5 + + +def test_sma_insufficient(): + assert sma([1, 2], 5) is None + assert sma([], 3) is None + + +def test_highest_high(): + assert highest_high([1, 9, 3, 4], 3) == 9 + assert highest_high([1, 2, 3], 3) == 3 + assert highest_high([1, 2], 3) is None + + +def test_rsi_all_gains_is_100(): + # 단조 증가 → 손실 0 → RSI 100 + closes = [float(i) for i in range(1, 20)] + rs = rsi_series(closes, 14) + assert rs, "series should not be empty" + assert rs[-1] == 100.0 + + +def test_rsi_insufficient(): + assert rsi_series([1, 2, 3], 14) == [] + + +def test_rsi_known_range(): + # 등락 섞인 시계열 → RSI는 0~100 사이 + closes = [10, 11, 10.5, 11.5, 11, 12, 11.8, 12.5, 12, 13, + 12.7, 13.2, 12.9, 13.5, 13.1, 13.8] + rs = rsi_series(closes, 14) + assert len(rs) == len(closes) - 14 + assert all(0.0 <= v <= 100.0 for v in rs) +``` + +- [ ] **Step 2: 실패 확인** + +Run: `python -m pytest services/trade-monitor/tests/test_indicators.py -q` +Expected: FAIL — `ModuleNotFoundError: No module named 'indicators'` + +- [ ] **Step 3: indicators.py 구현** + +```python +"""순수 TA 지표 — sma / rsi_series / highest_high.""" +from __future__ import annotations + + +def sma(values: list[float], period: int) -> float | None: + if period <= 0 or len(values) < period: + return None + return sum(values[-period:]) / period + + +def highest_high(highs: list[float], period: int) -> float | None: + if period <= 0 or len(highs) < period: + return None + return max(highs[-period:]) + + +def rsi_series(closes: list[float], period: int = 14) -> list[float]: + """Wilder RSI. 반환 리스트는 closes[period:]에 1:1 정렬. 부족하면 [].""" + if len(closes) <= period: + return [] + deltas = [closes[i] - closes[i - 1] for i in range(1, len(closes))] + gains = [d if d > 0 else 0.0 for d in deltas] + losses = [-d if d < 0 else 0.0 for d in deltas] + + def _rsi(ag: float, al: float) -> float: + if al == 0: + return 100.0 + rs = ag / al + return 100.0 - 100.0 / (1.0 + rs) + + avg_gain = sum(gains[:period]) / period + avg_loss = sum(losses[:period]) / period + out = [_rsi(avg_gain, avg_loss)] + for i in range(period, len(deltas)): + avg_gain = (avg_gain * (period - 1) + gains[i]) / period + avg_loss = (avg_loss * (period - 1) + losses[i]) / period + out.append(_rsi(avg_gain, avg_loss)) + return out +``` + +- [ ] **Step 4: 통과 확인** + +Run: `python -m pytest services/trade-monitor/tests/test_indicators.py -q` +Expected: PASS (6 passed) + +- [ ] **Step 5: 커밋** + +```bash +git add services/trade-monitor/indicators.py services/trade-monitor/tests/test_indicators.py +git commit -m "feat(trade-monitor): 순수 지표 모듈 (sma/rsi/highest_high)" +``` + +--- + +### Task 3: conditions — 매수 (순수, §6) + +**Files:** +- Create: `services/trade-monitor/conditions.py` +- Test: `services/trade-monitor/tests/test_conditions_buy.py` + +**Interfaces:** +- Consumes: `indicators.sma`, `indicators.rsi_series`, `indicators.highest_high`. +- ctx 계약(dict): `ticker,name,price,day_open,today_volume,closes,highs,lows,volumes,avg_price,qty,holding_high,climax_vol_mult`. +- Produces: + - `_fire(ctx, kind, condition, price, detail) -> dict` → `{"ticker","kind","condition","price","detail"}` + - `evaluate_buy(ctx: dict, params: dict) -> list[dict]` — params `{rsi_oversold, breakout_vol_mult, pullback_pct}` + +- [ ] **Step 1: 실패 테스트 작성** — `services/trade-monitor/tests/test_conditions_buy.py` + +```python +"""evaluate_buy — 3개 매수 조건 경계.""" +from conditions import evaluate_buy + +BUY_PARAMS = {"rsi_oversold": 30, "breakout_vol_mult": 1.5, "pullback_pct": 0.02} + + +def _ctx(**over): + base = dict( + ticker="005930", name="삼성전자", price=100.0, day_open=99.0, + today_volume=1000.0, closes=[], highs=[], lows=[], volumes=[], + avg_price=None, qty=None, holding_high=None, climax_vol_mult=3.0, + ) + base.update(over) + return base + + +def _conditions(firing): + return {f["condition"] for f in firing} + + +def test_ma20_pullback_fires(): + # 정배열(ma20>ma50>ma200), 최근 저가가 ma20 근처, price가 ma20 위로 반등 + closes = [90.0] * 200 + [100.0] * 20 # ma20=100, ma50/ma200 낮음 → 정배열 + lows = [90.0] * 217 + [100.5, 100.4, 100.3] # 최근 3봉 저가 ~ma20*(1.02)=102 이하 + ctx = _ctx(price=101.0, closes=closes, highs=closes, lows=lows, + volumes=[1.0] * len(closes)) + assert "buy_ma20_pullback" in _conditions(evaluate_buy(ctx, BUY_PARAMS)) + + +def test_ma20_pullback_skips_when_not_aligned(): + closes = [100.0] * 200 + [90.0] * 20 # 역배열 + ctx = _ctx(price=91.0, closes=closes, highs=closes, lows=closes, + volumes=[1.0] * len(closes)) + assert "buy_ma20_pullback" not in _conditions(evaluate_buy(ctx, BUY_PARAMS)) + + +def test_breakout_fires(): + closes = [50.0] * 25 + highs = [60.0] * 25 # 직전 20봉 최고 60 + vols = [100.0] * 25 # avg20=100 + ctx = _ctx(price=61.0, today_volume=200.0, closes=closes, highs=highs, + lows=closes, volumes=vols) # 61>60, 200>1.5*100 + assert "buy_breakout" in _conditions(evaluate_buy(ctx, BUY_PARAMS)) + + +def test_breakout_skips_on_low_volume(): + highs = [60.0] * 25 + ctx = _ctx(price=61.0, today_volume=120.0, closes=[50.0] * 25, highs=highs, + lows=[50.0] * 25, volumes=[100.0] * 25) # 120 < 1.5*100=150 + assert "buy_breakout" not in _conditions(evaluate_buy(ctx, BUY_PARAMS)) + + +def test_rsi_bounce_fires(): + # 급락으로 RSI<30 찍고 반등하는 시계열 + closes = [100.0] + for _ in range(14): + closes.append(closes[-1] * 0.97) # 하락 → RSI 저하 + closes.append(closes[-1] * 1.05) # 마지막 반등 + closes.append(closes[-1] * 1.05) + ctx = _ctx(price=closes[-1], closes=closes, highs=closes, lows=closes, + volumes=[1.0] * len(closes)) + assert "buy_rsi_bounce" in _conditions(evaluate_buy(ctx, BUY_PARAMS)) + + +def test_empty_series_no_fire(): + assert evaluate_buy(_ctx(), BUY_PARAMS) == [] +``` + +- [ ] **Step 2: 실패 확인** + +Run: `python -m pytest services/trade-monitor/tests/test_conditions_buy.py -q` +Expected: FAIL — `ModuleNotFoundError: No module named 'conditions'` + +- [ ] **Step 3: conditions.py 구현 (매수 + _fire)** + +```python +"""§6 조건 로직 (순수). ctx + params → firing 리스트.""" +from __future__ import annotations + +from indicators import sma, rsi_series, highest_high + + +def _fire(ctx: dict, kind: str, condition: str, price: float, detail: dict) -> dict: + return { + "ticker": ctx["ticker"], "kind": kind, + "condition": condition, "price": price, "detail": detail, + } + + +def evaluate_buy(ctx: dict, params: dict) -> list[dict]: + price = ctx["price"] + closes, highs, lows, vols = ctx["closes"], ctx["highs"], ctx["lows"], ctx["volumes"] + rsi_os = params.get("rsi_oversold", 30) + vol_mult = params.get("breakout_vol_mult", 1.5) + pullback = params.get("pullback_pct", 0.02) + firing: list[dict] = [] + + # buy_ma20_pullback — 정배열 + ma20 근접 저가 + 반등 복귀 + ma20, ma50, ma200 = sma(closes, 20), sma(closes, 50), sma(closes, 200) + if ma20 and ma50 and ma200 and ma20 > ma50 > ma200 and len(lows) >= 3: + recent_low = min(lows[-3:]) + if recent_low <= ma20 * (1 + pullback) and price > ma20: + firing.append(_fire(ctx, "buy", "buy_ma20_pullback", price, { + "ma20": round(ma20, 1), "ma50": round(ma50, 1), + "ma200": round(ma200, 1), "recent_low": recent_low, + })) + + # buy_breakout — 직전 20봉 고점 돌파 + 거래량 배수 + prior_high20 = highest_high(highs, 20) + avg_vol20 = sma(vols, 20) + if prior_high20 and avg_vol20 and price > prior_high20 \ + and ctx["today_volume"] > vol_mult * avg_vol20: + firing.append(_fire(ctx, "buy", "buy_breakout", price, { + "prior_high_20": prior_high20, + "vol_mult": round(ctx["today_volume"] / avg_vol20, 2), + "avg_vol_20": round(avg_vol20, 0), + })) + + # buy_rsi_bounce — RSI 과매도 후 반등 (무상태 재계산) + rs = rsi_series(closes, 14) + if len(rs) >= 3 and min(rs[-3:]) < rsi_os and rs[-1] > rsi_os and rs[-1] > rs[-2]: + firing.append(_fire(ctx, "buy", "buy_rsi_bounce", price, { + "rsi": round(rs[-1], 1), "rsi_prev": round(rs[-2], 1), + "rsi_oversold": rsi_os, + })) + + return firing +``` + +- [ ] **Step 4: 통과 확인** + +Run: `python -m pytest services/trade-monitor/tests/test_conditions_buy.py -q` +Expected: PASS (6 passed) + +- [ ] **Step 5: 커밋** + +```bash +git add services/trade-monitor/conditions.py services/trade-monitor/tests/test_conditions_buy.py +git commit -m "feat(trade-monitor): 매수 조건 (ma20_pullback/breakout/rsi_bounce)" +``` + +--- + +### Task 4: conditions — 매도 (순수, §6) + +**Files:** +- Modify: `services/trade-monitor/conditions.py` (add `evaluate_sell`) +- Test: `services/trade-monitor/tests/test_conditions_sell.py` + +**Interfaces:** +- Produces: `evaluate_sell(ctx: dict, params: dict) -> list[dict]` — params `{stop_pct, take_pct, trailing_pct}`. `sell_climax` 임계는 `ctx["climax_vol_mult"]`. + +- [ ] **Step 1: 실패 테스트 작성** — `services/trade-monitor/tests/test_conditions_sell.py` + +```python +"""evaluate_sell — 5개 매도 조건 경계.""" +from conditions import evaluate_sell + +EXIT = {"stop_pct": 0.08, "take_pct": 0.25, "trailing_pct": 0.10} + + +def _ctx(**over): + base = dict( + ticker="000660", name="SK하이닉스", price=100.0, day_open=100.0, + today_volume=100.0, closes=[100.0] * 60, highs=[100.0] * 60, + lows=[100.0] * 60, volumes=[100.0] * 60, + avg_price=100.0, qty=10, holding_high=100.0, climax_vol_mult=3.0, + ) + base.update(over) + return base + + +def _c(firing): + return {f["condition"] for f in firing} + + +def test_stop_loss_fires(): + ctx = _ctx(price=90.0, avg_price=100.0) # -10% <= -8% + assert "sell_stop_loss" in _c(evaluate_sell(ctx, EXIT)) + + +def test_stop_loss_skips_above_threshold(): + ctx = _ctx(price=95.0, avg_price=100.0) # -5% > -8% + assert "sell_stop_loss" not in _c(evaluate_sell(ctx, EXIT)) + + +def test_take_profit_fires(): + ctx = _ctx(price=130.0, avg_price=100.0) # +30% >= 25% + assert "sell_take_profit" in _c(evaluate_sell(ctx, EXIT)) + + +def test_trailing_stop_fires(): + ctx = _ctx(price=89.0, holding_high=100.0) # 89 <= 100*0.9=90 + assert "sell_trailing_stop" in _c(evaluate_sell(ctx, EXIT)) + + +def test_ma_break_severity_high(): + # price가 ma50/ma200 아래 → severity high + closes = [200.0] * 60 + ctx = _ctx(price=100.0, closes=closes, avg_price=100.0, holding_high=100.0) + firing = evaluate_sell(ctx, EXIT) + mb = [f for f in firing if f["condition"] == "sell_ma_break"] + assert mb and mb[0]["detail"]["severity"] == "high" + + +def test_climax_fires(): + # 거래량 3배 이상 + 종가(현재가)<시가 반전 + ctx = _ctx(price=98.0, day_open=100.0, today_volume=400.0, + volumes=[100.0] * 60) # 400 >= 3*100, 98<100 + assert "sell_climax" in _c(evaluate_sell(ctx, EXIT)) + + +def test_climax_skips_when_not_reversal(): + ctx = _ctx(price=101.0, day_open=100.0, today_volume=400.0, + volumes=[100.0] * 60) # 상승 마감 → 반전 아님 + assert "sell_climax" not in _c(evaluate_sell(ctx, EXIT)) + + +def test_no_avg_no_pnl_conditions(): + # avg_price None(보유정보 없음) → stop/take 미발화 + ctx = _ctx(price=50.0, avg_price=None, holding_high=None, + closes=[100.0] * 60) + conds = _c(evaluate_sell(ctx, EXIT)) + assert "sell_stop_loss" not in conds and "sell_take_profit" not in conds +``` + +- [ ] **Step 2: 실패 확인** + +Run: `python -m pytest services/trade-monitor/tests/test_conditions_sell.py -q` +Expected: FAIL — `ImportError: cannot import name 'evaluate_sell'` + +- [ ] **Step 3: conditions.py에 evaluate_sell 추가** + +```python +def evaluate_sell(ctx: dict, params: dict) -> list[dict]: + price = ctx["price"] + avg = ctx.get("avg_price") + hh = ctx.get("holding_high") + closes, vols = ctx["closes"], ctx["volumes"] + stop = params.get("stop_pct", 0.08) + take = params.get("take_pct", 0.25) + trail = params.get("trailing_pct", 0.10) + climax_mult = ctx.get("climax_vol_mult", 3.0) + firing: list[dict] = [] + + if avg: + pnl = (price - avg) / avg + if pnl <= -stop: + firing.append(_fire(ctx, "sell", "sell_stop_loss", price, { + "avg_price": avg, "pnl_pct": round(pnl, 4), "stop_pct": stop})) + if pnl >= take: + firing.append(_fire(ctx, "sell", "sell_take_profit", price, { + "avg_price": avg, "pnl_pct": round(pnl, 4), "take_pct": take})) + + if hh and price <= hh * (1 - trail): + firing.append(_fire(ctx, "sell", "sell_trailing_stop", price, { + "holding_high": hh, "trailing_pct": trail, + "drawdown_pct": round((price - hh) / hh, 4)})) + + ma50, ma200 = sma(closes, 50), sma(closes, 200) + if ma50 and price < ma50: + severity = "high" if (ma200 and price < ma200) else "normal" + firing.append(_fire(ctx, "sell", "sell_ma_break", price, { + "ma50": round(ma50, 1), + "ma200": round(ma200, 1) if ma200 else None, + "severity": severity})) + + # sell_climax — 휴리스틱(추후 holdings_intel 정합): 거래량 급증 + 반전 캔들 + avg_vol20 = sma(vols, 20) + if avg_vol20 and ctx["today_volume"] >= climax_mult * avg_vol20 \ + and price < ctx["day_open"]: + firing.append(_fire(ctx, "sell", "sell_climax", price, { + "vol_mult": round(ctx["today_volume"] / avg_vol20, 2), + "day_open": ctx["day_open"]})) # TODO: holdings_intel 대조 + + return firing +``` + +- [ ] **Step 4: 통과 확인** + +Run: `python -m pytest services/trade-monitor/tests/test_conditions_sell.py -q` +Expected: PASS (8 passed) + +- [ ] **Step 5: 커밋** + +```bash +git add services/trade-monitor/conditions.py services/trade-monitor/tests/test_conditions_sell.py +git commit -m "feat(trade-monitor): 매도 조건 (stop/take/trailing/ma_break/climax)" +``` + +--- + +### Task 5: kis_client (자체 토큰) + +**Files:** +- Create: `services/trade-monitor/kis_client.py` +- Test: `services/trade-monitor/tests/test_kis_client.py` + +**Interfaces:** +- Produces `KISClient`: + - `__init__(app_key, app_secret, account, is_virtual, timeout=10.0)` + - `async _issue_token() -> str` (메모리 캐시, 만료 10분 전 재발급) + - `async get_quote(ticker: str) -> dict` → `{"price":int,"day_open":int,"today_volume":int,"as_of":str}` + - `async get_daily_ohlcv(ticker: str, days: int = 250) -> list[dict]` → `[{"datetime","open","high","low","close","volume"}]` 오름차순 + - `async close()` + +- [ ] **Step 1: 실패 테스트 작성** — `services/trade-monitor/tests/test_kis_client.py` + +```python +"""KISClient — 토큰 발급/캐시 + quote/daily 파싱 (respx).""" +import httpx +import pytest +import respx + +from kis_client import KISClient + +BASE = "https://openapi.koreainvestment.com:9443" + + +def _client(): + return KISClient("APPKEY", "APPSECRET", "12345678-01", is_virtual=False) + + +@pytest.mark.asyncio +@respx.mock +async def test_issue_token_cached(): + route = respx.post(f"{BASE}/oauth2/tokenP").mock( + return_value=httpx.Response(200, json={"access_token": "TKN", "expires_in": 86400})) + c = _client() + t1 = await c._issue_token() + t2 = await c._issue_token() + assert t1 == "TKN" and t2 == "TKN" + assert route.call_count == 1 # 캐시 → 1회만 발급 + await c.close() + + +@pytest.mark.asyncio +@respx.mock +async def test_get_quote_parses(): + respx.post(f"{BASE}/oauth2/tokenP").mock( + return_value=httpx.Response(200, json={"access_token": "TKN", "expires_in": 86400})) + respx.get(f"{BASE}/uapi/domestic-stock/v1/quotations/inquire-price").mock( + return_value=httpx.Response(200, json={"output": { + "stck_prpr": "71500", "stck_oprc": "71000", "acml_vol": "1234567"}})) + c = _client() + q = await c.get_quote("005930") + assert q["price"] == 71500 and q["day_open"] == 71000 and q["today_volume"] == 1234567 + await c.close() + + +@pytest.mark.asyncio +@respx.mock +async def test_get_daily_ascending(): + respx.post(f"{BASE}/oauth2/tokenP").mock( + return_value=httpx.Response(200, json={"access_token": "TKN", "expires_in": 86400})) + # KIS는 내림차순 반환 → 오름차순으로 뒤집혀야 함 + respx.get(f"{BASE}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice").mock( + return_value=httpx.Response(200, json={"output2": [ + {"stck_bsop_date": "20260702", "stck_oprc": "100", "stck_hgpr": "110", + "stck_lwpr": "90", "stck_clpr": "105", "acml_vol": "5"}, + {"stck_bsop_date": "20260701", "stck_oprc": "95", "stck_hgpr": "102", + "stck_lwpr": "94", "stck_clpr": "100", "acml_vol": "4"}]})) + c = _client() + bars = await c.get_daily_ohlcv("005930", days=250) + assert bars[0]["datetime"] == "2026-07-01" + assert bars[-1]["close"] == 105 + await c.close() +``` + +- [ ] **Step 2: 실패 확인** + +Run: `python -m pytest services/trade-monitor/tests/test_kis_client.py -q` +Expected: FAIL — `ModuleNotFoundError: No module named 'kis_client'` + +- [ ] **Step 3: kis_client.py 구현** + +```python +"""KIS REST client — 자체 OAuth 토큰(TM_KIS_*) + quote + 일봉 + throttle.""" +from __future__ import annotations + +import asyncio +import logging +import time +from datetime import datetime, timedelta +from zoneinfo import ZoneInfo + +import httpx + +logger = logging.getLogger(__name__) +KST = ZoneInfo("Asia/Seoul") + +_MAX_ATTEMPTS = 3 +_THROTTLE_INTERVAL = 0.5 # 초당 2회 +_TOKEN_MARGIN = 600 # 만료 10분 전 재발급 + + +class KISClient: + def __init__(self, app_key, app_secret, account, is_virtual, timeout: float = 10.0): + self._app_key = app_key + self._app_secret = app_secret + self._account = account + self._base_url = ( + "https://openapivts.koreainvestment.com:29443" if is_virtual + else "https://openapi.koreainvestment.com:9443" + ) + self._client = httpx.AsyncClient(timeout=timeout) + self._token: str | None = None + self._token_exp: float = 0.0 + self._last_throttle_at = 0.0 + self._throttle_lock = asyncio.Lock() + self._token_lock = asyncio.Lock() + + async def close(self) -> None: + await self._client.aclose() + + async def _issue_token(self) -> str: + async with self._token_lock: + now = time.time() + if self._token and now < self._token_exp - _TOKEN_MARGIN: + return self._token + r = await self._client.post( + f"{self._base_url}/oauth2/tokenP", + json={"grant_type": "client_credentials", + "appkey": self._app_key, "appsecret": self._app_secret}, + ) + r.raise_for_status() + data = r.json() + self._token = data["access_token"] + self._token_exp = now + int(data.get("expires_in", 86400)) + return self._token + + async def _throttle(self) -> None: + async with self._throttle_lock: + elapsed = time.monotonic() - self._last_throttle_at + if elapsed < _THROTTLE_INTERVAL: + await asyncio.sleep(_THROTTLE_INTERVAL - elapsed) + self._last_throttle_at = time.monotonic() + + async def _request(self, method: str, path: str, tr_id: str, **kwargs) -> dict: + token = await self._issue_token() + headers = { + "authorization": f"Bearer {token}", + "appkey": self._app_key, "appsecret": self._app_secret, + "tr_id": tr_id, "custtype": "P", + } + url = f"{self._base_url}{path}" + for attempt in range(_MAX_ATTEMPTS): + await self._throttle() + try: + resp = await self._client.request(method, url, headers=headers, **kwargs) + if resp.status_code == 429 and attempt < _MAX_ATTEMPTS - 1: + await asyncio.sleep(2 ** attempt) + continue + resp.raise_for_status() + return resp.json() + except httpx.TimeoutException: + if attempt < _MAX_ATTEMPTS - 1: + await asyncio.sleep(2 ** attempt) + continue + raise + raise RuntimeError("retry exhausted") + + async def get_quote(self, ticker: str) -> dict: + raw = await self._request( + "GET", "/uapi/domestic-stock/v1/quotations/inquire-price", + tr_id="FHKST01010100", + params={"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": ticker}, + ) + o = raw.get("output", {}) + return { + "price": int(o["stck_prpr"]), + "day_open": int(o["stck_oprc"]), + "today_volume": int(o["acml_vol"]), + "as_of": datetime.now(KST).isoformat(), + } + + async def get_daily_ohlcv(self, ticker: str, days: int = 250) -> list[dict]: + today = datetime.now(KST).strftime("%Y%m%d") + start = (datetime.now(KST) - timedelta(days=days * 2)).strftime("%Y%m%d") + raw = await self._request( + "GET", "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice", + tr_id="FHKST03010100", + params={"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": ticker, + "FID_INPUT_DATE_1": start, "FID_INPUT_DATE_2": today, + "FID_PERIOD_DIV_CODE": "D", "FID_ORG_ADJ_PRC": "1"}, + ) + bars = [] + for row in raw.get("output2", []): + try: + d = row["stck_bsop_date"] + bars.append({ + "datetime": f"{d[:4]}-{d[4:6]}-{d[6:]}", + "open": int(row["stck_oprc"]), "high": int(row["stck_hgpr"]), + "low": int(row["stck_lwpr"]), "close": int(row["stck_clpr"]), + "volume": int(row["acml_vol"]), + }) + except (KeyError, ValueError): + continue + bars.reverse() + return bars[-days:] +``` + +- [ ] **Step 4: 통과 확인** + +Run: `python -m pytest services/trade-monitor/tests/test_kis_client.py -q` +Expected: PASS (3 passed) + +> respx/pytest-asyncio 관련: `asyncio_mode` 필요 시 `pytest.ini`가 아니라 각 테스트에 `@pytest.mark.asyncio`를 명시(위 테스트대로). pytest-asyncio 미설치면 `pip install pytest-asyncio` 후 `requirements.txt`에 추가하고 커밋에 포함. + +- [ ] **Step 5: 커밋** + +```bash +git add services/trade-monitor/kis_client.py services/trade-monitor/tests/test_kis_client.py services/trade-monitor/requirements.txt +git commit -m "feat(trade-monitor): KIS 자체 토큰 + quote + 일봉 클라이언트" +``` + +--- + +### Task 6: nas_client (X-WebAI-Key) + +**Files:** +- Create: `services/trade-monitor/nas_client.py` +- Test: `services/trade-monitor/tests/test_nas_client.py` + +**Interfaces:** +- Produces `NASClient`: + - `__init__(base_url, api_key, timeout=10.0)` + - `async get_monitor_set() -> dict` (GET `/api/webai/trade-alert/monitor-set`) + - `async post_report(as_of: str, firing: list[dict]) -> dict` (POST `/api/webai/trade-alert/report`) + - `async close()` + +- [ ] **Step 1: 실패 테스트 작성** — `services/trade-monitor/tests/test_nas_client.py` + +```python +"""NASClient — monitor-set/report + X-WebAI-Key (respx).""" +import httpx +import pytest +import respx + +from nas_client import NASClient + +BASE = "http://nas.test" + + +@pytest.mark.asyncio +@respx.mock +async def test_get_monitor_set_sends_key(): + route = respx.get(f"{BASE}/api/webai/trade-alert/monitor-set").mock( + return_value=httpx.Response(200, json={"session": "regular", "buy_targets": []})) + c = NASClient(BASE, "KEY") + ms = await c.get_monitor_set() + assert ms["session"] == "regular" + assert route.calls.last.request.headers["X-WebAI-Key"] == "KEY" + await c.close() + + +@pytest.mark.asyncio +@respx.mock +async def test_post_report_payload(): + captured = {} + + def _resp(request): + import json as _j + captured.update(_j.loads(request.content)) + return httpx.Response(200, json={"new_alerts": 1, "cleared": 0}) + + respx.post(f"{BASE}/api/webai/trade-alert/report").mock(side_effect=_resp) + c = NASClient(BASE, "KEY") + firing = [{"ticker": "005930", "kind": "buy", "condition": "buy_breakout", + "price": 71500, "detail": {}}] + out = await c.post_report("2026-07-02T09:01:00+09:00", firing) + assert out["new_alerts"] == 1 + assert captured["as_of"] == "2026-07-02T09:01:00+09:00" + assert captured["firing"] == firing + await c.close() +``` + +- [ ] **Step 2: 실패 확인** + +Run: `python -m pytest services/trade-monitor/tests/test_nas_client.py -q` +Expected: FAIL — `ModuleNotFoundError: No module named 'nas_client'` + +- [ ] **Step 3: nas_client.py 구현** + +```python +"""NAS stock 백엔드 trade-alert 계약 — X-WebAI-Key + retry.""" +from __future__ import annotations + +import asyncio +import logging + +import httpx + +logger = logging.getLogger(__name__) + +_MAX_ATTEMPTS = 3 +_RETRY_STATUSES = {429, 500, 502, 503, 504} + + +class NASClient: + def __init__(self, base_url: str, api_key: str, timeout: float = 10.0): + self._base_url = base_url.rstrip("/") + self._api_key = api_key + self._client = httpx.AsyncClient(timeout=timeout) + + async def close(self) -> None: + await self._client.aclose() + + async def get_monitor_set(self) -> dict: + return await self._request("GET", "/api/webai/trade-alert/monitor-set") + + async def post_report(self, as_of: str, firing: list[dict]) -> dict: + return await self._request( + "POST", "/api/webai/trade-alert/report", + json={"as_of": as_of, "firing": firing}) + + async def _request(self, method: str, path: str, **kwargs) -> dict: + url = f"{self._base_url}{path}" + headers = {"X-WebAI-Key": self._api_key} + for attempt in range(_MAX_ATTEMPTS): + try: + resp = await self._client.request(method, url, headers=headers, **kwargs) + if resp.status_code in _RETRY_STATUSES and attempt < _MAX_ATTEMPTS - 1: + await asyncio.sleep(2 ** attempt) + continue + resp.raise_for_status() + return resp.json() + except httpx.TimeoutException: + if attempt < _MAX_ATTEMPTS - 1: + await asyncio.sleep(2 ** attempt) + continue + raise + raise RuntimeError("retry exhausted") +``` + +- [ ] **Step 4: 통과 확인** + +Run: `python -m pytest services/trade-monitor/tests/test_nas_client.py -q` +Expected: PASS (2 passed) + +- [ ] **Step 5: 커밋** + +```bash +git add services/trade-monitor/nas_client.py services/trade-monitor/tests/test_nas_client.py +git commit -m "feat(trade-monitor): NAS trade-alert 클라이언트 (monitor-set/report)" +``` + +--- + +### Task 7: monitor 오케스트레이션 + +**Files:** +- Create: `services/trade-monitor/monitor.py` +- Test: `services/trade-monitor/tests/test_monitor.py` + +**Interfaces:** +- Consumes: `NASClient`(get_monitor_set/post_report), `KISClient`(get_quote/get_daily_ohlcv), `evaluate_buy`/`evaluate_sell`, `_shared.heartbeat.WorkerStats`, `config.Settings`. +- Produces: + - `class MonitorState` — `.session_state:str="idle"`, `.last_alert_at:str|None=None` + - `filter_krx(targets: list[dict]) -> list[dict]` — 6자리 숫자 티커만 + - `async _build_ctx(kis, target: dict, settings) -> dict` — ctx 조립(quote+daily) + - `async run_cycle(nas, kis, state, stats, settings) -> None` — 1 사이클 + - `async monitor_loop(nas, kis, state, stats, settings) -> None` + - `make_state_fn(state) -> callable` — heartbeat state_fn `(redis, stats) -> (state_str, {"last_alert_at":...})` + +- [ ] **Step 1: 실패 테스트 작성** — `services/trade-monitor/tests/test_monitor.py` + +```python +"""monitor.run_cycle — 게이트/필터/조립/격리.""" +import pytest + +import monitor +from monitor import MonitorState, filter_krx, run_cycle +from config import load_settings +from _shared.heartbeat import WorkerStats + + +def test_filter_krx_keeps_only_numeric6(): + targets = [{"ticker": "005930"}, {"ticker": "AAPL"}, {"ticker": "00660"}, + {"ticker": "000660"}, {"ticker": "0059301"}] + kept = {t["ticker"] for t in filter_krx(targets)} + assert kept == {"005930", "000660"} + + +class _FakeNAS: + def __init__(self, ms): + self._ms = ms + self.reported = None + + async def get_monitor_set(self): + return self._ms + + async def post_report(self, as_of, firing): + self.reported = {"as_of": as_of, "firing": firing} + return {"new_alerts": len(firing), "cleared": 0} + + +class _FakeKIS: + def __init__(self, price=100, fail_on=None): + self._price = price + self._fail_on = fail_on or set() + + async def get_quote(self, ticker): + if ticker in self._fail_on: + raise RuntimeError("KIS down") + return {"price": self._price, "day_open": 99, "today_volume": 1000, + "as_of": "x"} + + async def get_daily_ohlcv(self, ticker, days=250): + # 정배열 + 저가 근접 → ma20_pullback 발화 유도 + return [{"open": 90, "high": 90, "low": 90, "close": 90, "volume": 1}] * 200 \ + + [{"open": 100, "high": 100, "low": 100, "close": 100, "volume": 1}] * 20 + + +@pytest.mark.asyncio +async def test_closed_session_skips_kis(monkeypatch): + nas = _FakeNAS({"session": "closed"}) + state, stats = MonitorState(), WorkerStats() + await run_cycle(nas, _FakeKIS(), state, stats, load_settings()) + assert state.session_state == "market_closed" + assert nas.reported is None # report도 안 함 + + +@pytest.mark.asyncio +async def test_non_krx_skipped_and_report_sent(): + nas = _FakeNAS({"session": "regular", + "buy_targets": [{"ticker": "AAPL", "name": "Apple"}], + "sell_targets": [], "buy_params": {}, "exit_params": {}}) + state, stats = MonitorState(), WorkerStats() + await run_cycle(nas, _FakeKIS(), state, stats, load_settings()) + assert state.session_state == "market_open" + assert nas.reported is not None + assert nas.reported["firing"] == [] # 알파벳 티커 skip → 빈 발화 + + +@pytest.mark.asyncio +async def test_firing_assembled_and_last_alert_set(): + nas = _FakeNAS({"session": "regular", + "buy_targets": [{"ticker": "005930", "name": "삼성전자"}], + "sell_targets": [], "buy_params": {"pullback_pct": 0.02}, + "exit_params": {}}) + state, stats = MonitorState(), WorkerStats() + await run_cycle(nas, _FakeKIS(price=101), state, stats, load_settings()) + conds = {f["condition"] for f in nas.reported["firing"]} + assert "buy_ma20_pullback" in conds + assert state.last_alert_at is not None + + +@pytest.mark.asyncio +async def test_per_ticker_failure_isolated(): + nas = _FakeNAS({"session": "regular", + "buy_targets": [{"ticker": "005930"}, {"ticker": "000660"}], + "sell_targets": [], "buy_params": {}, "exit_params": {}}) + state, stats = MonitorState(), WorkerStats() + # 005930은 실패, 000660은 성공 → 루프가 죽지 않고 report 전송 + await run_cycle(nas, _FakeKIS(fail_on={"005930"}), state, stats, load_settings()) + assert nas.reported is not None + assert state.session_state == "market_open" + + +@pytest.mark.asyncio +async def test_monitor_set_failure_sets_idle(): + class _BadNAS(_FakeNAS): + async def get_monitor_set(self): + raise RuntimeError("NAS down") + + nas = _BadNAS({}) + state, stats = MonitorState(), WorkerStats() + await run_cycle(nas, _FakeKIS(), state, stats, load_settings()) + assert state.session_state == "idle" + assert nas.reported is None +``` + +- [ ] **Step 2: 실패 확인** + +Run: `python -m pytest services/trade-monitor/tests/test_monitor.py -q` +Expected: FAIL — `ModuleNotFoundError: No module named 'monitor'` + +- [ ] **Step 3: monitor.py 구현** + +```python +"""오케스트레이션 — monitor-set 조회 → 조건 평가 → report + heartbeat state.""" +from __future__ import annotations + +import asyncio +import logging +from datetime import datetime +from zoneinfo import ZoneInfo + +from conditions import evaluate_buy, evaluate_sell + +logger = logging.getLogger(__name__) +KST = ZoneInfo("Asia/Seoul") + + +class MonitorState: + """monitor_loop가 갱신, heartbeat state_fn이 읽는 공유 상태.""" + def __init__(self): + self.session_state = "idle" # market_open | market_closed | idle + self.last_alert_at: str | None = None + + +def filter_krx(targets: list[dict]) -> list[dict]: + """6자리 숫자 티커(KRX)만. 알파벳 티커 skip.""" + out = [] + for t in targets: + tk = str(t.get("ticker", "")) + if tk.isdigit() and len(tk) == 6: + out.append(t) + return out + + +async def _build_ctx(kis, target: dict, settings) -> dict: + ticker = target["ticker"] + quote = await kis.get_quote(ticker) + daily = await kis.get_daily_ohlcv(ticker, 250) + return { + "ticker": ticker, "name": target.get("name", ""), + "price": quote["price"], "day_open": quote["day_open"], + "today_volume": quote["today_volume"], + "closes": [b["close"] for b in daily], + "highs": [b["high"] for b in daily], + "lows": [b["low"] for b in daily], + "volumes": [b["volume"] for b in daily], + "avg_price": target.get("avg_price"), + "qty": target.get("qty"), + "holding_high": target.get("holding_high"), + "climax_vol_mult": settings.climax_vol_mult, + } + + +async def run_cycle(nas, kis, state, stats, settings) -> None: + try: + ms = await nas.get_monitor_set() + except Exception: + logger.exception("monitor-set 조회 실패") + state.session_state = "idle" + stats.jobs_failed += 1 + return + + session = ms.get("session", "closed") + if session == "closed": + state.session_state = "market_closed" + return + + buy_targets = filter_krx(ms.get("buy_targets", [])) + sell_targets = filter_krx(ms.get("sell_targets", [])) + buy_params = ms.get("buy_params", {}) + exit_params = ms.get("exit_params", {}) + + firing: list[dict] = [] + for t in buy_targets: + try: + firing += evaluate_buy(await _build_ctx(kis, t, settings), buy_params) + except Exception: + logger.exception("buy 평가 실패 %s", t.get("ticker")) + for t in sell_targets: + try: + firing += evaluate_sell(await _build_ctx(kis, t, settings), exit_params) + except Exception: + logger.exception("sell 평가 실패 %s", t.get("ticker")) + + as_of = datetime.now(KST).isoformat(timespec="seconds") + if firing: + state.last_alert_at = as_of + logger.info("firing %d개: %s", len(firing), + [f"{f['ticker']}:{f['condition']}" for f in firing]) + try: + await nas.post_report(as_of, firing) # 빈 배열도 전송(edge clear) + except Exception: + logger.exception("report 전송 실패") + + state.session_state = "market_open" + stats.jobs_done += 1 + stats.last_job_at = as_of + + +async def monitor_loop(nas, kis, state, stats, settings) -> None: + logger.info("trade-monitor loop 시작 interval=%ds", settings.loop_interval) + while True: + try: + await run_cycle(nas, kis, state, stats, settings) + except asyncio.CancelledError: + logger.info("monitor_loop cancelled") + raise + except Exception: + logger.exception("monitor_loop iteration 실패") + await asyncio.sleep(settings.loop_interval) + + +def make_state_fn(state): + async def state_fn(redis, stats): + return state.session_state, {"last_alert_at": state.last_alert_at} + return state_fn +``` + +- [ ] **Step 4: 통과 확인** + +Run: `python -m pytest services/trade-monitor/tests/test_monitor.py -q` +Expected: PASS (6 passed) + +- [ ] **Step 5: 커밋** + +```bash +git add services/trade-monitor/monitor.py services/trade-monitor/tests/test_monitor.py +git commit -m "feat(trade-monitor): monitor 오케스트레이션 (run_cycle/loop/state_fn)" +``` + +--- + +### Task 8: main.py (FastAPI lifespan + /health) + +**Files:** +- Create: `services/trade-monitor/main.py` +- Test: `services/trade-monitor/tests/test_health.py` + +**Interfaces:** +- Consumes: `config.load_settings`, `NASClient`, `KISClient`, `monitor.*`, `_shared.heartbeat.heartbeat_loop`, `_shared.heartbeat.WorkerStats`. +- Produces: FastAPI `app` with `/health` → `{"ok": True, "service": "trade-monitor"}`. + +- [ ] **Step 1: 실패 테스트 작성** — `services/trade-monitor/tests/test_health.py` + +```python +"""/health — lifespan 미기동(직접 라우트 호출) 대신 TestClient 없이 함수 검증.""" +from main import health + + +def test_health(): + assert health() == {"ok": True, "service": "trade-monitor"} +``` + +- [ ] **Step 2: 실패 확인** + +Run: `python -m pytest services/trade-monitor/tests/test_health.py -q` +Expected: FAIL — `ModuleNotFoundError: No module named 'main'` + +- [ ] **Step 3: main.py 구현** + +```python +"""trade-monitor FastAPI entry — lifespan(monitor_loop + heartbeat_loop) + /health.""" +from __future__ import annotations + +import asyncio +import logging +from contextlib import asynccontextmanager + +import redis.asyncio as aioredis +from fastapi import FastAPI + +import monitor +from config import load_settings +from kis_client import KISClient +from nas_client import NASClient +from _shared.heartbeat import heartbeat_loop, WorkerStats + +logging.basicConfig(level=logging.INFO, + format="%(asctime)s %(name)s %(levelname)s %(message)s") +logger = logging.getLogger(__name__) + +HEARTBEAT_INTERVAL = 15 # 60초 루프 > TTL 45초 → 독립 15초 발신으로 만료갭 해소 +HEARTBEAT_TTL = 45 + + +@asynccontextmanager +async def lifespan(app: FastAPI): + settings = load_settings() + nas = NASClient(settings.nas_base_url, settings.webai_api_key) + kis = KISClient(settings.kis_app_key, settings.kis_app_secret, + settings.kis_account, settings.kis_is_virtual) + state = monitor.MonitorState() + stats = WorkerStats() + redis = aioredis.from_url(settings.redis_url, decode_responses=False) + + mon_task = asyncio.create_task( + monitor.monitor_loop(nas, kis, state, stats, settings)) + hb_task = asyncio.create_task(heartbeat_loop( + redis, "trade-monitor", "trader", stats, + interval=HEARTBEAT_INTERVAL, ttl=HEARTBEAT_TTL, + state_fn=monitor.make_state_fn(state))) + logger.info("trade-monitor lifespan 시작") + try: + yield + finally: + for t in (mon_task, hb_task): + t.cancel() + try: + await t + except asyncio.CancelledError: + pass + await kis.close() + await nas.close() + await redis.aclose() + logger.info("trade-monitor lifespan 종료") + + +app = FastAPI(lifespan=lifespan) + + +@app.get("/health") +def health(): + return {"ok": True, "service": "trade-monitor"} +``` + +- [ ] **Step 4: 통과 확인 + 전체 스위트** + +Run: `python -m pytest services/trade-monitor/tests -q` +Expected: PASS (전체 통과 — config 2 + indicators 6 + buy 6 + sell 8 + kis 3 + nas 2 + monitor 6 + health 1) + +- [ ] **Step 5: 커밋** + +```bash +git add services/trade-monitor/main.py services/trade-monitor/tests/test_health.py +git commit -m "feat(trade-monitor): FastAPI lifespan + heartbeat 배선 + /health" +``` + +--- + +### Task 9: 배포 (Dockerfile + compose + .env.example) + +**Files:** +- Create: `services/trade-monitor/Dockerfile` +- Create: `services/trade-monitor/.env.example` +- Modify: `services/docker-compose.yml` (add `trade-monitor` 서비스) + +**Interfaces:** 없음(배포 산출물). 검증은 `docker compose config`. + +- [ ] **Step 1: Dockerfile 작성** — `services/trade-monitor/Dockerfile` + +```dockerfile +FROM python:3.12-slim-bookworm +ENV PYTHONUNBUFFERED=1 + +WORKDIR /app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates tzdata \ + && rm -rf /var/lib/apt/lists/* + +COPY trade-monitor/requirements.txt /app/ +RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt + +# 공통 heartbeat 모듈 (services/_shared) — main.py가 from _shared.heartbeat import +COPY _shared /app/_shared +COPY trade-monitor/. /app/ +ENV PYTHONPATH=/app + +EXPOSE 8000 +CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"] +``` + +- [ ] **Step 2: .env.example 작성** — `services/trade-monitor/.env.example` + +``` +# Plan-realtime-trade-alerts — trade-monitor + +# NAS Redis (heartbeat) +REDIS_URL=redis://192.168.45.54:6379 + +# NAS stock 백엔드 (monitor-set / report) +NAS_BASE_URL=http://192.168.45.54:18500 +WEBAI_API_KEY= + +# KIS 자체 토큰 (ai_trade와 분리된 전용 app_key) +TM_KIS_APP_KEY= +TM_KIS_APP_SECRET= +TM_KIS_ACCOUNT= +TM_KIS_IS_VIRTUAL=0 + +# 루프 주기(초) / sell_climax 거래량 배수 임계 +TM_LOOP_INTERVAL=60 +TM_CLIMAX_VOL_MULT=3.0 +``` + +- [ ] **Step 3: docker-compose.yml에 서비스 추가** + +`services/docker-compose.yml`의 `image-render` 블록 뒤(파일 끝, 들여쓰기 2칸)에 추가: + +```yaml + trade-monitor: + build: + context: . + dockerfile: trade-monitor/Dockerfile + container_name: trade-monitor + restart: unless-stopped + ports: + - "18715:8000" + environment: + - TZ=Asia/Seoul + - REDIS_URL=${REDIS_URL:-redis://192.168.45.54:6379} + - NAS_BASE_URL=${NAS_BASE_URL:-http://192.168.45.54:18500} + - WEBAI_API_KEY=${WEBAI_API_KEY:-} + - TM_KIS_APP_KEY=${TM_KIS_APP_KEY:-} + - TM_KIS_APP_SECRET=${TM_KIS_APP_SECRET:-} + - TM_KIS_ACCOUNT=${TM_KIS_ACCOUNT:-} + - TM_KIS_IS_VIRTUAL=${TM_KIS_IS_VIRTUAL:-0} + - TM_LOOP_INTERVAL=${TM_LOOP_INTERVAL:-60} + - TM_CLIMAX_VOL_MULT=${TM_CLIMAX_VOL_MULT:-3.0} + healthcheck: + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"] + interval: 60s + timeout: 5s + retries: 3 +``` + +- [ ] **Step 4: compose 검증** + +Run: `cd services && docker compose config` (Windows 로컬에 docker 없으면 skip — NAS/WSL2에서 검증. YAML 들여쓰기만 육안 확인) +Expected: `trade-monitor` 서비스가 파싱되고 포트 18715 매핑 표시. + +- [ ] **Step 5: 커밋** + +```bash +git add services/trade-monitor/Dockerfile services/trade-monitor/.env.example services/docker-compose.yml +git commit -m "feat(trade-monitor): Dockerfile + compose 서비스(18715) + .env.example" +``` + +--- + +## Self-Review + +**1. Spec coverage:** +- §5.1 monitor-set(GET) → Task 6 `get_monitor_set` + Task 7 세션 게이트. ✅ +- §5.2 report(POST) → Task 6 `post_report` + Task 7 firing 조립(빈 배열 포함). ✅ +- §5.4 heartbeat(EX45, kind trader, state) → Task 7 `make_state_fn` + Task 8 `heartbeat_loop(15s/45s)`. ✅ +- §6 매수 3조건 → Task 3. ✅ / 매도 5조건 → Task 4. ✅ +- 비-KRX skip → Task 7 `filter_krx`. ✅ / 무상태 → Task 7(로컬 dedup 없음). ✅ +- KIS 자체 토큰 → Task 5. ✅ / 배포(_shared COPY, 18715) → Task 9. ✅ + +**2. Placeholder scan:** 실제 코드/명령/기대출력 모두 기재. `TODO: holdings_intel 대조`는 의도된 코드 주석(플래그). ✅ + +**3. Type consistency:** ctx 키(`closes/highs/lows/volumes/today_volume/day_open/avg_price/holding_high/climax_vol_mult`)가 `_build_ctx`(Task 7) 생성 ↔ `evaluate_buy/sell`(Task 3/4) 소비 일치. `WorkerStats`(jobs_done/jobs_failed/last_job_at) `_shared`와 일치. heartbeat `state_fn` 시그니처 `(redis, stats)->(str, dict)`가 `_shared.heartbeat_loop` 호출부와 일치. ✅ + +**미해결 플래그(DESIGN.md §11):** sell_climax 휴리스틱 근사, KIS 필드 실검증, 매수 해석 4주 IC 재조정, KIS rate limit 공존, after 시간외 시세. diff --git a/services/trade-monitor/config.py b/services/trade-monitor/config.py new file mode 100644 index 0000000..421176d --- /dev/null +++ b/services/trade-monitor/config.py @@ -0,0 +1,32 @@ +"""Settings — 환경변수 로드. TM_ 접두사로 ai_trade와 분리.""" +from __future__ import annotations + +import os +from dataclasses import dataclass + + +@dataclass +class Settings: + nas_base_url: str + webai_api_key: str + redis_url: str + kis_app_key: str + kis_app_secret: str + kis_account: str + kis_is_virtual: bool + loop_interval: int + climax_vol_mult: float + + +def load_settings() -> Settings: + return Settings( + nas_base_url=os.getenv("NAS_BASE_URL", "http://192.168.45.54:18500"), + webai_api_key=os.getenv("WEBAI_API_KEY", ""), + redis_url=os.getenv("REDIS_URL", "redis://192.168.45.54:6379"), + kis_app_key=os.getenv("TM_KIS_APP_KEY", ""), + kis_app_secret=os.getenv("TM_KIS_APP_SECRET", ""), + kis_account=os.getenv("TM_KIS_ACCOUNT", ""), + kis_is_virtual=os.getenv("TM_KIS_IS_VIRTUAL", "0") == "1", + loop_interval=int(os.getenv("TM_LOOP_INTERVAL", "60")), + climax_vol_mult=float(os.getenv("TM_CLIMAX_VOL_MULT", "3.0")), + ) diff --git a/services/trade-monitor/conftest.py b/services/trade-monitor/conftest.py new file mode 100644 index 0000000..5be9bf8 --- /dev/null +++ b/services/trade-monitor/conftest.py @@ -0,0 +1,5 @@ +"""services/ 루트를 sys.path에 추가 — from _shared.heartbeat import 가능하게.""" +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) diff --git a/services/trade-monitor/pytest.ini b/services/trade-monitor/pytest.ini new file mode 100644 index 0000000..2f4c80e --- /dev/null +++ b/services/trade-monitor/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +asyncio_mode = auto diff --git a/services/trade-monitor/requirements.txt b/services/trade-monitor/requirements.txt new file mode 100644 index 0000000..a69ad6a --- /dev/null +++ b/services/trade-monitor/requirements.txt @@ -0,0 +1,7 @@ +fastapi==0.115.6 +uvicorn[standard]==0.34.0 +redis>=5.0 +httpx>=0.27 +pytest>=8.0 +pytest-asyncio>=0.24 +respx>=0.21 diff --git a/services/trade-monitor/tests/__init__.py b/services/trade-monitor/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/trade-monitor/tests/test_config.py b/services/trade-monitor/tests/test_config.py new file mode 100644 index 0000000..10ac1d1 --- /dev/null +++ b/services/trade-monitor/tests/test_config.py @@ -0,0 +1,27 @@ +"""Settings env 로드 — 기본값 + override.""" +from config import load_settings + + +def test_defaults(monkeypatch): + for k in ("NAS_BASE_URL", "WEBAI_API_KEY", "REDIS_URL", "TM_KIS_APP_KEY", + "TM_KIS_APP_SECRET", "TM_KIS_ACCOUNT", "TM_KIS_IS_VIRTUAL", + "TM_LOOP_INTERVAL", "TM_CLIMAX_VOL_MULT"): + monkeypatch.delenv(k, raising=False) + s = load_settings() + assert s.nas_base_url == "http://192.168.45.54:18500" + assert s.redis_url == "redis://192.168.45.54:6379" + assert s.kis_is_virtual is False + assert s.loop_interval == 60 + assert s.climax_vol_mult == 3.0 + + +def test_override(monkeypatch): + monkeypatch.setenv("TM_KIS_IS_VIRTUAL", "1") + monkeypatch.setenv("TM_LOOP_INTERVAL", "30") + monkeypatch.setenv("TM_CLIMAX_VOL_MULT", "2.5") + monkeypatch.setenv("WEBAI_API_KEY", "secret") + s = load_settings() + assert s.kis_is_virtual is True + assert s.loop_interval == 30 + assert s.climax_vol_mult == 2.5 + assert s.webai_api_key == "secret"