Compare commits

..

19 Commits

Author SHA1 Message Date
9abca3eeab docs: /infra 워커 관측 규칙 + trade-monitor climax 정합 반영
- CLAUDE.md: 모든 WSL docker 워커 /infra 관측 필수 규칙(BE 팀규칙) +
  services 행에 trade-monitor(:18715) 반영
- README.md: sell_climax 정합·36 tests·env 우선순위 문구 갱신

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01N83vbXEA8h83GMXQcg8fxD
2026-07-03 11:15:36 +09:00
5dbb11ac83 fix(trade-monitor): sell_climax holdings_intel 정합
BE 회신(holdings_intel.py:109-118)에 맞춰 반전 기준을
price<day_open → price<day_high×climax_close_pct(윗꼬리)로 변경.
- kis_client.get_quote에 day_high(stck_hgpr) 추가
- monitor._build_ctx가 day_high를 ctx로 전달
- climax_vol_x·climax_close_pct를 monitor-set exit_params에서 읽기
  (fallback: TM_CLIMAX_VOL_MULT/0.97)
- 테스트 36/36 (climax exit_params 2건 추가)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01N83vbXEA8h83GMXQcg8fxD
2026-07-03 11:15:27 +09:00
8e1b20190d docs(readme): trade-monitor 워커 섹션 + /infra 관측 규칙 반영 2026-07-03 11:05:57 +09:00
fa6ef6c5c8 feat(trade-monitor): Dockerfile + compose 서비스(18715) + .env.example 2026-07-03 01:48:14 +09:00
12aa55ed14 feat(trade-monitor): FastAPI lifespan + heartbeat 배선 + /health 2026-07-03 01:47:34 +09:00
ce8983c1b9 feat(trade-monitor): monitor 오케스트레이션 (run_cycle/loop/state_fn) 2026-07-03 01:47:34 +09:00
04aff34883 feat(trade-monitor): NAS trade-alert 클라이언트 (monitor-set/report) 2026-07-03 01:46:37 +09:00
d761716e00 feat(trade-monitor): KIS 자체 토큰 + quote + 일봉 클라이언트 2026-07-03 01:46:37 +09:00
241ce41a6a feat(trade-monitor): 매수/매도 조건 로직 (§6 8개 조건) 2026-07-03 01:45:41 +09:00
366a9160d5 feat(trade-monitor): 순수 지표 모듈 (sma/rsi/highest_high) 2026-07-03 01:45:41 +09:00
141209ad42 feat(trade-monitor): 스캐폴딩 + config 2026-07-03 01:44:25 +09:00
03e50d2be1 fix(task-watcher): _shared를 빌드 컨텍스트에 포함 (heartbeat import 크래시 수정)
task-watcher는 build context가 ./task-watcher라 services/_shared가 이미지에
없었음 → A3가 추가한 `from _shared.heartbeat import` ModuleNotFoundError로
컨테이너 즉시 크래시(재시작 후 alive=false). render 워커 패턴대로
context=. + COPY _shared /app/_shared + PYTHONPATH=/app 로 수정.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
2026-07-01 02:26:57 +09:00
54fca07d43 feat(ai_trade): NAS Redis heartbeat (trader market_open/closed)
- ai_trade/heartbeat.py: build_trader_payload() + heartbeat_loop() 자체 미니 헬퍼
  (Windows 호스트 실행이라 _shared import 경로 달라 독립 구현, 계약은 동일)
- ai_trade/main.py: lifespan에 hb_task spawn + shutdown 시 cancel
  state_fn = scheduler._is_market_day & _is_polling_window(KST now) 조합
  signals = len(state.signals) 실시간 주입
- requirements.txt: redis>=5.0 추가
- ai_trade/tests/test_heartbeat.py: build_trader_payload 3케이스 TDD 검증

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
2026-07-01 01:07:00 +09:00
574b5712c3 feat(task-watcher): heartbeat 발신 (state=mode, paused 이유 노출)
- watcher_loop 에서 mode 판정 직후 worker:task-watcher:heartbeat SET EX 45
- payload: build_payload(state=mode, extra={"mode": mode})
- LOOP_INTERVAL 30s < TTL 45s → 만료 전 주기적 갱신
- conftest.py 추가: services/ 를 sys.path에 주입해 _shared import 가능
- tests/test_watcher.py: payload kind/state/mode 필드 검증 (1 passed)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-07-01 00:59:28 +09:00
2ff31b2e76 feat(render-workers): 4 render 워커 heartbeat 배선 + poll_once 카운터
- services/_shared/heartbeat.py (A1) WorkerStats/utc_now_iso/heartbeat_loop 소비
- image-render / video-render / music-render / insta-render 각 worker.py:
  stats = WorkerStats() 모듈 레벨 추가, poll_once에서 dispatch 전 busy=True,
  ack 후 jobs_done+1 / fail 후 jobs_failed+1 + last_job_at + busy=False
- 각 main.py: lifespan에 aioredis(decode_responses=False) + heartbeat_loop 태스크 spawn,
  종료 시 cancel + aclose
- 각 tests/test_worker.py: test_poll_once_increments_jobs_done 추가
  (image:flux / video:sora / music:suno / insta:_process_one mock)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
2026-07-01 00:52:57 +09:00
d1b9ff570d feat(_shared): 워커 heartbeat 모듈 (worker:<name>:heartbeat TTL SET)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-07-01 00:43:01 +09:00
4fb3d12244 merge: co-gahusb AI 클라이언트 배선 2026-06-12 23:46:35 +09:00
789a807d50 feat(co-gahusb): AI 클라이언트 배선 (.mcp.json + 역할 블록)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 23:46:34 +09:00
ad141a2887 fix(insta-render): INSTA_MEDIA_ROOT를 insta_cards 하위로 정렬 (nginx 서빙 경로 일치)
워커가 INSTA_MEDIA_ROOT/{slate_id}에 PNG를 쓰는데 기본값 /mnt/nas/webpage/data/insta가 insta_cards 서브디렉토리를 누락 → data/insta/{id}에 저장. 그러나 nginx(/media/insta→/data/insta_cards), insta-lab CARDS_DIR, frontend 마운트, 구 렌더는 전부 data/insta/insta_cards/{id}를 기대 → /media/insta/{id}/NN.png 404.

INSTA_MEDIA_ROOT을 /mnt/nas/webpage/data/insta/insta_cards로 정정(.env + compose 기본값 + .env.example). 코드 무변경 → 컨테이너 recreate만으로 적용(rebuild 불요). SMB 볼륨 마운트는 상위 디렉토리라 그대로 유효.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-11 01:18:09 +09:00
50 changed files with 3139 additions and 29 deletions

9
.mcp.json Normal file
View File

@@ -0,0 +1,9 @@
{
"mcpServers": {
"co-gahusb": {
"type": "http",
"url": "https://gahusb.synology.me/api/co/mcp",
"headers": { "Authorization": "Bearer ${CO_BUS_KEY}" }
}
}
}

View File

@@ -15,12 +15,14 @@ Windows AI 머신 (AMD 9800X3D + RTX 5070 Ti 16GB) 의 두 신호 파이프라
| `ai_trade/` | 자동매매 메인 (구 `signal_v2` 2026-05-19 rename) — Chronos-bolt + 분봉 모멘텀 + KIS WebSocket + 신호 생성 | `:8001` | **Phase 4 완료 (2026-05-17)**, Phase 5 대기 | | `ai_trade/` | 자동매매 메인 (구 `signal_v2` 2026-05-19 rename) — Chronos-bolt + 분봉 모멘텀 + KIS WebSocket + 신호 생성 | `:8001` | **Phase 4 완료 (2026-05-17)**, Phase 5 대기 |
| `legacy/start_v1.bat` | (deprecated) V1 진입점 — root `start.bat`에서 이동됨. 자동 실행 차단 | — | **OFF** | | `legacy/start_v1.bat` | (deprecated) V1 진입점 — root `start.bat`에서 이동됨. 자동 실행 차단 | — | **OFF** |
| `ai_trade/start.bat` | 자동매매 진입점 | — | `ai_trade/main.py` uvicorn 실행 | | `ai_trade/start.bat` | 자동매매 진입점 | — | `ai_trade/main.py` uvicorn 실행 |
| `services/` | (예정) NAS↔Windows 분산 worker — insta-render·music-render·video-render·task-watcher | 18710~ | **Plan-B-Insta 작업 중** | | `services/` | NAS↔Windows 분산 worker — insta·music·video·image-render·task-watcher·**trade-monitor(:18715, 실시간 매매 알람)** | 18710~ | 운영 |
| `.env` | 환경변수 (`KIS_REAL_*`, `TELEGRAM_*`, `STOCK_API_URL`, `WEBAI_API_KEY`, `LOG_LEVEL`) | — | | | `.env` | 환경변수 (`KIS_REAL_*`, `TELEGRAM_*`, `STOCK_API_URL`, `WEBAI_API_KEY`, `LOG_LEVEL`) | — | |
| `requirements.txt` | 공용 의존성 | — | torch, chronos-forecasting, fastapi, httpx, websockets 등 | | `requirements.txt` | 공용 의존성 | — | torch, chronos-forecasting, fastapi, httpx, websockets 등 |
`.venv`**구조적으로 깨짐**: `pyvenv.cfg` 가 한글 사용자 경로(`C:\Users\박재오\...`) 를 포함하여 콘솔 코드페이지가 roundtrip 못함. 테스트는 시스템 Python 으로 실행: `C:\Users\jaeoh\AppData\Local\Programs\Python\Python312\python.exe -m pytest ai_trade/tests -q`. `.venv`**구조적으로 깨짐**: `pyvenv.cfg` 가 한글 사용자 경로(`C:\Users\박재오\...`) 를 포함하여 콘솔 코드페이지가 roundtrip 못함. 테스트는 시스템 Python 으로 실행: `C:\Users\jaeoh\AppData\Local\Programs\Python\Python312\python.exe -m pytest ai_trade/tests -q`.
> **분산 워커 /infra 관측 규칙 (팀 규칙, BE 제정)**: 모든 WSL docker 워커는 ① Redis `worker:<name>:heartbeat`(EX45) 발신 ② BE `node_monitor.WORKER_REGISTRY` 등재 ③ `/api/agent-office/nodes`·web-ui `/infra` 노출이 필수. trade-monitor는 kind=trader로 등재됨.
--- ---
## 서버 시작 방식 ## 서버 시작 방식
@@ -137,3 +139,14 @@ cd C:\Users\jaeoh\Desktop\workspace\web-ai\ai_trade
- **spec amendment 발생 시**: 코드는 `web-ai` 에 commit, spec 갱신은 `web-ui/docs/superpowers/specs/` 에 commit (Phase 4 spread formula 변경 사례 = web-ui commit `534ded5`) - **spec amendment 발생 시**: 코드는 `web-ai` 에 commit, spec 갱신은 `web-ui/docs/superpowers/specs/` 에 commit (Phase 4 spread formula 변경 사례 = web-ui commit `534ded5`)
자세한 V1 가이드는 `signal_v1/CLAUDE.md` 참조 (있다면). 자세한 V1 가이드는 `signal_v1/CLAUDE.md` 참조 (있다면).
---
## 협업 팀 버스 (co-gahusb) — 이 세션의 역할: **AI**
이 세션은 AI 리서치(AI) 역할이다. co-gahusb MCP 툴로 다른 세션(FE/BE/Producer)과 협업한다.
- **소유권**: 이 세션은 `web-ai` repo만 쓴다(FE=web-ui, BE=web-backend).
- **공유 리소스 변경 전 반드시 `acquire_lock(resource, "AI")`**: 대상 = `nas-deploy`, `stock-db-schema`, `lotto-db-schema`, `memory-mirror`, `nginx-conf`, `compose`. 점유 중이면 대기, 긴 작업은 `heartbeat_lock`, 끝나면 `release_lock`.
- **모든 툴 호출에 `role="AI"`** (또는 `from_role`/`created_by`에 AI).
- **수신**: `/loop`로 주기적으로 `read_inbox("AI", after_id=<last>)` + `list_tasks(assignee_role="AI")` 확인.
-`CO_BUS_KEY`는 환경변수로 주입(커밋 금지). `.mcp.json``${CO_BUS_KEY}`가 프로세스 환경변수에서 치환됨 → `setx CO_BUS_KEY "..."` 후 새 터미널에서 `claude` 실행.

View File

