Compare commits
19 Commits
6774067505
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 9abca3eeab | |||
| 5dbb11ac83 | |||
| 8e1b20190d | |||
| fa6ef6c5c8 | |||
| 12aa55ed14 | |||
| ce8983c1b9 | |||
| 04aff34883 | |||
| d761716e00 | |||
| 241ce41a6a | |||
| 366a9160d5 | |||
| 141209ad42 | |||
| 03e50d2be1 | |||
| 54fca07d43 | |||
| 574b5712c3 | |||
| 2ff31b2e76 | |||
| d1b9ff570d | |||
| 4fb3d12244 | |||
| 789a807d50 | |||
| ad141a2887 |
9
.mcp.json
Normal file
9
.mcp.json
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
{
|
||||||
|
"mcpServers": {
|
||||||
|
"co-gahusb": {
|
||||||
|
"type": "http",
|
||||||
|
"url": "https://gahusb.synology.me/api/co/mcp",
|
||||||
|
"headers": { "Authorization": "Bearer ${CO_BUS_KEY}" }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
15
CLAUDE.md
15
CLAUDE.md
@@ -15,12 +15,14 @@ Windows AI 머신 (AMD 9800X3D + RTX 5070 Ti 16GB) 의 두 신호 파이프라
|
|||||||
| `ai_trade/` | 자동매매 메인 (구 `signal_v2` 2026-05-19 rename) — Chronos-bolt + 분봉 모멘텀 + KIS WebSocket + 신호 생성 | `:8001` | **Phase 4 완료 (2026-05-17)**, Phase 5 대기 |
|
| `ai_trade/` | 자동매매 메인 (구 `signal_v2` 2026-05-19 rename) — Chronos-bolt + 분봉 모멘텀 + KIS WebSocket + 신호 생성 | `:8001` | **Phase 4 완료 (2026-05-17)**, Phase 5 대기 |
|
||||||
| `legacy/start_v1.bat` | (deprecated) V1 진입점 — root `start.bat`에서 이동됨. 자동 실행 차단 | — | **OFF** |
|
| `legacy/start_v1.bat` | (deprecated) V1 진입점 — root `start.bat`에서 이동됨. 자동 실행 차단 | — | **OFF** |
|
||||||
| `ai_trade/start.bat` | 자동매매 진입점 | — | `ai_trade/main.py` uvicorn 실행 |
|
| `ai_trade/start.bat` | 자동매매 진입점 | — | `ai_trade/main.py` uvicorn 실행 |
|
||||||
| `services/` | (예정) NAS↔Windows 분산 worker — insta-render·music-render·video-render·task-watcher | 18710~ | **Plan-B-Insta 작업 중** |
|
| `services/` | NAS↔Windows 분산 worker — insta·music·video·image-render·task-watcher·**trade-monitor(:18715, 실시간 매매 알람)** | 18710~ | 운영 |
|
||||||
| `.env` | 환경변수 (`KIS_REAL_*`, `TELEGRAM_*`, `STOCK_API_URL`, `WEBAI_API_KEY`, `LOG_LEVEL`) | — | |
|
| `.env` | 환경변수 (`KIS_REAL_*`, `TELEGRAM_*`, `STOCK_API_URL`, `WEBAI_API_KEY`, `LOG_LEVEL`) | — | |
|
||||||
| `requirements.txt` | 공용 의존성 | — | torch, chronos-forecasting, fastapi, httpx, websockets 등 |
|
| `requirements.txt` | 공용 의존성 | — | torch, chronos-forecasting, fastapi, httpx, websockets 등 |
|
||||||
|
|
||||||
`.venv` 는 **구조적으로 깨짐**: `pyvenv.cfg` 가 한글 사용자 경로(`C:\Users\박재오\...`) 를 포함하여 콘솔 코드페이지가 roundtrip 못함. 테스트는 시스템 Python 으로 실행: `C:\Users\jaeoh\AppData\Local\Programs\Python\Python312\python.exe -m pytest ai_trade/tests -q`.
|
`.venv` 는 **구조적으로 깨짐**: `pyvenv.cfg` 가 한글 사용자 경로(`C:\Users\박재오\...`) 를 포함하여 콘솔 코드페이지가 roundtrip 못함. 테스트는 시스템 Python 으로 실행: `C:\Users\jaeoh\AppData\Local\Programs\Python\Python312\python.exe -m pytest ai_trade/tests -q`.
|
||||||
|
|
||||||
|
> **분산 워커 /infra 관측 규칙 (팀 규칙, BE 제정)**: 모든 WSL docker 워커는 ① Redis `worker:<name>:heartbeat`(EX45) 발신 ② BE `node_monitor.WORKER_REGISTRY` 등재 ③ `/api/agent-office/nodes`·web-ui `/infra` 노출이 필수. trade-monitor는 kind=trader로 등재됨.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 서버 시작 방식
|
## 서버 시작 방식
|
||||||
@@ -137,3 +139,14 @@ cd C:\Users\jaeoh\Desktop\workspace\web-ai\ai_trade
|
|||||||
- **spec amendment 발생 시**: 코드는 `web-ai` 에 commit, spec 갱신은 `web-ui/docs/superpowers/specs/` 에 commit (Phase 4 spread formula 변경 사례 = web-ui commit `534ded5`)
|
- **spec amendment 발생 시**: 코드는 `web-ai` 에 commit, spec 갱신은 `web-ui/docs/superpowers/specs/` 에 commit (Phase 4 spread formula 변경 사례 = web-ui commit `534ded5`)
|
||||||
|
|
||||||
자세한 V1 가이드는 `signal_v1/CLAUDE.md` 참조 (있다면).
|
자세한 V1 가이드는 `signal_v1/CLAUDE.md` 참조 (있다면).
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 협업 팀 버스 (co-gahusb) — 이 세션의 역할: **AI**
|
||||||
|
|
||||||
|
이 세션은 AI 리서치(AI) 역할이다. co-gahusb MCP 툴로 다른 세션(FE/BE/Producer)과 협업한다.
|
||||||
|
- **소유권**: 이 세션은 `web-ai` repo만 쓴다(FE=web-ui, BE=web-backend).
|
||||||
|
- **공유 리소스 변경 전 반드시 `acquire_lock(resource, "AI")`**: 대상 = `nas-deploy`, `stock-db-schema`, `lotto-db-schema`, `memory-mirror`, `nginx-conf`, `compose`. 점유 중이면 대기, 긴 작업은 `heartbeat_lock`, 끝나면 `release_lock`.
|
||||||
|
- **모든 툴 호출에 `role="AI"`** (또는 `from_role`/`created_by`에 AI).
|
||||||
|
- **수신**: `/loop`로 주기적으로 `read_inbox("AI", after_id=<last>)` + `list_tasks(assignee_role="AI")` 확인.
|
||||||
|
- 키 `CO_BUS_KEY`는 환경변수로 주입(커밋 금지). `.mcp.json`의 `${CO_BUS_KEY}`가 프로세스 환경변수에서 치환됨 → `setx CO_BUS_KEY "..."` 후 새 터미널에서 `claude` 실행.
|
||||||
|
|||||||
60
README.md
60
README.md
@@ -3,7 +3,7 @@
|
|||||||
Windows AI 머신(AMD 9800X3D + RTX 5070 Ti 16GB)에서 동작하는 두 영역의 서비스:
|
Windows AI 머신(AMD 9800X3D + RTX 5070 Ti 16GB)에서 동작하는 두 영역의 서비스:
|
||||||
|
|
||||||
1. **ai_trade** — Confidence Signal Pipeline V2. NAS stock 백엔드와 KIS Open API를 결합해 매수/매도 신호를 생성하는 FastAPI 워커.
|
1. **ai_trade** — Confidence Signal Pipeline V2. NAS stock 백엔드와 KIS Open API를 결합해 매수/매도 신호를 생성하는 FastAPI 워커.
|
||||||
2. **services** — NAS↔Windows 분산 렌더링 워커(인스타 카드 / 음악 / 영상 / 이미지) + task-watcher.
|
2. **services** — NAS↔Windows 분산 워커: 렌더링(인스타 카드 / 음악 / 영상 / 이미지) + task-watcher + **trade-monitor**(실시간 매매 알람).
|
||||||
|
|
||||||
상위 워크스페이스 컨텍스트는 `../CLAUDE.md`, 본 디렉토리 상세는 `CLAUDE.md`, 운영 체크포인트는 `CHECK_POINT.md` 참조.
|
상위 워크스페이스 컨텍스트는 `../CLAUDE.md`, 본 디렉토리 상세는 `CLAUDE.md`, 운영 체크포인트는 `CHECK_POINT.md` 참조.
|
||||||
|
|
||||||
@@ -20,6 +20,7 @@ Windows AI 머신(AMD 9800X3D + RTX 5070 Ti 16GB)에서 동작하는 두 영역
|
|||||||
| `services/video-render/` | sora / veo / kling / seedance 4 provider 영상 생성 게이트웨이. `queue:video-render` 소비. | `:18712` |
|
| `services/video-render/` | sora / veo / kling / seedance 4 provider 영상 생성 게이트웨이. `queue:video-render` 소비. | `:18712` |
|
||||||
| `services/image-render/` | gpt_image / nano_banana / flux(ComfyUI 로컬) 3 provider. `queue:image-render` 소비. | `:18714` |
|
| `services/image-render/` | gpt_image / nano_banana / flux(ComfyUI 로컬) 3 provider. `queue:image-render` 소비. | `:18714` |
|
||||||
| `services/task-watcher/` | 박재오 작업 시간대에 `queue:paused` 토글 → 워커 일시 정지. | `:18713` |
|
| `services/task-watcher/` | 박재오 작업 시간대에 `queue:paused` 토글 → 워커 일시 정지. | `:18713` |
|
||||||
|
| `services/trade-monitor/` | 실시간 매매 알람. monitor-set pull → KIS 시세 + TA 조건(§6 8종) → report + heartbeat(kind=trader). 무상태. | `:18715` |
|
||||||
| `legacy/signal_v1/` | ⚠ **DEPRECATED** (2026-05-19). LSTM 봇. 자동 실행 차단됨. | OFF |
|
| `legacy/signal_v1/` | ⚠ **DEPRECATED** (2026-05-19). LSTM 봇. 자동 실행 차단됨. | OFF |
|
||||||
|
|
||||||
---
|
---
|
||||||
@@ -130,6 +131,60 @@ redis-cli -h 192.168.45.54 KEYS 'processing:*'
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
## trade-monitor — 실시간 매매 알람 워커 (신규 2026-07-03)
|
||||||
|
|
||||||
|
NAS stock 백엔드(`:18500`)에서 `monitor-set`을 60초마다 pull하고, KIS 실시간/일봉 시세로 TA 조건을 평가해 발화집합을 `report`로 전송. NAS가 edge diff로 신규 알림만 텔레그램/프론트에 노출. **워커 무상태**(dedup은 NAS 영속). 포트 `:18715`, WSL2 Docker. 상세 설계·조건 규칙은 `services/trade-monitor/DESIGN.md`.
|
||||||
|
|
||||||
|
### 루프 (1분)
|
||||||
|
|
||||||
|
1. `GET /api/webai/trade-alert/monitor-set` (X-WebAI-Key) → buy_targets(watch∪screener) + sell_targets(보유 avg_price/qty/holding_high) + buy_params/exit_params + session.
|
||||||
|
2. `session=="closed"`면 KIS 호출 0, idle. **비-KRX(알파벳) 티커 skip**(워커 책임).
|
||||||
|
3. KIS quote + 일봉 250봉 → 지표 계산 → 조건 평가(종목 단위 실패 격리).
|
||||||
|
4. `POST /api/webai/trade-alert/report {as_of, firing:[...]}` — 빈 배열도 전송(edge clear).
|
||||||
|
5. heartbeat `worker:trade-monitor:heartbeat` EX45 (kind=trader, state=market_open|market_closed|idle + last_alert_at). 60초 루프 > TTL45 만료갭 회피 위해 **15초 독립 태스크**.
|
||||||
|
|
||||||
|
### 조건 (§6 — `condition` 문자열이 FE 라벨/뱃지로 그대로 매핑됨)
|
||||||
|
|
||||||
|
- **매수**: `buy_ma20_pullback`(정배열 + ma20 근접 반등), `buy_breakout`(20봉 고점 돌파 + 거래량 배수), `buy_rsi_bounce`(RSI 과매도 반등, 무상태).
|
||||||
|
- **매도**: `sell_stop_loss`, `sell_take_profit`, `sell_trailing_stop`, `sell_ma_break`(ma50/ma200 severity), `sell_climax`(거래량 급증 + `price<day_high×0.97` 윗꼬리 — holdings_intel 정합됨, `exit_params` 파라미터화).
|
||||||
|
|
||||||
|
### 핵심 파일
|
||||||
|
|
||||||
|
| 파일 | 책임 |
|
||||||
|
|------|------|
|
||||||
|
| `config.py` | Settings (`TM_` 접두사, ai_trade와 분리) |
|
||||||
|
| `indicators.py` | 순수: `sma` / `rsi_series`(Wilder) / `highest_high` |
|
||||||
|
| `conditions.py` | 순수 §6: `evaluate_buy` / `evaluate_sell` |
|
||||||
|
| `kis_client.py` | KIS **자체 OAuth 토큰** + `get_quote` + `get_daily_ohlcv` + 0.5s throttle |
|
||||||
|
| `nas_client.py` | monitor-set / report (X-WebAI-Key + retry) |
|
||||||
|
| `monitor.py` | `run_cycle` / `monitor_loop` / `make_state_fn` |
|
||||||
|
| `main.py` | FastAPI lifespan + `_shared.heartbeat_loop` 배선 + `/health` |
|
||||||
|
|
||||||
|
### 환경 변수
|
||||||
|
|
||||||
|
| 변수 | 기본 | 설명 |
|
||||||
|
|------|------|------|
|
||||||
|
| `NAS_BASE_URL` | `http://192.168.45.54:18500` | stock 백엔드 |
|
||||||
|
| `WEBAI_API_KEY` | (필수) | X-WebAI-Key |
|
||||||
|
| `REDIS_URL` | `redis://192.168.45.54:6379` | heartbeat |
|
||||||
|
| `TM_KIS_APP_KEY` / `TM_KIS_APP_SECRET` / `TM_KIS_ACCOUNT` | (필수) | KIS **전용** 자체 토큰(ai_trade와 분리 발급 → 토큰 상호 무효화·EGW00201 회피) |
|
||||||
|
| `TM_KIS_IS_VIRTUAL` | `0` | 실전/모의 |
|
||||||
|
| `TM_LOOP_INTERVAL` | `60` | 루프 주기(초) |
|
||||||
|
| `TM_CLIMAX_VOL_MULT` | `3.0` | sell_climax 거래량 배수 fallback (우선순위: monitor-set `exit_params.climax_vol_x` > 이 값) |
|
||||||
|
|
||||||
|
### 상태
|
||||||
|
|
||||||
|
⏳ 구현·머지 완료(테스트 36/36, sell_climax holdings_intel 정합 포함), **미배포**. 배포 전: ① 전용 KIS 앱키 발급·주입(박재오 진행 중) ② 첫 운영 KIS 필드 검증(stck_hgpr 등). BE가 `node_monitor.WORKER_REGISTRY`에 등재 완료 → 배포 시 `/api/agent-office/nodes`·web-ui `/infra`에 trader 노드 자동 노출(미배포 동안 down, 무경보).
|
||||||
|
|
||||||
|
### 시작 (NAS, WSL2 Docker)
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd services
|
||||||
|
docker compose up -d trade-monitor
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
## 환경 변수 (ai_trade)
|
## 환경 변수 (ai_trade)
|
||||||
|
|
||||||
| 변수 | 기본 | 설명 |
|
| 변수 | 기본 | 설명 |
|
||||||
@@ -168,6 +223,7 @@ cd services/insta-render && python -m pytest tests/ -q
|
|||||||
cd services/music-render && python -m pytest tests/ -q
|
cd services/music-render && python -m pytest tests/ -q
|
||||||
cd services/video-render && python -m pytest tests/ -q
|
cd services/video-render && python -m pytest tests/ -q
|
||||||
cd services/image-render && python -m pytest tests/ -q
|
cd services/image-render && python -m pytest tests/ -q
|
||||||
|
cd services/trade-monitor && python -m pytest tests/ -q # 36 tests
|
||||||
```
|
```
|
||||||
|
|
||||||
**`.venv` 한글 사용자 경로 깨짐**으로 시스템 Python(`C:\Users\jaeoh\AppData\Local\Programs\Python\Python312\python.exe`) 사용 권장. 또는 `py -3.12 -m pytest …`.
|
**`.venv` 한글 사용자 경로 깨짐**으로 시스템 Python(`C:\Users\jaeoh\AppData\Local\Programs\Python\Python312\python.exe`) 사용 권장. 또는 `py -3.12 -m pytest …`.
|
||||||
@@ -183,6 +239,8 @@ cd services/image-render && python -m pytest tests/ -q
|
|||||||
5. **services worker_id** — env로 명시 권장. hostname 기반 default는 컨테이너 재기동 시 바뀌어 orphan 분실 위험.
|
5. **services worker_id** — env로 명시 권장. hostname 기반 default는 컨테이너 재기동 시 바뀌어 orphan 분실 위험.
|
||||||
6. **dead-letter 누적** — `redis-cli LLEN dead_letter:*` 정기 점검 필요.
|
6. **dead-letter 누적** — `redis-cli LLEN dead_letter:*` 정기 점검 필요.
|
||||||
7. **Dockerfile build context** — `services/` 루트 (각 worker 디렉토리 아님). compose 변경 동반.
|
7. **Dockerfile build context** — `services/` 루트 (각 worker 디렉토리 아님). compose 변경 동반.
|
||||||
|
8. **분산 워커 /infra 관측 필수 (팀 규칙)** — 모든 WSL docker 워커는 heartbeat(`worker:<name>:heartbeat` EX45) + BE `node_monitor.WORKER_REGISTRY` 등재 + `/infra` 노출이 필수. trade-monitor는 kind=trader로 등재됨.
|
||||||
|
9. **trade-monitor KIS 앱키 분리** — ai_trade와 **다른 전용 app_key**(`TM_KIS_*`) 사용. 같은 app_key 공유 시 토큰 상호 무효화 + EGW00201. 실전 최대 89앱 발급 가능.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
|||||||
57
ai_trade/heartbeat.py
Normal file
57
ai_trade/heartbeat.py
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
"""ai_trade heartbeat — NAS Redis로 worker:ai_trade:heartbeat SET.
|
||||||
|
|
||||||
|
Global Constraints 계약 1: kind=trader, state=market_open|market_closed.
|
||||||
|
ai_trade는 Windows 호스트 실행이라 _shared import 경로가 달라 자체 미니 헬퍼로 둔다.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import datetime as dt
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
|
||||||
|
import redis.asyncio as aioredis
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
|
||||||
|
KEY = "worker:ai_trade:heartbeat"
|
||||||
|
INTERVAL = int(os.getenv("HEARTBEAT_INTERVAL", "15"))
|
||||||
|
TTL = int(os.getenv("HEARTBEAT_TTL", "45"))
|
||||||
|
|
||||||
|
|
||||||
|
def build_trader_payload(state: str, signals: int = 0) -> str:
|
||||||
|
"""JSON 문자열 반환. state: 'market_open' | 'market_closed'."""
|
||||||
|
return json.dumps({
|
||||||
|
"name": "ai_trade",
|
||||||
|
"kind": "trader",
|
||||||
|
"state": state,
|
||||||
|
"ts": dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
|
||||||
|
"last_job_at": None,
|
||||||
|
"jobs_done": signals,
|
||||||
|
"jobs_failed": 0,
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
async def heartbeat_loop(state_fn) -> None:
|
||||||
|
"""Redis에 HEARTBEAT_INTERVAL마다 SET EX TTL.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
state_fn: () -> (state: str, signals: int). 호출자가 폴링 윈도우 판정 주입.
|
||||||
|
"""
|
||||||
|
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
state, signals = state_fn()
|
||||||
|
payload = build_trader_payload(state, signals)
|
||||||
|
await redis.set(KEY, payload, ex=TTL)
|
||||||
|
logger.debug("ai_trade heartbeat sent: state=%s signals=%d", state, signals)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
|
except Exception:
|
||||||
|
logger.exception("ai_trade heartbeat 실패 — 다음 주기에 재시도")
|
||||||
|
await asyncio.sleep(INTERVAL)
|
||||||
|
finally:
|
||||||
|
await redis.aclose()
|
||||||
@@ -3,9 +3,12 @@ from __future__ import annotations
|
|||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
from datetime import datetime
|
||||||
|
from zoneinfo import ZoneInfo
|
||||||
|
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
|
|
||||||
|
from ai_trade import heartbeat as _hb
|
||||||
from ai_trade import state as state_mod
|
from ai_trade import state as state_mod
|
||||||
from ai_trade.chronos_predictor import ChronosPredictor
|
from ai_trade.chronos_predictor import ChronosPredictor
|
||||||
from ai_trade.config import get_settings
|
from ai_trade.config import get_settings
|
||||||
@@ -13,8 +16,11 @@ from ai_trade.kis_client import KISClient
|
|||||||
from ai_trade.kis_websocket import KISWebSocket
|
from ai_trade.kis_websocket import KISWebSocket
|
||||||
from ai_trade.pull_worker import poll_loop, make_asking_price_callback
|
from ai_trade.pull_worker import poll_loop, make_asking_price_callback
|
||||||
from ai_trade.rate_limit import SignalDedup
|
from ai_trade.rate_limit import SignalDedup
|
||||||
|
from ai_trade.scheduler import _is_polling_window, _is_market_day
|
||||||
from ai_trade.stock_client import StockClient
|
from ai_trade.stock_client import StockClient
|
||||||
|
|
||||||
|
_KST = ZoneInfo("Asia/Seoul")
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@@ -23,6 +29,7 @@ class AppContext:
|
|||||||
dedup: SignalDedup | None = None
|
dedup: SignalDedup | None = None
|
||||||
shutdown: asyncio.Event | None = None
|
shutdown: asyncio.Event | None = None
|
||||||
poll_task: asyncio.Task | None = None
|
poll_task: asyncio.Task | None = None
|
||||||
|
hb_task: asyncio.Task | None = None
|
||||||
kis_client: KISClient | None = None
|
kis_client: KISClient | None = None
|
||||||
kis_ws: KISWebSocket | None = None
|
kis_ws: KISWebSocket | None = None
|
||||||
chronos: ChronosPredictor | None = None
|
chronos: ChronosPredictor | None = None
|
||||||
@@ -87,9 +94,27 @@ async def lifespan(app: FastAPI):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _trader_state() -> tuple[str, int]:
|
||||||
|
"""scheduler의 실제 폴링 윈도우 판정으로 market_open/market_closed 결정."""
|
||||||
|
now = datetime.now(_KST)
|
||||||
|
is_open = _is_market_day(now) and _is_polling_window(now)
|
||||||
|
state_str = "market_open" if is_open else "market_closed"
|
||||||
|
signals = len(state_mod.state.signals)
|
||||||
|
return state_str, signals
|
||||||
|
|
||||||
|
_ctx.hb_task = asyncio.create_task(_hb.heartbeat_loop(_trader_state))
|
||||||
|
|
||||||
yield
|
yield
|
||||||
|
|
||||||
# Shutdown
|
# Shutdown heartbeat task
|
||||||
|
if _ctx.hb_task is not None:
|
||||||
|
_ctx.hb_task.cancel()
|
||||||
|
try:
|
||||||
|
await _ctx.hb_task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Shutdown poll task
|
||||||
if _ctx.shutdown is not None:
|
if _ctx.shutdown is not None:
|
||||||
_ctx.shutdown.set()
|
_ctx.shutdown.set()
|
||||||
if _ctx.poll_task is not None:
|
if _ctx.poll_task is not None:
|
||||||
|
|||||||
38
ai_trade/tests/test_heartbeat.py
Normal file
38
ai_trade/tests/test_heartbeat.py
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
"""Tests for ai_trade heartbeat payload builder."""
|
||||||
|
import json
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
def test_trader_payload_market_open():
|
||||||
|
from ai_trade.heartbeat import build_trader_payload
|
||||||
|
|
||||||
|
p = json.loads(build_trader_payload("market_open", signals=2))
|
||||||
|
assert p["name"] == "ai_trade"
|
||||||
|
assert p["kind"] == "trader"
|
||||||
|
assert p["state"] == "market_open"
|
||||||
|
assert p["ts"].endswith("Z")
|
||||||
|
assert p["jobs_done"] == 2
|
||||||
|
|
||||||
|
|
||||||
|
def test_trader_payload_market_closed():
|
||||||
|
from ai_trade.heartbeat import build_trader_payload
|
||||||
|
|
||||||
|
p = json.loads(build_trader_payload("market_closed"))
|
||||||
|
assert p["name"] == "ai_trade"
|
||||||
|
assert p["kind"] == "trader"
|
||||||
|
assert p["state"] == "market_closed"
|
||||||
|
assert p["jobs_done"] == 0
|
||||||
|
assert p["jobs_failed"] == 0
|
||||||
|
assert p["last_job_at"] is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_trader_payload_ts_format():
|
||||||
|
"""ts 필드가 ISO 8601 UTC 형식 (YYYY-MM-DDTHH:MM:SSZ)인지 확인."""
|
||||||
|
from ai_trade.heartbeat import build_trader_payload
|
||||||
|
import re
|
||||||
|
|
||||||
|
p = json.loads(build_trader_payload("market_open"))
|
||||||
|
assert re.match(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z", p["ts"]), (
|
||||||
|
f"ts={p['ts']!r} does not match expected UTC format"
|
||||||
|
)
|
||||||
@@ -7,6 +7,7 @@ pytest>=8.0
|
|||||||
pytest-asyncio>=0.23
|
pytest-asyncio>=0.23
|
||||||
respx>=0.21
|
respx>=0.21
|
||||||
websockets>=12
|
websockets>=12
|
||||||
|
redis>=5.0
|
||||||
# Phase 3b dependencies (Chronos-2 + ML)
|
# Phase 3b dependencies (Chronos-2 + ML)
|
||||||
transformers>=4.40
|
transformers>=4.40
|
||||||
chronos-forecasting>=1.4
|
chronos-forecasting>=1.4
|
||||||
|
|||||||
55
services/_shared/heartbeat.py
Normal file
55
services/_shared/heartbeat.py
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
"""분산 워커 heartbeat — worker:<name>:heartbeat SET (TTL). Global Constraints 계약 1."""
|
||||||
|
from __future__ import annotations
|
||||||
|
import asyncio, datetime as dt, json, logging, os
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
DEFAULT_INTERVAL = int(os.getenv("HEARTBEAT_INTERVAL", "15"))
|
||||||
|
DEFAULT_TTL = int(os.getenv("HEARTBEAT_TTL", "45"))
|
||||||
|
|
||||||
|
|
||||||
|
class WorkerStats:
|
||||||
|
"""worker_loop가 갱신, heartbeat_loop가 읽는 가변 카운터."""
|
||||||
|
def __init__(self):
|
||||||
|
self.busy = False
|
||||||
|
self.jobs_done = 0
|
||||||
|
self.jobs_failed = 0
|
||||||
|
self.last_job_at = None # ISO str | None
|
||||||
|
|
||||||
|
|
||||||
|
def utc_now_iso() -> str:
|
||||||
|
return dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||||
|
|
||||||
|
|
||||||
|
def build_payload(name: str, kind: str, state: str, stats: WorkerStats, extra: dict | None = None) -> str:
|
||||||
|
payload = {
|
||||||
|
"name": name, "kind": kind, "state": state, "ts": utc_now_iso(),
|
||||||
|
"last_job_at": stats.last_job_at,
|
||||||
|
"jobs_done": stats.jobs_done, "jobs_failed": stats.jobs_failed,
|
||||||
|
}
|
||||||
|
if extra:
|
||||||
|
payload.update(extra)
|
||||||
|
return json.dumps(payload)
|
||||||
|
|
||||||
|
|
||||||
|
async def render_state(redis, stats: WorkerStats, paused_key: str = "queue:paused") -> str:
|
||||||
|
if await redis.get(paused_key) == b"1":
|
||||||
|
return "paused"
|
||||||
|
return "busy" if stats.busy else "idle"
|
||||||
|
|
||||||
|
|
||||||
|
async def heartbeat_loop(redis, name, kind, stats, *, interval=DEFAULT_INTERVAL,
|
||||||
|
ttl=DEFAULT_TTL, paused_key="queue:paused", state_fn=None):
|
||||||
|
key = f"worker:{name}:heartbeat"
|
||||||
|
logger.info("heartbeat 시작 name=%s ttl=%ds", name, ttl)
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
if state_fn is not None:
|
||||||
|
state, extra = await state_fn(redis, stats)
|
||||||
|
else:
|
||||||
|
state, extra = await render_state(redis, stats, paused_key), None
|
||||||
|
await redis.set(key, build_payload(name, kind, state, stats, extra), ex=ttl)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
|
except Exception:
|
||||||
|
logger.exception("heartbeat 발신 실패 name=%s", name)
|
||||||
|
await asyncio.sleep(interval)
|
||||||
46
services/_shared/tests/test_heartbeat.py
Normal file
46
services/_shared/tests/test_heartbeat.py
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
"""Tests for _shared.heartbeat — Task A1."""
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# Make `_shared` importable (same pattern as test_reliable_queue.py)
|
||||||
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent))
|
||||||
|
|
||||||
|
from _shared.heartbeat import WorkerStats, build_payload, render_state
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_payload_has_contract_fields():
|
||||||
|
s = WorkerStats(); s.jobs_done = 3; s.last_job_at = "2026-06-29T00:00:00Z"
|
||||||
|
payload = json.loads(build_payload("image-render", "render", "idle", s))
|
||||||
|
assert payload["name"] == "image-render"
|
||||||
|
assert payload["kind"] == "render"
|
||||||
|
assert payload["state"] == "idle"
|
||||||
|
assert payload["jobs_done"] == 3
|
||||||
|
assert payload["last_job_at"] == "2026-06-29T00:00:00Z"
|
||||||
|
assert payload["ts"].endswith("Z")
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_payload_merges_extra():
|
||||||
|
payload = json.loads(build_payload("task-watcher", "watcher", "free", WorkerStats(), extra={"mode": "free"}))
|
||||||
|
assert payload["mode"] == "free"
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeRedis:
|
||||||
|
def __init__(self, paused): self._paused = paused
|
||||||
|
async def get(self, key): return b"1" if self._paused else None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_render_state_paused_overrides_busy():
|
||||||
|
s = WorkerStats(); s.busy = True
|
||||||
|
assert await render_state(_FakeRedis(paused=True), s) == "paused"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_render_state_busy_then_idle():
|
||||||
|
s = WorkerStats(); s.busy = True
|
||||||
|
assert await render_state(_FakeRedis(paused=False), s) == "busy"
|
||||||
|
s.busy = False
|
||||||
|
assert await render_state(_FakeRedis(paused=False), s) == "idle"
|
||||||
@@ -14,7 +14,7 @@ services:
|
|||||||
- REDIS_URL=${REDIS_URL:-redis://192.168.45.54:6379}
|
- REDIS_URL=${REDIS_URL:-redis://192.168.45.54:6379}
|
||||||
- NAS_BASE_URL=${NAS_BASE_URL:-http://192.168.45.54:18700}
|
- NAS_BASE_URL=${NAS_BASE_URL:-http://192.168.45.54:18700}
|
||||||
- INTERNAL_API_KEY=${INTERNAL_API_KEY:-}
|
- INTERNAL_API_KEY=${INTERNAL_API_KEY:-}
|
||||||
- INSTA_MEDIA_ROOT=${INSTA_MEDIA_ROOT:-/mnt/nas/webpage/data/insta}
|
- INSTA_MEDIA_ROOT=${INSTA_MEDIA_ROOT:-/mnt/nas/webpage/data/insta/insta_cards}
|
||||||
- INSTA_MEDIA_URL_PREFIX=${INSTA_MEDIA_URL_PREFIX:-/media/insta}
|
- INSTA_MEDIA_URL_PREFIX=${INSTA_MEDIA_URL_PREFIX:-/media/insta}
|
||||||
- CARD_TEMPLATE_DIR=/app/templates
|
- CARD_TEMPLATE_DIR=/app/templates
|
||||||
volumes:
|
volumes:
|
||||||
@@ -82,7 +82,8 @@ services:
|
|||||||
|
|
||||||
task-watcher:
|
task-watcher:
|
||||||
build:
|
build:
|
||||||
context: ./task-watcher
|
context: .
|
||||||
|
dockerfile: task-watcher/Dockerfile
|
||||||
container_name: task-watcher
|
container_name: task-watcher
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
ports:
|
ports:
|
||||||
@@ -126,3 +127,28 @@ services:
|
|||||||
interval: 60s
|
interval: 60s
|
||||||
timeout: 5s
|
timeout: 5s
|
||||||
retries: 3
|
retries: 3
|
||||||
|
|
||||||
|
trade-monitor:
|
||||||
|
build:
|
||||||
|
context: .
|
||||||
|
dockerfile: trade-monitor/Dockerfile
|
||||||
|
container_name: trade-monitor
|
||||||
|
restart: unless-stopped
|
||||||
|
ports:
|
||||||
|
- "18715:8000"
|
||||||
|
environment:
|
||||||
|
- TZ=Asia/Seoul
|
||||||
|
- REDIS_URL=${REDIS_URL:-redis://192.168.45.54:6379}
|
||||||
|
- NAS_BASE_URL=${NAS_BASE_URL:-http://192.168.45.54:18500}
|
||||||
|
- WEBAI_API_KEY=${WEBAI_API_KEY:-}
|
||||||
|
- TM_KIS_APP_KEY=${TM_KIS_APP_KEY:-}
|
||||||
|
- TM_KIS_APP_SECRET=${TM_KIS_APP_SECRET:-}
|
||||||
|
- TM_KIS_ACCOUNT=${TM_KIS_ACCOUNT:-}
|
||||||
|
- TM_KIS_IS_VIRTUAL=${TM_KIS_IS_VIRTUAL:-0}
|
||||||
|
- TM_LOOP_INTERVAL=${TM_LOOP_INTERVAL:-60}
|
||||||
|
- TM_CLIMAX_VOL_MULT=${TM_CLIMAX_VOL_MULT:-3.0}
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
|
||||||
|
interval: 60s
|
||||||
|
timeout: 5s
|
||||||
|
retries: 3
|
||||||
|
|||||||
@@ -3,11 +3,14 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
|
||||||
|
import redis.asyncio as aioredis
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
|
|
||||||
import worker
|
import worker
|
||||||
|
from _shared.heartbeat import heartbeat_loop
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -16,15 +19,19 @@ logger = logging.getLogger(__name__)
|
|||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
worker_task = asyncio.create_task(worker.worker_loop())
|
worker_task = asyncio.create_task(worker.worker_loop())
|
||||||
|
hb_redis = aioredis.from_url(os.getenv("REDIS_URL", "redis://192.168.45.54:6379"), decode_responses=False)
|
||||||
|
hb_task = asyncio.create_task(heartbeat_loop(hb_redis, "image-render", "render", worker.stats))
|
||||||
logger.info("image-render lifespan 시작")
|
logger.info("image-render lifespan 시작")
|
||||||
try:
|
try:
|
||||||
yield
|
yield
|
||||||
finally:
|
finally:
|
||||||
worker_task.cancel()
|
for t in (worker_task, hb_task):
|
||||||
try:
|
t.cancel()
|
||||||
await worker_task
|
try:
|
||||||
except asyncio.CancelledError:
|
await t
|
||||||
pass
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
await hb_redis.aclose()
|
||||||
logger.info("image-render lifespan 종료")
|
logger.info("image-render lifespan 종료")
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -67,3 +67,25 @@ async def test_poll_once_returns_false_on_timeout(monkeypatch):
|
|||||||
assert handled is False
|
assert handled is False
|
||||||
fake_queue.ack.assert_not_awaited()
|
fake_queue.ack.assert_not_awaited()
|
||||||
fake_queue.fail.assert_not_awaited()
|
fake_queue.fail.assert_not_awaited()
|
||||||
|
|
||||||
|
|
||||||
|
# ----- heartbeat stats 카운터 -----
|
||||||
|
|
||||||
|
class _OneJobQueue:
|
||||||
|
def __init__(self): self.acked = False
|
||||||
|
async def dequeue(self, timeout=5):
|
||||||
|
if self.acked: return None
|
||||||
|
return ({"job_type": "flux_generation", "task_id": "t1", "params": {}}, b"raw")
|
||||||
|
async def ack(self, raw): self.acked = True
|
||||||
|
async def fail(self, raw, payload): pass
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_poll_once_increments_jobs_done(monkeypatch):
|
||||||
|
worker.stats.jobs_done = 0
|
||||||
|
monkeypatch.setattr(worker, "run_flux_generation", lambda task_id, params: None)
|
||||||
|
handled = await worker.poll_once(_OneJobQueue())
|
||||||
|
assert handled is True
|
||||||
|
assert worker.stats.jobs_done == 1
|
||||||
|
assert worker.stats.busy is False
|
||||||
|
assert worker.stats.last_job_at is not None
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ from providers.gpt_image import run_gpt_image_generation
|
|||||||
from providers.nano_banana import run_nano_banana_generation
|
from providers.nano_banana import run_nano_banana_generation
|
||||||
from providers.flux import run_flux_generation
|
from providers.flux import run_flux_generation
|
||||||
from _shared.reliable_queue import ReliableQueue
|
from _shared.reliable_queue import ReliableQueue
|
||||||
|
from _shared.heartbeat import WorkerStats, utc_now_iso
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -25,6 +26,8 @@ REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
|
|||||||
QUEUE_KEY = "queue:image-render"
|
QUEUE_KEY = "queue:image-render"
|
||||||
PAUSED_KEY = "queue:paused"
|
PAUSED_KEY = "queue:paused"
|
||||||
|
|
||||||
|
stats = WorkerStats()
|
||||||
|
|
||||||
# string names so `unittest.mock.patch` / `monkeypatch.setattr` on `worker.<name>`
|
# string names so `unittest.mock.patch` / `monkeypatch.setattr` on `worker.<name>`
|
||||||
# is correctly intercepted by getattr(sys.modules[__name__], ...)
|
# is correctly intercepted by getattr(sys.modules[__name__], ...)
|
||||||
_DISPATCH_TABLE = {
|
_DISPATCH_TABLE = {
|
||||||
@@ -59,14 +62,21 @@ async def poll_once(queue: ReliableQueue) -> bool:
|
|||||||
if result is None:
|
if result is None:
|
||||||
return False
|
return False
|
||||||
payload, raw = result
|
payload, raw = result
|
||||||
|
stats.busy = True
|
||||||
try:
|
try:
|
||||||
await asyncio.to_thread(_dispatch, payload)
|
await asyncio.to_thread(_dispatch, payload)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("dispatch unhandled exception task_id=%s",
|
logger.exception("dispatch unhandled exception task_id=%s",
|
||||||
payload.get("task_id"))
|
payload.get("task_id"))
|
||||||
await queue.fail(raw, payload)
|
await queue.fail(raw, payload)
|
||||||
|
stats.jobs_failed += 1
|
||||||
|
stats.last_job_at = utc_now_iso()
|
||||||
|
stats.busy = False
|
||||||
return True
|
return True
|
||||||
await queue.ack(raw)
|
await queue.ack(raw)
|
||||||
|
stats.jobs_done += 1
|
||||||
|
stats.last_job_at = utc_now_iso()
|
||||||
|
stats.busy = False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -7,8 +7,9 @@ REDIS_URL=redis://192.168.45.54:6379
|
|||||||
NAS_BASE_URL=http://192.168.45.54:18700
|
NAS_BASE_URL=http://192.168.45.54:18700
|
||||||
INTERNAL_API_KEY=__copy_from_nas_dotenv__
|
INTERNAL_API_KEY=__copy_from_nas_dotenv__
|
||||||
|
|
||||||
# NAS SMB mount 안의 미디어 디렉토리 (/mnt/nas/webpage/data/insta/)
|
# NAS SMB mount 안의 미디어 디렉토리.
|
||||||
INSTA_MEDIA_ROOT=/mnt/nas/webpage/data/insta
|
# ⚠️ nginx가 /media/insta를 data/insta/insta_cards/로 서빙하므로 반드시 insta_cards까지 포함.
|
||||||
|
INSTA_MEDIA_ROOT=/mnt/nas/webpage/data/insta/insta_cards
|
||||||
|
|
||||||
# nginx 서빙 prefix (NAS webhook payload에 보낼 result_path 만들 때)
|
# nginx 서빙 prefix (NAS webhook payload에 보낼 result_path 만들 때)
|
||||||
INSTA_MEDIA_URL_PREFIX=/media/insta
|
INSTA_MEDIA_URL_PREFIX=/media/insta
|
||||||
|
|||||||
@@ -3,12 +3,15 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
|
||||||
|
import redis.asyncio as aioredis
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
|
|
||||||
import card_renderer
|
import card_renderer
|
||||||
import worker
|
import worker
|
||||||
|
from _shared.heartbeat import heartbeat_loop
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -20,15 +23,19 @@ async def lifespan(app: FastAPI):
|
|||||||
await card_renderer.init_browser()
|
await card_renderer.init_browser()
|
||||||
# 큐 워커 백그라운드 시작
|
# 큐 워커 백그라운드 시작
|
||||||
worker_task = asyncio.create_task(worker.worker_loop())
|
worker_task = asyncio.create_task(worker.worker_loop())
|
||||||
|
hb_redis = aioredis.from_url(os.getenv("REDIS_URL", "redis://192.168.45.54:6379"), decode_responses=False)
|
||||||
|
hb_task = asyncio.create_task(heartbeat_loop(hb_redis, "insta-render", "render", worker.stats))
|
||||||
logger.info("insta-render lifespan 시작")
|
logger.info("insta-render lifespan 시작")
|
||||||
try:
|
try:
|
||||||
yield
|
yield
|
||||||
finally:
|
finally:
|
||||||
worker_task.cancel()
|
for t in (worker_task, hb_task):
|
||||||
try:
|
t.cancel()
|
||||||
await worker_task
|
try:
|
||||||
except asyncio.CancelledError:
|
await t
|
||||||
pass
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
await hb_redis.aclose()
|
||||||
await card_renderer.shutdown_browser()
|
await card_renderer.shutdown_browser()
|
||||||
logger.info("insta-render lifespan 종료")
|
logger.info("insta-render lifespan 종료")
|
||||||
|
|
||||||
|
|||||||
@@ -230,3 +230,27 @@ def test_make_queue_redis_socket_timeout_exceeds_block():
|
|||||||
c = worker.make_queue_redis()
|
c = worker.make_queue_redis()
|
||||||
st = c.connection_pool.connection_kwargs.get("socket_timeout")
|
st = c.connection_pool.connection_kwargs.get("socket_timeout")
|
||||||
assert st is not None and st > 5 # blmove 블록(5s)보다 커야 안정
|
assert st is not None and st > 5 # blmove 블록(5s)보다 커야 안정
|
||||||
|
|
||||||
|
|
||||||
|
# ----- heartbeat stats 카운터 -----
|
||||||
|
|
||||||
|
class _OneJobQueueInsta:
|
||||||
|
def __init__(self): self.acked = False
|
||||||
|
async def dequeue(self, timeout=5):
|
||||||
|
if self.acked: return None
|
||||||
|
return ({"task_id": "t1", "params": {"slate_id": 1, "theme": "default"}}, b"raw")
|
||||||
|
async def ack(self, raw): self.acked = True
|
||||||
|
async def fail(self, raw, payload): pass
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_poll_once_increments_jobs_done(monkeypatch):
|
||||||
|
worker.stats.jobs_done = 0
|
||||||
|
async def fake_process(client, payload): pass
|
||||||
|
monkeypatch.setattr(worker, "_process_one", fake_process)
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
handled = await worker.poll_once(_OneJobQueueInsta(), client)
|
||||||
|
assert handled is True
|
||||||
|
assert worker.stats.jobs_done == 1
|
||||||
|
assert worker.stats.busy is False
|
||||||
|
assert worker.stats.last_job_at is not None
|
||||||
|
|||||||
@@ -14,9 +14,11 @@ import redis.asyncio as aioredis
|
|||||||
|
|
||||||
from card_renderer import render_slate
|
from card_renderer import render_slate
|
||||||
from _shared.reliable_queue import ReliableQueue
|
from _shared.reliable_queue import ReliableQueue
|
||||||
|
from _shared.heartbeat import WorkerStats, utc_now_iso
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
stats = WorkerStats()
|
||||||
|
|
||||||
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
|
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
|
||||||
NAS_BASE_URL = os.getenv("NAS_BASE_URL", "http://192.168.45.54:18700")
|
NAS_BASE_URL = os.getenv("NAS_BASE_URL", "http://192.168.45.54:18700")
|
||||||
@@ -89,12 +91,19 @@ async def poll_once(queue: ReliableQueue, client: httpx.AsyncClient) -> bool:
|
|||||||
if result is None:
|
if result is None:
|
||||||
return False
|
return False
|
||||||
payload, raw = result
|
payload, raw = result
|
||||||
|
stats.busy = True
|
||||||
try:
|
try:
|
||||||
await _process_one(client, payload)
|
await _process_one(client, payload)
|
||||||
except Exception:
|
except Exception:
|
||||||
await queue.fail(raw, payload)
|
await queue.fail(raw, payload)
|
||||||
|
stats.jobs_failed += 1
|
||||||
|
stats.last_job_at = utc_now_iso()
|
||||||
|
stats.busy = False
|
||||||
return True
|
return True
|
||||||
await queue.ack(raw)
|
await queue.ack(raw)
|
||||||
|
stats.jobs_done += 1
|
||||||
|
stats.last_job_at = utc_now_iso()
|
||||||
|
stats.busy = False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -7,12 +7,15 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
|
||||||
|
import redis.asyncio as aioredis
|
||||||
from fastapi import FastAPI, HTTPException
|
from fastapi import FastAPI, HTTPException
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
import worker
|
import worker
|
||||||
|
from _shared.heartbeat import heartbeat_loop
|
||||||
from providers.sync_ops import (
|
from providers.sync_ops import (
|
||||||
generate_lyrics, get_credits,
|
generate_lyrics, get_credits,
|
||||||
get_timestamped_lyrics, generate_style_boost,
|
get_timestamped_lyrics, generate_style_boost,
|
||||||
@@ -25,15 +28,19 @@ logger = logging.getLogger(__name__)
|
|||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
worker_task = asyncio.create_task(worker.worker_loop())
|
worker_task = asyncio.create_task(worker.worker_loop())
|
||||||
|
hb_redis = aioredis.from_url(os.getenv("REDIS_URL", "redis://192.168.45.54:6379"), decode_responses=False)
|
||||||
|
hb_task = asyncio.create_task(heartbeat_loop(hb_redis, "music-render", "render", worker.stats))
|
||||||
logger.info("music-render lifespan 시작")
|
logger.info("music-render lifespan 시작")
|
||||||
try:
|
try:
|
||||||
yield
|
yield
|
||||||
finally:
|
finally:
|
||||||
worker_task.cancel()
|
for t in (worker_task, hb_task):
|
||||||
try:
|
t.cancel()
|
||||||
await worker_task
|
try:
|
||||||
except asyncio.CancelledError:
|
await t
|
||||||
pass
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
await hb_redis.aclose()
|
||||||
logger.info("music-render lifespan 종료")
|
logger.info("music-render lifespan 종료")
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -167,3 +167,25 @@ async def test_poll_once_returns_false_on_timeout(monkeypatch):
|
|||||||
dispatch_mock.assert_not_called()
|
dispatch_mock.assert_not_called()
|
||||||
fake_queue.ack.assert_not_awaited()
|
fake_queue.ack.assert_not_awaited()
|
||||||
fake_queue.fail.assert_not_awaited()
|
fake_queue.fail.assert_not_awaited()
|
||||||
|
|
||||||
|
|
||||||
|
# ----- heartbeat stats 카운터 -----
|
||||||
|
|
||||||
|
class _OneJobQueue:
|
||||||
|
def __init__(self): self.acked = False
|
||||||
|
async def dequeue(self, timeout=5):
|
||||||
|
if self.acked: return None
|
||||||
|
return ({"job_type": "suno_generation", "task_id": "t1", "params": {}}, b"raw")
|
||||||
|
async def ack(self, raw): self.acked = True
|
||||||
|
async def fail(self, raw, payload): pass
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_poll_once_increments_jobs_done(monkeypatch):
|
||||||
|
worker.stats.jobs_done = 0
|
||||||
|
monkeypatch.setattr(worker, "run_suno_generation", lambda task_id, params: None)
|
||||||
|
handled = await worker.poll_once(_OneJobQueue())
|
||||||
|
assert handled is True
|
||||||
|
assert worker.stats.jobs_done == 1
|
||||||
|
assert worker.stats.busy is False
|
||||||
|
assert worker.stats.last_job_at is not None
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ from providers.suno import (
|
|||||||
)
|
)
|
||||||
from providers.local import run_local_generation
|
from providers.local import run_local_generation
|
||||||
from _shared.reliable_queue import ReliableQueue
|
from _shared.reliable_queue import ReliableQueue
|
||||||
|
from _shared.heartbeat import WorkerStats, utc_now_iso
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -28,6 +29,8 @@ REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
|
|||||||
QUEUE_KEY = "queue:music-render"
|
QUEUE_KEY = "queue:music-render"
|
||||||
PAUSED_KEY = "queue:paused"
|
PAUSED_KEY = "queue:paused"
|
||||||
|
|
||||||
|
stats = WorkerStats()
|
||||||
|
|
||||||
# Maps job_type → module-level function name (string).
|
# Maps job_type → module-level function name (string).
|
||||||
# _dispatch resolves the name via globals() at call time so unittest.mock.patch
|
# _dispatch resolves the name via globals() at call time so unittest.mock.patch
|
||||||
# on "worker.<name>" is correctly intercepted.
|
# on "worker.<name>" is correctly intercepted.
|
||||||
@@ -74,6 +77,7 @@ async def poll_once(queue: ReliableQueue) -> bool:
|
|||||||
if result is None:
|
if result is None:
|
||||||
return False
|
return False
|
||||||
payload, raw = result
|
payload, raw = result
|
||||||
|
stats.busy = True
|
||||||
try:
|
try:
|
||||||
# sync provider 함수 — thread로 실행해서 이벤트 루프 블로킹 방지
|
# sync provider 함수 — thread로 실행해서 이벤트 루프 블로킹 방지
|
||||||
await asyncio.to_thread(_dispatch, payload)
|
await asyncio.to_thread(_dispatch, payload)
|
||||||
@@ -81,8 +85,14 @@ async def poll_once(queue: ReliableQueue) -> bool:
|
|||||||
logger.exception("dispatch unhandled exception task_id=%s",
|
logger.exception("dispatch unhandled exception task_id=%s",
|
||||||
payload.get("task_id"))
|
payload.get("task_id"))
|
||||||
await queue.fail(raw, payload)
|
await queue.fail(raw, payload)
|
||||||
|
stats.jobs_failed += 1
|
||||||
|
stats.last_job_at = utc_now_iso()
|
||||||
|
stats.busy = False
|
||||||
return True
|
return True
|
||||||
await queue.ack(raw)
|
await queue.ack(raw)
|
||||||
|
stats.jobs_done += 1
|
||||||
|
stats.last_job_at = utc_now_iso()
|
||||||
|
stats.busy = False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -7,10 +7,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
|
|||||||
ca-certificates tzdata \
|
ca-certificates tzdata \
|
||||||
&& rm -rf /var/lib/apt/lists/*
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
COPY requirements.txt .
|
COPY task-watcher/requirements.txt /app/
|
||||||
RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt
|
RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt
|
||||||
|
|
||||||
COPY . .
|
# 공통 heartbeat 모듈 (services/_shared) — watcher.py가 from _shared.heartbeat import
|
||||||
|
COPY _shared /app/_shared
|
||||||
|
COPY task-watcher/. /app/
|
||||||
|
ENV PYTHONPATH=/app
|
||||||
|
|
||||||
EXPOSE 8000
|
EXPOSE 8000
|
||||||
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]
|
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]
|
||||||
|
|||||||
5
services/task-watcher/conftest.py
Normal file
5
services/task-watcher/conftest.py
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
"""Make services/ root importable so `from _shared.heartbeat import ...` works during tests."""
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||||
16
services/task-watcher/tests/test_watcher.py
Normal file
16
services/task-watcher/tests/test_watcher.py
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
"""task-watcher heartbeat payload — state=mode + mode 필드 검증."""
|
||||||
|
import json
|
||||||
|
|
||||||
|
from _shared.heartbeat import build_payload, WorkerStats
|
||||||
|
|
||||||
|
|
||||||
|
def test_watcher_heartbeat_payload_carries_mode():
|
||||||
|
payload = json.loads(
|
||||||
|
build_payload(
|
||||||
|
"task-watcher", "watcher", "trading",
|
||||||
|
WorkerStats(), extra={"mode": "trading"},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
assert payload["kind"] == "watcher"
|
||||||
|
assert payload["state"] == "trading"
|
||||||
|
assert payload["mode"] == "trading"
|
||||||
@@ -15,6 +15,7 @@ from zoneinfo import ZoneInfo
|
|||||||
import redis.asyncio as aioredis
|
import redis.asyncio as aioredis
|
||||||
|
|
||||||
from mode import current_mode, fetch_holidays, KST
|
from mode import current_mode, fetch_holidays, KST
|
||||||
|
from _shared.heartbeat import build_payload, WorkerStats
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -23,6 +24,10 @@ PAUSED_KEY = "queue:paused"
|
|||||||
LOOP_INTERVAL = 30 # 초
|
LOOP_INTERVAL = 30 # 초
|
||||||
HOLIDAYS_REFRESH = 3600 # 1시간
|
HOLIDAYS_REFRESH = 3600 # 1시간
|
||||||
PAUSED_TTL = 600 # 10분 (watcher 죽어도 자동 해제)
|
PAUSED_TTL = 600 # 10분 (watcher 죽어도 자동 해제)
|
||||||
|
HEARTBEAT_KEY = "worker:task-watcher:heartbeat"
|
||||||
|
HEARTBEAT_TTL = 45 # LOOP_INTERVAL 30s < TTL 45s → 만료 전 갱신
|
||||||
|
|
||||||
|
_HB_STATS = WorkerStats()
|
||||||
|
|
||||||
|
|
||||||
async def watcher_loop():
|
async def watcher_loop():
|
||||||
@@ -46,6 +51,13 @@ async def watcher_loop():
|
|||||||
else:
|
else:
|
||||||
await redis.delete(PAUSED_KEY)
|
await redis.delete(PAUSED_KEY)
|
||||||
|
|
||||||
|
# heartbeat (LOOP_INTERVAL=30s < TTL 45s → 만료 전 갱신)
|
||||||
|
await redis.set(
|
||||||
|
HEARTBEAT_KEY,
|
||||||
|
build_payload("task-watcher", "watcher", mode, _HB_STATS, extra={"mode": mode}),
|
||||||
|
ex=HEARTBEAT_TTL,
|
||||||
|
)
|
||||||
|
|
||||||
if mode != last_mode:
|
if mode != last_mode:
|
||||||
logger.info("mode 전환: %s → %s (paused=%s)", last_mode, mode, mode == "trading")
|
logger.info("mode 전환: %s → %s (paused=%s)", last_mode, mode, mode == "trading")
|
||||||
last_mode = mode
|
last_mode = mode
|
||||||
|
|||||||
18
services/trade-monitor/.env.example
Normal file
18
services/trade-monitor/.env.example
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
# Plan-realtime-trade-alerts — trade-monitor
|
||||||
|
|
||||||
|
# NAS Redis (heartbeat)
|
||||||
|
REDIS_URL=redis://192.168.45.54:6379
|
||||||
|
|
||||||
|
# NAS stock 백엔드 (monitor-set / report)
|
||||||
|
NAS_BASE_URL=http://192.168.45.54:18500
|
||||||
|
WEBAI_API_KEY=
|
||||||
|
|
||||||
|
# KIS 자체 토큰 (ai_trade와 분리된 전용 app_key)
|
||||||
|
TM_KIS_APP_KEY=
|
||||||
|
TM_KIS_APP_SECRET=
|
||||||
|
TM_KIS_ACCOUNT=
|
||||||
|
TM_KIS_IS_VIRTUAL=0
|
||||||
|
|
||||||
|
# 루프 주기(초) / sell_climax 거래량 배수 임계
|
||||||
|
TM_LOOP_INTERVAL=60
|
||||||
|
TM_CLIMAX_VOL_MULT=3.0
|
||||||
166
services/trade-monitor/DESIGN.md
Normal file
166
services/trade-monitor/DESIGN.md
Normal file
@@ -0,0 +1,166 @@
|
|||||||
|
# trade-monitor 워커 — 구현 설계
|
||||||
|
|
||||||
|
> **2026-07-03 · web-ai 소유.** 실시간 매매 알람 파이프라인의 Windows-side 워커.
|
||||||
|
> **권위 계약(원본 스펙)**: web-backend repo `docs/superpowers/specs/2026-07-02-realtime-trade-alerts-design.md` §5(계약)·§6(조건).
|
||||||
|
> 이 문서는 그 계약을 Windows Docker 워커로 구현하기 위한 **구현 설계**(모듈 분해·조건 해석·배포)만 다룬다.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 1. 역할 · 경계
|
||||||
|
|
||||||
|
`services/trade-monitor/` 는 형제 워커(`image-render`, `task-watcher`)와 동일한 관례를 따르는 **FastAPI + asyncio 루프** 워커다. WSL2 Docker(`services/docker-compose.yml`)에서 구동.
|
||||||
|
|
||||||
|
**책임**
|
||||||
|
1. 60초 루프로 NAS `monitor-set` 조회 → 세션 게이트.
|
||||||
|
2. 비-KRX(알파벳) 티커 skip.
|
||||||
|
3. KIS 실시간 현재가 + 일봉 OHLCV 조회 → TA 지표 계산.
|
||||||
|
4. 매수/매도 조건 평가 → 발화집합 F 구성.
|
||||||
|
5. `POST report`로 F 전체 전송(무상태 — dedup은 NAS 영속).
|
||||||
|
6. Redis heartbeat 발신(`worker:trade-monitor:heartbeat` EX45).
|
||||||
|
7. KIS 오류는 사이클/종목 단위 격리(다음 분 재시도).
|
||||||
|
|
||||||
|
**경계 밖(안 함)**: dedup 상태 보관, 텔레그램 전송(NAS 담당), KST 세션/휴장 캘린더 재구현(NAS가 `session` 판정), 주문 실행.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 2. 모듈 분해
|
||||||
|
|
||||||
|
| 파일 | 책임 | 인터페이스(순수/부작용) |
|
||||||
|
|------|------|------|
|
||||||
|
| `main.py` | FastAPI app + lifespan. `monitor_loop` + `heartbeat_loop` 스폰, `/health` | 부작용(태스크 스폰) |
|
||||||
|
| `monitor.py` | 오케스트레이션 루프. monitor-set→게이트→종목순회→firing→report. 공유 `MonitorState` 갱신 | 부작용 |
|
||||||
|
| `nas_client.py` | `get_monitor_set()` / `post_report(as_of, firing)` — `X-WebAI-Key` + retry | 부작용(HTTP) |
|
||||||
|
| `kis_client.py` | KIS REST: `_issue_token()`(OAuth 자체 발급, 24h 캐시) + `get_quote()` + `get_daily_ohlcv()` + 0.5s throttle | 부작용(HTTP) |
|
||||||
|
| `indicators.py` | `sma`, `rsi`, `avg_volume`, `highest_high` | **순수** |
|
||||||
|
| `conditions.py` | `evaluate_buy(ctx, buy_params)` / `evaluate_sell(ctx, exit_params)` → `list[firing]` | **순수** |
|
||||||
|
| `config.py` | `Settings` — env 로드 | 순수 |
|
||||||
|
|
||||||
|
순수 모듈(`indicators`, `conditions`)에 조건 로직을 격리해 테이블 기반 단위 테스트로 검증 가능하게 한다. HTTP·시간·Redis는 경계 모듈에만.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 3. 데이터 흐름 (monitor_loop, 60초)
|
||||||
|
|
||||||
|
```
|
||||||
|
매 사이클:
|
||||||
|
ms = nas.get_monitor_set() # §5.1
|
||||||
|
state.session = ms.session
|
||||||
|
if ms.session == "closed":
|
||||||
|
state.hb = "market_closed"; sleep; continue # KIS 호출 0
|
||||||
|
targets = filter_krx(ms.buy_targets, ms.sell_targets) # 알파벳 티커 skip
|
||||||
|
firing = []
|
||||||
|
for t in targets: # 종목 단위 try/except 격리
|
||||||
|
quote = kis.get_quote(t.ticker) # 현재가 + 당일 누적 거래량
|
||||||
|
daily = kis.get_daily_ohlcv(t.ticker, 250) # MA200·52주 고점용
|
||||||
|
ctx = build_ctx(t, quote, daily)
|
||||||
|
if t is buy_target: firing += evaluate_buy(ctx, ms.buy_params)
|
||||||
|
if t is sell_target: firing += evaluate_sell(ctx, ms.exit_params)
|
||||||
|
if firing: state.last_alert_at = now
|
||||||
|
nas.post_report(as_of=now_kst_iso, firing=firing) # §5.2 — 빈 배열도 전송(edge clear 위해)
|
||||||
|
state.hb = "market_open"
|
||||||
|
stats.jobs_done += 1
|
||||||
|
```
|
||||||
|
|
||||||
|
`heartbeat_loop`(별도 15초 태스크)이 `state`를 읽어 §5.4 페이로드 발신.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 4. 조건 로직 해석 (§6)
|
||||||
|
|
||||||
|
지표는 **일봉 시계열**(최신 last, 오름차순) + **실시간 현재가**(`price`) 기준. 데이터 부족(예: MA200용 200봉 미만)이면 해당 조건은 **미발화**(graceful skip). 각 firing은 `{ticker, kind, condition, price, detail}`.
|
||||||
|
|
||||||
|
### 매수 (`buy_targets`, `buy_params={rsi_oversold, breakout_vol_mult, pullback_pct}`)
|
||||||
|
|
||||||
|
| condition | 발화 규칙(해석) | detail |
|
||||||
|
|-----------|----------------|--------|
|
||||||
|
| `buy_ma20_pullback` | `ma20>ma50>ma200`(정배열) **AND** 최근 3봉 최저가가 `ma20*(1+pullback_pct)` 이하로 접근 **AND** `price>ma20`(반등 복귀) | `ma20, ma50, ma200, recent_low` |
|
||||||
|
| `buy_breakout` | `price > 직전 20봉 최고가`(당일 제외) **AND** `today_volume > breakout_vol_mult × avg_volume(20)` | `prior_high_20, vol_mult, avg_vol_20` |
|
||||||
|
| `buy_rsi_bounce` | RSI(14) 시계열에서 `min(rsi[-3:]) < rsi_oversold` **AND** `rsi[-1] > rsi_oversold` **AND** `rsi[-1] > rsi[-2]`(반등). 사이클마다 재계산·무상태 | `rsi, rsi_prev, rsi_oversold` |
|
||||||
|
|
||||||
|
종합: 각 조건 독립 발화(신뢰도 가중합은 NAS/텔레그램 단계 책임 아님 — 워커는 조건 발화만).
|
||||||
|
|
||||||
|
### 매도 (`sell_targets`, `exit_params={stop_pct, take_pct, trailing_pct}`, target에 `avg_price, qty, holding_high`)
|
||||||
|
|
||||||
|
| condition | 발화 규칙 | detail |
|
||||||
|
|-----------|----------|--------|
|
||||||
|
| `sell_stop_loss` | `(price-avg)/avg ≤ -stop_pct` | `avg_price, pnl_pct, stop_pct` |
|
||||||
|
| `sell_take_profit` | `(price-avg)/avg ≥ take_pct` | `avg_price, pnl_pct, take_pct` |
|
||||||
|
| `sell_trailing_stop` | `price ≤ holding_high × (1-trailing_pct)` (기본 0.10) | `holding_high, trailing_pct, drawdown_pct` |
|
||||||
|
| `sell_ma_break` | `price < ma50` (추가 `price<ma200`이면 detail.severity="high") | `ma50, ma200, severity` |
|
||||||
|
| `sell_climax` | **holdings_intel 정합**: `today_volume ≥ climax_vol_x × avg_volume(20)` **AND** `price < day_high × climax_close_pct`(윗꼬리) | `vol_mult, day_high, climax_close_pct` |
|
||||||
|
|
||||||
|
`climax_vol_x`(기본 3.0)·`climax_close_pct`(기본 0.97)는 monitor-set `exit_params`에서 읽음(BE 중앙화, main ed17193). 없으면 env `TM_CLIMAX_VOL_MULT` fallback. `day_high`는 KIS quote `stck_hgpr`(당일 세션 누적 고가).
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 5. KIS 클라이언트 (자체 토큰)
|
||||||
|
|
||||||
|
- `_issue_token()`: `POST {base}/oauth2/tokenP {grant_type, appkey, appsecret}` → `access_token`(만료 24h). 메모리 캐시, 만료 10분 전 재발급. **ai_trade와 분리된 `TM_KIS_APP_KEY/SECRET`** 사용(같은 app_key 공유 시 토큰 상호 무효화 + EGW00201).
|
||||||
|
- `get_quote(ticker)`: `inquire-price`(FHKST01010100) → `stck_prpr`(현재가), `acml_vol`(당일 누적 거래량), `stck_oprc`(당일 시가).
|
||||||
|
- `get_daily_ohlcv(ticker, days=250)`: `inquire-daily-itemchartprice`(FHKST03010100) — ai_trade `kis_client.py` 로직 복제, 오름차순.
|
||||||
|
- throttle 0.5s(초당 2회) + `_throttle_lock` 직렬화 + 429/timeout 지수 backoff(ai_trade 패턴 재사용).
|
||||||
|
|
||||||
|
> ⚠️ **운영 함정**: ai_trade와 KIS를 동시 호출하면 전용 app_key라도 KIS 계정 전체 rate limit을 공유할 수 있음. 별도 app_key로 무효화는 회피되나, 운영 시 동시 부하 모니터링 필요(Phase 7 백로그 연계).
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 6. heartbeat (§5.4)
|
||||||
|
|
||||||
|
`_shared.heartbeat.heartbeat_loop(redis, "trade-monitor", "trader", stats, interval=15, ttl=45, state_fn=...)`.
|
||||||
|
- `state_fn`이 `MonitorState`를 읽어 `state ∈ {market_open, market_closed, idle}` + `extra={"last_alert_at": ...}` 반환.
|
||||||
|
- **디커플링 이유**: 루프 60초 > TTL 45초 → 인라인 발신 시 만료 갭. 15초 독립 태스크로 해소(형제 워커와 동일 구조). §5.4 필수 필드(name/kind/state/ts/last_alert_at) 충족, `jobs_done/jobs_failed`는 형제 워커처럼 superset 유지.
|
||||||
|
- 초기 상태 `idle`(첫 monitor-set 조회 전).
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 7. 설정 (env) — `TM_` 접두사로 ai_trade와 분리
|
||||||
|
|
||||||
|
| env | 기본값 | 용도 |
|
||||||
|
|-----|--------|------|
|
||||||
|
| `NAS_BASE_URL` | `http://192.168.45.54:18500` | stock 백엔드 |
|
||||||
|
| `WEBAI_API_KEY` | (필수) | `X-WebAI-Key` |
|
||||||
|
| `REDIS_URL` | `redis://192.168.45.54:6379` | heartbeat |
|
||||||
|
| `TM_KIS_APP_KEY` / `TM_KIS_APP_SECRET` | (필수) | KIS 자체 토큰 |
|
||||||
|
| `TM_KIS_ACCOUNT` | (필수) | KIS 계좌 |
|
||||||
|
| `TM_KIS_IS_VIRTUAL` | `0` | 실전/모의 |
|
||||||
|
| `TM_LOOP_INTERVAL` | `60` | 루프 주기(초) |
|
||||||
|
| `TM_CLIMAX_VOL_MULT` | `3.0` | sell_climax 임계 |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 8. 에러 처리
|
||||||
|
|
||||||
|
- **monitor-set 실패**: 사이클 skip(report 안 함), heartbeat=`idle`, 다음 분 재시도.
|
||||||
|
- **KIS 종목 실패**: 해당 종목만 skip(로그 warning), 나머지 종목 계속.
|
||||||
|
- **report 실패**: 로그 error, 다음 사이클 신선 firing 재전송(무상태라 손실 허용).
|
||||||
|
- 루프 최상위 `try/except` — 어떤 예외도 루프를 죽이지 않음(task-watcher 패턴).
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 9. 테스트 전략 (pytest, 시스템 Python)
|
||||||
|
|
||||||
|
| 파일 | 검증 |
|
||||||
|
|------|------|
|
||||||
|
| `test_indicators.py` | sma/rsi/avg_volume/highest_high 수치(알려진 시계열), 데이터 부족 시 None |
|
||||||
|
| `test_conditions.py` | 8개 조건 테이블 기반(발화/미발화 경계), detail 필드 |
|
||||||
|
| `test_nas_client.py` | respx — monitor-set 파싱, report 페이로드, X-WebAI-Key 헤더, retry |
|
||||||
|
| `test_kis_client.py` | respx — 토큰 발급/캐시, quote/daily 파싱, throttle |
|
||||||
|
| `test_monitor.py` | 루프 1회(mock): closed skip, 비-KRX skip, firing 조립, last_alert_at 갱신, 종목 실패 격리 |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 10. 배포
|
||||||
|
|
||||||
|
- `services/trade-monitor/Dockerfile`: task-watcher 관례 복제 — `COPY _shared /app/_shared` **필수**(빌드 컨텍스트 `.` 에서), `COPY trade-monitor/. /app/`, `PYTHONPATH=/app`, uvicorn `:8000`.
|
||||||
|
- `services/docker-compose.yml`: `trade-monitor` 서비스 추가, 포트 **18715**(image-render 18714 다음), `TZ=Asia/Seoul`, KIS/WEBAI/REDIS env, healthcheck `/health`.
|
||||||
|
- `services/.env`(비커밋): `TM_KIS_*`, `WEBAI_API_KEY` 실값. `.env.example`에 키만 기재.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 11. 미해결 플래그 / 후속
|
||||||
|
|
||||||
|
1. **sell_climax** — ✅ 2026-07-03 holdings_intel 정합 완료(`price < day_high × climax_close_pct` + `exit_params` 파라미터화). BE 회신 기준.
|
||||||
|
2. **KIS 지표 필드 실검증** — quote의 `acml_vol`/`stck_oprc`, daily TR 응답 필드는 첫 운영 raw 캡처로 대조.
|
||||||
|
3. **`buy_ma20_pullback`·`buy_rsi_bounce` 해석** — "current candle series" 문구를 일봉 시계열로 해석. 첫 운영 4주 IC 검증 시 재조정 가능.
|
||||||
|
4. **KIS rate limit 공존** — ai_trade와 동시 부하. 전용 app_key로 토큰 무효화는 회피, 초당 호출 총량은 운영 모니터링.
|
||||||
|
5. **after 세션 시간외 시세** — `inquire-price`가 시간외 단일가를 반영하는지 첫 운영 대조.
|
||||||
19
services/trade-monitor/Dockerfile
Normal file
19
services/trade-monitor/Dockerfile
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
FROM python:3.12-slim-bookworm
|
||||||
|
ENV PYTHONUNBUFFERED=1
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||||
|
ca-certificates tzdata \
|
||||||
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
COPY trade-monitor/requirements.txt /app/
|
||||||
|
RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt
|
||||||
|
|
||||||
|
# 공통 heartbeat 모듈 (services/_shared) — main.py가 from _shared.heartbeat import
|
||||||
|
COPY _shared /app/_shared
|
||||||
|
COPY trade-monitor/. /app/
|
||||||
|
ENV PYTHONPATH=/app
|
||||||
|
|
||||||
|
EXPOSE 8000
|
||||||
|
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]
|
||||||
1437
services/trade-monitor/PLAN.md
Normal file
1437
services/trade-monitor/PLAN.md
Normal file
File diff suppressed because it is too large
Load Diff
99
services/trade-monitor/conditions.py
Normal file
99
services/trade-monitor/conditions.py
Normal file
@@ -0,0 +1,99 @@
|
|||||||
|
"""§6 조건 로직 (순수). ctx + params → firing 리스트."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from indicators import sma, rsi_series, highest_high
|
||||||
|
|
||||||
|
|
||||||
|
def _fire(ctx: dict, kind: str, condition: str, price: float, detail: dict) -> dict:
|
||||||
|
return {
|
||||||
|
"ticker": ctx["ticker"], "kind": kind,
|
||||||
|
"condition": condition, "price": price, "detail": detail,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def evaluate_buy(ctx: dict, params: dict) -> list[dict]:
|
||||||
|
price = ctx["price"]
|
||||||
|
closes, highs, lows, vols = ctx["closes"], ctx["highs"], ctx["lows"], ctx["volumes"]
|
||||||
|
rsi_os = params.get("rsi_oversold", 30)
|
||||||
|
vol_mult = params.get("breakout_vol_mult", 1.5)
|
||||||
|
pullback = params.get("pullback_pct", 0.02)
|
||||||
|
firing: list[dict] = []
|
||||||
|
|
||||||
|
# buy_ma20_pullback — 정배열 + ma20 근접 저가 + 반등 복귀
|
||||||
|
ma20, ma50, ma200 = sma(closes, 20), sma(closes, 50), sma(closes, 200)
|
||||||
|
if ma20 and ma50 and ma200 and ma20 > ma50 > ma200 and len(lows) >= 3:
|
||||||
|
recent_low = min(lows[-3:])
|
||||||
|
if recent_low <= ma20 * (1 + pullback) and price > ma20:
|
||||||
|
firing.append(_fire(ctx, "buy", "buy_ma20_pullback", price, {
|
||||||
|
"ma20": round(ma20, 1), "ma50": round(ma50, 1),
|
||||||
|
"ma200": round(ma200, 1), "recent_low": recent_low,
|
||||||
|
}))
|
||||||
|
|
||||||
|
# buy_breakout — 직전 20봉 고점 돌파 + 거래량 배수
|
||||||
|
prior_high20 = highest_high(highs, 20)
|
||||||
|
avg_vol20 = sma(vols, 20)
|
||||||
|
if prior_high20 and avg_vol20 and price > prior_high20 \
|
||||||
|
and ctx["today_volume"] > vol_mult * avg_vol20:
|
||||||
|
firing.append(_fire(ctx, "buy", "buy_breakout", price, {
|
||||||
|
"prior_high_20": prior_high20,
|
||||||
|
"vol_mult": round(ctx["today_volume"] / avg_vol20, 2),
|
||||||
|
"avg_vol_20": round(avg_vol20, 0),
|
||||||
|
}))
|
||||||
|
|
||||||
|
# buy_rsi_bounce — RSI 과매도 후 반등 (무상태 재계산)
|
||||||
|
rs = rsi_series(closes, 14)
|
||||||
|
if len(rs) >= 3 and min(rs[-3:]) < rsi_os and rs[-1] > rsi_os and rs[-1] > rs[-2]:
|
||||||
|
firing.append(_fire(ctx, "buy", "buy_rsi_bounce", price, {
|
||||||
|
"rsi": round(rs[-1], 1), "rsi_prev": round(rs[-2], 1),
|
||||||
|
"rsi_oversold": rsi_os,
|
||||||
|
}))
|
||||||
|
|
||||||
|
return firing
|
||||||
|
|
||||||
|
|
||||||
|
def evaluate_sell(ctx: dict, params: dict) -> list[dict]:
|
||||||
|
price = ctx["price"]
|
||||||
|
avg = ctx.get("avg_price")
|
||||||
|
hh = ctx.get("holding_high")
|
||||||
|
closes, vols = ctx["closes"], ctx["volumes"]
|
||||||
|
stop = params.get("stop_pct", 0.08)
|
||||||
|
take = params.get("take_pct", 0.25)
|
||||||
|
trail = params.get("trailing_pct", 0.10)
|
||||||
|
firing: list[dict] = []
|
||||||
|
|
||||||
|
if avg:
|
||||||
|
pnl = (price - avg) / avg
|
||||||
|
if pnl <= -stop:
|
||||||
|
firing.append(_fire(ctx, "sell", "sell_stop_loss", price, {
|
||||||
|
"avg_price": avg, "pnl_pct": round(pnl, 4), "stop_pct": stop}))
|
||||||
|
if pnl >= take:
|
||||||
|
firing.append(_fire(ctx, "sell", "sell_take_profit", price, {
|
||||||
|
"avg_price": avg, "pnl_pct": round(pnl, 4), "take_pct": take}))
|
||||||
|
|
||||||
|
if hh and price <= hh * (1 - trail):
|
||||||
|
firing.append(_fire(ctx, "sell", "sell_trailing_stop", price, {
|
||||||
|
"holding_high": hh, "trailing_pct": trail,
|
||||||
|
"drawdown_pct": round((price - hh) / hh, 4)}))
|
||||||
|
|
||||||
|
ma50, ma200 = sma(closes, 50), sma(closes, 200)
|
||||||
|
if ma50 and price < ma50:
|
||||||
|
severity = "high" if (ma200 and price < ma200) else "normal"
|
||||||
|
firing.append(_fire(ctx, "sell", "sell_ma_break", price, {
|
||||||
|
"ma50": round(ma50, 1),
|
||||||
|
"ma200": round(ma200, 1) if ma200 else None,
|
||||||
|
"severity": severity}))
|
||||||
|
|
||||||
|
# sell_climax — holdings_intel 정합(stock/app/holdings_intel.py:109-118):
|
||||||
|
# 거래량 ≥ 20일평균 × climax_vol_x AND 종가 < 당일고가 × climax_close_pct (윗꼬리)
|
||||||
|
# 실시간이므로 day_high = 당일 세션 누적 고가(최신 1분봉 고가 아님).
|
||||||
|
climax_vol_x = params.get("climax_vol_x", ctx.get("climax_vol_mult", 3.0))
|
||||||
|
climax_close_pct = params.get("climax_close_pct", 0.97)
|
||||||
|
avg_vol20 = sma(vols, 20)
|
||||||
|
day_high = ctx.get("day_high")
|
||||||
|
if avg_vol20 and day_high and ctx["today_volume"] >= climax_vol_x * avg_vol20 \
|
||||||
|
and price < day_high * climax_close_pct:
|
||||||
|
firing.append(_fire(ctx, "sell", "sell_climax", price, {
|
||||||
|
"vol_mult": round(ctx["today_volume"] / avg_vol20, 2),
|
||||||
|
"day_high": day_high, "climax_close_pct": climax_close_pct}))
|
||||||
|
|
||||||
|
return firing
|
||||||
32
services/trade-monitor/config.py
Normal file
32
services/trade-monitor/config.py
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
"""Settings — 환경변수 로드. TM_ 접두사로 ai_trade와 분리."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Settings:
|
||||||
|
nas_base_url: str
|
||||||
|
webai_api_key: str
|
||||||
|
redis_url: str
|
||||||
|
kis_app_key: str
|
||||||
|
kis_app_secret: str
|
||||||
|
kis_account: str
|
||||||
|
kis_is_virtual: bool
|
||||||
|
loop_interval: int
|
||||||
|
climax_vol_mult: float
|
||||||
|
|
||||||
|
|
||||||
|
def load_settings() -> Settings:
|
||||||
|
return Settings(
|
||||||
|
nas_base_url=os.getenv("NAS_BASE_URL", "http://192.168.45.54:18500"),
|
||||||
|
webai_api_key=os.getenv("WEBAI_API_KEY", ""),
|
||||||
|
redis_url=os.getenv("REDIS_URL", "redis://192.168.45.54:6379"),
|
||||||
|
kis_app_key=os.getenv("TM_KIS_APP_KEY", ""),
|
||||||
|
kis_app_secret=os.getenv("TM_KIS_APP_SECRET", ""),
|
||||||
|
kis_account=os.getenv("TM_KIS_ACCOUNT", ""),
|
||||||
|
kis_is_virtual=os.getenv("TM_KIS_IS_VIRTUAL", "0") == "1",
|
||||||
|
loop_interval=int(os.getenv("TM_LOOP_INTERVAL", "60")),
|
||||||
|
climax_vol_mult=float(os.getenv("TM_CLIMAX_VOL_MULT", "3.0")),
|
||||||
|
)
|
||||||
5
services/trade-monitor/conftest.py
Normal file
5
services/trade-monitor/conftest.py
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
"""services/ 루트를 sys.path에 추가 — from _shared.heartbeat import 가능하게."""
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||||
38
services/trade-monitor/indicators.py
Normal file
38
services/trade-monitor/indicators.py
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
"""순수 TA 지표 — sma / rsi_series / highest_high."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
|
||||||
|
def sma(values: list[float], period: int) -> float | None:
|
||||||
|
if period <= 0 or len(values) < period:
|
||||||
|
return None
|
||||||
|
return sum(values[-period:]) / period
|
||||||
|
|
||||||
|
|
||||||
|
def highest_high(highs: list[float], period: int) -> float | None:
|
||||||
|
if period <= 0 or len(highs) < period:
|
||||||
|
return None
|
||||||
|
return max(highs[-period:])
|
||||||
|
|
||||||
|
|
||||||
|
def rsi_series(closes: list[float], period: int = 14) -> list[float]:
|
||||||
|
"""Wilder RSI. 반환 리스트는 closes[period:]에 1:1 정렬. 부족하면 []."""
|
||||||
|
if len(closes) <= period:
|
||||||
|
return []
|
||||||
|
deltas = [closes[i] - closes[i - 1] for i in range(1, len(closes))]
|
||||||
|
gains = [d if d > 0 else 0.0 for d in deltas]
|
||||||
|
losses = [-d if d < 0 else 0.0 for d in deltas]
|
||||||
|
|
||||||
|
def _rsi(ag: float, al: float) -> float:
|
||||||
|
if al == 0:
|
||||||
|
return 100.0
|
||||||
|
rs = ag / al
|
||||||
|
return 100.0 - 100.0 / (1.0 + rs)
|
||||||
|
|
||||||
|
avg_gain = sum(gains[:period]) / period
|
||||||
|
avg_loss = sum(losses[:period]) / period
|
||||||
|
out = [_rsi(avg_gain, avg_loss)]
|
||||||
|
for i in range(period, len(deltas)):
|
||||||
|
avg_gain = (avg_gain * (period - 1) + gains[i]) / period
|
||||||
|
avg_loss = (avg_loss * (period - 1) + losses[i]) / period
|
||||||
|
out.append(_rsi(avg_gain, avg_loss))
|
||||||
|
return out
|
||||||
124
services/trade-monitor/kis_client.py
Normal file
124
services/trade-monitor/kis_client.py
Normal file
@@ -0,0 +1,124 @@
|
|||||||
|
"""KIS REST client — 자체 OAuth 토큰(TM_KIS_*) + quote + 일봉 + throttle."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from zoneinfo import ZoneInfo
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
KST = ZoneInfo("Asia/Seoul")
|
||||||
|
|
||||||
|
_MAX_ATTEMPTS = 3
|
||||||
|
_THROTTLE_INTERVAL = 0.5 # 초당 2회
|
||||||
|
_TOKEN_MARGIN = 600 # 만료 10분 전 재발급
|
||||||
|
|
||||||
|
|
||||||
|
class KISClient:
|
||||||
|
def __init__(self, app_key, app_secret, account, is_virtual, timeout: float = 10.0):
|
||||||
|
self._app_key = app_key
|
||||||
|
self._app_secret = app_secret
|
||||||
|
self._account = account
|
||||||
|
self._base_url = (
|
||||||
|
"https://openapivts.koreainvestment.com:29443" if is_virtual
|
||||||
|
else "https://openapi.koreainvestment.com:9443"
|
||||||
|
)
|
||||||
|
self._client = httpx.AsyncClient(timeout=timeout)
|
||||||
|
self._token: str | None = None
|
||||||
|
self._token_exp: float = 0.0
|
||||||
|
self._last_throttle_at = 0.0
|
||||||
|
self._throttle_lock = asyncio.Lock()
|
||||||
|
self._token_lock = asyncio.Lock()
|
||||||
|
|
||||||
|
async def close(self) -> None:
|
||||||
|
await self._client.aclose()
|
||||||
|
|
||||||
|
async def _issue_token(self) -> str:
|
||||||
|
async with self._token_lock:
|
||||||
|
now = time.time()
|
||||||
|
if self._token and now < self._token_exp - _TOKEN_MARGIN:
|
||||||
|
return self._token
|
||||||
|
r = await self._client.post(
|
||||||
|
f"{self._base_url}/oauth2/tokenP",
|
||||||
|
json={"grant_type": "client_credentials",
|
||||||
|
"appkey": self._app_key, "appsecret": self._app_secret},
|
||||||
|
)
|
||||||
|
r.raise_for_status()
|
||||||
|
data = r.json()
|
||||||
|
self._token = data["access_token"]
|
||||||
|
self._token_exp = now + int(data.get("expires_in", 86400))
|
||||||
|
return self._token
|
||||||
|
|
||||||
|
async def _throttle(self) -> None:
|
||||||
|
async with self._throttle_lock:
|
||||||
|
elapsed = time.monotonic() - self._last_throttle_at
|
||||||
|
if elapsed < _THROTTLE_INTERVAL:
|
||||||
|
await asyncio.sleep(_THROTTLE_INTERVAL - elapsed)
|
||||||
|
self._last_throttle_at = time.monotonic()
|
||||||
|
|
||||||
|
async def _request(self, method: str, path: str, tr_id: str, **kwargs) -> dict:
|
||||||
|
token = await self._issue_token()
|
||||||
|
headers = {
|
||||||
|
"authorization": f"Bearer {token}",
|
||||||
|
"appkey": self._app_key, "appsecret": self._app_secret,
|
||||||
|
"tr_id": tr_id, "custtype": "P",
|
||||||
|
}
|
||||||
|
url = f"{self._base_url}{path}"
|
||||||
|
for attempt in range(_MAX_ATTEMPTS):
|
||||||
|
await self._throttle()
|
||||||
|
try:
|
||||||
|
resp = await self._client.request(method, url, headers=headers, **kwargs)
|
||||||
|
if resp.status_code == 429 and attempt < _MAX_ATTEMPTS - 1:
|
||||||
|
await asyncio.sleep(2 ** attempt)
|
||||||
|
continue
|
||||||
|
resp.raise_for_status()
|
||||||
|
return resp.json()
|
||||||
|
except httpx.TimeoutException:
|
||||||
|
if attempt < _MAX_ATTEMPTS - 1:
|
||||||
|
await asyncio.sleep(2 ** attempt)
|
||||||
|
continue
|
||||||
|
raise
|
||||||
|
raise RuntimeError("retry exhausted")
|
||||||
|
|
||||||
|
async def get_quote(self, ticker: str) -> dict:
|
||||||
|
raw = await self._request(
|
||||||
|
"GET", "/uapi/domestic-stock/v1/quotations/inquire-price",
|
||||||
|
tr_id="FHKST01010100",
|
||||||
|
params={"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": ticker},
|
||||||
|
)
|
||||||
|
o = raw.get("output", {})
|
||||||
|
return {
|
||||||
|
"price": int(o["stck_prpr"]),
|
||||||
|
"day_open": int(o["stck_oprc"]),
|
||||||
|
"day_high": int(o["stck_hgpr"]),
|
||||||
|
"today_volume": int(o["acml_vol"]),
|
||||||
|
"as_of": datetime.now(KST).isoformat(),
|
||||||
|
}
|
||||||
|
|
||||||
|
async def get_daily_ohlcv(self, ticker: str, days: int = 250) -> list[dict]:
|
||||||
|
today = datetime.now(KST).strftime("%Y%m%d")
|
||||||
|
start = (datetime.now(KST) - timedelta(days=days * 2)).strftime("%Y%m%d")
|
||||||
|
raw = await self._request(
|
||||||
|
"GET", "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice",
|
||||||
|
tr_id="FHKST03010100",
|
||||||
|
params={"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": ticker,
|
||||||
|
"FID_INPUT_DATE_1": start, "FID_INPUT_DATE_2": today,
|
||||||
|
"FID_PERIOD_DIV_CODE": "D", "FID_ORG_ADJ_PRC": "1"},
|
||||||
|
)
|
||||||
|
bars = []
|
||||||
|
for row in raw.get("output2", []):
|
||||||
|
try:
|
||||||
|
d = row["stck_bsop_date"]
|
||||||
|
bars.append({
|
||||||
|
"datetime": f"{d[:4]}-{d[4:6]}-{d[6:]}",
|
||||||
|
"open": int(row["stck_oprc"]), "high": int(row["stck_hgpr"]),
|
||||||
|
"low": int(row["stck_lwpr"]), "close": int(row["stck_clpr"]),
|
||||||
|
"volume": int(row["acml_vol"]),
|
||||||
|
})
|
||||||
|
except (KeyError, ValueError):
|
||||||
|
continue
|
||||||
|
bars.reverse()
|
||||||
|
return bars[-days:]
|
||||||
62
services/trade-monitor/main.py
Normal file
62
services/trade-monitor/main.py
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
"""trade-monitor FastAPI entry — lifespan(monitor_loop + heartbeat_loop) + /health."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from contextlib import asynccontextmanager
|
||||||
|
|
||||||
|
import redis.asyncio as aioredis
|
||||||
|
from fastapi import FastAPI
|
||||||
|
|
||||||
|
import monitor
|
||||||
|
from config import load_settings
|
||||||
|
from kis_client import KISClient
|
||||||
|
from nas_client import NASClient
|
||||||
|
from _shared.heartbeat import heartbeat_loop, WorkerStats
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO,
|
||||||
|
format="%(asctime)s %(name)s %(levelname)s %(message)s")
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
HEARTBEAT_INTERVAL = 15 # 60초 루프 > TTL 45초 → 독립 15초 발신으로 만료갭 해소
|
||||||
|
HEARTBEAT_TTL = 45
|
||||||
|
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def lifespan(app: FastAPI):
|
||||||
|
settings = load_settings()
|
||||||
|
nas = NASClient(settings.nas_base_url, settings.webai_api_key)
|
||||||
|
kis = KISClient(settings.kis_app_key, settings.kis_app_secret,
|
||||||
|
settings.kis_account, settings.kis_is_virtual)
|
||||||
|
state = monitor.MonitorState()
|
||||||
|
stats = WorkerStats()
|
||||||
|
redis = aioredis.from_url(settings.redis_url, decode_responses=False)
|
||||||
|
|
||||||
|
mon_task = asyncio.create_task(
|
||||||
|
monitor.monitor_loop(nas, kis, state, stats, settings))
|
||||||
|
hb_task = asyncio.create_task(heartbeat_loop(
|
||||||
|
redis, "trade-monitor", "trader", stats,
|
||||||
|
interval=HEARTBEAT_INTERVAL, ttl=HEARTBEAT_TTL,
|
||||||
|
state_fn=monitor.make_state_fn(state)))
|
||||||
|
logger.info("trade-monitor lifespan 시작")
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
for t in (mon_task, hb_task):
|
||||||
|
t.cancel()
|
||||||
|
try:
|
||||||
|
await t
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
await kis.close()
|
||||||
|
await nas.close()
|
||||||
|
await redis.aclose()
|
||||||
|
logger.info("trade-monitor lifespan 종료")
|
||||||
|
|
||||||
|
|
||||||
|
app = FastAPI(lifespan=lifespan)
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/health")
|
||||||
|
def health():
|
||||||
|
return {"ok": True, "service": "trade-monitor"}
|
||||||
114
services/trade-monitor/monitor.py
Normal file
114
services/trade-monitor/monitor.py
Normal file
@@ -0,0 +1,114 @@
|
|||||||
|
"""오케스트레이션 — monitor-set 조회 → 조건 평가 → report + heartbeat state."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from datetime import datetime
|
||||||
|
from zoneinfo import ZoneInfo
|
||||||
|
|
||||||
|
from conditions import evaluate_buy, evaluate_sell
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
KST = ZoneInfo("Asia/Seoul")
|
||||||
|
|
||||||
|
|
||||||
|
class MonitorState:
|
||||||
|
"""monitor_loop가 갱신, heartbeat state_fn이 읽는 공유 상태."""
|
||||||
|
def __init__(self):
|
||||||
|
self.session_state = "idle" # market_open | market_closed | idle
|
||||||
|
self.last_alert_at: str | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def filter_krx(targets: list[dict]) -> list[dict]:
|
||||||
|
"""6자리 숫자 티커(KRX)만. 알파벳 티커 skip."""
|
||||||
|
out = []
|
||||||
|
for t in targets:
|
||||||
|
tk = str(t.get("ticker", ""))
|
||||||
|
if tk.isdigit() and len(tk) == 6:
|
||||||
|
out.append(t)
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
async def _build_ctx(kis, target: dict, settings) -> dict:
|
||||||
|
ticker = target["ticker"]
|
||||||
|
quote = await kis.get_quote(ticker)
|
||||||
|
daily = await kis.get_daily_ohlcv(ticker, 250)
|
||||||
|
return {
|
||||||
|
"ticker": ticker, "name": target.get("name", ""),
|
||||||
|
"price": quote["price"], "day_open": quote["day_open"],
|
||||||
|
"day_high": quote["day_high"],
|
||||||
|
"today_volume": quote["today_volume"],
|
||||||
|
"closes": [b["close"] for b in daily],
|
||||||
|
"highs": [b["high"] for b in daily],
|
||||||
|
"lows": [b["low"] for b in daily],
|
||||||
|
"volumes": [b["volume"] for b in daily],
|
||||||
|
"avg_price": target.get("avg_price"),
|
||||||
|
"qty": target.get("qty"),
|
||||||
|
"holding_high": target.get("holding_high"),
|
||||||
|
"climax_vol_mult": settings.climax_vol_mult,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async def run_cycle(nas, kis, state, stats, settings) -> None:
|
||||||
|
try:
|
||||||
|
ms = await nas.get_monitor_set()
|
||||||
|
except Exception:
|
||||||
|
logger.exception("monitor-set 조회 실패")
|
||||||
|
state.session_state = "idle"
|
||||||
|
stats.jobs_failed += 1
|
||||||
|
return
|
||||||
|
|
||||||
|
session = ms.get("session", "closed")
|
||||||
|
if session == "closed":
|
||||||
|
state.session_state = "market_closed"
|
||||||
|
return
|
||||||
|
|
||||||
|
buy_targets = filter_krx(ms.get("buy_targets", []))
|
||||||
|
sell_targets = filter_krx(ms.get("sell_targets", []))
|
||||||
|
buy_params = ms.get("buy_params", {})
|
||||||
|
exit_params = ms.get("exit_params", {})
|
||||||
|
|
||||||
|
firing: list[dict] = []
|
||||||
|
for t in buy_targets:
|
||||||
|
try:
|
||||||
|
firing += evaluate_buy(await _build_ctx(kis, t, settings), buy_params)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("buy 평가 실패 %s", t.get("ticker"))
|
||||||
|
for t in sell_targets:
|
||||||
|
try:
|
||||||
|
firing += evaluate_sell(await _build_ctx(kis, t, settings), exit_params)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("sell 평가 실패 %s", t.get("ticker"))
|
||||||
|
|
||||||
|
as_of = datetime.now(KST).isoformat(timespec="seconds")
|
||||||
|
if firing:
|
||||||
|
state.last_alert_at = as_of
|
||||||
|
logger.info("firing %d개: %s", len(firing),
|
||||||
|
[f"{f['ticker']}:{f['condition']}" for f in firing])
|
||||||
|
try:
|
||||||
|
await nas.post_report(as_of, firing) # 빈 배열도 전송(edge clear)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("report 전송 실패")
|
||||||
|
|
||||||
|
state.session_state = "market_open"
|
||||||
|
stats.jobs_done += 1
|
||||||
|
stats.last_job_at = as_of
|
||||||
|
|
||||||
|
|
||||||
|
async def monitor_loop(nas, kis, state, stats, settings) -> None:
|
||||||
|
logger.info("trade-monitor loop 시작 interval=%ds", settings.loop_interval)
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
await run_cycle(nas, kis, state, stats, settings)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
logger.info("monitor_loop cancelled")
|
||||||
|
raise
|
||||||
|
except Exception:
|
||||||
|
logger.exception("monitor_loop iteration 실패")
|
||||||
|
await asyncio.sleep(settings.loop_interval)
|
||||||
|
|
||||||
|
|
||||||
|
def make_state_fn(state):
|
||||||
|
async def state_fn(redis, stats):
|
||||||
|
return state.session_state, {"last_alert_at": state.last_alert_at}
|
||||||
|
return state_fn
|
||||||
48
services/trade-monitor/nas_client.py
Normal file
48
services/trade-monitor/nas_client.py
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
"""NAS stock 백엔드 trade-alert 계약 — X-WebAI-Key + retry."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_MAX_ATTEMPTS = 3
|
||||||
|
_RETRY_STATUSES = {429, 500, 502, 503, 504}
|
||||||
|
|
||||||
|
|
||||||
|
class NASClient:
|
||||||
|
def __init__(self, base_url: str, api_key: str, timeout: float = 10.0):
|
||||||
|
self._base_url = base_url.rstrip("/")
|
||||||
|
self._api_key = api_key
|
||||||
|
self._client = httpx.AsyncClient(timeout=timeout)
|
||||||
|
|
||||||
|
async def close(self) -> None:
|
||||||
|
await self._client.aclose()
|
||||||
|
|
||||||
|
async def get_monitor_set(self) -> dict:
|
||||||
|
return await self._request("GET", "/api/webai/trade-alert/monitor-set")
|
||||||
|
|
||||||
|
async def post_report(self, as_of: str, firing: list[dict]) -> dict:
|
||||||
|
return await self._request(
|
||||||
|
"POST", "/api/webai/trade-alert/report",
|
||||||
|
json={"as_of": as_of, "firing": firing})
|
||||||
|
|
||||||
|
async def _request(self, method: str, path: str, **kwargs) -> dict:
|
||||||
|
url = f"{self._base_url}{path}"
|
||||||
|
headers = {"X-WebAI-Key": self._api_key}
|
||||||
|
for attempt in range(_MAX_ATTEMPTS):
|
||||||
|
try:
|
||||||
|
resp = await self._client.request(method, url, headers=headers, **kwargs)
|
||||||
|
if resp.status_code in _RETRY_STATUSES and attempt < _MAX_ATTEMPTS - 1:
|
||||||
|
await asyncio.sleep(2 ** attempt)
|
||||||
|
continue
|
||||||
|
resp.raise_for_status()
|
||||||
|
return resp.json()
|
||||||
|
except httpx.TimeoutException:
|
||||||
|
if attempt < _MAX_ATTEMPTS - 1:
|
||||||
|
await asyncio.sleep(2 ** attempt)
|
||||||
|
continue
|
||||||
|
raise
|
||||||
|
raise RuntimeError("retry exhausted")
|
||||||
2
services/trade-monitor/pytest.ini
Normal file
2
services/trade-monitor/pytest.ini
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
[pytest]
|
||||||
|
asyncio_mode = auto
|
||||||
7
services/trade-monitor/requirements.txt
Normal file
7
services/trade-monitor/requirements.txt
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
fastapi==0.115.6
|
||||||
|
uvicorn[standard]==0.34.0
|
||||||
|
redis>=5.0
|
||||||
|
httpx>=0.27
|
||||||
|
pytest>=8.0
|
||||||
|
pytest-asyncio>=0.24
|
||||||
|
respx>=0.21
|
||||||
0
services/trade-monitor/tests/__init__.py
Normal file
0
services/trade-monitor/tests/__init__.py
Normal file
66
services/trade-monitor/tests/test_conditions_buy.py
Normal file
66
services/trade-monitor/tests/test_conditions_buy.py
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
"""evaluate_buy — 3개 매수 조건 경계."""
|
||||||
|
from conditions import evaluate_buy
|
||||||
|
|
||||||
|
BUY_PARAMS = {"rsi_oversold": 30, "breakout_vol_mult": 1.5, "pullback_pct": 0.02}
|
||||||
|
|
||||||
|
|
||||||
|
def _ctx(**over):
|
||||||
|
base = dict(
|
||||||
|
ticker="005930", name="삼성전자", price=100.0, day_open=99.0,
|
||||||
|
today_volume=1000.0, closes=[], highs=[], lows=[], volumes=[],
|
||||||
|
avg_price=None, qty=None, holding_high=None, climax_vol_mult=3.0,
|
||||||
|
)
|
||||||
|
base.update(over)
|
||||||
|
return base
|
||||||
|
|
||||||
|
|
||||||
|
def _conditions(firing):
|
||||||
|
return {f["condition"] for f in firing}
|
||||||
|
|
||||||
|
|
||||||
|
def test_ma20_pullback_fires():
|
||||||
|
# 정배열(ma20>ma50>ma200), 최근 저가가 ma20 근처, price가 ma20 위로 반등
|
||||||
|
closes = [90.0] * 200 + [100.0] * 20 # ma20=100, ma50/ma200 낮음 → 정배열
|
||||||
|
lows = [90.0] * 217 + [100.5, 100.4, 100.3] # 최근 3봉 저가 ~ma20*(1.02)=102 이하
|
||||||
|
ctx = _ctx(price=101.0, closes=closes, highs=closes, lows=lows,
|
||||||
|
volumes=[1.0] * len(closes))
|
||||||
|
assert "buy_ma20_pullback" in _conditions(evaluate_buy(ctx, BUY_PARAMS))
|
||||||
|
|
||||||
|
|
||||||
|
def test_ma20_pullback_skips_when_not_aligned():
|
||||||
|
closes = [100.0] * 200 + [90.0] * 20 # 역배열
|
||||||
|
ctx = _ctx(price=91.0, closes=closes, highs=closes, lows=closes,
|
||||||
|
volumes=[1.0] * len(closes))
|
||||||
|
assert "buy_ma20_pullback" not in _conditions(evaluate_buy(ctx, BUY_PARAMS))
|
||||||
|
|
||||||
|
|
||||||
|
def test_breakout_fires():
|
||||||
|
closes = [50.0] * 25
|
||||||
|
highs = [60.0] * 25 # 직전 20봉 최고 60
|
||||||
|
vols = [100.0] * 25 # avg20=100
|
||||||
|
ctx = _ctx(price=61.0, today_volume=200.0, closes=closes, highs=highs,
|
||||||
|
lows=closes, volumes=vols) # 61>60, 200>1.5*100
|
||||||
|
assert "buy_breakout" in _conditions(evaluate_buy(ctx, BUY_PARAMS))
|
||||||
|
|
||||||
|
|
||||||
|
def test_breakout_skips_on_low_volume():
|
||||||
|
highs = [60.0] * 25
|
||||||
|
ctx = _ctx(price=61.0, today_volume=120.0, closes=[50.0] * 25, highs=highs,
|
||||||
|
lows=[50.0] * 25, volumes=[100.0] * 25) # 120 < 1.5*100=150
|
||||||
|
assert "buy_breakout" not in _conditions(evaluate_buy(ctx, BUY_PARAMS))
|
||||||
|
|
||||||
|
|
||||||
|
def test_rsi_bounce_fires():
|
||||||
|
# 14봉 급락으로 RSI<30 찍고 5봉 반등하여 30 위로 복귀
|
||||||
|
closes = [100.0]
|
||||||
|
for _ in range(14):
|
||||||
|
closes.append(closes[-1] * 0.97) # 하락 → RSI 저하
|
||||||
|
for _ in range(5):
|
||||||
|
closes.append(closes[-1] * 1.05) # 반등 → RSI 30 위로
|
||||||
|
ctx = _ctx(price=closes[-1], closes=closes, highs=closes, lows=closes,
|
||||||
|
volumes=[1.0] * len(closes))
|
||||||
|
assert "buy_rsi_bounce" in _conditions(evaluate_buy(ctx, BUY_PARAMS))
|
||||||
|
|
||||||
|
|
||||||
|
def test_empty_series_no_fire():
|
||||||
|
assert evaluate_buy(_ctx(), BUY_PARAMS) == []
|
||||||
89
services/trade-monitor/tests/test_conditions_sell.py
Normal file
89
services/trade-monitor/tests/test_conditions_sell.py
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
"""evaluate_sell — 5개 매도 조건 경계."""
|
||||||
|
from conditions import evaluate_sell
|
||||||
|
|
||||||
|
EXIT = {"stop_pct": 0.08, "take_pct": 0.25, "trailing_pct": 0.10,
|
||||||
|
"climax_vol_x": 3.0, "climax_close_pct": 0.97}
|
||||||
|
|
||||||
|
|
||||||
|
def _ctx(**over):
|
||||||
|
base = dict(
|
||||||
|
ticker="000660", name="SK하이닉스", price=100.0, day_open=100.0,
|
||||||
|
day_high=100.0, today_volume=100.0, closes=[100.0] * 60,
|
||||||
|
highs=[100.0] * 60, lows=[100.0] * 60, volumes=[100.0] * 60,
|
||||||
|
avg_price=100.0, qty=10, holding_high=100.0, climax_vol_mult=3.0,
|
||||||
|
)
|
||||||
|
base.update(over)
|
||||||
|
return base
|
||||||
|
|
||||||
|
|
||||||
|
def _c(firing):
|
||||||
|
return {f["condition"] for f in firing}
|
||||||
|
|
||||||
|
|
||||||
|
def test_stop_loss_fires():
|
||||||
|
ctx = _ctx(price=90.0, avg_price=100.0) # -10% <= -8%
|
||||||
|
assert "sell_stop_loss" in _c(evaluate_sell(ctx, EXIT))
|
||||||
|
|
||||||
|
|
||||||
|
def test_stop_loss_skips_above_threshold():
|
||||||
|
ctx = _ctx(price=95.0, avg_price=100.0) # -5% > -8%
|
||||||
|
assert "sell_stop_loss" not in _c(evaluate_sell(ctx, EXIT))
|
||||||
|
|
||||||
|
|
||||||
|
def test_take_profit_fires():
|
||||||
|
ctx = _ctx(price=130.0, avg_price=100.0) # +30% >= 25%
|
||||||
|
assert "sell_take_profit" in _c(evaluate_sell(ctx, EXIT))
|
||||||
|
|
||||||
|
|
||||||
|
def test_trailing_stop_fires():
|
||||||
|
ctx = _ctx(price=89.0, holding_high=100.0) # 89 <= 100*0.9=90
|
||||||
|
assert "sell_trailing_stop" in _c(evaluate_sell(ctx, EXIT))
|
||||||
|
|
||||||
|
|
||||||
|
def test_ma_break_severity_high():
|
||||||
|
# price가 ma50/ma200 아래 → severity high (ma200 계산 위해 200봉 필요)
|
||||||
|
closes = [200.0] * 200
|
||||||
|
ctx = _ctx(price=100.0, closes=closes, avg_price=100.0, holding_high=100.0)
|
||||||
|
firing = evaluate_sell(ctx, EXIT)
|
||||||
|
mb = [f for f in firing if f["condition"] == "sell_ma_break"]
|
||||||
|
assert mb and mb[0]["detail"]["severity"] == "high"
|
||||||
|
|
||||||
|
|
||||||
|
def test_climax_fires():
|
||||||
|
# holdings_intel 정합: 거래량 3배 이상 + 종가 < 당일고가×0.97 (윗꼬리)
|
||||||
|
ctx = _ctx(price=96.0, day_high=100.0, today_volume=400.0,
|
||||||
|
volumes=[100.0] * 60) # 400>=3*100, 96 < 100*0.97=97
|
||||||
|
assert "sell_climax" in _c(evaluate_sell(ctx, EXIT))
|
||||||
|
|
||||||
|
|
||||||
|
def test_climax_skips_when_not_reversal():
|
||||||
|
# 종가가 당일고가의 97% 이상 → 윗꼬리 아님
|
||||||
|
ctx = _ctx(price=99.0, day_high=100.0, today_volume=400.0,
|
||||||
|
volumes=[100.0] * 60) # 99 >= 100*0.97=97 → 반전 아님
|
||||||
|
assert "sell_climax" not in _c(evaluate_sell(ctx, EXIT))
|
||||||
|
|
||||||
|
|
||||||
|
def test_climax_uses_exit_params_vol_x():
|
||||||
|
# exit_params.climax_vol_x=5.0 → 400 < 5*100=500 → 미발화
|
||||||
|
exit5 = {**EXIT, "climax_vol_x": 5.0}
|
||||||
|
ctx = _ctx(price=96.0, day_high=100.0, today_volume=400.0,
|
||||||
|
volumes=[100.0] * 60)
|
||||||
|
assert "sell_climax" not in _c(evaluate_sell(ctx, exit5))
|
||||||
|
|
||||||
|
|
||||||
|
def test_climax_uses_exit_params_close_pct():
|
||||||
|
# climax_close_pct=0.90 → 임계 90, price=95 → 95<90? No → 미발화
|
||||||
|
exit90 = {**EXIT, "climax_close_pct": 0.90}
|
||||||
|
ctx = _ctx(price=95.0, day_high=100.0, today_volume=400.0,
|
||||||
|
volumes=[100.0] * 60)
|
||||||
|
assert "sell_climax" not in _c(evaluate_sell(ctx, exit90))
|
||||||
|
# 기본 0.97이면 95 < 97 → 발화
|
||||||
|
assert "sell_climax" in _c(evaluate_sell(ctx, EXIT))
|
||||||
|
|
||||||
|
|
||||||
|
def test_no_avg_no_pnl_conditions():
|
||||||
|
# avg_price None(보유정보 없음) → stop/take 미발화
|
||||||
|
ctx = _ctx(price=50.0, avg_price=None, holding_high=None,
|
||||||
|
closes=[100.0] * 60)
|
||||||
|
conds = _c(evaluate_sell(ctx, EXIT))
|
||||||
|
assert "sell_stop_loss" not in conds and "sell_take_profit" not in conds
|
||||||
27
services/trade-monitor/tests/test_config.py
Normal file
27
services/trade-monitor/tests/test_config.py
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
"""Settings env 로드 — 기본값 + override."""
|
||||||
|
from config import load_settings
|
||||||
|
|
||||||
|
|
||||||
|
def test_defaults(monkeypatch):
|
||||||
|
for k in ("NAS_BASE_URL", "WEBAI_API_KEY", "REDIS_URL", "TM_KIS_APP_KEY",
|
||||||
|
"TM_KIS_APP_SECRET", "TM_KIS_ACCOUNT", "TM_KIS_IS_VIRTUAL",
|
||||||
|
"TM_LOOP_INTERVAL", "TM_CLIMAX_VOL_MULT"):
|
||||||
|
monkeypatch.delenv(k, raising=False)
|
||||||
|
s = load_settings()
|
||||||
|
assert s.nas_base_url == "http://192.168.45.54:18500"
|
||||||
|
assert s.redis_url == "redis://192.168.45.54:6379"
|
||||||
|
assert s.kis_is_virtual is False
|
||||||
|
assert s.loop_interval == 60
|
||||||
|
assert s.climax_vol_mult == 3.0
|
||||||
|
|
||||||
|
|
||||||
|
def test_override(monkeypatch):
|
||||||
|
monkeypatch.setenv("TM_KIS_IS_VIRTUAL", "1")
|
||||||
|
monkeypatch.setenv("TM_LOOP_INTERVAL", "30")
|
||||||
|
monkeypatch.setenv("TM_CLIMAX_VOL_MULT", "2.5")
|
||||||
|
monkeypatch.setenv("WEBAI_API_KEY", "secret")
|
||||||
|
s = load_settings()
|
||||||
|
assert s.kis_is_virtual is True
|
||||||
|
assert s.loop_interval == 30
|
||||||
|
assert s.climax_vol_mult == 2.5
|
||||||
|
assert s.webai_api_key == "secret"
|
||||||
6
services/trade-monitor/tests/test_health.py
Normal file
6
services/trade-monitor/tests/test_health.py
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
"""/health — 라우트 핸들러 직접 검증."""
|
||||||
|
from main import health
|
||||||
|
|
||||||
|
|
||||||
|
def test_health():
|
||||||
|
assert health() == {"ok": True, "service": "trade-monitor"}
|
||||||
39
services/trade-monitor/tests/test_indicators.py
Normal file
39
services/trade-monitor/tests/test_indicators.py
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
"""indicators — 순수 수치 검증."""
|
||||||
|
from indicators import sma, rsi_series, highest_high
|
||||||
|
|
||||||
|
|
||||||
|
def test_sma_basic():
|
||||||
|
assert sma([1, 2, 3, 4, 5], 5) == 3.0
|
||||||
|
assert sma([1, 2, 3, 4, 5], 2) == 4.5
|
||||||
|
|
||||||
|
|
||||||
|
def test_sma_insufficient():
|
||||||
|
assert sma([1, 2], 5) is None
|
||||||
|
assert sma([], 3) is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_highest_high():
|
||||||
|
assert highest_high([1, 9, 3, 4], 3) == 9
|
||||||
|
assert highest_high([1, 2, 3], 3) == 3
|
||||||
|
assert highest_high([1, 2], 3) is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_rsi_all_gains_is_100():
|
||||||
|
# 단조 증가 → 손실 0 → RSI 100
|
||||||
|
closes = [float(i) for i in range(1, 20)]
|
||||||
|
rs = rsi_series(closes, 14)
|
||||||
|
assert rs, "series should not be empty"
|
||||||
|
assert rs[-1] == 100.0
|
||||||
|
|
||||||
|
|
||||||
|
def test_rsi_insufficient():
|
||||||
|
assert rsi_series([1, 2, 3], 14) == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_rsi_known_range():
|
||||||
|
# 등락 섞인 시계열 → RSI는 0~100 사이
|
||||||
|
closes = [10, 11, 10.5, 11.5, 11, 12, 11.8, 12.5, 12, 13,
|
||||||
|
12.7, 13.2, 12.9, 13.5, 13.1, 13.8]
|
||||||
|
rs = rsi_series(closes, 14)
|
||||||
|
assert len(rs) == len(closes) - 14
|
||||||
|
assert all(0.0 <= v <= 100.0 for v in rs)
|
||||||
56
services/trade-monitor/tests/test_kis_client.py
Normal file
56
services/trade-monitor/tests/test_kis_client.py
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
"""KISClient — 토큰 발급/캐시 + quote/daily 파싱 (respx)."""
|
||||||
|
import httpx
|
||||||
|
import respx
|
||||||
|
|
||||||
|
from kis_client import KISClient
|
||||||
|
|
||||||
|
BASE = "https://openapi.koreainvestment.com:9443"
|
||||||
|
|
||||||
|
|
||||||
|
def _client():
|
||||||
|
return KISClient("APPKEY", "APPSECRET", "12345678-01", is_virtual=False)
|
||||||
|
|
||||||
|
|
||||||
|
@respx.mock
|
||||||
|
async def test_issue_token_cached():
|
||||||
|
route = respx.post(f"{BASE}/oauth2/tokenP").mock(
|
||||||
|
return_value=httpx.Response(200, json={"access_token": "TKN", "expires_in": 86400}))
|
||||||
|
c = _client()
|
||||||
|
t1 = await c._issue_token()
|
||||||
|
t2 = await c._issue_token()
|
||||||
|
assert t1 == "TKN" and t2 == "TKN"
|
||||||
|
assert route.call_count == 1 # 캐시 → 1회만 발급
|
||||||
|
await c.close()
|
||||||
|
|
||||||
|
|
||||||
|
@respx.mock
|
||||||
|
async def test_get_quote_parses():
|
||||||
|
respx.post(f"{BASE}/oauth2/tokenP").mock(
|
||||||
|
return_value=httpx.Response(200, json={"access_token": "TKN", "expires_in": 86400}))
|
||||||
|
respx.get(f"{BASE}/uapi/domestic-stock/v1/quotations/inquire-price").mock(
|
||||||
|
return_value=httpx.Response(200, json={"output": {
|
||||||
|
"stck_prpr": "71500", "stck_oprc": "71000", "stck_hgpr": "72000",
|
||||||
|
"acml_vol": "1234567"}}))
|
||||||
|
c = _client()
|
||||||
|
q = await c.get_quote("005930")
|
||||||
|
assert q["price"] == 71500 and q["day_open"] == 71000 and q["today_volume"] == 1234567
|
||||||
|
assert q["day_high"] == 72000
|
||||||
|
await c.close()
|
||||||
|
|
||||||
|
|
||||||
|
@respx.mock
|
||||||
|
async def test_get_daily_ascending():
|
||||||
|
respx.post(f"{BASE}/oauth2/tokenP").mock(
|
||||||
|
return_value=httpx.Response(200, json={"access_token": "TKN", "expires_in": 86400}))
|
||||||
|
# KIS는 내림차순 반환 → 오름차순으로 뒤집혀야 함
|
||||||
|
respx.get(f"{BASE}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice").mock(
|
||||||
|
return_value=httpx.Response(200, json={"output2": [
|
||||||
|
{"stck_bsop_date": "20260702", "stck_oprc": "100", "stck_hgpr": "110",
|
||||||
|
"stck_lwpr": "90", "stck_clpr": "105", "acml_vol": "5"},
|
||||||
|
{"stck_bsop_date": "20260701", "stck_oprc": "95", "stck_hgpr": "102",
|
||||||
|
"stck_lwpr": "94", "stck_clpr": "100", "acml_vol": "4"}]}))
|
||||||
|
c = _client()
|
||||||
|
bars = await c.get_daily_ohlcv("005930", days=250)
|
||||||
|
assert bars[0]["datetime"] == "2026-07-01"
|
||||||
|
assert bars[-1]["close"] == 105
|
||||||
|
await c.close()
|
||||||
95
services/trade-monitor/tests/test_monitor.py
Normal file
95
services/trade-monitor/tests/test_monitor.py
Normal file
@@ -0,0 +1,95 @@
|
|||||||
|
"""monitor.run_cycle — 게이트/필터/조립/격리."""
|
||||||
|
from monitor import MonitorState, filter_krx, run_cycle
|
||||||
|
from config import load_settings
|
||||||
|
from _shared.heartbeat import WorkerStats
|
||||||
|
|
||||||
|
|
||||||
|
def test_filter_krx_keeps_only_numeric6():
|
||||||
|
targets = [{"ticker": "005930"}, {"ticker": "AAPL"}, {"ticker": "00660"},
|
||||||
|
{"ticker": "000660"}, {"ticker": "0059301"}]
|
||||||
|
kept = {t["ticker"] for t in filter_krx(targets)}
|
||||||
|
assert kept == {"005930", "000660"}
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeNAS:
|
||||||
|
def __init__(self, ms):
|
||||||
|
self._ms = ms
|
||||||
|
self.reported = None
|
||||||
|
|
||||||
|
async def get_monitor_set(self):
|
||||||
|
return self._ms
|
||||||
|
|
||||||
|
async def post_report(self, as_of, firing):
|
||||||
|
self.reported = {"as_of": as_of, "firing": firing}
|
||||||
|
return {"new_alerts": len(firing), "cleared": 0}
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeKIS:
|
||||||
|
def __init__(self, price=100, fail_on=None):
|
||||||
|
self._price = price
|
||||||
|
self._fail_on = fail_on or set()
|
||||||
|
|
||||||
|
async def get_quote(self, ticker):
|
||||||
|
if ticker in self._fail_on:
|
||||||
|
raise RuntimeError("KIS down")
|
||||||
|
return {"price": self._price, "day_open": 99, "day_high": 100,
|
||||||
|
"today_volume": 1000, "as_of": "x"}
|
||||||
|
|
||||||
|
async def get_daily_ohlcv(self, ticker, days=250):
|
||||||
|
# 정배열 + 저가 근접 → ma20_pullback 발화 유도
|
||||||
|
return [{"open": 90, "high": 90, "low": 90, "close": 90, "volume": 1}] * 200 \
|
||||||
|
+ [{"open": 100, "high": 100, "low": 100, "close": 100, "volume": 1}] * 20
|
||||||
|
|
||||||
|
|
||||||
|
async def test_closed_session_skips_kis():
|
||||||
|
nas = _FakeNAS({"session": "closed"})
|
||||||
|
state, stats = MonitorState(), WorkerStats()
|
||||||
|
await run_cycle(nas, _FakeKIS(), state, stats, load_settings())
|
||||||
|
assert state.session_state == "market_closed"
|
||||||
|
assert nas.reported is None # report도 안 함
|
||||||
|
|
||||||
|
|
||||||
|
async def test_non_krx_skipped_and_report_sent():
|
||||||
|
nas = _FakeNAS({"session": "regular",
|
||||||
|
"buy_targets": [{"ticker": "AAPL", "name": "Apple"}],
|
||||||
|
"sell_targets": [], "buy_params": {}, "exit_params": {}})
|
||||||
|
state, stats = MonitorState(), WorkerStats()
|
||||||
|
await run_cycle(nas, _FakeKIS(), state, stats, load_settings())
|
||||||
|
assert state.session_state == "market_open"
|
||||||
|
assert nas.reported is not None
|
||||||
|
assert nas.reported["firing"] == [] # 알파벳 티커 skip → 빈 발화
|
||||||
|
|
||||||
|
|
||||||
|
async def test_firing_assembled_and_last_alert_set():
|
||||||
|
nas = _FakeNAS({"session": "regular",
|
||||||
|
"buy_targets": [{"ticker": "005930", "name": "삼성전자"}],
|
||||||
|
"sell_targets": [], "buy_params": {"pullback_pct": 0.02},
|
||||||
|
"exit_params": {}})
|
||||||
|
state, stats = MonitorState(), WorkerStats()
|
||||||
|
await run_cycle(nas, _FakeKIS(price=101), state, stats, load_settings())
|
||||||
|
conds = {f["condition"] for f in nas.reported["firing"]}
|
||||||
|
assert "buy_ma20_pullback" in conds
|
||||||
|
assert state.last_alert_at is not None
|
||||||
|
|
||||||
|
|
||||||
|
async def test_per_ticker_failure_isolated():
|
||||||
|
nas = _FakeNAS({"session": "regular",
|
||||||
|
"buy_targets": [{"ticker": "005930"}, {"ticker": "000660"}],
|
||||||
|
"sell_targets": [], "buy_params": {}, "exit_params": {}})
|
||||||
|
state, stats = MonitorState(), WorkerStats()
|
||||||
|
# 005930은 실패, 000660은 성공 → 루프가 죽지 않고 report 전송
|
||||||
|
await run_cycle(nas, _FakeKIS(fail_on={"005930"}), state, stats, load_settings())
|
||||||
|
assert nas.reported is not None
|
||||||
|
assert state.session_state == "market_open"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_monitor_set_failure_sets_idle():
|
||||||
|
class _BadNAS(_FakeNAS):
|
||||||
|
async def get_monitor_set(self):
|
||||||
|
raise RuntimeError("NAS down")
|
||||||
|
|
||||||
|
nas = _BadNAS({})
|
||||||
|
state, stats = MonitorState(), WorkerStats()
|
||||||
|
await run_cycle(nas, _FakeKIS(), state, stats, load_settings())
|
||||||
|
assert state.session_state == "idle"
|
||||||
|
assert nas.reported is None
|
||||||
39
services/trade-monitor/tests/test_nas_client.py
Normal file
39
services/trade-monitor/tests/test_nas_client.py
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
"""NASClient — monitor-set/report + X-WebAI-Key (respx)."""
|
||||||
|
import json as _json
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
import respx
|
||||||
|
|
||||||
|
from nas_client import NASClient
|
||||||
|
|
||||||
|
BASE = "http://nas.test"
|
||||||
|
|
||||||
|
|
||||||
|
@respx.mock
|
||||||
|
async def test_get_monitor_set_sends_key():
|
||||||
|
route = respx.get(f"{BASE}/api/webai/trade-alert/monitor-set").mock(
|
||||||
|
return_value=httpx.Response(200, json={"session": "regular", "buy_targets": []}))
|
||||||
|
c = NASClient(BASE, "KEY")
|
||||||
|
ms = await c.get_monitor_set()
|
||||||
|
assert ms["session"] == "regular"
|
||||||
|
assert route.calls.last.request.headers["X-WebAI-Key"] == "KEY"
|
||||||
|
await c.close()
|
||||||
|
|
||||||
|
|
||||||
|
@respx.mock
|
||||||
|
async def test_post_report_payload():
|
||||||
|
captured = {}
|
||||||
|
|
||||||
|
def _resp(request):
|
||||||
|
captured.update(_json.loads(request.content))
|
||||||
|
return httpx.Response(200, json={"new_alerts": 1, "cleared": 0})
|
||||||
|
|
||||||
|
respx.post(f"{BASE}/api/webai/trade-alert/report").mock(side_effect=_resp)
|
||||||
|
c = NASClient(BASE, "KEY")
|
||||||
|
firing = [{"ticker": "005930", "kind": "buy", "condition": "buy_breakout",
|
||||||
|
"price": 71500, "detail": {}}]
|
||||||
|
out = await c.post_report("2026-07-02T09:01:00+09:00", firing)
|
||||||
|
assert out["new_alerts"] == 1
|
||||||
|
assert captured["as_of"] == "2026-07-02T09:01:00+09:00"
|
||||||
|
assert captured["firing"] == firing
|
||||||
|
await c.close()
|
||||||
@@ -3,11 +3,14 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
|
||||||
|
import redis.asyncio as aioredis
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
|
|
||||||
import worker
|
import worker
|
||||||
|
from _shared.heartbeat import heartbeat_loop
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -16,15 +19,19 @@ logger = logging.getLogger(__name__)
|
|||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
worker_task = asyncio.create_task(worker.worker_loop())
|
worker_task = asyncio.create_task(worker.worker_loop())
|
||||||
|
hb_redis = aioredis.from_url(os.getenv("REDIS_URL", "redis://192.168.45.54:6379"), decode_responses=False)
|
||||||
|
hb_task = asyncio.create_task(heartbeat_loop(hb_redis, "video-render", "render", worker.stats))
|
||||||
logger.info("video-render lifespan 시작")
|
logger.info("video-render lifespan 시작")
|
||||||
try:
|
try:
|
||||||
yield
|
yield
|
||||||
finally:
|
finally:
|
||||||
worker_task.cancel()
|
for t in (worker_task, hb_task):
|
||||||
try:
|
t.cancel()
|
||||||
await worker_task
|
try:
|
||||||
except asyncio.CancelledError:
|
await t
|
||||||
pass
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
await hb_redis.aclose()
|
||||||
logger.info("video-render lifespan 종료")
|
logger.info("video-render lifespan 종료")
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -94,3 +94,25 @@ async def test_poll_once_returns_false_on_timeout(monkeypatch):
|
|||||||
assert handled is False
|
assert handled is False
|
||||||
fake_queue.ack.assert_not_awaited()
|
fake_queue.ack.assert_not_awaited()
|
||||||
fake_queue.fail.assert_not_awaited()
|
fake_queue.fail.assert_not_awaited()
|
||||||
|
|
||||||
|
|
||||||
|
# ----- heartbeat stats 카운터 -----
|
||||||
|
|
||||||
|
class _OneJobQueue:
|
||||||
|
def __init__(self): self.acked = False
|
||||||
|
async def dequeue(self, timeout=5):
|
||||||
|
if self.acked: return None
|
||||||
|
return ({"job_type": "sora_generation", "task_id": "t1", "params": {}}, b"raw")
|
||||||
|
async def ack(self, raw): self.acked = True
|
||||||
|
async def fail(self, raw, payload): pass
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_poll_once_increments_jobs_done(monkeypatch):
|
||||||
|
worker.stats.jobs_done = 0
|
||||||
|
monkeypatch.setattr(worker, "run_sora_generation", lambda task_id, params: None)
|
||||||
|
handled = await worker.poll_once(_OneJobQueue())
|
||||||
|
assert handled is True
|
||||||
|
assert worker.stats.jobs_done == 1
|
||||||
|
assert worker.stats.busy is False
|
||||||
|
assert worker.stats.last_job_at is not None
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ from providers.veo import run_veo_generation
|
|||||||
from providers.kling import run_kling_generation
|
from providers.kling import run_kling_generation
|
||||||
from providers.seedance import run_seedance_generation
|
from providers.seedance import run_seedance_generation
|
||||||
from _shared.reliable_queue import ReliableQueue
|
from _shared.reliable_queue import ReliableQueue
|
||||||
|
from _shared.heartbeat import WorkerStats, utc_now_iso
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -26,6 +27,8 @@ REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
|
|||||||
QUEUE_KEY = "queue:video-render"
|
QUEUE_KEY = "queue:video-render"
|
||||||
PAUSED_KEY = "queue:paused"
|
PAUSED_KEY = "queue:paused"
|
||||||
|
|
||||||
|
stats = WorkerStats()
|
||||||
|
|
||||||
# string names so `unittest.mock.patch` on `worker.<name>` is correctly intercepted
|
# string names so `unittest.mock.patch` on `worker.<name>` is correctly intercepted
|
||||||
_DISPATCH_TABLE = {
|
_DISPATCH_TABLE = {
|
||||||
"sora_generation": "run_sora_generation",
|
"sora_generation": "run_sora_generation",
|
||||||
@@ -60,14 +63,21 @@ async def poll_once(queue: ReliableQueue) -> bool:
|
|||||||
if result is None:
|
if result is None:
|
||||||
return False
|
return False
|
||||||
payload, raw = result
|
payload, raw = result
|
||||||
|
stats.busy = True
|
||||||
try:
|
try:
|
||||||
await asyncio.to_thread(_dispatch, payload)
|
await asyncio.to_thread(_dispatch, payload)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("dispatch unhandled exception task_id=%s",
|
logger.exception("dispatch unhandled exception task_id=%s",
|
||||||
payload.get("task_id"))
|
payload.get("task_id"))
|
||||||
await queue.fail(raw, payload)
|
await queue.fail(raw, payload)
|
||||||
|
stats.jobs_failed += 1
|
||||||
|
stats.last_job_at = utc_now_iso()
|
||||||
|
stats.busy = False
|
||||||
return True
|
return True
|
||||||
await queue.ack(raw)
|
await queue.ack(raw)
|
||||||
|
stats.jobs_done += 1
|
||||||
|
stats.last_job_at = utc_now_iso()
|
||||||
|
stats.busy = False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user