@@ -3,7 +3,7 @@
Windows AI 머신(AMD 9800X3D + RTX 5070 Ti 16GB)에서 동작하는 두 영역의 서비스: Windows AI 머신(AMD 9800X3D + RTX 5070 Ti 16GB)에서 동작하는 두 영역의 서비스:
1. **ai_trade** — Confidence Signal Pipeline V2. NAS stock 백엔드와 KIS Open API를 결합해 매수/매도 신호를 생성하는 FastAPI 워커. 1. **ai_trade** — Confidence Signal Pipeline V2. NAS stock 백엔드와 KIS Open API를 결합해 매수/매도 신호를 생성하는 FastAPI 워커.
2. **services** — NAS↔Windows 분산 렌더링 워커(인스타 카드 / 음악 / 영상 / 이미지) + task-watcher. 2. **services** — NAS↔Windows 분산 워커: 렌더링(인스타 카드 / 음악 / 영상 / 이미지) + task-watcher + **trade-monitor**(실시간 매매 알람).
상위 워크스페이스 컨텍스트는 `../CLAUDE.md`, 본 디렉토리 상세는 `CLAUDE.md`, 운영 체크포인트는 `CHECK_POINT.md` 참조. 상위 워크스페이스 컨텍스트는 `../CLAUDE.md`, 본 디렉토리 상세는 `CLAUDE.md`, 운영 체크포인트는 `CHECK_POINT.md` 참조.
@@ -20,6 +20,7 @@ Windows AI 머신(AMD 9800X3D + RTX 5070 Ti 16GB)에서 동작하는 두 영역
| `services/video-render/` | sora / veo / kling / seedance 4 provider 영상 생성 게이트웨이. `queue:video-render` 소비. | `:18712` | | `services/video-render/` | sora / veo / kling / seedance 4 provider 영상 생성 게이트웨이. `queue:video-render` 소비. | `:18712` |
| `services/image-render/` | gpt_image / nano_banana / flux(ComfyUI 로컬) 3 provider. `queue:image-render` 소비. | `:18714` | | `services/image-render/` | gpt_image / nano_banana / flux(ComfyUI 로컬) 3 provider. `queue:image-render` 소비. | `:18714` |
| `services/task-watcher/` | 박재오 작업 시간대에 `queue:paused` 토글 → 워커 일시 정지. | `:18713` | | `services/task-watcher/` | 박재오 작업 시간대에 `queue:paused` 토글 → 워커 일시 정지. | `:18713` |
| `services/trade-monitor/` | 실시간 매매 알람. monitor-set pull → KIS 시세 + TA 조건(§6 8종) → report + heartbeat(kind=trader). 무상태. | `:18715` |
| `legacy/signal_v1/` | ⚠ **DEPRECATED** (2026-05-19). LSTM 봇. 자동 실행 차단됨. | OFF | | `legacy/signal_v1/` | ⚠ **DEPRECATED** (2026-05-19). LSTM 봇. 자동 실행 차단됨. | OFF |
--- ---
@@ -130,6 +131,60 @@ redis-cli -h 192.168.45.54 KEYS 'processing:*'
--- ---
## trade-monitor — 실시간 매매 알람 워커 (신규 2026-07-03)
NAS stock 백엔드(`:18500`)에서 `monitor-set`을 60초마다 pull하고, KIS 실시간/일봉 시세로 TA 조건을 평가해 발화집합을 `report`로 전송. NAS가 edge diff로 신규 알림만 텔레그램/프론트에 노출. **워커 무상태**(dedup은 NAS 영속). 포트 `:18715`, WSL2 Docker. 상세 설계·조건 규칙은 `services/trade-monitor/DESIGN.md`.
### 루프 (1분)
1. `GET /api/webai/trade-alert/monitor-set` (X-WebAI-Key) → buy_targets(watchscreener) + sell_targets(보유 avg_price/qty/holding_high) + buy_params/exit_params + session.
2. `session=="closed"`면 KIS 호출 0, idle. **비-KRX(알파벳) 티커 skip**(워커 책임).
3. KIS quote + 일봉 250봉 → 지표 계산 → 조건 평가(종목 단위 실패 격리).
4. `POST /api/webai/trade-alert/report {as_of, firing:[...]}` — 빈 배열도 전송(edge clear).
5. heartbeat `worker:trade-monitor:heartbeat` EX45 (kind=trader, state=market_open|market_closed|idle + last_alert_at). 60초 루프 > TTL45 만료갭 회피 위해 **15초 독립 태스크**.
### 조건 (§6 — `condition` 문자열이 FE 라벨/뱃지로 그대로 매핑됨)
- **매수**: `buy_ma20_pullback`(정배열 + ma20 근접 반등), `buy_breakout`(20봉 고점 돌파 + 거래량 배수), `buy_rsi_bounce`(RSI 과매도 반등, 무상태).
- **매도**: `sell_stop_loss`, `sell_take_profit`, `sell_trailing_stop`, `sell_ma_break`(ma50/ma200 severity), `sell_climax`(거래량 급증 + `price<day_high×0.97` 윗꼬리 — holdings_intel 정합됨, `exit_params` 파라미터화).
### 핵심 파일
| 파일 | 책임 |
|------|------|
| `config.py` | Settings (`TM_` 접두사, ai_trade와 분리) |
| `indicators.py` | 순수: `sma` / `rsi_series`(Wilder) / `highest_high` |
| `conditions.py` | 순수 §6: `evaluate_buy` / `evaluate_sell` |
| `kis_client.py` | KIS **자체 OAuth 토큰** + `get_quote` + `get_daily_ohlcv` + 0.5s throttle |
| `nas_client.py` | monitor-set / report (X-WebAI-Key + retry) |
| `monitor.py` | `run_cycle` / `monitor_loop` / `make_state_fn` |
| `main.py` | FastAPI lifespan + `_shared.heartbeat_loop` 배선 + `/health` |
### 환경 변수
| 변수 | 기본 | 설명 |
|------|------|------|
| `NAS_BASE_URL` | `http://192.168.45.54:18500` | stock 백엔드 |
| `WEBAI_API_KEY` | (필수) | X-WebAI-Key |
| `REDIS_URL` | `redis://192.168.45.54:6379` | heartbeat |
| `TM_KIS_APP_KEY` / `TM_KIS_APP_SECRET` / `TM_KIS_ACCOUNT` | (필수) | KIS **전용** 자체 토큰(ai_trade와 분리 발급 → 토큰 상호 무효화·EGW00201 회피) |
| `TM_KIS_IS_VIRTUAL` | `0` | 실전/모의 |
| `TM_LOOP_INTERVAL` | `60` | 루프 주기(초) |
| `TM_CLIMAX_VOL_MULT` | `3.0` | sell_climax 거래량 배수 fallback (우선순위: monitor-set `exit_params.climax_vol_x` > 이 값) |
### 상태
⏳ 구현·머지 완료(테스트 36/36, sell_climax holdings_intel 정합 포함), **미배포**. 배포 전: ① 전용 KIS 앱키 발급·주입(박재오 진행 중) ② 첫 운영 KIS 필드 검증(stck_hgpr 등). BE가 `node_monitor.WORKER_REGISTRY`에 등재 완료 → 배포 시 `/api/agent-office/nodes`·web-ui `/infra`에 trader 노드 자동 노출(미배포 동안 down, 무경보).
### 시작 (NAS, WSL2 Docker)
```bash
cd services
docker compose up -d trade-monitor
```
---
## 환경 변수 (ai_trade) ## 환경 변수 (ai_trade)
| 변수 | 기본 | 설명 | | 변수 | 기본 | 설명 |
@@ -168,6 +223,7 @@ cd services/insta-render && python -m pytest tests/ -q
cd services/music-render && python -m pytest tests/ -q cd services/music-render && python -m pytest tests/ -q
cd services/video-render && python -m pytest tests/ -q cd services/video-render && python -m pytest tests/ -q
cd services/image-render && python -m pytest tests/ -q cd services/image-render && python -m pytest tests/ -q
cd services/trade-monitor && python -m pytest tests/ -q # 36 tests
``` ```
**`.venv` 한글 사용자 경로 깨짐**으로 시스템 Python(`C:\Users\jaeoh\AppData\Local\Programs\Python\Python312\python.exe`) 사용 권장. 또는 `py -3.12 -m pytest …`. **`.venv` 한글 사용자 경로 깨짐**으로 시스템 Python(`C:\Users\jaeoh\AppData\Local\Programs\Python\Python312\python.exe`) 사용 권장. 또는 `py -3.12 -m pytest …`.
@@ -183,6 +239,8 @@ cd services/image-render && python -m pytest tests/ -q
5. **services worker_id** — env로 명시 권장. hostname 기반 default는 컨테이너 재기동 시 바뀌어 orphan 분실 위험. 5. **services worker_id** — env로 명시 권장. hostname 기반 default는 컨테이너 재기동 시 바뀌어 orphan 분실 위험.
6. **dead-letter 누적**`redis-cli LLEN dead_letter:*` 정기 점검 필요. 6. **dead-letter 누적**`redis-cli LLEN dead_letter:*` 정기 점검 필요.
7. **Dockerfile build context**`services/` 루트 (각 worker 디렉토리 아님). compose 변경 동반. 7. **Dockerfile build context**`services/` 루트 (각 worker 디렉토리 아님). compose 변경 동반.
8. **분산 워커 /infra 관측 필수 (팀 규칙)** — 모든 WSL docker 워커는 heartbeat(`worker:<name>:heartbeat` EX45) + BE `node_monitor.WORKER_REGISTRY` 등재 + `/infra` 노출이 필수. trade-monitor는 kind=trader로 등재됨.
9. **trade-monitor KIS 앱키 분리** — ai_trade와 **다른 전용 app_key**(`TM_KIS_*`) 사용. 같은 app_key 공유 시 토큰 상호 무효화 + EGW00201. 실전 최대 89앱 발급 가능.
--- ---

57
ai_trade/heartbeat.py Normal file
View File

@@ -0,0 +1,57 @@
"""ai_trade heartbeat — NAS Redis로 worker:ai_trade:heartbeat SET.
Global Constraints 계약 1: kind=trader, state=market_open|market_closed.
ai_trade는 Windows 호스트 실행이라 _shared import 경로가 달라 자체 미니 헬퍼로 둔다.
"""
from __future__ import annotations
import asyncio
import datetime as dt
import json
import logging
import os
import redis.asyncio as aioredis
logger = logging.getLogger(__name__)
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
KEY = "worker:ai_trade:heartbeat"
INTERVAL = int(os.getenv("HEARTBEAT_INTERVAL", "15"))
TTL = int(os.getenv("HEARTBEAT_TTL", "45"))
def build_trader_payload(state: str, signals: int = 0) -> str:
"""JSON 문자열 반환. state: 'market_open' | 'market_closed'."""
return json.dumps({
"name": "ai_trade",
"kind": "trader",
"state": state,
"ts": dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"last_job_at": None,
"jobs_done": signals,
"jobs_failed": 0,
})
async def heartbeat_loop(state_fn) -> None:
"""Redis에 HEARTBEAT_INTERVAL마다 SET EX TTL.
Args:
state_fn: () -> (state: str, signals: int). 호출자가 폴링 윈도우 판정 주입.
"""
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
try:
while True:
try:
state, signals = state_fn()
payload = build_trader_payload(state, signals)
await redis.set(KEY, payload, ex=TTL)
logger.debug("ai_trade heartbeat sent: state=%s signals=%d", state, signals)
except asyncio.CancelledError:
raise
except Exception:
logger.exception("ai_trade heartbeat 실패 — 다음 주기에 재시도")
await asyncio.sleep(INTERVAL)
finally:
await redis.aclose()

View File

@@ -3,9 +3,12 @@ from __future__ import annotations
import asyncio import asyncio
import logging import logging
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from datetime import datetime
from zoneinfo import ZoneInfo
from fastapi import FastAPI from fastapi import FastAPI
from ai_trade import heartbeat as _hb
from ai_trade import state as state_mod from ai_trade import state as state_mod
from ai_trade.chronos_predictor import ChronosPredictor from ai_trade.chronos_predictor import ChronosPredictor
from ai_trade.config import get_settings from ai_trade.config import get_settings
@@ -13,8 +16,11 @@ from ai_trade.kis_client import KISClient
from ai_trade.kis_websocket import KISWebSocket from ai_trade.kis_websocket import KISWebSocket
from ai_trade.pull_worker import poll_loop, make_asking_price_callback from ai_trade.pull_worker import poll_loop, make_asking_price_callback
from ai_trade.rate_limit import SignalDedup from ai_trade.rate_limit import SignalDedup
from ai_trade.scheduler import _is_polling_window, _is_market_day
from ai_trade.stock_client import StockClient from ai_trade.stock_client import StockClient
_KST = ZoneInfo("Asia/Seoul")
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -23,6 +29,7 @@ class AppContext:
dedup: SignalDedup | None = None dedup: SignalDedup | None = None
shutdown: asyncio.Event | None = None shutdown: asyncio.Event | None = None
poll_task: asyncio.Task | None = None poll_task: asyncio.Task | None = None
hb_task: asyncio.Task | None = None
kis_client: KISClient | None = None kis_client: KISClient | None = None
kis_ws: KISWebSocket | None = None kis_ws: KISWebSocket | None = None
chronos: ChronosPredictor | None = None chronos: ChronosPredictor | None = None
@@ -87,9 +94,27 @@ async def lifespan(app: FastAPI):
) )
) )
def _trader_state() -> tuple[str, int]:
"""scheduler의 실제 폴링 윈도우 판정으로 market_open/market_closed 결정."""
now = datetime.now(_KST)
is_open = _is_market_day(now) and _is_polling_window(now)
state_str = "market_open" if is_open else "market_closed"
signals = len(state_mod.state.signals)
return state_str, signals
_ctx.hb_task = asyncio.create_task(_hb.heartbeat_loop(_trader_state))
yield yield
# Shutdown # Shutdown heartbeat task
if _ctx.hb_task is not None:
_ctx.hb_task.cancel()
try:
await _ctx.hb_task
except asyncio.CancelledError:
pass
# Shutdown poll task
if _ctx.shutdown is not None: if _ctx.shutdown is not None:
_ctx.shutdown.set() _ctx.shutdown.set()
if _ctx.poll_task is not None: if _ctx.poll_task is not None:

View File

@@ -0,0 +1,38 @@
"""Tests for ai_trade heartbeat payload builder."""
import json
import pytest
def test_trader_payload_market_open():
from ai_trade.heartbeat import build_trader_payload
p = json.loads(build_trader_payload("market_open", signals=2))
assert p["name"] == "ai_trade"
assert p["kind"] == "trader"
assert p["state"] == "market_open"
assert p["ts"].endswith("Z")
assert p["jobs_done"] == 2
def test_trader_payload_market_closed():
from ai_trade.heartbeat import build_trader_payload
p = json.loads(build_trader_payload("market_closed"))
assert p["name"] == "ai_trade"
assert p["kind"] == "trader"
assert p["state"] == "market_closed"
assert p["jobs_done"] == 0
assert p["jobs_failed"] == 0
assert p["last_job_at"] is None
def test_trader_payload_ts_format():
"""ts 필드가 ISO 8601 UTC 형식 (YYYY-MM-DDTHH:MM:SSZ)인지 확인."""
from ai_trade.heartbeat import build_trader_payload
import re
p = json.loads(build_trader_payload("market_open"))
assert re.match(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z", p["ts"]), (
f"ts={p['ts']!r} does not match expected UTC format"
)

View File

@@ -7,6 +7,7 @@ pytest>=8.0
pytest-asyncio>=0.23 pytest-asyncio>=0.23
respx>=0.21 respx>=0.21
websockets>=12 websockets>=12
redis>=5.0
# Phase 3b dependencies (Chronos-2 + ML) # Phase 3b dependencies (Chronos-2 + ML)
transformers>=4.40 transformers>=4.40
chronos-forecasting>=1.4 chronos-forecasting>=1.4

View File

@@ -0,0 +1,55 @@
"""분산 워커 heartbeat — worker:<name>:heartbeat SET (TTL). Global Constraints 계약 1."""
from __future__ import annotations
import asyncio, datetime as dt, json, logging, os
logger = logging.getLogger(__name__)
DEFAULT_INTERVAL = int(os.getenv("HEARTBEAT_INTERVAL", "15"))
DEFAULT_TTL = int(os.getenv("HEARTBEAT_TTL", "45"))
class WorkerStats:
"""worker_loop가 갱신, heartbeat_loop가 읽는 가변 카운터."""
def __init__(self):
self.busy = False
self.jobs_done = 0
self.jobs_failed = 0
self.last_job_at = None # ISO str | None
def utc_now_iso() -> str:
return dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def build_payload(name: str, kind: str, state: str, stats: WorkerStats, extra: dict | None = None) -> str:
payload = {
"name": name, "kind": kind, "state": state, "ts": utc_now_iso(),
"last_job_at": stats.last_job_at,
"jobs_done": stats.jobs_done, "jobs_failed": stats.jobs_failed,
}
if extra:
payload.update(extra)
return json.dumps(payload)
async def render_state(redis, stats: WorkerStats, paused_key: str = "queue:paused") -> str:
if await redis.get(paused_key) == b"1":
return "paused"
return "busy" if stats.busy else "idle"
async def heartbeat_loop(redis, name, kind, stats, *, interval=DEFAULT_INTERVAL,
ttl=DEFAULT_TTL, paused_key="queue:paused", state_fn=None):
key = f"worker:{name}:heartbeat"
logger.info("heartbeat 시작 name=%s ttl=%ds", name, ttl)
while True:
try:
if state_fn is not None:
state, extra = await state_fn(redis, stats)
else:
state, extra = await render_state(redis, stats, paused_key), None
await redis.set(key, build_payload(name, kind, state, stats, extra), ex=ttl)
except asyncio.CancelledError:
raise
except Exception:
logger.exception("heartbeat 발신 실패 name=%s", name)
await asyncio.sleep(interval)

View File

@@ -0,0 +1,46 @@
"""Tests for _shared.heartbeat — Task A1."""
import json
import sys
from pathlib import Path
import pytest
# Make `_shared` importable (same pattern as test_reliable_queue.py)
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent))
from _shared.heartbeat import WorkerStats, build_payload, render_state
def test_build_payload_has_contract_fields():
s = WorkerStats(); s.jobs_done = 3; s.last_job_at = "2026-06-29T00:00:00Z"
payload = json.loads(build_payload("image-render", "render", "idle", s))
assert payload["name"] == "image-render"
assert payload["kind"] == "render"
assert payload["state"] == "idle"
assert payload["jobs_done"] == 3
assert payload["last_job_at"] == "2026-06-29T00:00:00Z"
assert payload["ts"].endswith("Z")
def test_build_payload_merges_extra():
payload = json.loads(build_payload("task-watcher", "watcher", "free", WorkerStats(), extra={"mode": "free"}))
assert payload["mode"] == "free"
class _FakeRedis:
def __init__(self, paused): self._paused = paused
async def get(self, key): return b"1" if self._paused else None
@pytest.mark.asyncio
async def test_render_state_paused_overrides_busy():
s = WorkerStats(); s.busy = True
assert await render_state(_FakeRedis(paused=True), s) == "paused"
@pytest.mark.asyncio
async def test_render_state_busy_then_idle():
s = WorkerStats(); s.busy = True
assert await render_state(_FakeRedis(paused=False), s) == "busy"
s.busy = False
assert await render_state(_FakeRedis(paused=False), s) == "idle"

View File

@@ -14,7 +14,7 @@ services:
- REDIS_URL=${REDIS_URL:-redis://192.168.45.54:6379} - REDIS_URL=${REDIS_URL:-redis://192.168.45.54:6379}
- NAS_BASE_URL=${NAS_BASE_URL:-http://192.168.45.54:18700} - NAS_BASE_URL=${NAS_BASE_URL:-http://192.168.45.54:18700}
- INTERNAL_API_KEY=${INTERNAL_API_KEY:-} - INTERNAL_API_KEY=${INTERNAL_API_KEY:-}
- INSTA_MEDIA_ROOT=${INSTA_MEDIA_ROOT:-/mnt/nas/webpage/data/insta} - INSTA_MEDIA_ROOT=${INSTA_MEDIA_ROOT:-/mnt/nas/webpage/data/insta/insta_cards}
- INSTA_MEDIA_URL_PREFIX=${INSTA_MEDIA_URL_PREFIX:-/media/insta} - INSTA_MEDIA_URL_PREFIX=${INSTA_MEDIA_URL_PREFIX:-/media/insta}
- CARD_TEMPLATE_DIR=/app/templates - CARD_TEMPLATE_DIR=/app/templates
volumes: volumes:
@@ -82,7 +82,8 @@ services:
task-watcher: task-watcher:
build: build:
context: ./task-watcher context: .
dockerfile: task-watcher/Dockerfile
container_name: task-watcher container_name: task-watcher
restart: unless-stopped restart: unless-stopped
ports: ports:
@@ -126,3 +127,28 @@ services:
interval: 60s interval: 60s
timeout: 5s timeout: 5s
retries: 3 retries: 3
trade-monitor:
build:
context: .
dockerfile: trade-monitor/Dockerfile
container_name: trade-monitor
restart: unless-stopped
ports:
- "18715:8000"
environment:
- TZ=Asia/Seoul
- REDIS_URL=${REDIS_URL:-redis://192.168.45.54:6379}
- NAS_BASE_URL=${NAS_BASE_URL:-http://192.168.45.54:18500}
- WEBAI_API_KEY=${WEBAI_API_KEY:-}
- TM_KIS_APP_KEY=${TM_KIS_APP_KEY:-}
- TM_KIS_APP_SECRET=${TM_KIS_APP_SECRET:-}
- TM_KIS_ACCOUNT=${TM_KIS_ACCOUNT:-}
- TM_KIS_IS_VIRTUAL=${TM_KIS_IS_VIRTUAL:-0}
- TM_LOOP_INTERVAL=${TM_LOOP_INTERVAL:-60}
- TM_CLIMAX_VOL_MULT=${TM_CLIMAX_VOL_MULT:-3.0}
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
interval: 60s
timeout: 5s
retries: 3

View File

@@ -3,11 +3,14 @@ from __future__ import annotations
import asyncio import asyncio
import logging import logging
import os
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
import redis.asyncio as aioredis
from fastapi import FastAPI from fastapi import FastAPI
import worker import worker
from _shared.heartbeat import heartbeat_loop
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s") logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -16,15 +19,19 @@ logger = logging.getLogger(__name__)
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
worker_task = asyncio.create_task(worker.worker_loop()) worker_task = asyncio.create_task(worker.worker_loop())
hb_redis = aioredis.from_url(os.getenv("REDIS_URL", "redis://192.168.45.54:6379"), decode_responses=False)
hb_task = asyncio.create_task(heartbeat_loop(hb_redis, "image-render", "render", worker.stats))
logger.info("image-render lifespan 시작") logger.info("image-render lifespan 시작")
try: try:
yield yield
finally: finally:
worker_task.cancel() for t in (worker_task, hb_task):
try: t.cancel()
await worker_task try:
except asyncio.CancelledError: await t
pass except asyncio.CancelledError:
pass
await hb_redis.aclose()
logger.info("image-render lifespan 종료") logger.info("image-render lifespan 종료")

View File

@@ -67,3 +67,25 @@ async def test_poll_once_returns_false_on_timeout(monkeypatch):
assert handled is False assert handled is False
fake_queue.ack.assert_not_awaited() fake_queue.ack.assert_not_awaited()
fake_queue.fail.assert_not_awaited() fake_queue.fail.assert_not_awaited()
# ----- heartbeat stats 카운터 -----
class _OneJobQueue:
def __init__(self): self.acked = False
async def dequeue(self, timeout=5):
if self.acked: return None
return ({"job_type": "flux_generation", "task_id": "t1", "params": {}}, b"raw")
async def ack(self, raw): self.acked = True
async def fail(self, raw, payload): pass
@pytest.mark.asyncio
async def test_poll_once_increments_jobs_done(monkeypatch):
worker.stats.jobs_done = 0
monkeypatch.setattr(worker, "run_flux_generation", lambda task_id, params: None)
handled = await worker.poll_once(_OneJobQueue())
assert handled is True
assert worker.stats.jobs_done == 1
assert worker.stats.busy is False
assert worker.stats.last_job_at is not None

View File

@@ -18,6 +18,7 @@ from providers.gpt_image import run_gpt_image_generation
from providers.nano_banana import run_nano_banana_generation from providers.nano_banana import run_nano_banana_generation
from providers.flux import run_flux_generation from providers.flux import run_flux_generation
from _shared.reliable_queue import ReliableQueue from _shared.reliable_queue import ReliableQueue
from _shared.heartbeat import WorkerStats, utc_now_iso
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -25,6 +26,8 @@ REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
QUEUE_KEY = "queue:image-render" QUEUE_KEY = "queue:image-render"
PAUSED_KEY = "queue:paused" PAUSED_KEY = "queue:paused"
stats = WorkerStats()
# string names so `unittest.mock.patch` / `monkeypatch.setattr` on `worker.<name>` # string names so `unittest.mock.patch` / `monkeypatch.setattr` on `worker.<name>`
# is correctly intercepted by getattr(sys.modules[__name__], ...) # is correctly intercepted by getattr(sys.modules[__name__], ...)
_DISPATCH_TABLE = { _DISPATCH_TABLE = {
@@ -59,14 +62,21 @@ async def poll_once(queue: ReliableQueue) -> bool:
if result is None: if result is None:
return False return False
payload, raw = result payload, raw = result
stats.busy = True
try: try:
await asyncio.to_thread(_dispatch, payload) await asyncio.to_thread(_dispatch, payload)
except Exception: except Exception:
logger.exception("dispatch unhandled exception task_id=%s", logger.exception("dispatch unhandled exception task_id=%s",
payload.get("task_id")) payload.get("task_id"))
await queue.fail(raw, payload) await queue.fail(raw, payload)
stats.jobs_failed += 1
stats.last_job_at = utc_now_iso()
stats.busy = False
return True return True
await queue.ack(raw) await queue.ack(raw)
stats.jobs_done += 1
stats.last_job_at = utc_now_iso()
stats.busy = False
return True return True

View File

@@ -7,8 +7,9 @@ REDIS_URL=redis://192.168.45.54:6379
NAS_BASE_URL=http://192.168.45.54:18700 NAS_BASE_URL=http://192.168.45.54:18700
INTERNAL_API_KEY=__copy_from_nas_dotenv__ INTERNAL_API_KEY=__copy_from_nas_dotenv__
# NAS SMB mount 안의 미디어 디렉토리 (/mnt/nas/webpage/data/insta/) # NAS SMB mount 안의 미디어 디렉토리.
INSTA_MEDIA_ROOT=/mnt/nas/webpage/data/insta # ⚠️ nginx가 /media/insta를 data/insta/insta_cards/로 서빙하므로 반드시 insta_cards까지 포함.
INSTA_MEDIA_ROOT=/mnt/nas/webpage/data/insta/insta_cards
# nginx 서빙 prefix (NAS webhook payload에 보낼 result_path 만들 때) # nginx 서빙 prefix (NAS webhook payload에 보낼 result_path 만들 때)
INSTA_MEDIA_URL_PREFIX=/media/insta INSTA_MEDIA_URL_PREFIX=/media/insta

View File

@@ -3,12 +3,15 @@ from __future__ import annotations
import asyncio import asyncio
import logging import logging
import os
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
import redis.asyncio as aioredis
from fastapi import FastAPI from fastapi import FastAPI
import card_renderer import card_renderer
import worker import worker
from _shared.heartbeat import heartbeat_loop
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s") logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -20,15 +23,19 @@ async def lifespan(app: FastAPI):
await card_renderer.init_browser() await card_renderer.init_browser()
# 큐 워커 백그라운드 시작 # 큐 워커 백그라운드 시작
worker_task = asyncio.create_task(worker.worker_loop()) worker_task = asyncio.create_task(worker.worker_loop())
hb_redis = aioredis.from_url(os.getenv("REDIS_URL", "redis://192.168.45.54:6379"), decode_responses=False)
hb_task = asyncio.create_task(heartbeat_loop(hb_redis, "insta-render", "render", worker.stats))
logger.info("insta-render lifespan 시작") logger.info("insta-render lifespan 시작")
try: try:
yield yield
finally: finally:
worker_task.cancel() for t in (worker_task, hb_task):
try: t.cancel()
await worker_task try:
except asyncio.CancelledError: await t
pass except asyncio.CancelledError:
pass
await hb_redis.aclose()
await card_renderer.shutdown_browser() await card_renderer.shutdown_browser()
logger.info("insta-render lifespan 종료") logger.info("insta-render lifespan 종료")

View File

@@ -230,3 +230,27 @@ def test_make_queue_redis_socket_timeout_exceeds_block():
c = worker.make_queue_redis() c = worker.make_queue_redis()
st = c.connection_pool.connection_kwargs.get("socket_timeout") st = c.connection_pool.connection_kwargs.get("socket_timeout")
assert st is not None and st > 5 # blmove 블록(5s)보다 커야 안정 assert st is not None and st > 5 # blmove 블록(5s)보다 커야 안정
# ----- heartbeat stats 카운터 -----
class _OneJobQueueInsta:
def __init__(self): self.acked = False
async def dequeue(self, timeout=5):
if self.acked: return None
return ({"task_id": "t1", "params": {"slate_id": 1, "theme": "default"}}, b"raw")
async def ack(self, raw): self.acked = True
async def fail(self, raw, payload): pass
@pytest.mark.asyncio
async def test_poll_once_increments_jobs_done(monkeypatch):
worker.stats.jobs_done = 0
async def fake_process(client, payload): pass
monkeypatch.setattr(worker, "_process_one", fake_process)
async with httpx.AsyncClient() as client:
handled = await worker.poll_once(_OneJobQueueInsta(), client)
assert handled is True
assert worker.stats.jobs_done == 1
assert worker.stats.busy is False
assert worker.stats.last_job_at is not None

View File

@@ -14,9 +14,11 @@ import redis.asyncio as aioredis
from card_renderer import render_slate from card_renderer import render_slate
from _shared.reliable_queue import ReliableQueue from _shared.reliable_queue import ReliableQueue
from _shared.heartbeat import WorkerStats, utc_now_iso
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
stats = WorkerStats()
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379") REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
NAS_BASE_URL = os.getenv("NAS_BASE_URL", "http://192.168.45.54:18700") NAS_BASE_URL = os.getenv("NAS_BASE_URL", "http://192.168.45.54:18700")
@@ -89,12 +91,19 @@ async def poll_once(queue: ReliableQueue, client: httpx.AsyncClient) -> bool:
if result is None: if result is None:
return False return False
payload, raw = result payload, raw = result
stats.busy = True
try: try:
await _process_one(client, payload) await _process_one(client, payload)
except Exception: except Exception:
await queue.fail(raw, payload) await queue.fail(raw, payload)
stats.jobs_failed += 1
stats.last_job_at = utc_now_iso()
stats.busy = False
return True return True
await queue.ack(raw) await queue.ack(raw)
stats.jobs_done += 1
stats.last_job_at = utc_now_iso()
stats.busy = False
return True return True

View File

@@ -7,12 +7,15 @@ from __future__ import annotations
import asyncio import asyncio
import logging import logging
import os
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
import redis.asyncio as aioredis
from fastapi import FastAPI, HTTPException from fastapi import FastAPI, HTTPException
from pydantic import BaseModel from pydantic import BaseModel
import worker import worker
from _shared.heartbeat import heartbeat_loop
from providers.sync_ops import ( from providers.sync_ops import (
generate_lyrics, get_credits, generate_lyrics, get_credits,
get_timestamped_lyrics, generate_style_boost, get_timestamped_lyrics, generate_style_boost,
@@ -25,15 +28,19 @@ logger = logging.getLogger(__name__)
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
worker_task = asyncio.create_task(worker.worker_loop()) worker_task = asyncio.create_task(worker.worker_loop())
hb_redis = aioredis.from_url(os.getenv("REDIS_URL", "redis://192.168.45.54:6379"), decode_responses=False)
hb_task = asyncio.create_task(heartbeat_loop(hb_redis, "music-render", "render", worker.stats))
logger.info("music-render lifespan 시작") logger.info("music-render lifespan 시작")
try: try:
yield yield
finally: finally:
worker_task.cancel() for t in (worker_task, hb_task):
try: t.cancel()
await worker_task try:
except asyncio.CancelledError: await t
pass except asyncio.CancelledError:
pass
await hb_redis.aclose()
logger.info("music-render lifespan 종료") logger.info("music-render lifespan 종료")

View File

@@ -167,3 +167,25 @@ async def test_poll_once_returns_false_on_timeout(monkeypatch):
dispatch_mock.assert_not_called() dispatch_mock.assert_not_called()
fake_queue.ack.assert_not_awaited() fake_queue.ack.assert_not_awaited()
fake_queue.fail.assert_not_awaited() fake_queue.fail.assert_not_awaited()
# ----- heartbeat stats 카운터 -----
class _OneJobQueue:
def __init__(self): self.acked = False
async def dequeue(self, timeout=5):
if self.acked: return None
return ({"job_type": "suno_generation", "task_id": "t1", "params": {}}, b"raw")
async def ack(self, raw): self.acked = True
async def fail(self, raw, payload): pass
@pytest.mark.asyncio
async def test_poll_once_increments_jobs_done(monkeypatch):
worker.stats.jobs_done = 0
monkeypatch.setattr(worker, "run_suno_generation", lambda task_id, params: None)
handled = await worker.poll_once(_OneJobQueue())
assert handled is True
assert worker.stats.jobs_done == 1
assert worker.stats.busy is False
assert worker.stats.last_job_at is not None

View File

@@ -21,6 +21,7 @@ from providers.suno import (
) )
from providers.local import run_local_generation from providers.local import run_local_generation
from _shared.reliable_queue import ReliableQueue from _shared.reliable_queue import ReliableQueue
from _shared.heartbeat import WorkerStats, utc_now_iso
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -28,6 +29,8 @@ REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
QUEUE_KEY = "queue:music-render" QUEUE_KEY = "queue:music-render"
PAUSED_KEY = "queue:paused" PAUSED_KEY = "queue:paused"
stats = WorkerStats()
# Maps job_type → module-level function name (string). # Maps job_type → module-level function name (string).
# _dispatch resolves the name via globals() at call time so unittest.mock.patch # _dispatch resolves the name via globals() at call time so unittest.mock.patch
# on "worker.<name>" is correctly intercepted. # on "worker.<name>" is correctly intercepted.
@@ -74,6 +77,7 @@ async def poll_once(queue: ReliableQueue) -> bool:
if result is None: if result is None:
return False return False
payload, raw = result payload, raw = result
stats.busy = True
try: try:
# sync provider 함수 — thread로 실행해서 이벤트 루프 블로킹 방지 # sync provider 함수 — thread로 실행해서 이벤트 루프 블로킹 방지
await asyncio.to_thread(_dispatch, payload) await asyncio.to_thread(_dispatch, payload)
@@ -81,8 +85,14 @@ async def poll_once(queue: ReliableQueue) -> bool:
logger.exception("dispatch unhandled exception task_id=%s", logger.exception("dispatch unhandled exception task_id=%s",
payload.get("task_id")) payload.get("task_id"))
await queue.fail(raw, payload) await queue.fail(raw, payload)
stats.jobs_failed += 1
stats.last_job_at = utc_now_iso()
stats.busy = False
return True return True
await queue.ack(raw) await queue.ack(raw)
stats.jobs_done += 1
stats.last_job_at = utc_now_iso()
stats.busy = False
return True return True

View File

@@ -7,10 +7,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates tzdata \ ca-certificates tzdata \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
COPY requirements.txt . COPY task-watcher/requirements.txt /app/
RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt
COPY . . # 공통 heartbeat 모듈 (services/_shared) — watcher.py가 from _shared.heartbeat import
COPY _shared /app/_shared
COPY task-watcher/. /app/
ENV PYTHONPATH=/app
EXPOSE 8000 EXPOSE 8000
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"] CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]

View File

@@ -0,0 +1,5 @@
"""Make services/ root importable so `from _shared.heartbeat import ...` works during tests."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))

View File

@@ -0,0 +1,16 @@
"""task-watcher heartbeat payload — state=mode + mode 필드 검증."""
import json
from _shared.heartbeat import build_payload, WorkerStats
def test_watcher_heartbeat_payload_carries_mode():
payload = json.loads(
build_payload(
"task-watcher", "watcher", "trading",
WorkerStats(), extra={"mode": "trading"},
)
)
assert payload["kind"] == "watcher"
assert payload["state"] == "trading"
assert payload["mode"] == "trading"

View File

@@ -15,6 +15,7 @@ from zoneinfo import ZoneInfo
import redis.asyncio as aioredis import redis.asyncio as aioredis
from mode import current_mode, fetch_holidays, KST from mode import current_mode, fetch_holidays, KST
from _shared.heartbeat import build_payload, WorkerStats
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -23,6 +24,10 @@ PAUSED_KEY = "queue:paused"
LOOP_INTERVAL = 30 # 초 LOOP_INTERVAL = 30 # 초
HOLIDAYS_REFRESH = 3600 # 1시간 HOLIDAYS_REFRESH = 3600 # 1시간
PAUSED_TTL = 600 # 10분 (watcher 죽어도 자동 해제) PAUSED_TTL = 600 # 10분 (watcher 죽어도 자동 해제)
HEARTBEAT_KEY = "worker:task-watcher:heartbeat"
HEARTBEAT_TTL = 45 # LOOP_INTERVAL 30s < TTL 45s → 만료 전 갱신
_HB_STATS = WorkerStats()
async def watcher_loop(): async def watcher_loop():
@@ -46,6 +51,13 @@ async def watcher_loop():
else: else:
await redis.delete(PAUSED_KEY) await redis.delete(PAUSED_KEY)
# heartbeat (LOOP_INTERVAL=30s < TTL 45s → 만료 전 갱신)
await redis.set(
HEARTBEAT_KEY,
build_payload("task-watcher", "watcher", mode, _HB_STATS, extra={"mode": mode}),
ex=HEARTBEAT_TTL,
)
if mode != last_mode: if mode != last_mode:
logger.info("mode 전환: %s%s (paused=%s)", last_mode, mode, mode == "trading") logger.info("mode 전환: %s%s (paused=%s)", last_mode, mode, mode == "trading")
last_mode = mode last_mode = mode

View File

@@ -0,0 +1,18 @@
# Plan-realtime-trade-alerts — trade-monitor
# NAS Redis (heartbeat)
REDIS_URL=redis://192.168.45.54:6379
# NAS stock 백엔드 (monitor-set / report)
NAS_BASE_URL=http://192.168.45.54:18500
WEBAI_API_KEY=
# KIS 자체 토큰 (ai_trade와 분리된 전용 app_key)
TM_KIS_APP_KEY=
TM_KIS_APP_SECRET=
TM_KIS_ACCOUNT=
TM_KIS_IS_VIRTUAL=0
# 루프 주기(초) / sell_climax 거래량 배수 임계
TM_LOOP_INTERVAL=60
TM_CLIMAX_VOL_MULT=3.0

View File

@@ -0,0 +1,166 @@
# trade-monitor 워커 — 구현 설계
> **2026-07-03 · web-ai 소유.** 실시간 매매 알람 파이프라인의 Windows-side 워커.
> **권위 계약(원본 스펙)**: web-backend repo `docs/superpowers/specs/2026-07-02-realtime-trade-alerts-design.md` §5(계약)·§6(조건).
> 이 문서는 그 계약을 Windows Docker 워커로 구현하기 위한 **구현 설계**(모듈 분해·조건 해석·배포)만 다룬다.
---
## 1. 역할 · 경계
`services/trade-monitor/` 는 형제 워커(`image-render`, `task-watcher`)와 동일한 관례를 따르는 **FastAPI + asyncio 루프** 워커다. WSL2 Docker(`services/docker-compose.yml`)에서 구동.
**책임**
1. 60초 루프로 NAS `monitor-set` 조회 → 세션 게이트.
2. 비-KRX(알파벳) 티커 skip.
3. KIS 실시간 현재가 + 일봉 OHLCV 조회 → TA 지표 계산.
4. 매수/매도 조건 평가 → 발화집합 F 구성.
5. `POST report`로 F 전체 전송(무상태 — dedup은 NAS 영속).
6. Redis heartbeat 발신(`worker:trade-monitor:heartbeat` EX45).
7. KIS 오류는 사이클/종목 단위 격리(다음 분 재시도).
**경계 밖(안 함)**: dedup 상태 보관, 텔레그램 전송(NAS 담당), KST 세션/휴장 캘린더 재구현(NAS가 `session` 판정), 주문 실행.
---
## 2. 모듈 분해
| 파일 | 책임 | 인터페이스(순수/부작용) |
|------|------|------|
| `main.py` | FastAPI app + lifespan. `monitor_loop` + `heartbeat_loop` 스폰, `/health` | 부작용(태스크 스폰) |
| `monitor.py` | 오케스트레이션 루프. monitor-set→게이트→종목순회→firing→report. 공유 `MonitorState` 갱신 | 부작용 |
| `nas_client.py` | `get_monitor_set()` / `post_report(as_of, firing)``X-WebAI-Key` + retry | 부작용(HTTP) |
| `kis_client.py` | KIS REST: `_issue_token()`(OAuth 자체 발급, 24h 캐시) + `get_quote()` + `get_daily_ohlcv()` + 0.5s throttle | 부작용(HTTP) |
| `indicators.py` | `sma`, `rsi`, `avg_volume`, `highest_high` | **순수** |
| `conditions.py` | `evaluate_buy(ctx, buy_params)` / `evaluate_sell(ctx, exit_params)``list[firing]` | **순수** |
| `config.py` | `Settings` — env 로드 | 순수 |
순수 모듈(`indicators`, `conditions`)에 조건 로직을 격리해 테이블 기반 단위 테스트로 검증 가능하게 한다. HTTP·시간·Redis는 경계 모듈에만.
---
## 3. 데이터 흐름 (monitor_loop, 60초)
```
매 사이클:
ms = nas.get_monitor_set() # §5.1
state.session = ms.session
if ms.session == "closed":
state.hb = "market_closed"; sleep; continue # KIS 호출 0
targets = filter_krx(ms.buy_targets, ms.sell_targets) # 알파벳 티커 skip
firing = []
for t in targets: # 종목 단위 try/except 격리
quote = kis.get_quote(t.ticker) # 현재가 + 당일 누적 거래량
daily = kis.get_daily_ohlcv(t.ticker, 250) # MA200·52주 고점용
ctx = build_ctx(t, quote, daily)
if t is buy_target: firing += evaluate_buy(ctx, ms.buy_params)
if t is sell_target: firing += evaluate_sell(ctx, ms.exit_params)
if firing: state.last_alert_at = now
nas.post_report(as_of=now_kst_iso, firing=firing) # §5.2 — 빈 배열도 전송(edge clear 위해)
state.hb = "market_open"
stats.jobs_done += 1
```
`heartbeat_loop`(별도 15초 태스크)이 `state`를 읽어 §5.4 페이로드 발신.
---
## 4. 조건 로직 해석 (§6)
지표는 **일봉 시계열**(최신 last, 오름차순) + **실시간 현재가**(`price`) 기준. 데이터 부족(예: MA200용 200봉 미만)이면 해당 조건은 **미발화**(graceful skip). 각 firing은 `{ticker, kind, condition, price, detail}`.
### 매수 (`buy_targets`, `buy_params={rsi_oversold, breakout_vol_mult, pullback_pct}`)
| condition | 발화 규칙(해석) | detail |
|-----------|----------------|--------|
| `buy_ma20_pullback` | `ma20>ma50>ma200`(정배열) **AND** 최근 3봉 최저가가 `ma20*(1+pullback_pct)` 이하로 접근 **AND** `price>ma20`(반등 복귀) | `ma20, ma50, ma200, recent_low` |
| `buy_breakout` | `price > 직전 20봉 최고가`(당일 제외) **AND** `today_volume > breakout_vol_mult × avg_volume(20)` | `prior_high_20, vol_mult, avg_vol_20` |
| `buy_rsi_bounce` | RSI(14) 시계열에서 `min(rsi[-3:]) < rsi_oversold` **AND** `rsi[-1] > rsi_oversold` **AND** `rsi[-1] > rsi[-2]`(반등). 사이클마다 재계산·무상태 | `rsi, rsi_prev, rsi_oversold` |
종합: 각 조건 독립 발화(신뢰도 가중합은 NAS/텔레그램 단계 책임 아님 — 워커는 조건 발화만).
### 매도 (`sell_targets`, `exit_params={stop_pct, take_pct, trailing_pct}`, target에 `avg_price, qty, holding_high`)
| condition | 발화 규칙 | detail |
|-----------|----------|--------|
| `sell_stop_loss` | `(price-avg)/avg ≤ -stop_pct` | `avg_price, pnl_pct, stop_pct` |
| `sell_take_profit` | `(price-avg)/avg ≥ take_pct` | `avg_price, pnl_pct, take_pct` |
| `sell_trailing_stop` | `price ≤ holding_high × (1-trailing_pct)` (기본 0.10) | `holding_high, trailing_pct, drawdown_pct` |
| `sell_ma_break` | `price < ma50` (추가 `price<ma200`이면 detail.severity="high") | `ma50, ma200, severity` |
| `sell_climax` | **holdings_intel 정합**: `today_volume ≥ climax_vol_x × avg_volume(20)` **AND** `price < day_high × climax_close_pct`(윗꼬리) | `vol_mult, day_high, climax_close_pct` |
`climax_vol_x`(기본 3.0)·`climax_close_pct`(기본 0.97)는 monitor-set `exit_params`에서 읽음(BE 중앙화, main ed17193). 없으면 env `TM_CLIMAX_VOL_MULT` fallback. `day_high`는 KIS quote `stck_hgpr`(당일 세션 누적 고가).
---
## 5. KIS 클라이언트 (자체 토큰)
- `_issue_token()`: `POST {base}/oauth2/tokenP {grant_type, appkey, appsecret}``access_token`(만료 24h). 메모리 캐시, 만료 10분 전 재발급. **ai_trade와 분리된 `TM_KIS_APP_KEY/SECRET`** 사용(같은 app_key 공유 시 토큰 상호 무효화 + EGW00201).
- `get_quote(ticker)`: `inquire-price`(FHKST01010100) → `stck_prpr`(현재가), `acml_vol`(당일 누적 거래량), `stck_oprc`(당일 시가).
- `get_daily_ohlcv(ticker, days=250)`: `inquire-daily-itemchartprice`(FHKST03010100) — ai_trade `kis_client.py` 로직 복제, 오름차순.
- throttle 0.5s(초당 2회) + `_throttle_lock` 직렬화 + 429/timeout 지수 backoff(ai_trade 패턴 재사용).
> ⚠️ **운영 함정**: ai_trade와 KIS를 동시 호출하면 전용 app_key라도 KIS 계정 전체 rate limit을 공유할 수 있음. 별도 app_key로 무효화는 회피되나, 운영 시 동시 부하 모니터링 필요(Phase 7 백로그 연계).
---
## 6. heartbeat (§5.4)
`_shared.heartbeat.heartbeat_loop(redis, "trade-monitor", "trader", stats, interval=15, ttl=45, state_fn=...)`.
- `state_fn``MonitorState`를 읽어 `state ∈ {market_open, market_closed, idle}` + `extra={"last_alert_at": ...}` 반환.
- **디커플링 이유**: 루프 60초 > TTL 45초 → 인라인 발신 시 만료 갭. 15초 독립 태스크로 해소(형제 워커와 동일 구조). §5.4 필수 필드(name/kind/state/ts/last_alert_at) 충족, `jobs_done/jobs_failed`는 형제 워커처럼 superset 유지.
- 초기 상태 `idle`(첫 monitor-set 조회 전).
---
## 7. 설정 (env) — `TM_` 접두사로 ai_trade와 분리
| env | 기본값 | 용도 |
|-----|--------|------|
| `NAS_BASE_URL` | `http://192.168.45.54:18500` | stock 백엔드 |
| `WEBAI_API_KEY` | (필수) | `X-WebAI-Key` |
| `REDIS_URL` | `redis://192.168.45.54:6379` | heartbeat |
| `TM_KIS_APP_KEY` / `TM_KIS_APP_SECRET` | (필수) | KIS 자체 토큰 |
| `TM_KIS_ACCOUNT` | (필수) | KIS 계좌 |
| `TM_KIS_IS_VIRTUAL` | `0` | 실전/모의 |
| `TM_LOOP_INTERVAL` | `60` | 루프 주기(초) |
| `TM_CLIMAX_VOL_MULT` | `3.0` | sell_climax 임계 |
---
## 8. 에러 처리
- **monitor-set 실패**: 사이클 skip(report 안 함), heartbeat=`idle`, 다음 분 재시도.
- **KIS 종목 실패**: 해당 종목만 skip(로그 warning), 나머지 종목 계속.
- **report 실패**: 로그 error, 다음 사이클 신선 firing 재전송(무상태라 손실 허용).
- 루프 최상위 `try/except` — 어떤 예외도 루프를 죽이지 않음(task-watcher 패턴).
---
## 9. 테스트 전략 (pytest, 시스템 Python)
| 파일 | 검증 |
|------|------|
| `test_indicators.py` | sma/rsi/avg_volume/highest_high 수치(알려진 시계열), 데이터 부족 시 None |
| `test_conditions.py` | 8개 조건 테이블 기반(발화/미발화 경계), detail 필드 |
| `test_nas_client.py` | respx — monitor-set 파싱, report 페이로드, X-WebAI-Key 헤더, retry |
| `test_kis_client.py` | respx — 토큰 발급/캐시, quote/daily 파싱, throttle |
| `test_monitor.py` | 루프 1회(mock): closed skip, 비-KRX skip, firing 조립, last_alert_at 갱신, 종목 실패 격리 |
---
## 10. 배포
- `services/trade-monitor/Dockerfile`: task-watcher 관례 복제 — `COPY _shared /app/_shared` **필수**(빌드 컨텍스트 `.` 에서), `COPY trade-monitor/. /app/`, `PYTHONPATH=/app`, uvicorn `:8000`.
- `services/docker-compose.yml`: `trade-monitor` 서비스 추가, 포트 **18715**(image-render 18714 다음), `TZ=Asia/Seoul`, KIS/WEBAI/REDIS env, healthcheck `/health`.
- `services/.env`(비커밋): `TM_KIS_*`, `WEBAI_API_KEY` 실값. `.env.example`에 키만 기재.
---
## 11. 미해결 플래그 / 후속
1. **sell_climax** — ✅ 2026-07-03 holdings_intel 정합 완료(`price < day_high × climax_close_pct` + `exit_params` 파라미터화). BE 회신 기준.
2. **KIS 지표 필드 실검증** — quote의 `acml_vol`/`stck_oprc`, daily TR 응답 필드는 첫 운영 raw 캡처로 대조.
3. **`buy_ma20_pullback`·`buy_rsi_bounce` 해석** — "current candle series" 문구를 일봉 시계열로 해석. 첫 운영 4주 IC 검증 시 재조정 가능.
4. **KIS rate limit 공존** — ai_trade와 동시 부하. 전용 app_key로 토큰 무효화는 회피, 초당 호출 총량은 운영 모니터링.
5. **after 세션 시간외 시세**`inquire-price`가 시간외 단일가를 반영하는지 첫 운영 대조.

View File

@@ -0,0 +1,19 @@
FROM python:3.12-slim-bookworm
ENV PYTHONUNBUFFERED=1
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates tzdata \
&& rm -rf /var/lib/apt/lists/*
COPY trade-monitor/requirements.txt /app/
RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt
# 공통 heartbeat 모듈 (services/_shared) — main.py가 from _shared.heartbeat import
COPY _shared /app/_shared
COPY trade-monitor/. /app/
ENV PYTHONPATH=/app
EXPOSE 8000
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,99 @@
"""§6 조건 로직 (순수). ctx + params → firing 리스트."""
from __future__ import annotations
from indicators import sma, rsi_series, highest_high
def _fire(ctx: dict, kind: str, condition: str, price: float, detail: dict) -> dict:
return {
"ticker": ctx["ticker"], "kind": kind,
"condition": condition, "price": price, "detail": detail,
}
def evaluate_buy(ctx: dict, params: dict) -> list[dict]:
price = ctx["price"]
closes, highs, lows, vols = ctx["closes"], ctx["highs"], ctx["lows"], ctx["volumes"]
rsi_os = params.get("rsi_oversold", 30)
vol_mult = params.get("breakout_vol_mult", 1.5)
pullback = params.get("pullback_pct", 0.02)
firing: list[dict] = []
# buy_ma20_pullback — 정배열 + ma20 근접 저가 + 반등 복귀
ma20, ma50, ma200 = sma(closes, 20), sma(closes, 50), sma(closes, 200)
if ma20 and ma50 and ma200 and ma20 > ma50 > ma200 and len(lows) >= 3:
recent_low = min(lows[-3:])
if recent_low <= ma20 * (1 + pullback) and price > ma20:
firing.append(_fire(ctx, "buy", "buy_ma20_pullback", price, {
"ma20": round(ma20, 1), "ma50": round(ma50, 1),
"ma200": round(ma200, 1), "recent_low": recent_low,
}))
# buy_breakout — 직전 20봉 고점 돌파 + 거래량 배수
prior_high20 = highest_high(highs, 20)
avg_vol20 = sma(vols, 20)
if prior_high20 and avg_vol20 and price > prior_high20 \
and ctx["today_volume"] > vol_mult * avg_vol20:
firing.append(_fire(ctx, "buy", "buy_breakout", price, {
"prior_high_20": prior_high20,
"vol_mult": round(ctx["today_volume"] / avg_vol20, 2),
"avg_vol_20": round(avg_vol20, 0),
}))
# buy_rsi_bounce — RSI 과매도 후 반등 (무상태 재계산)
rs = rsi_series(closes, 14)
if len(rs) >= 3 and min(rs[-3:]) < rsi_os and rs[-1] > rsi_os and rs[-1] > rs[-2]:
firing.append(_fire(ctx, "buy", "buy_rsi_bounce", price, {
"rsi": round(rs[-1], 1), "rsi_prev": round(rs[-2], 1),
"rsi_oversold": rsi_os,
}))
return firing
def evaluate_sell(ctx: dict, params: dict) -> list[dict]:
price = ctx["price"]
avg = ctx.get("avg_price")
hh = ctx.get("holding_high")
closes, vols = ctx["closes"], ctx["volumes"]
stop = params.get("stop_pct", 0.08)
take = params.get("take_pct", 0.25)
trail = params.get("trailing_pct", 0.10)
firing: list[dict] = []
if avg:
pnl = (price - avg) / avg
if pnl <= -stop:
firing.append(_fire(ctx, "sell", "sell_stop_loss", price, {
"avg_price": avg, "pnl_pct": round(pnl, 4), "stop_pct": stop}))
if pnl >= take:
firing.append(_fire(ctx, "sell", "sell_take_profit", price, {
"avg_price": avg, "pnl_pct": round(pnl, 4), "take_pct": take}))
if hh and price <= hh * (1 - trail):
firing.append(_fire(ctx, "sell", "sell_trailing_stop", price, {
"holding_high": hh, "trailing_pct": trail,
"drawdown_pct": round((price - hh) / hh, 4)}))
ma50, ma200 = sma(closes, 50), sma(closes, 200)
if ma50 and price < ma50:
severity = "high" if (ma200 and price < ma200) else "normal"
firing.append(_fire(ctx, "sell", "sell_ma_break", price, {
"ma50": round(ma50, 1),
"ma200": round(ma200, 1) if ma200 else None,
"severity": severity}))
# sell_climax — holdings_intel 정합(stock/app/holdings_intel.py:109-118):
# 거래량 ≥ 20일평균 × climax_vol_x AND 종가 < 당일고가 × climax_close_pct (윗꼬리)
# 실시간이므로 day_high = 당일 세션 누적 고가(최신 1분봉 고가 아님).
climax_vol_x = params.get("climax_vol_x", ctx.get("climax_vol_mult", 3.0))
climax_close_pct = params.get("climax_close_pct", 0.97)
avg_vol20 = sma(vols, 20)
day_high = ctx.get("day_high")
if avg_vol20 and day_high and ctx["today_volume"] >= climax_vol_x * avg_vol20 \
and price < day_high * climax_close_pct:
firing.append(_fire(ctx, "sell", "sell_climax", price, {
"vol_mult": round(ctx["today_volume"] / avg_vol20, 2),
"day_high": day_high, "climax_close_pct": climax_close_pct}))
return firing

View File

@@ -0,0 +1,32 @@
"""Settings — 환경변수 로드. TM_ 접두사로 ai_trade와 분리."""
from __future__ import annotations
import os
from dataclasses import dataclass
@dataclass
class Settings:
nas_base_url: str
webai_api_key: str
redis_url: str
kis_app_key: str
kis_app_secret: str
kis_account: str
kis_is_virtual: bool
loop_interval: int
climax_vol_mult: float
def load_settings() -> Settings:
return Settings(
nas_base_url=os.getenv("NAS_BASE_URL", "http://192.168.45.54:18500"),
webai_api_key=os.getenv("WEBAI_API_KEY", ""),
redis_url=os.getenv("REDIS_URL", "redis://192.168.45.54:6379"),
kis_app_key=os.getenv("TM_KIS_APP_KEY", ""),
kis_app_secret=os.getenv("TM_KIS_APP_SECRET", ""),
kis_account=os.getenv("TM_KIS_ACCOUNT", ""),
kis_is_virtual=os.getenv("TM_KIS_IS_VIRTUAL", "0") == "1",
loop_interval=int(os.getenv("TM_LOOP_INTERVAL", "60")),
climax_vol_mult=float(os.getenv("TM_CLIMAX_VOL_MULT", "3.0")),
)

View File

@@ -0,0 +1,5 @@
"""services/ 루트를 sys.path에 추가 — from _shared.heartbeat import 가능하게."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))

View File

@@ -0,0 +1,38 @@
"""순수 TA 지표 — sma / rsi_series / highest_high."""
from __future__ import annotations
def sma(values: list[float], period: int) -> float | None:
if period <= 0 or len(values) < period:
return None
return sum(values[-period:]) / period
def highest_high(highs: list[float], period: int) -> float | None:
if period <= 0 or len(highs) < period:
return None
return max(highs[-period:])
def rsi_series(closes: list[float], period: int = 14) -> list[float]:
"""Wilder RSI. 반환 리스트는 closes[period:]에 1:1 정렬. 부족하면 []."""
if len(closes) <= period:
return []
deltas = [closes[i] - closes[i - 1] for i in range(1, len(closes))]
gains = [d if d > 0 else 0.0 for d in deltas]
losses = [-d if d < 0 else 0.0 for d in deltas]
def _rsi(ag: float, al: float) -> float:
if al == 0:
return 100.0
rs = ag / al
return 100.0 - 100.0 / (1.0 + rs)
avg_gain = sum(gains[:period]) / period
avg_loss = sum(losses[:period]) / period
out = [_rsi(avg_gain, avg_loss)]
for i in range(period, len(deltas)):
avg_gain = (avg_gain * (period - 1) + gains[i]) / period
avg_loss = (avg_loss * (period - 1) + losses[i]) / period
out.append(_rsi(avg_gain, avg_loss))
return out

View File

@@ -0,0 +1,124 @@
"""KIS REST client — 자체 OAuth 토큰(TM_KIS_*) + quote + 일봉 + throttle."""
from __future__ import annotations
import asyncio
import logging
import time
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
import httpx
logger = logging.getLogger(__name__)
KST = ZoneInfo("Asia/Seoul")
_MAX_ATTEMPTS = 3
_THROTTLE_INTERVAL = 0.5 # 초당 2회
_TOKEN_MARGIN = 600 # 만료 10분 전 재발급
class KISClient:
def __init__(self, app_key, app_secret, account, is_virtual, timeout: float = 10.0):
self._app_key = app_key
self._app_secret = app_secret
self._account = account
self._base_url = (
"https://openapivts.koreainvestment.com:29443" if is_virtual
else "https://openapi.koreainvestment.com:9443"
)
self._client = httpx.AsyncClient(timeout=timeout)
self._token: str | None = None
self._token_exp: float = 0.0
self._last_throttle_at = 0.0
self._throttle_lock = asyncio.Lock()
self._token_lock = asyncio.Lock()
async def close(self) -> None:
await self._client.aclose()
async def _issue_token(self) -> str:
async with self._token_lock:
now = time.time()
if self._token and now < self._token_exp - _TOKEN_MARGIN:
return self._token
r = await self._client.post(
f"{self._base_url}/oauth2/tokenP",
json={"grant_type": "client_credentials",
"appkey": self._app_key, "appsecret": self._app_secret},
)
r.raise_for_status()
data = r.json()
self._token = data["access_token"]
self._token_exp = now + int(data.get("expires_in", 86400))
return self._token
async def _throttle(self) -> None:
async with self._throttle_lock:
elapsed = time.monotonic() - self._last_throttle_at
if elapsed < _THROTTLE_INTERVAL:
await asyncio.sleep(_THROTTLE_INTERVAL - elapsed)
self._last_throttle_at = time.monotonic()
async def _request(self, method: str, path: str, tr_id: str, **kwargs) -> dict:
token = await self._issue_token()
headers = {
"authorization": f"Bearer {token}",
"appkey": self._app_key, "appsecret": self._app_secret,
"tr_id": tr_id, "custtype": "P",
}
url = f"{self._base_url}{path}"
for attempt in range(_MAX_ATTEMPTS):
await self._throttle()
try:
resp = await self._client.request(method, url, headers=headers, **kwargs)
if resp.status_code == 429 and attempt < _MAX_ATTEMPTS - 1:
await asyncio.sleep(2 ** attempt)
continue
resp.raise_for_status()
return resp.json()
except httpx.TimeoutException:
if attempt < _MAX_ATTEMPTS - 1:
await asyncio.sleep(2 ** attempt)
continue
raise
raise RuntimeError("retry exhausted")
async def get_quote(self, ticker: str) -> dict:
raw = await self._request(
"GET", "/uapi/domestic-stock/v1/quotations/inquire-price",
tr_id="FHKST01010100",
params={"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": ticker},
)
o = raw.get("output", {})
return {
"price": int(o["stck_prpr"]),
"day_open": int(o["stck_oprc"]),
"day_high": int(o["stck_hgpr"]),
"today_volume": int(o["acml_vol"]),
"as_of": datetime.now(KST).isoformat(),
}
async def get_daily_ohlcv(self, ticker: str, days: int = 250) -> list[dict]:
today = datetime.now(KST).strftime("%Y%m%d")
start = (datetime.now(KST) - timedelta(days=days * 2)).strftime("%Y%m%d")
raw = await self._request(
"GET", "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice",
tr_id="FHKST03010100",
params={"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": ticker,
"FID_INPUT_DATE_1": start, "FID_INPUT_DATE_2": today,
"FID_PERIOD_DIV_CODE": "D", "FID_ORG_ADJ_PRC": "1"},
)
bars = []
for row in raw.get("output2", []):
try:
d = row["stck_bsop_date"]
bars.append({
"datetime": f"{d[:4]}-{d[4:6]}-{d[6:]}",
"open": int(row["stck_oprc"]), "high": int(row["stck_hgpr"]),
"low": int(row["stck_lwpr"]), "close": int(row["stck_clpr"]),
"volume": int(row["acml_vol"]),
})
except (KeyError, ValueError):
continue
bars.reverse()
return bars[-days:]

View File

@@ -0,0 +1,62 @@
"""trade-monitor FastAPI entry — lifespan(monitor_loop + heartbeat_loop) + /health."""
from __future__ import annotations
import asyncio
import logging
from contextlib import asynccontextmanager
import redis.asyncio as aioredis
from fastapi import FastAPI
import monitor
from config import load_settings
from kis_client import KISClient
from nas_client import NASClient
from _shared.heartbeat import heartbeat_loop, WorkerStats
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(name)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
HEARTBEAT_INTERVAL = 15 # 60초 루프 > TTL 45초 → 독립 15초 발신으로 만료갭 해소
HEARTBEAT_TTL = 45
@asynccontextmanager
async def lifespan(app: FastAPI):
settings = load_settings()
nas = NASClient(settings.nas_base_url, settings.webai_api_key)
kis = KISClient(settings.kis_app_key, settings.kis_app_secret,
settings.kis_account, settings.kis_is_virtual)
state = monitor.MonitorState()
stats = WorkerStats()
redis = aioredis.from_url(settings.redis_url, decode_responses=False)
mon_task = asyncio.create_task(
monitor.monitor_loop(nas, kis, state, stats, settings))
hb_task = asyncio.create_task(heartbeat_loop(
redis, "trade-monitor", "trader", stats,
interval=HEARTBEAT_INTERVAL, ttl=HEARTBEAT_TTL,
state_fn=monitor.make_state_fn(state)))
logger.info("trade-monitor lifespan 시작")
try:
yield
finally:
for t in (mon_task, hb_task):
t.cancel()
try:
await t
except asyncio.CancelledError:
pass
await kis.close()
await nas.close()
await redis.aclose()
logger.info("trade-monitor lifespan 종료")
app = FastAPI(lifespan=lifespan)
@app.get("/health")
def health():
return {"ok": True, "service": "trade-monitor"}

View File

@@ -0,0 +1,114 @@
"""오케스트레이션 — monitor-set 조회 → 조건 평가 → report + heartbeat state."""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime
from zoneinfo import ZoneInfo
from conditions import evaluate_buy, evaluate_sell
logger = logging.getLogger(__name__)
KST = ZoneInfo("Asia/Seoul")
class MonitorState:
"""monitor_loop가 갱신, heartbeat state_fn이 읽는 공유 상태."""
def __init__(self):
self.session_state = "idle" # market_open | market_closed | idle
self.last_alert_at: str | None = None
def filter_krx(targets: list[dict]) -> list[dict]:
"""6자리 숫자 티커(KRX)만. 알파벳 티커 skip."""
out = []
for t in targets:
tk = str(t.get("ticker", ""))
if tk.isdigit() and len(tk) == 6:
out.append(t)
return out
async def _build_ctx(kis, target: dict, settings) -> dict:
ticker = target["ticker"]
quote = await kis.get_quote(ticker)
daily = await kis.get_daily_ohlcv(ticker, 250)
return {
"ticker": ticker, "name": target.get("name", ""),
"price": quote["price"], "day_open": quote["day_open"],
"day_high": quote["day_high"],
"today_volume": quote["today_volume"],
"closes": [b["close"] for b in daily],
"highs": [b["high"] for b in daily],
"lows": [b["low"] for b in daily],
"volumes": [b["volume"] for b in daily],
"avg_price": target.get("avg_price"),
"qty": target.get("qty"),
"holding_high": target.get("holding_high"),
"climax_vol_mult": settings.climax_vol_mult,
}
async def run_cycle(nas, kis, state, stats, settings) -> None:
try:
ms = await nas.get_monitor_set()
except Exception:
logger.exception("monitor-set 조회 실패")
state.session_state = "idle"
stats.jobs_failed += 1
return
session = ms.get("session", "closed")
if session == "closed":
state.session_state = "market_closed"
return
buy_targets = filter_krx(ms.get("buy_targets", []))
sell_targets = filter_krx(ms.get("sell_targets", []))
buy_params = ms.get("buy_params", {})
exit_params = ms.get("exit_params", {})
firing: list[dict] = []
for t in buy_targets:
try:
firing += evaluate_buy(await _build_ctx(kis, t, settings), buy_params)
except Exception:
logger.exception("buy 평가 실패 %s", t.get("ticker"))
for t in sell_targets:
try:
firing += evaluate_sell(await _build_ctx(kis, t, settings), exit_params)
except Exception:
logger.exception("sell 평가 실패 %s", t.get("ticker"))
as_of = datetime.now(KST).isoformat(timespec="seconds")
if firing:
state.last_alert_at = as_of
logger.info("firing %d개: %s", len(firing),
[f"{f['ticker']}:{f['condition']}" for f in firing])
try:
await nas.post_report(as_of, firing) # 빈 배열도 전송(edge clear)
except Exception:
logger.exception("report 전송 실패")
state.session_state = "market_open"
stats.jobs_done += 1
stats.last_job_at = as_of
async def monitor_loop(nas, kis, state, stats, settings) -> None:
logger.info("trade-monitor loop 시작 interval=%ds", settings.loop_interval)
while True:
try:
await run_cycle(nas, kis, state, stats, settings)
except asyncio.CancelledError:
logger.info("monitor_loop cancelled")
raise
except Exception:
logger.exception("monitor_loop iteration 실패")
await asyncio.sleep(settings.loop_interval)
def make_state_fn(state):
async def state_fn(redis, stats):
return state.session_state, {"last_alert_at": state.last_alert_at}
return state_fn

View File

@@ -0,0 +1,48 @@
"""NAS stock 백엔드 trade-alert 계약 — X-WebAI-Key + retry."""
from __future__ import annotations
import asyncio
import logging
import httpx
logger = logging.getLogger(__name__)
_MAX_ATTEMPTS = 3
_RETRY_STATUSES = {429, 500, 502, 503, 504}
class NASClient:
def __init__(self, base_url: str, api_key: str, timeout: float = 10.0):
self._base_url = base_url.rstrip("/")
self._api_key = api_key
self._client = httpx.AsyncClient(timeout=timeout)
async def close(self) -> None:
await self._client.aclose()
async def get_monitor_set(self) -> dict:
return await self._request("GET", "/api/webai/trade-alert/monitor-set")
async def post_report(self, as_of: str, firing: list[dict]) -> dict:
return await self._request(
"POST", "/api/webai/trade-alert/report",
json={"as_of": as_of, "firing": firing})
async def _request(self, method: str, path: str, **kwargs) -> dict:
url = f"{self._base_url}{path}"
headers = {"X-WebAI-Key": self._api_key}
for attempt in range(_MAX_ATTEMPTS):
try:
resp = await self._client.request(method, url, headers=headers, **kwargs)
if resp.status_code in _RETRY_STATUSES and attempt < _MAX_ATTEMPTS - 1:
await asyncio.sleep(2 ** attempt)
continue
resp.raise_for_status()
return resp.json()
except httpx.TimeoutException:
if attempt < _MAX_ATTEMPTS - 1:
await asyncio.sleep(2 ** attempt)
continue
raise
raise RuntimeError("retry exhausted")

View File

@@ -0,0 +1,2 @@
[pytest]
asyncio_mode = auto

View File

@@ -0,0 +1,7 @@
fastapi==0.115.6
uvicorn[standard]==0.34.0
redis>=5.0
httpx>=0.27
pytest>=8.0
pytest-asyncio>=0.24
respx>=0.21

View File

View File

@@ -0,0 +1,66 @@
"""evaluate_buy — 3개 매수 조건 경계."""
from conditions import evaluate_buy
BUY_PARAMS = {"rsi_oversold": 30, "breakout_vol_mult": 1.5, "pullback_pct": 0.02}
def _ctx(**over):
base = dict(
ticker="005930", name="삼성전자", price=100.0, day_open=99.0,
today_volume=1000.0, closes=[], highs=[], lows=[], volumes=[],
avg_price=None, qty=None, holding_high=None, climax_vol_mult=3.0,
)
base.update(over)
return base
def _conditions(firing):
return {f["condition"] for f in firing}
def test_ma20_pullback_fires():
# 정배열(ma20>ma50>ma200), 최근 저가가 ma20 근처, price가 ma20 위로 반등
closes = [90.0] * 200 + [100.0] * 20 # ma20=100, ma50/ma200 낮음 → 정배열
lows = [90.0] * 217 + [100.5, 100.4, 100.3] # 최근 3봉 저가 ~ma20*(1.02)=102 이하
ctx = _ctx(price=101.0, closes=closes, highs=closes, lows=lows,
volumes=[1.0] * len(closes))
assert "buy_ma20_pullback" in _conditions(evaluate_buy(ctx, BUY_PARAMS))
def test_ma20_pullback_skips_when_not_aligned():
closes = [100.0] * 200 + [90.0] * 20 # 역배열
ctx = _ctx(price=91.0, closes=closes, highs=closes, lows=closes,
volumes=[1.0] * len(closes))
assert "buy_ma20_pullback" not in _conditions(evaluate_buy(ctx, BUY_PARAMS))
def test_breakout_fires():
closes = [50.0] * 25
highs = [60.0] * 25 # 직전 20봉 최고 60
vols = [100.0] * 25 # avg20=100
ctx = _ctx(price=61.0, today_volume=200.0, closes=closes, highs=highs,
lows=closes, volumes=vols) # 61>60, 200>1.5*100
assert "buy_breakout" in _conditions(evaluate_buy(ctx, BUY_PARAMS))
def test_breakout_skips_on_low_volume():
highs = [60.0] * 25
ctx = _ctx(price=61.0, today_volume=120.0, closes=[50.0] * 25, highs=highs,
lows=[50.0] * 25, volumes=[100.0] * 25) # 120 < 1.5*100=150
assert "buy_breakout" not in _conditions(evaluate_buy(ctx, BUY_PARAMS))
def test_rsi_bounce_fires():
# 14봉 급락으로 RSI<30 찍고 5봉 반등하여 30 위로 복귀
closes = [100.0]
for _ in range(14):
closes.append(closes[-1] * 0.97) # 하락 → RSI 저하
for _ in range(5):
closes.append(closes[-1] * 1.05) # 반등 → RSI 30 위로
ctx = _ctx(price=closes[-1], closes=closes, highs=closes, lows=closes,
volumes=[1.0] * len(closes))
assert "buy_rsi_bounce" in _conditions(evaluate_buy(ctx, BUY_PARAMS))
def test_empty_series_no_fire():
assert evaluate_buy(_ctx(), BUY_PARAMS) == []

View File

@@ -0,0 +1,89 @@
"""evaluate_sell — 5개 매도 조건 경계."""
from conditions import evaluate_sell
EXIT = {"stop_pct": 0.08, "take_pct": 0.25, "trailing_pct": 0.10,
"climax_vol_x": 3.0, "climax_close_pct": 0.97}
def _ctx(**over):
base = dict(
ticker="000660", name="SK하이닉스", price=100.0, day_open=100.0,
day_high=100.0, today_volume=100.0, closes=[100.0] * 60,
highs=[100.0] * 60, lows=[100.0] * 60, volumes=[100.0] * 60,
avg_price=100.0, qty=10, holding_high=100.0, climax_vol_mult=3.0,
)
base.update(over)
return base
def _c(firing):
return {f["condition"] for f in firing}
def test_stop_loss_fires():
ctx = _ctx(price=90.0, avg_price=100.0) # -10% <= -8%
assert "sell_stop_loss" in _c(evaluate_sell(ctx, EXIT))
def test_stop_loss_skips_above_threshold():
ctx = _ctx(price=95.0, avg_price=100.0) # -5% > -8%
assert "sell_stop_loss" not in _c(evaluate_sell(ctx, EXIT))
def test_take_profit_fires():
ctx = _ctx(price=130.0, avg_price=100.0) # +30% >= 25%
assert "sell_take_profit" in _c(evaluate_sell(ctx, EXIT))
def test_trailing_stop_fires():
ctx = _ctx(price=89.0, holding_high=100.0) # 89 <= 100*0.9=90
assert "sell_trailing_stop" in _c(evaluate_sell(ctx, EXIT))
def test_ma_break_severity_high():
# price가 ma50/ma200 아래 → severity high (ma200 계산 위해 200봉 필요)
closes = [200.0] * 200
ctx = _ctx(price=100.0, closes=closes, avg_price=100.0, holding_high=100.0)
firing = evaluate_sell(ctx, EXIT)
mb = [f for f in firing if f["condition"] == "sell_ma_break"]
assert mb and mb[0]["detail"]["severity"] == "high"
def test_climax_fires():
# holdings_intel 정합: 거래량 3배 이상 + 종가 < 당일고가×0.97 (윗꼬리)
ctx = _ctx(price=96.0, day_high=100.0, today_volume=400.0,
volumes=[100.0] * 60) # 400>=3*100, 96 < 100*0.97=97
assert "sell_climax" in _c(evaluate_sell(ctx, EXIT))
def test_climax_skips_when_not_reversal():
# 종가가 당일고가의 97% 이상 → 윗꼬리 아님
ctx = _ctx(price=99.0, day_high=100.0, today_volume=400.0,
volumes=[100.0] * 60) # 99 >= 100*0.97=97 → 반전 아님
assert "sell_climax" not in _c(evaluate_sell(ctx, EXIT))
def test_climax_uses_exit_params_vol_x():
# exit_params.climax_vol_x=5.0 → 400 < 5*100=500 → 미발화
exit5 = {**EXIT, "climax_vol_x": 5.0}
ctx = _ctx(price=96.0, day_high=100.0, today_volume=400.0,
volumes=[100.0] * 60)
assert "sell_climax" not in _c(evaluate_sell(ctx, exit5))
def test_climax_uses_exit_params_close_pct():
# climax_close_pct=0.90 → 임계 90, price=95 → 95<90? No → 미발화
exit90 = {**EXIT, "climax_close_pct": 0.90}
ctx = _ctx(price=95.0, day_high=100.0, today_volume=400.0,
volumes=[100.0] * 60)
assert "sell_climax" not in _c(evaluate_sell(ctx, exit90))
# 기본 0.97이면 95 < 97 → 발화
assert "sell_climax" in _c(evaluate_sell(ctx, EXIT))
def test_no_avg_no_pnl_conditions():
# avg_price None(보유정보 없음) → stop/take 미발화
ctx = _ctx(price=50.0, avg_price=None, holding_high=None,
closes=[100.0] * 60)
conds = _c(evaluate_sell(ctx, EXIT))
assert "sell_stop_loss" not in conds and "sell_take_profit" not in conds

View File

@@ -0,0 +1,27 @@
"""Settings env 로드 — 기본값 + override."""
from config import load_settings
def test_defaults(monkeypatch):
for k in ("NAS_BASE_URL", "WEBAI_API_KEY", "REDIS_URL", "TM_KIS_APP_KEY",
"TM_KIS_APP_SECRET", "TM_KIS_ACCOUNT", "TM_KIS_IS_VIRTUAL",
"TM_LOOP_INTERVAL", "TM_CLIMAX_VOL_MULT"):
monkeypatch.delenv(k, raising=False)
s = load_settings()
assert s.nas_base_url == "http://192.168.45.54:18500"
assert s.redis_url == "redis://192.168.45.54:6379"
assert s.kis_is_virtual is False
assert s.loop_interval == 60
assert s.climax_vol_mult == 3.0
def test_override(monkeypatch):
monkeypatch.setenv("TM_KIS_IS_VIRTUAL", "1")
monkeypatch.setenv("TM_LOOP_INTERVAL", "30")
monkeypatch.setenv("TM_CLIMAX_VOL_MULT", "2.5")
monkeypatch.setenv("WEBAI_API_KEY", "secret")
s = load_settings()
assert s.kis_is_virtual is True
assert s.loop_interval == 30
assert s.climax_vol_mult == 2.5
assert s.webai_api_key == "secret"

View File

@@ -0,0 +1,6 @@
"""/health — 라우트 핸들러 직접 검증."""
from main import health
def test_health():
assert health() == {"ok": True, "service": "trade-monitor"}

View File

@@ -0,0 +1,39 @@
"""indicators — 순수 수치 검증."""
from indicators import sma, rsi_series, highest_high
def test_sma_basic():
assert sma([1, 2, 3, 4, 5], 5) == 3.0
assert sma([1, 2, 3, 4, 5], 2) == 4.5
def test_sma_insufficient():
assert sma([1, 2], 5) is None
assert sma([], 3) is None
def test_highest_high():
assert highest_high([1, 9, 3, 4], 3) == 9
assert highest_high([1, 2, 3], 3) == 3
assert highest_high([1, 2], 3) is None
def test_rsi_all_gains_is_100():
# 단조 증가 → 손실 0 → RSI 100
closes = [float(i) for i in range(1, 20)]
rs = rsi_series(closes, 14)
assert rs, "series should not be empty"
assert rs[-1] == 100.0
def test_rsi_insufficient():
assert rsi_series([1, 2, 3], 14) == []
def test_rsi_known_range():
# 등락 섞인 시계열 → RSI는 0~100 사이
closes = [10, 11, 10.5, 11.5, 11, 12, 11.8, 12.5, 12, 13,
12.7, 13.2, 12.9, 13.5, 13.1, 13.8]
rs = rsi_series(closes, 14)
assert len(rs) == len(closes) - 14
assert all(0.0 <= v <= 100.0 for v in rs)

View File

@@ -0,0 +1,56 @@
"""KISClient — 토큰 발급/캐시 + quote/daily 파싱 (respx)."""
import httpx
import respx
from kis_client import KISClient
BASE = "https://openapi.koreainvestment.com:9443"
def _client():
return KISClient("APPKEY", "APPSECRET", "12345678-01", is_virtual=False)
@respx.mock
async def test_issue_token_cached():
route = respx.post(f"{BASE}/oauth2/tokenP").mock(
return_value=httpx.Response(200, json={"access_token": "TKN", "expires_in": 86400}))
c = _client()
t1 = await c._issue_token()
t2 = await c._issue_token()
assert t1 == "TKN" and t2 == "TKN"
assert route.call_count == 1 # 캐시 → 1회만 발급
await c.close()
@respx.mock
async def test_get_quote_parses():
respx.post(f"{BASE}/oauth2/tokenP").mock(
return_value=httpx.Response(200, json={"access_token": "TKN", "expires_in": 86400}))
respx.get(f"{BASE}/uapi/domestic-stock/v1/quotations/inquire-price").mock(
return_value=httpx.Response(200, json={"output": {
"stck_prpr": "71500", "stck_oprc": "71000", "stck_hgpr": "72000",
"acml_vol": "1234567"}}))
c = _client()
q = await c.get_quote("005930")
assert q["price"] == 71500 and q["day_open"] == 71000 and q["today_volume"] == 1234567
assert q["day_high"] == 72000
await c.close()
@respx.mock
async def test_get_daily_ascending():
respx.post(f"{BASE}/oauth2/tokenP").mock(
return_value=httpx.Response(200, json={"access_token": "TKN", "expires_in": 86400}))
# KIS는 내림차순 반환 → 오름차순으로 뒤집혀야 함
respx.get(f"{BASE}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice").mock(
return_value=httpx.Response(200, json={"output2": [
{"stck_bsop_date": "20260702", "stck_oprc": "100", "stck_hgpr": "110",
"stck_lwpr": "90", "stck_clpr": "105", "acml_vol": "5"},
{"stck_bsop_date": "20260701", "stck_oprc": "95", "stck_hgpr": "102",
"stck_lwpr": "94", "stck_clpr": "100", "acml_vol": "4"}]}))
c = _client()
bars = await c.get_daily_ohlcv("005930", days=250)
assert bars[0]["datetime"] == "2026-07-01"
assert bars[-1]["close"] == 105
await c.close()

View File

@@ -0,0 +1,95 @@
"""monitor.run_cycle — 게이트/필터/조립/격리."""
from monitor import MonitorState, filter_krx, run_cycle
from config import load_settings
from _shared.heartbeat import WorkerStats
def test_filter_krx_keeps_only_numeric6():
targets = [{"ticker": "005930"}, {"ticker": "AAPL"}, {"ticker": "00660"},
{"ticker": "000660"}, {"ticker": "0059301"}]
kept = {t["ticker"] for t in filter_krx(targets)}
assert kept == {"005930", "000660"}
class _FakeNAS:
def __init__(self, ms):
self._ms = ms
self.reported = None
async def get_monitor_set(self):
return self._ms
async def post_report(self, as_of, firing):
self.reported = {"as_of": as_of, "firing": firing}
return {"new_alerts": len(firing), "cleared": 0}
class _FakeKIS:
def __init__(self, price=100, fail_on=None):
self._price = price
self._fail_on = fail_on or set()
async def get_quote(self, ticker):
if ticker in self._fail_on:
raise RuntimeError("KIS down")
return {"price": self._price, "day_open": 99, "day_high": 100,
"today_volume": 1000, "as_of": "x"}
async def get_daily_ohlcv(self, ticker, days=250):
# 정배열 + 저가 근접 → ma20_pullback 발화 유도
return [{"open": 90, "high": 90, "low": 90, "close": 90, "volume": 1}] * 200 \
+ [{"open": 100, "high": 100, "low": 100, "close": 100, "volume": 1}] * 20
async def test_closed_session_skips_kis():
nas = _FakeNAS({"session": "closed"})
state, stats = MonitorState(), WorkerStats()
await run_cycle(nas, _FakeKIS(), state, stats, load_settings())
assert state.session_state == "market_closed"
assert nas.reported is None # report도 안 함
async def test_non_krx_skipped_and_report_sent():
nas = _FakeNAS({"session": "regular",
"buy_targets": [{"ticker": "AAPL", "name": "Apple"}],
"sell_targets": [], "buy_params": {}, "exit_params": {}})
state, stats = MonitorState(), WorkerStats()
await run_cycle(nas, _FakeKIS(), state, stats, load_settings())
assert state.session_state == "market_open"
assert nas.reported is not None
assert nas.reported["firing"] == [] # 알파벳 티커 skip → 빈 발화
async def test_firing_assembled_and_last_alert_set():
nas = _FakeNAS({"session": "regular",
"buy_targets": [{"ticker": "005930", "name": "삼성전자"}],
"sell_targets": [], "buy_params": {"pullback_pct": 0.02},
"exit_params": {}})
state, stats = MonitorState(), WorkerStats()
await run_cycle(nas, _FakeKIS(price=101), state, stats, load_settings())
conds = {f["condition"] for f in nas.reported["firing"]}
assert "buy_ma20_pullback" in conds
assert state.last_alert_at is not None
async def test_per_ticker_failure_isolated():
nas = _FakeNAS({"session": "regular",
"buy_targets": [{"ticker": "005930"}, {"ticker": "000660"}],
"sell_targets": [], "buy_params": {}, "exit_params": {}})
state, stats = MonitorState(), WorkerStats()
# 005930은 실패, 000660은 성공 → 루프가 죽지 않고 report 전송
await run_cycle(nas, _FakeKIS(fail_on={"005930"}), state, stats, load_settings())
assert nas.reported is not None
assert state.session_state == "market_open"
async def test_monitor_set_failure_sets_idle():
class _BadNAS(_FakeNAS):
async def get_monitor_set(self):
raise RuntimeError("NAS down")
nas = _BadNAS({})
state, stats = MonitorState(), WorkerStats()
await run_cycle(nas, _FakeKIS(), state, stats, load_settings())
assert state.session_state == "idle"
assert nas.reported is None

View File

@@ -0,0 +1,39 @@
"""NASClient — monitor-set/report + X-WebAI-Key (respx)."""
import json as _json
import httpx
import respx
from nas_client import NASClient
BASE = "http://nas.test"
@respx.mock
async def test_get_monitor_set_sends_key():
route = respx.get(f"{BASE}/api/webai/trade-alert/monitor-set").mock(
return_value=httpx.Response(200, json={"session": "regular", "buy_targets": []}))
c = NASClient(BASE, "KEY")
ms = await c.get_monitor_set()
assert ms["session"] == "regular"
assert route.calls.last.request.headers["X-WebAI-Key"] == "KEY"
await c.close()
@respx.mock
async def test_post_report_payload():
captured = {}
def _resp(request):
captured.update(_json.loads(request.content))
return httpx.Response(200, json={"new_alerts": 1, "cleared": 0})
respx.post(f"{BASE}/api/webai/trade-alert/report").mock(side_effect=_resp)
c = NASClient(BASE, "KEY")
firing = [{"ticker": "005930", "kind": "buy", "condition": "buy_breakout",
"price": 71500, "detail": {}}]
out = await c.post_report("2026-07-02T09:01:00+09:00", firing)
assert out["new_alerts"] == 1
assert captured["as_of"] == "2026-07-02T09:01:00+09:00"
assert captured["firing"] == firing
await c.close()

View File

@@ -3,11 +3,14 @@ from __future__ import annotations
import asyncio import asyncio
import logging import logging
import os
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
import redis.asyncio as aioredis
from fastapi import FastAPI from fastapi import FastAPI
import worker import worker
from _shared.heartbeat import heartbeat_loop
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s") logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -16,15 +19,19 @@ logger = logging.getLogger(__name__)
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
worker_task = asyncio.create_task(worker.worker_loop()) worker_task = asyncio.create_task(worker.worker_loop())
hb_redis = aioredis.from_url(os.getenv("REDIS_URL", "redis://192.168.45.54:6379"), decode_responses=False)
hb_task = asyncio.create_task(heartbeat_loop(hb_redis, "video-render", "render", worker.stats))
logger.info("video-render lifespan 시작") logger.info("video-render lifespan 시작")
try: try:
yield yield
finally: finally:
worker_task.cancel() for t in (worker_task, hb_task):
try: t.cancel()
await worker_task try:
except asyncio.CancelledError: await t
pass except asyncio.CancelledError:
pass
await hb_redis.aclose()
logger.info("video-render lifespan 종료") logger.info("video-render lifespan 종료")

View File

@@ -94,3 +94,25 @@ async def test_poll_once_returns_false_on_timeout(monkeypatch):
assert handled is False assert handled is False
fake_queue.ack.assert_not_awaited() fake_queue.ack.assert_not_awaited()
fake_queue.fail.assert_not_awaited() fake_queue.fail.assert_not_awaited()
# ----- heartbeat stats 카운터 -----
class _OneJobQueue:
def __init__(self): self.acked = False
async def dequeue(self, timeout=5):
if self.acked: return None
return ({"job_type": "sora_generation", "task_id": "t1", "params": {}}, b"raw")
async def ack(self, raw): self.acked = True
async def fail(self, raw, payload): pass
@pytest.mark.asyncio
async def test_poll_once_increments_jobs_done(monkeypatch):
worker.stats.jobs_done = 0
monkeypatch.setattr(worker, "run_sora_generation", lambda task_id, params: None)
handled = await worker.poll_once(_OneJobQueue())
assert handled is True
assert worker.stats.jobs_done == 1
assert worker.stats.busy is False
assert worker.stats.last_job_at is not None

View File

@@ -19,6 +19,7 @@ from providers.veo import run_veo_generation
from providers.kling import run_kling_generation from providers.kling import run_kling_generation
from providers.seedance import run_seedance_generation from providers.seedance import run_seedance_generation
from _shared.reliable_queue import ReliableQueue from _shared.reliable_queue import ReliableQueue
from _shared.heartbeat import WorkerStats, utc_now_iso
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -26,6 +27,8 @@ REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
QUEUE_KEY = "queue:video-render" QUEUE_KEY = "queue:video-render"
PAUSED_KEY = "queue:paused" PAUSED_KEY = "queue:paused"
stats = WorkerStats()
# string names so `unittest.mock.patch` on `worker.<name>` is correctly intercepted # string names so `unittest.mock.patch` on `worker.<name>` is correctly intercepted
_DISPATCH_TABLE = { _DISPATCH_TABLE = {
"sora_generation": "run_sora_generation", "sora_generation": "run_sora_generation",
@@ -60,14 +63,21 @@ async def poll_once(queue: ReliableQueue) -> bool:
if result is None: if result is None:
return False return False
payload, raw = result payload, raw = result
stats.busy = True
try: try:
await asyncio.to_thread(_dispatch, payload) await asyncio.to_thread(_dispatch, payload)
except Exception: except Exception:
logger.exception("dispatch unhandled exception task_id=%s", logger.exception("dispatch unhandled exception task_id=%s",
payload.get("task_id")) payload.get("task_id"))
await queue.fail(raw, payload) await queue.fail(raw, payload)
stats.jobs_failed += 1
stats.last_job_at = utc_now_iso()
stats.busy = False
return True return True
await queue.ack(raw) await queue.ack(raw)
stats.jobs_done += 1
stats.last_job_at = utc_now_iso()
stats.busy = False
return True return True