27 Commits

Author SHA1 Message Date
c4b2fffeb4 docs(CLAUDE.md): 실시간 매매 알람 엔드포인트 카탈로그 등재 (stock watchlist/webai + agent-office notify/봇명령) 2026-07-02 20:09:07 +09:00
c6540b2417 feat(agent-office): /watch /unwatch /watchlist 봇 명령 2026-07-02 20:05:59 +09:00
2bce07c367 feat(agent-office): 매매알람 텔레그램 notify(너+아내) 엔드포인트 2026-07-02 20:01:10 +09:00
2906a2ae3e feat(stock): webai report — edge diff→agent-office push→상태/이력(전송성공시만) 2026-07-02 19:56:58 +09:00
134b9e5d07 feat(stock): session 판정 + webai monitor-set 엔드포인트
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01EqCYBhvTcdeCTUDX3RhWx9
2026-07-02 19:51:57 +09:00
bf84328d59 feat(stock): edge diff(신규/해제/재무장) 순수 함수 2026-07-02 19:48:45 +09:00
d8b3267b98 feat(stock): 감시대상(monitor-set) 조립 로직 2026-07-02 15:51:06 +09:00
89c52b1fb6 feat(stock): watchlist CRUD + 알람 이력 API 2026-07-02 15:45:14 +09:00
01a8aee226 fix(stock): 매매알람 이력 days 필터 포맷을 ISO로 통일 (경계일 과다포함 수정, 리뷰 Important) 2026-07-02 15:43:22 +09:00
b2c4ca0e0b feat(stock): 매매알람 DB — watchlist/alert_state/history 테이블+헬퍼
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01EqCYBhvTcdeCTUDX3RhWx9
2026-07-02 15:34:53 +09:00
baa3a3075d docs(stock): 실시간 매매 알람 BE 구현 계획 (9 tasks, TDD)
watchlist/alert_state/history DB → CRUD API → monitor-set 조립 → edge diff →
webai monitor-set/report → agent-office 텔레그램(너+아내) → /watch 봇 명령 → 회귀/배포.
워커(web-ai)·탭(web-ui)은 계약(스펙 §5)만 정의해 각 세션 핸드오프.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01EqCYBhvTcdeCTUDX3RhWx9
2026-07-02 15:25:26 +09:00
4cb9dc6a7c docs(stock): 실시간 매매 알람 설계 스펙 (watchlist∪screener buy + exit+trailing sell, 1분 Windows 워커, NAS edge dedup)
브레인스토밍 확정 요구사항 6종 + 아키텍처 A(신규 Windows docker 워커). TA/조건판정은
Windows, edge 중복판정 상태는 NAS 영속(재시작 스팸 방지). cross-repo 계약(webai
monitor-set/report, agent-office notify, watchlist CRUD, heartbeat) 정의.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01EqCYBhvTcdeCTUDX3RhWx9
2026-07-02 15:19:00 +09:00
36e8d11060 fix(stock): AI 뉴스 리포트 하루 밀림 해소 — asof를 KST로 보정 + LLM에 현재 일자 주입
근본원인: stock 컨테이너는 python:3.12-alpine + tzdata 미설치라 TZ=Asia/Seoul이
무효 → date.today()가 UTC를 반환. AI 뉴스 리포트 cron은 08:00 KST(=전날 23:00 UTC)라
asof가 어제로 계산돼 라벨·기사 윈도우·news_sentiment 저장이 전부 하루 밀렸음
(월요일은 일요일 UTC로 계산돼 skip_weekend까지).

- screener/router.py: _today_kst()(=utcnow+9h, holdings_intel 관용) 추가.
  /snapshot/refresh · /snapshot/refresh-news-sentiment의 asof 기본값을 KST로.
- ai_news/analyzer.py: score_sentiment(asof=...) → 프롬프트 앞에 "오늘 날짜" 명시,
  LLM이 현재 일자 기준으로 뉴스 평가(사용자 요청).
- ai_news/pipeline.py: refresh_daily가 asof를 score_sentiment까지 스레딩.
- 테스트: _today_kst KST 보정 + analyzer asof 주입 2종 TDD Red→Green.
  기존 pipeline 목 시그니처에 asof 반영. stock 전체 149 passed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01EqCYBhvTcdeCTUDX3RhWx9
2026-07-02 14:38:51 +09:00
db6fed72b3 feat(music-lab): 파이프라인 하드 삭제 엔드포인트 DELETE /api/music/pipeline/{id}
cancel(state→cancelled, active/failed 뷰에서만 제거)만으론 status=all 뷰에
행이 남아 옛 dead 파이프라인을 완전히 치울 수 없었음. DELETE로 하드 삭제 추가.

- db.delete_pipeline(pid)→bool: 자식행(pipeline_feedback, pipeline_jobs) 먼저
  삭제 후 video_pipelines 삭제(SQLite FK 미강제라 명시적 cascade). 존재 여부 bool.
- DELETE /api/music/pipeline/{id}: 없으면 404, 있으면 {"ok":true,"deleted":id}.
  상태 가드 없음(관리자 정리 용도, cancel과 동일한 단순 정책).
- 테스트 3종(삭제+404+자식행 cascade) TDD Red→Green. music-lab 152 passed.
- CLAUDE.md 엔드포인트 카탈로그 갱신.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01EqCYBhvTcdeCTUDX3RhWx9
2026-07-02 13:52:11 +09:00
7cce5c422f fix(agent-office): 파이프라인 실패 알림 dedup을 DB 영속화 (재시작 재알림 스팸 해소)
youtube_publisher._notified_failed(인메모리 set)가 컨테이너 재시작 시 소실되어
기존 failed 파이프라인(예: video 인코딩 구버전 실패 #3)을 매 재시작마다 "신규"로
재알림하던 스팸 버그를 notified_failed_pipelines 테이블로 영속화해 해결.

부수 버그 fix: failed 폴링이 예외를 던지면 failed=[]로 오해해 원장을 통째로
비우던 코드 → 예외 시 early-return(원장 보존).

진행 중 *_pending 승인 dedup(_notified_state_per_pipeline)은 의도적으로 인메모리
유지(재시작 시 살아있는 파이프라인 승인 재알림은 유용한 리마인더).

테스트: 재시작 지속성 + 일시적 폴링 예외 재현 테스트 2종 추가(TDD Red→Green).
DB_PATH 첫 import 고정으로 인한 테스트 간 영속 테이블 누수를 monkeypatch로 격리.
agent-office 전체 140개 통과.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01EqCYBhvTcdeCTUDX3RhWx9
2026-07-01 15:20:07 +09:00
94beecbfaf docs(CLAUDE.md): agent-office 카탈로그에 /nodes 엔드포인트 + node_monitor.py 등재
분산 워커 관측 시스템 — GET /api/agent-office/nodes(heartbeat 생사+큐깊이+
dead-letter 집계, web-ui /infra 소비) 엔드포인트 표 추가 + 핵심파일에
node_monitor.py 추가. 상세는 infra_distributed_workers.md 메모리.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
2026-07-01 02:56:37 +09:00
98b17f3a3a fix(redis): bgsave fork 실패로 인한 쓰기 차단 해소 (--save "" + stop-writes off)
근본원인: NAS vm.overcommit_memory=0 + Committed_AS≈CommitLimit(98%)로 redis
bgsave fork()가 거부되어 stop-writes-on-bgsave-error(기본 yes)가 모든 쓰기를
차단(6/29 20:36 이후). AOF가 durability를 담당하므로 실패하는 RDB 스냅샷을
비활성화(--save "")하고 stop-writes-on-bgsave-error no로 안전망 추가.
호스트 vm.overcommit_memory=1(sudo)은 별도 권장.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
2026-06-30 14:21:09 +09:00
94cddccaa7 fix(agent-office): alive를 heartbeat staleness로 판정 + 다운/복구 전이 발송실패 시 재시도 (최종 리뷰 I1·I2)
I1: collect_status - heartbeat 키 존재 여부가 아닌 ts age 기반으로 alive 판정.
    age > NODE_STALE_THRESHOLD_SEC(90s, env 주입 가능)이면 키 있어도 dead.
    config.py에 NODE_STALE_THRESHOLD_SEC=90 추가.
I2: check_and_alert - 다운/복구 전이 시 send_raw 실패하면 _node_state 갱신 보류.
    다음 사이클에서 동일 전이 재감지 → 재발송 시도 (다운 이벤트 유실 방지).
테스트: _hb 헬퍼 현재 시각 기본값으로 수정 + 신규 2개 (stale→dead, I2 재시도 회귀).
14 passed.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
2026-06-29 18:50:45 +09:00
b49cc14ef3 fix(agent-office): dead-letter _dl_notified 갱신을 발송성공 시로 한정 + collect_status 예외방어 (B4 리뷰)
- _dl_notified[name] = dl을 if ok: 블록 안으로 이동 — 텔레그램 실패 시 갱신 방지
- check_and_alert에 collect_status try/except 추가 — 스케줄러 잡 생존 보장
- tests: import app.node_monitor as nm 최상단 이동
- tests: test_dl_notified_not_updated_on_telegram_failure 회귀 테스트 추가

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
2026-06-29 18:13:33 +09:00
5d5ff27d29 feat(agent-office): 노드 헬스 1분 cron + 텔레그램 경보(다운/복구/dead-letter) 2026-06-29 18:06:38 +09:00
2a0090a1d4 feat(agent-office): GET /api/agent-office/nodes 엔드포인트
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
2026-06-29 18:01:00 +09:00
ea1f0d103d fix(agent-office): node_monitor 루프 예외 방어 + 테스트 보강 (B2 리뷰)
- per-worker 루프 전체를 try/except로 감싸 Redis 예외 시 redis_ok=False+break (Blocker)
- heartbeat 파싱 except에 UnicodeDecodeError 추가 (Important)
- hb.get('ts') or '' 로 null ts 안전 처리 (Minor)
- 테스트 3개 추가: paused 폴백·processing 집계·llen 예외 회귀

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
2026-06-29 17:56:18 +09:00
a3ae85cde1 feat(agent-office): node_monitor.collect_status (heartbeat+큐+dead-letter 집계) 2026-06-29 17:50:16 +09:00
363e95c5a9 chore(agent-office): redis 의존성 + REDIS_URL/dead-letter 임계 설정 2026-06-29 17:44:45 +09:00
c69b18243b docs: 분산 워커 관측 시스템 구현 계획(3-repo TDD plan) 추가
Part A(web-ai heartbeat) / Part B(agent-office 집계+경보) / Part C(web-ui
Three.js 대시보드). 각 Part 독립 실행·테스트 가능, 계약 2개를 Global
Constraints로 잠금.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
2026-06-29 17:33:16 +09:00
f0fad05f2d docs: 분산 워커 관측 시스템(NAS↔Windows) 설계 스펙 추가
music/video/image/insta-render + task-watcher + ai_trade의 heartbeat 기반
관측, agent-office /nodes 집계 API + 텔레그램 경보, web-ui Three.js 파이프라인
시각화를 다루는 3-repo 설계. heartbeat 키 스키마 + /nodes 응답 스키마를
잠그는 계약으로 정의.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
2026-06-29 17:25:13 +09:00
ed8ffdf343 docs: co-gahusb를 서비스 목록·포트·nginx 라우팅 테이블에 등재
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-13 03:52:30 +09:00
39 changed files with 4140 additions and 28 deletions

View File

@@ -21,7 +21,7 @@
## 1. 프로젝트 개요 ## 1. 프로젝트 개요
Synology NAS 기반의 개인 웹 플랫폼 백엔드 모노레포. Synology NAS 기반의 개인 웹 플랫폼 백엔드 모노레포.
- **서비스 14개**: lotto, stock, music-lab, video-lab, image-lab, insta-lab, realestate-lab, agent-office, tarot-lab, saju-lab, personal, packs-lab, travel-proxy, deployer - **서비스 15개**: lotto, stock, music-lab, video-lab, image-lab, insta-lab, realestate-lab, agent-office, tarot-lab, saju-lab, personal, packs-lab, travel-proxy, co-gahusb, deployer
- **공유 인프라**: `_shared/access_log` 모듈 (5개 서비스 공유), `redis` (music/video/image/insta-lab 큐 공유) - **공유 인프라**: `_shared/access_log` 모듈 (5개 서비스 공유), `redis` (music/video/image/insta-lab 큐 공유)
- **렌더/생성 위임**: music/video/image/insta의 무거운 생성·렌더는 **Windows AI 워커**(`web-ai` 별도 레포)가 담당. NAS 서비스는 Redis 큐 push + 결과 webhook 수신만 한다. - **렌더/생성 위임**: music/video/image/insta의 무거운 생성·렌더는 **Windows AI 워커**(`web-ai` 별도 레포)가 담당. NAS 서비스는 Redis 큐 push + 결과 webhook 수신만 한다.
- **프론트엔드**: 별도 레포 (React + Vite SPA), 빌드 산출물만 NAS에 배포 - **프론트엔드**: 별도 레포 (React + Vite SPA), 빌드 산출물만 NAS에 배포
@@ -80,7 +80,8 @@ Synology NAS 기반의 개인 웹 플랫폼 백엔드 모노레포.
| `packs-lab` | 18950 | NAS 자료 다운로드 자동화 (DSM 공유 링크 + 5GB 업로드, Vercel SaaS와 HMAC 통신) | | `packs-lab` | 18950 | NAS 자료 다운로드 자동화 (DSM 공유 링크 + 5GB 업로드, Vercel SaaS와 HMAC 통신) |
| `personal` | 18850 | 개인 서비스 (포트폴리오·블로그·투두 통합) | | `personal` | 18850 | 개인 서비스 (포트폴리오·블로그·투두 통합) |
| `travel-proxy` | 19000 | 여행 사진 API + 썸네일 생성 | | `travel-proxy` | 19000 | 여행 사진 API + 썸네일 생성 |
| `redis` | 6379 | 비동기 큐 (music/video/image/insta-lab 공유) | | `co-gahusb` | 18920 | 세션 간 협업 팀 버스 (FastMCP streamable-http + Redis, Bearer `CO_BUS_KEY`, DNS-rebinding 보호 off) |
| `redis` | 6379 | 비동기 큐 (music/video/image/insta-lab + co-gahusb 공유) |
| `frontend` (nginx) | 8080 | 정적 SPA 서빙 + API 리버스 프록시 | | `frontend` (nginx) | 8080 | 정적 SPA 서빙 + API 리버스 프록시 |
| `webpage-deployer` | 19010 | Gitea Webhook 수신 → 자동 배포 | | `webpage-deployer` | 19010 | Gitea Webhook 수신 → 자동 배포 |
@@ -106,6 +107,7 @@ Synology NAS 기반의 개인 웹 플랫폼 백엔드 모노레포.
| `/api/blog/` | `personal:8000` | 블로그 API | | `/api/blog/` | `personal:8000` | 블로그 API |
| `/api/profile/` | `personal:8000` | 포트폴리오 API | | `/api/profile/` | `personal:8000` | 포트폴리오 API |
| `/api/agent-office/` | `agent-office:8000` | AI 에이전트 오피스 API + WebSocket (86400s) | | `/api/agent-office/` | `agent-office:8000` | AI 에이전트 오피스 API + WebSocket (86400s) |
| `/api/co/` | `co-gahusb:8000/` | MCP 팀 버스 (trailing-slash strip → `/mcp`, `Authorization` forward, `proxy_buffering off`, 3600s) |
| `/api/packs/upload` | `packs-lab:8000` | 5GB multipart 업로드 (`client_max_body_size 5G`, `proxy_request_buffering off`, **1800s** timeout) | | `/api/packs/upload` | `packs-lab:8000` | 5GB multipart 업로드 (`client_max_body_size 5G`, `proxy_request_buffering off`, **1800s** timeout) |
| `/api/packs/` | `packs-lab:8000` | 다운로드/list | | `/api/packs/` | `packs-lab:8000` | 다운로드/list |
| `/api/internal/insta/` | `insta-lab:8000` | Windows 워커 webhook (nginx IP 화이트리스트 + 앱 `X-Internal-Key`) | | `/api/internal/insta/` | `insta-lab:8000` | Windows 워커 webhook (nginx IP 화이트리스트 + 앱 `X-Internal-Key`) |
@@ -244,6 +246,10 @@ docker compose up -d
| GET | `/api/portfolio/snapshot/history` | 스냅샷 이력 (`days`) | | GET | `/api/portfolio/snapshot/history` | 스냅샷 이력 (`days`) |
| GET/POST | `/api/portfolio/sell-history` | 매도 내역 조회/저장 | | GET/POST | `/api/portfolio/sell-history` | 매도 내역 조회/저장 |
| PUT/DELETE | `/api/portfolio/sell-history/{id}` | 매도 기록 수정/삭제 | | PUT/DELETE | `/api/portfolio/sell-history/{id}` | 매도 기록 수정/삭제 |
| GET/POST/DELETE | `/api/stock/watchlist` (+ `/{ticker}`) | 실시간 매수 알람 관심종목 CRUD |
| GET | `/api/stock/trade-alerts` | 매매 알람 이력 (`days`) |
| GET | `/api/webai/trade-alert/monitor-set` | (워커) 감시대상 조립 = watchlistscreener보유 + session/params (X-WebAI-Key) |
| POST | `/api/webai/trade-alert/report` | (워커) 발화집합 수신 → edge diff → 신규만 텔레그램 push (X-WebAI-Key) |
### music-lab (music-lab/) ### music-lab (music-lab/)
듀얼 프로바이더 음악 생성(Suno + MusicGen) + YouTube 영상 자동화 파이프라인 + 시장 트렌드. 듀얼 프로바이더 음악 생성(Suno + MusicGen) + YouTube 영상 자동화 파이프라인 + 시장 트렌드.
@@ -266,6 +272,7 @@ docker compose up -d
| POST/GET | `/api/music/compile` (+ `/compiles/{id}/export`) | 컴파일 | | POST/GET | `/api/music/compile` (+ `/compiles/{id}/export`) | 컴파일 |
| POST/GET/DELETE | `/api/music/video-project` (+ `/{id}/render`, `/export`) | 영상 프로젝트 | | POST/GET/DELETE | `/api/music/video-project` (+ `/{id}/render`, `/export`) | 영상 프로젝트 |
| ALL | `/api/music/pipeline` (생성/start/feedback/cancel/publish/retry/telegram-msg/lookup) | YouTube 자동화 파이프라인. `POST /{id}/retry`=실패 step 재개(publish+업로드완료 시 409) | | ALL | `/api/music/pipeline` (생성/start/feedback/cancel/publish/retry/telegram-msg/lookup) | YouTube 자동화 파이프라인. `POST /{id}/retry`=실패 step 재개(publish+업로드완료 시 409) |
| DELETE | `/api/music/pipeline/{id}` | 파이프라인 행 하드 삭제(자식 jobs/feedback 포함, 전체 목록에서 제거). 없으면 404 |
| GET/PUT | `/api/music/setup` | 파이프라인 설정 | | GET/PUT | `/api/music/setup` | 파이프라인 설정 |
| GET | `/api/music/youtube/auth-url`, `/callback`, `/status`; POST `/disconnect` | YouTube OAuth | | GET | `/api/music/youtube/auth-url`, `/callback`, `/status`; POST `/disconnect` | YouTube OAuth |
| GET/POST/PUT/DELETE | `/api/music/revenue` (+ `/dashboard`) | 수익 기록 | | GET/POST/PUT/DELETE | `/api/music/revenue` (+ `/dashboard`) | 수익 기록 |
@@ -345,7 +352,7 @@ docker compose up -d
### agent-office (agent-office/) ### agent-office (agent-office/)
AI 에이전트 가상 오피스 — 기존 서비스 API를 프록시로 호출, 실시간 WebSocket + 텔레그램 봇. AI 에이전트 가상 오피스 — 기존 서비스 API를 프록시로 호출, 실시간 WebSocket + 텔레그램 봇.
- 핵심 파일: `main.py`, `db.py`, `config.py`, `websocket_manager.py`, `service_proxy.py`, `telegram_bot.py`, `scheduler.py`, `agents/`(stock/music/realestate/youtube/youtube_publisher/lotto/base) - 핵심 파일: `main.py`, `db.py`, `config.py`, `websocket_manager.py`, `service_proxy.py`, `telegram_bot.py`, `scheduler.py`, `node_monitor.py`(분산 워커 관측 집계+경보), `agents/`(stock/music/realestate/youtube/youtube_publisher/lotto/base)
- 에이전트 7종 레지스트리. 명령 API body 필드명 → `reference_agent_office_command_api.md` - 에이전트 7종 레지스트리. 명령 API body 필드명 → `reference_agent_office_command_api.md`
- 📌 상세(DB 9테이블·FSM·전체 cron 목록·AGENT_CONTAINER_MAP·텔레그램 캐싱·env): **`service_agent_office.md`** - 📌 상세(DB 9테이블·FSM·전체 cron 목록·AGENT_CONTAINER_MAP·텔레그램 캐싱·env): **`service_agent_office.md`**
@@ -360,11 +367,13 @@ AI 에이전트 가상 오피스 — 기존 서비스 API를 프록시로 호출
| POST | `/api/agent-office/telegram/webhook` | 텔레그램 Webhook (realestate_bookmark_* 콜백 포함) | | POST | `/api/agent-office/telegram/webhook` | 텔레그램 Webhook (realestate_bookmark_* 콜백 포함) |
| POST | `/api/agent-office/realestate/notify` | realestate-lab 전용 push 수신 → 텔레그램 | | POST | `/api/agent-office/realestate/notify` | realestate-lab 전용 push 수신 → 텔레그램 |
| GET | `/api/agent-office/states` | 전체 에이전트 상태 | | GET | `/api/agent-office/states` | 전체 에이전트 상태 |
| GET | `/api/agent-office/nodes` | 분산 워커(NAS↔Windows) 관측 — heartbeat 생사+큐깊이+dead-letter 집계 (web-ui `/infra` Three.js 시각화 소비). 상세 → `infra_distributed_workers.md` |
| GET | `/api/agent-office/activity` | 전 에이전트 통합 활동 피드 (tasks+logs UNION). 필터 `agent_id`/`type`(task\|log)/`status`/`days` + `limit`/`offset` | | GET | `/api/agent-office/activity` | 전 에이전트 통합 활동 피드 (tasks+logs UNION). 필터 `agent_id`/`type`(task\|log)/`status`/`days` + `limit`/`offset` |
| GET | `/api/agent-office/conversation/stats` | 텔레그램 대화 토큰·캐시 통계 (`days`) | | GET | `/api/agent-office/conversation/stats` | 텔레그램 대화 토큰·캐시 통계 (`days`) |
| POST/GET | `/api/agent-office/youtube/research` (+ `/status`) | YouTube 트렌드 수집 트리거/상태 | | POST/GET | `/api/agent-office/youtube/research` (+ `/status`) | YouTube 트렌드 수집 트리거/상태 |
| GET | `/api/agent-office/lotto/signals`, `/lotto/baselines` | 로또 시그널 이력·baseline | | GET | `/api/agent-office/lotto/signals`, `/lotto/baselines` | 로또 시그널 이력·baseline |
| POST | `/api/agent-office/lotto/signal-check` | 로또 시그널 평가 트리거 (light/sim/deep) | | POST | `/api/agent-office/lotto/signal-check` | 로또 시그널 평가 트리거 (light/sim/deep) |
| POST | `/api/agent-office/stock/trade-alert` | stock에서 push된 매매 알람 → 텔레그램(너+아내). 봇 명령 `/watch`·`/unwatch`·`/watchlist`로 watchlist 관리 |
### tarot-lab (tarot-lab/) ### tarot-lab (tarot-lab/)
타로 카드 해석 (Claude Sonnet, agent-office에서 2026-05-25 독립). 타로 카드 해석 (Claude Sonnet, agent-office에서 2026-05-25 독립).

View File

@@ -4,7 +4,12 @@ import logging
from .base import BaseAgent from .base import BaseAgent
from . import classify_intent from . import classify_intent
from .. import service_proxy from .. import service_proxy
from ..db import add_log from ..db import (
add_log,
get_notified_failed_pipelines,
add_notified_failed_pipeline,
prune_notified_failed_pipelines,
)
from ..telegram.messaging import send_raw from ..telegram.messaging import send_raw
logger = logging.getLogger("agent-office.youtube_publisher") logger = logging.getLogger("agent-office.youtube_publisher")
@@ -25,8 +30,9 @@ class YoutubePublisherAgent(BaseAgent):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
# 진행 중(*_pending) 승인 요청 dedup — 인메모리 유지(의도적).
# 재시작 시 살아있는 파이프라인 승인 재알림은 유용한 리마인더라 스팸 아님.
self._notified_state_per_pipeline: dict[int, tuple] = {} self._notified_state_per_pipeline: dict[int, tuple] = {}
self._notified_failed: set[int] = set()
async def poll_state_changes(self) -> None: async def poll_state_changes(self) -> None:
"""주기적으로 호출되어 *_pending 신규 진입 시 텔레그램 발송.""" """주기적으로 호출되어 *_pending 신규 진입 시 텔레그램 발송."""
@@ -52,18 +58,21 @@ class YoutubePublisherAgent(BaseAgent):
try: try:
failed = await service_proxy.list_failed_pipelines() failed = await service_proxy.list_failed_pipelines()
except Exception as e: except Exception as e:
# 일시적 폴링 실패를 "failed 없음"으로 오해하면 원장을 비워 재알림 스팸이 남.
# → 원장을 건드리지 않고 조용히 종료(다음 폴링에서 재시도).
logger.warning("failed 폴링 실패: %s", e) logger.warning("failed 폴링 실패: %s", e)
failed = [] return
notified = get_notified_failed_pipelines()
for p in failed: for p in failed:
pid = p.get("id") pid = p.get("id")
if pid is None: if pid is None:
continue continue
if pid not in self._notified_failed: if pid not in notified:
await self._notify_failed(p) await self._notify_failed(p)
self._notified_failed.add(pid) add_notified_failed_pipeline(pid)
# 재개되어 failed에서 벗어난 파이프라인은 재알림 가능하도록 해제 # 재개되어 failed에서 벗어난 파이프라인은 재알림 가능하도록 원장에서 제거
failed_ids = {p.get("id") for p in failed} failed_ids = {p.get("id") for p in failed if p.get("id") is not None}
self._notified_failed &= failed_ids prune_notified_failed_pipelines(failed_ids)
async def _notify_failed(self, p: dict) -> None: async def _notify_failed(self, p: dict) -> None:
reason = p.get("failed_reason") or "?" reason = p.get("failed_reason") or "?"

View File

@@ -51,3 +51,9 @@ AGENT_CONTAINER_MAP: dict[str, tuple[str, int, _re.Pattern]] = {
"insta": ("insta-lab", 8000, _re.compile(r"^/api/insta")), "insta": ("insta-lab", 8000, _re.compile(r"^/api/insta")),
"realestate": ("realestate-lab", 8000, _re.compile(r"^/api/realestate")), "realestate": ("realestate-lab", 8000, _re.compile(r"^/api/realestate")),
} }
# Redis (node monitor)
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379")
NODE_ALERT_DEADLETTER_THRESHOLD = int(os.getenv("NODE_ALERT_DEADLETTER_THRESHOLD", "1"))
# heartbeat TTL(45s)의 2배 — 키가 남아있어도 age>90s면 dead 판정
NODE_STALE_THRESHOLD_SEC = int(os.getenv("NODE_STALE_THRESHOLD_SEC", "90"))

View File

@@ -158,6 +158,12 @@ def init_db() -> None:
CREATE INDEX IF NOT EXISTS idx_tarot_favorite CREATE INDEX IF NOT EXISTS idx_tarot_favorite
ON tarot_readings(favorite, created_at DESC) ON tarot_readings(favorite, created_at DESC)
""") """)
conn.execute("""
CREATE TABLE IF NOT EXISTS notified_failed_pipelines (
pipeline_id INTEGER PRIMARY KEY,
notified_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
# Seed default agent configs # Seed default agent configs
for agent_id, name in [ for agent_id, name in [
("stock", "주식 트레이더"), ("stock", "주식 트레이더"),
@@ -826,6 +832,47 @@ def get_all_baselines() -> List[Dict[str, Any]]:
return out return out
# --- notified_failed_pipelines (파이프라인 실패 알림 dedup 원장, 재시작 지속) ---
def get_notified_failed_pipelines() -> set:
"""이미 실패 알림을 발송한 pipeline_id 집합."""
with _conn() as conn:
rows = conn.execute(
"SELECT pipeline_id FROM notified_failed_pipelines"
).fetchall()
return {r["pipeline_id"] for r in rows}
def add_notified_failed_pipeline(pipeline_id: int) -> None:
with _conn() as conn:
conn.execute(
"INSERT OR IGNORE INTO notified_failed_pipelines(pipeline_id) VALUES(?)",
(pipeline_id,),
)
def prune_notified_failed_pipelines(active_failed_ids) -> None:
"""현재 failed 목록에 없는 pipeline_id를 원장에서 제거.
재개되어 failed에서 벗어난 파이프라인이 다시 실패하면 재알림 가능하도록 함.
(기존 인메모리 `_notified_failed &= failed_ids`의 영속 버전)
"""
keep = set(active_failed_ids)
with _conn() as conn:
existing = {
r["pipeline_id"]
for r in conn.execute(
"SELECT pipeline_id FROM notified_failed_pipelines"
).fetchall()
}
stale = existing - keep
for pid in stale:
conn.execute(
"DELETE FROM notified_failed_pipelines WHERE pipeline_id=?",
(pid,),
)
def get_tasks_by_agent_date_kind(agent_id: str, date_iso: str, task_type: str) -> List[Dict[str, Any]]: def get_tasks_by_agent_date_kind(agent_id: str, date_iso: str, task_type: str) -> List[Dict[str, Any]]:
"""같은 (agent, date, task_type)으로 이미 생성된 task 조회. 멱등 guard.""" """같은 (agent, date, task_type)으로 이미 생성된 task 조회. 멱등 guard."""
with _conn() as conn: with _conn() as conn:

View File

@@ -187,6 +187,11 @@ async def telegram_webhook(data: dict):
def all_states(): def all_states():
return {"agents": get_all_agent_states()} return {"agents": get_all_agent_states()}
@app.get("/api/agent-office/nodes")
async def nodes_status():
from .node_monitor import collect_status
return await collect_status()
@app.get("/api/agent-office/agents/{agent_id}/token-usage") @app.get("/api/agent-office/agents/{agent_id}/token-usage")
def agent_token_usage(agent_id: str, days: int = 1): def agent_token_usage(agent_id: str, days: int = 1):
from .db import get_token_usage_stats from .db import get_token_usage_stats
@@ -273,3 +278,19 @@ async def trigger_signal_check(source: str = "light"):
if not agent: if not agent:
raise HTTPException(status_code=503, detail="lotto agent not registered") raise HTTPException(status_code=503, detail="lotto agent not registered")
return await agent.run_signal_check(source=source) return await agent.run_signal_check(source=source)
# --- Trade Alert Notify Endpoint ---
class TradeAlertBody(BaseModel):
alerts: List[Dict[str, Any]] = []
@app.post("/api/agent-office/stock/trade-alert")
async def stock_trade_alert(body: TradeAlertBody):
from .notifiers.telegram_trade import send_trade_alerts
from .db import add_log
res = await send_trade_alerts(body.alerts)
for a in body.alerts:
add_log("stock", f"매매알람 {a.get('kind')} {a.get('ticker')} {a.get('condition')}", "info")
return res

View File

@@ -0,0 +1,147 @@
"""분산 워커 상태 집계 (read-only). Global Constraints 계약 2 스키마 생성."""
from __future__ import annotations
import datetime as dt, json, logging
import redis.asyncio as aioredis
from .config import REDIS_URL, NODE_ALERT_DEADLETTER_THRESHOLD, NODE_STALE_THRESHOLD_SEC
logger = logging.getLogger("agent-office.node_monitor")
_node_state: dict[str, bool] = {} # name -> 직전 alive
_dl_notified: dict[str, int] = {} # name -> 직전 알린 dead_letter 수
WORKER_REGISTRY = [
{"name": "music-render", "kind": "render", "queue": "queue:music-render"},
{"name": "video-render", "kind": "render", "queue": "queue:video-render"},
{"name": "image-render", "kind": "render", "queue": "queue:image-render"},
{"name": "insta-render", "kind": "render", "queue": "queue:insta-render"},
{"name": "task-watcher", "kind": "watcher", "queue": None},
{"name": "ai_trade", "kind": "trader", "queue": None},
]
_redis = None
def _get_redis():
global _redis
if _redis is None:
_redis = aioredis.from_url(REDIS_URL, decode_responses=False)
return _redis
def _beat_age(ts_str, now):
try:
beat = dt.datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
return max(0, int((now - beat).total_seconds()))
except Exception:
return None
def _render_link_status(w):
if not w["alive"]:
return "down"
if w["state"] == "paused":
return "paused"
if w["dead_letter"] > 0:
return "degraded"
return "healthy"
async def collect_status(redis=None) -> dict:
r = redis or _get_redis()
now = dt.datetime.now(dt.timezone.utc)
out = {"redis_ok": True, "paused": False, "paused_reason": None,
"generated_at": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
"workers": [], "links": []}
try:
out["paused"] = (await r.get("queue:paused")) == b"1"
except Exception:
logger.exception("redis 접근 실패")
out["redis_ok"] = False
return out
for w in WORKER_REGISTRY:
try:
info = {"name": w["name"], "kind": w["kind"], "alive": False, "state": None,
"last_beat_age_s": None, "queue_depth": 0, "dead_letter": 0,
"processing": 0, "jobs_done": 0, "jobs_failed": 0, "last_job_at": None}
raw = await r.get(f"worker:{w['name']}:heartbeat")
if raw:
try:
hb = json.loads(raw)
age = _beat_age(hb.get("ts") or "", now)
info["last_beat_age_s"] = age
info["alive"] = age is not None and age <= NODE_STALE_THRESHOLD_SEC
info["state"] = hb.get("state")
info["jobs_done"] = hb.get("jobs_done", 0)
info["jobs_failed"] = hb.get("jobs_failed", 0)
info["last_job_at"] = hb.get("last_job_at")
if w["kind"] == "watcher" and hb.get("mode"):
out["paused_reason"] = hb["mode"]
except (json.JSONDecodeError, UnicodeDecodeError):
logger.warning("heartbeat JSON 파싱 실패 name=%s", w["name"])
if w["queue"]:
info["queue_depth"] = await r.llen(w["queue"])
info["dead_letter"] = await r.llen(f"dead_letter:{w['queue']}")
proc = 0
async for key in r.scan_iter(match=f"processing:{w['queue']}:*"):
proc += await r.llen(key)
info["processing"] = proc
out["workers"].append(info)
except Exception:
logger.exception("워커 상태 수집 실패 name=%s", w["name"])
out["redis_ok"] = False
break
for w in out["workers"]:
if w["kind"] == "trader":
out["links"].append({"from": "ai_trade", "to": "nas-stock", "type": "http-pull",
"status": "healthy" if w["alive"] else "down"})
elif w["kind"] == "render":
out["links"].append({"from": "nas", "to": w["name"], "type": "redis-queue",
"status": _render_link_status(w)})
if out["paused"] and not out["paused_reason"]:
out["paused_reason"] = "trading"
return out
async def check_and_alert(status=None) -> list[str]:
"""워커 상태를 점검해 다운/복구/dead-letter 전이를 텔레그램으로 경보한다.
첫 관측(prev=None)엔 경보 없음 — 부팅 시 false alarm 방지.
반환값: 실제로 전송된 경보 텍스트 목록 (테스트용).
"""
from .telegram.messaging import send_raw
from .db import add_log
try:
st = status or await collect_status()
except Exception:
logger.exception("collect_status 예외")
return []
sent: list[str] = []
for w in st["workers"]:
name = w["name"]
alive = w.get("alive", False)
prev = _node_state.get(name)
transition_send_failed = False
if prev is True and not alive:
text = f"🔴 [{name}] 워커 다운"
if (await send_raw(text=text)).get("ok"):
add_log("node_monitor", f"{name} 다운", "warning"); sent.append(text)
else:
transition_send_failed = True
elif prev is False and alive:
text = f"🟢 [{name}] 워커 복구"
if (await send_raw(text=text)).get("ok"):
add_log("node_monitor", f"{name} 복구", "info"); sent.append(text)
else:
transition_send_failed = True
if not transition_send_failed:
_node_state[name] = alive
dl = w.get("dead_letter", 0)
if dl >= NODE_ALERT_DEADLETTER_THRESHOLD and dl != _dl_notified.get(name, 0):
text = f"❌ [{name}] 실패 누적 {dl}건 (dead-letter)"
if (await send_raw(text=text)).get("ok"):
add_log("node_monitor", f"{name} dead-letter {dl}", "warning")
sent.append(text)
_dl_notified[name] = dl
elif dl == 0:
_dl_notified.pop(name, None)
return sent

View File

@@ -0,0 +1,45 @@
"""매매 알람 텔레그램 포맷+전송 (본인+아내 각각)."""
import logging
from typing import Any, Dict, List
from ..telegram.messaging import send_raw
from ..config import TELEGRAM_CHAT_ID, TELEGRAM_WIFE_CHAT_ID
logger = logging.getLogger("agent-office")
_KIND_LABEL = {"buy": "🟢 매수", "sell": "🔴 매도"}
_COND_LABEL = {
"buy_ma20_pullback": "지지선 되돌림", "buy_breakout": "돌파", "buy_rsi_bounce": "RSI 과매도 반등",
"sell_stop_loss": "손절", "sell_ma_break": "이평 이탈", "sell_take_profit": "익절",
"sell_climax": "급등 소진", "sell_trailing_stop": "트레일링 스톱",
}
def format_trade_alert(a: Dict[str, Any]) -> str:
kind = _KIND_LABEL.get(a["kind"], a["kind"])
cond = _COND_LABEL.get(a["condition"], a["condition"])
name = a.get("name") or a["ticker"]
price = a.get("price")
price_s = f"{int(price):,}" if price else "-"
return f"{kind} 알람\n<b>{name}</b> ({a['ticker']})\n조건: {cond}\n현재가: {price_s}"
async def send_trade_alerts(alerts: List[Dict[str, Any]]) -> dict:
"""알람마다 본인+아내 chat_id 각각으로 send_raw. 실패해도 계속 진행."""
sent = 0
all_ok = True
chat_ids = [c for c in (TELEGRAM_CHAT_ID, TELEGRAM_WIFE_CHAT_ID) if c]
for a in alerts:
text = format_trade_alert(a)
for cid in chat_ids:
try:
r = await send_raw(text, chat_id=cid)
except Exception as e:
logger.warning(f"[telegram_trade] send failed (chat_id={cid}): {e}")
all_ok = False
continue
if r.get("ok"):
sent += 1
else:
all_ok = False
return {"sent": sent, "ok": all_ok}

View File

@@ -4,6 +4,7 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler
from .agents import AGENT_REGISTRY from .agents import AGENT_REGISTRY
from .db import delete_old_logs from .db import delete_old_logs
from . import node_monitor
scheduler = AsyncIOScheduler(timezone="Asia/Seoul") scheduler = AsyncIOScheduler(timezone="Asia/Seoul")
@@ -98,6 +99,9 @@ async def _poll_pipelines():
if agent: if agent:
await agent.poll_state_changes() await agent.poll_state_changes()
async def _run_node_health_check():
await node_monitor.check_and_alert()
def _cleanup_old_logs(): def _cleanup_old_logs():
n = delete_old_logs(days=90) n = delete_old_logs(days=90)
if n: if n:
@@ -142,5 +146,6 @@ def init_scheduler():
scheduler.add_job(_run_youtube_research, "cron", hour=9, minute=10, id="youtube_research") scheduler.add_job(_run_youtube_research, "cron", hour=9, minute=10, id="youtube_research")
scheduler.add_job(_send_youtube_weekly_report, "cron", day_of_week="mon", hour=8, minute=0, id="youtube_weekly_report") scheduler.add_job(_send_youtube_weekly_report, "cron", day_of_week="mon", hour=8, minute=0, id="youtube_weekly_report")
scheduler.add_job(_poll_pipelines, "interval", seconds=30, id="pipeline_poll") scheduler.add_job(_poll_pipelines, "interval", seconds=30, id="pipeline_poll")
scheduler.add_job(_run_node_health_check, "interval", seconds=60, id="node_health_check", replace_existing=True)
scheduler.add_job(_cleanup_old_logs, "cron", hour=3, minute=0, id="cleanup_old_logs", replace_existing=True) scheduler.add_job(_cleanup_old_logs, "cron", hour=3, minute=0, id="cleanup_old_logs", replace_existing=True)
scheduler.start() scheduler.start()

View File

@@ -111,6 +111,29 @@ async def stock_holdings_brief() -> Dict[str, Any]:
return resp.json() return resp.json()
# --- stock watchlist (실시간 매매 알람) ---
async def watchlist_add(ticker: str) -> Dict[str, Any]:
"""stock의 관심종목 추가 (POST, 이미 존재하면 멱등하게 갱신)."""
resp = await _client.post(f"{STOCK_URL}/api/stock/watchlist", json={"ticker": ticker})
resp.raise_for_status()
return resp.json()
async def watchlist_remove(ticker: str) -> Dict[str, Any]:
"""stock의 관심종목 삭제."""
resp = await _client.delete(f"{STOCK_URL}/api/stock/watchlist/{ticker}")
resp.raise_for_status()
return resp.json()
async def watchlist_list() -> Dict[str, Any]:
"""stock의 관심종목 목록 조회 → {"watchlist": [...]}."""
resp = await _client.get(f"{STOCK_URL}/api/stock/watchlist")
resp.raise_for_status()
return resp.json()
async def generate_music(payload: dict) -> Dict[str, Any]: async def generate_music(payload: dict) -> Dict[str, Any]:
resp = await _client.post(f"{MUSIC_LAB_URL}/api/music/generate", json=payload) resp = await _client.post(f"{MUSIC_LAB_URL}/api/music/generate", json=payload)
resp.raise_for_status() resp.raise_for_status()

View File

@@ -1,6 +1,7 @@
"""텔레그램 Webhook 이벤트 처리.""" """텔레그램 Webhook 이벤트 처리."""
from typing import Optional from typing import Optional
from .. import service_proxy
from ..db import get_telegram_callback, mark_telegram_responded from ..db import get_telegram_callback, mark_telegram_responded
from .client import _enabled, api_call from .client import _enabled, api_call
@@ -23,12 +24,43 @@ async def handle_webhook(data: dict, agent_dispatcher=None) -> Optional[dict]:
if message: if message:
chat = message.get("chat", {}) chat = message.get("chat", {})
print(f"[TG-WEBHOOK] chat.id={chat.get('id')} type={chat.get('type')} text={message.get('text')!r}", flush=True) print(f"[TG-WEBHOOK] chat.id={chat.get('id')} type={chat.get('type')} text={message.get('text')!r}", flush=True)
if message and message.get("text"):
if await handle_watch_command(message):
return None
if message and message.get("text") and agent_dispatcher is not None: if message and message.get("text") and agent_dispatcher is not None:
return await _handle_message(message, agent_dispatcher) return await _handle_message(message, agent_dispatcher)
return None return None
async def handle_watch_command(message: dict) -> bool:
"""/watch /unwatch /watchlist 명령을 처리해 stock watchlist API로 프록시.
처리했으면(응답 전송 포함) True, 매칭되지 않는 텍스트면 False."""
text = (message.get("text") or "").strip()
chat_id = message.get("chat", {}).get("id")
parts = text.split()
cmd = parts[0].lower() if parts else ""
if cmd == "/watch" and len(parts) >= 2:
await service_proxy.watchlist_add(parts[1])
reply = f"관심종목 추가: {parts[1]}"
elif cmd == "/unwatch" and len(parts) >= 2:
await service_proxy.watchlist_remove(parts[1])
reply = f"관심종목 삭제: {parts[1]}"
elif cmd == "/watchlist":
res = await service_proxy.watchlist_list()
items = res.get("watchlist", [])
reply = "관심종목:\n" + (
"\n".join(f"- {w.get('name') or ''} ({w['ticker']})" for w in items) or "(없음)"
)
else:
return False
await api_call("sendMessage", {"chat_id": chat_id, "text": reply})
return True
async def _handle_callback(callback_query: dict) -> Optional[dict]: async def _handle_callback(callback_query: dict) -> Optional[dict]:
"""승인/거절 및 realestate 북마크 콜백 처리.""" """승인/거절 및 realestate 북마크 콜백 처리."""
callback_id = callback_query.get("data", "") callback_id = callback_query.get("data", "")

View File

@@ -7,3 +7,4 @@ respx>=0.21
pytest-asyncio>=0.23 pytest-asyncio>=0.23
google-api-python-client>=2.100.0 google-api-python-client>=2.100.0
pytrends>=4.9.2 pytrends>=4.9.2
redis>=5.0

View File

@@ -0,0 +1,196 @@
# agent-office/tests/test_node_monitor.py
import datetime as dt
import json, pytest
from app import node_monitor
import app.node_monitor as nm
class FakeRedis:
"""worker heartbeat + queue llen + scan_iter 흉내."""
def __init__(self, kv=None, lists=None):
self._kv = kv or {} # key(str) -> bytes
self._lists = lists or {} # key(str) -> length(int)
async def get(self, key):
return self._kv.get(key)
async def llen(self, key):
return self._lists.get(key, 0)
async def scan_iter(self, match=None):
prefix = match.rstrip("*")
for k in list(self._lists):
if k.startswith(prefix):
yield k
def _hb(name, kind, state, ts=None, **extra):
"""heartbeat 페이로드 생성. ts 기본값은 현재 시각(신선한 heartbeat)."""
if ts is None:
ts = dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
return json.dumps({"name": name, "kind": kind, "state": state, "ts": ts,
"last_job_at": None, "jobs_done": 0, "jobs_failed": 0, **extra}).encode()
@pytest.mark.asyncio
async def test_alive_worker_healthy_link():
r = FakeRedis(kv={"worker:image-render:heartbeat": _hb("image-render","render","idle")})
st = await node_monitor.collect_status(redis=r)
img = next(w for w in st["workers"] if w["name"] == "image-render")
assert img["alive"] is True and img["state"] == "idle"
link = next(l for l in st["links"] if l["to"] == "image-render")
assert link["status"] == "healthy" and link["type"] == "redis-queue"
@pytest.mark.asyncio
async def test_missing_heartbeat_is_dead_and_down():
r = FakeRedis() # heartbeat 없음
st = await node_monitor.collect_status(redis=r)
img = next(w for w in st["workers"] if w["name"] == "image-render")
assert img["alive"] is False
link = next(l for l in st["links"] if l["to"] == "image-render")
assert link["status"] == "down"
@pytest.mark.asyncio
async def test_dead_letter_makes_degraded():
r = FakeRedis(kv={"worker:video-render:heartbeat": _hb("video-render","render","idle")},
lists={"dead_letter:queue:video-render": 2})
st = await node_monitor.collect_status(redis=r)
vid = next(w for w in st["workers"] if w["name"] == "video-render")
assert vid["dead_letter"] == 2
link = next(l for l in st["links"] if l["to"] == "video-render")
assert link["status"] == "degraded"
@pytest.mark.asyncio
async def test_paused_reason_from_watcher():
r = FakeRedis(kv={"queue:paused": b"1",
"worker:task-watcher:heartbeat": _hb("task-watcher","watcher","trading",mode="trading")})
st = await node_monitor.collect_status(redis=r)
assert st["paused"] is True and st["paused_reason"] == "trading"
@pytest.mark.asyncio
async def test_trader_http_pull_link():
r = FakeRedis(kv={"worker:ai_trade:heartbeat": _hb("ai_trade","trader","market_open")})
st = await node_monitor.collect_status(redis=r)
link = next(l for l in st["links"] if l["from"] == "ai_trade")
assert link["type"] == "http-pull" and link["status"] == "healthy"
@pytest.mark.asyncio
async def test_paused_no_watcher_heartbeat_fallback_reason():
"""paused=True인데 watcher heartbeat 없으면 paused_reason == 'trading' 폴백."""
r = FakeRedis(kv={"queue:paused": b"1"}) # watcher heartbeat 없음
st = await node_monitor.collect_status(redis=r)
assert st["paused"] is True
assert st["paused_reason"] == "trading"
@pytest.mark.asyncio
async def test_processing_count_image_render():
"""processing:<queue>:<worker_id> 리스트가 있으면 processing 필드에 합산된다."""
worker_id = "abc123"
proc_key = f"processing:queue:image-render:{worker_id}"
r = FakeRedis(
kv={"worker:image-render:heartbeat": _hb("image-render", "render", "busy")},
lists={proc_key: 3},
)
st = await node_monitor.collect_status(redis=r)
img = next(w for w in st["workers"] if w["name"] == "image-render")
assert img["processing"] == 3
@pytest.mark.asyncio
async def test_llen_exception_returns_redis_ok_false():
"""워커 루프 중 llen 예외 발생 시 예외를 전파하지 않고 redis_ok=False 반환 (Blocker 회귀)."""
class BrokenLlenRedis(FakeRedis):
async def llen(self, key):
raise ConnectionError("Redis 연결 끊김")
r = BrokenLlenRedis(
kv={"worker:music-render:heartbeat": _hb("music-render", "render", "idle")}
)
st = await node_monitor.collect_status(redis=r)
assert st["redis_ok"] is False
@pytest.mark.asyncio
async def test_alert_on_alive_to_dead(monkeypatch):
sent = []
async def fake_send_raw(text, **kw): sent.append(text); return {"ok": True}
monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw)
monkeypatch.setattr("app.db.add_log", lambda *a, **k: None)
nm._node_state.clear(); nm._dl_notified.clear()
alive = {"workers": [{"name":"image-render","alive":True,"dead_letter":0}], "links": []}
dead = {"workers": [{"name":"image-render","alive":False,"dead_letter":0}], "links": []}
await nm.check_and_alert(status=alive) # 첫 관측 — 경보 없음
assert sent == []
await nm.check_and_alert(status=dead) # alive→dead 전이
assert any("다운" in t for t in sent)
@pytest.mark.asyncio
async def test_alert_on_dead_letter_growth(monkeypatch):
sent = []
async def fake_send_raw(text, **kw): sent.append(text); return {"ok": True}
monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw)
monkeypatch.setattr("app.db.add_log", lambda *a, **k: None)
nm._node_state.clear(); nm._dl_notified.clear()
s = {"workers": [{"name":"video-render","alive":True,"dead_letter":2}], "links": []}
await nm.check_and_alert(status=s)
assert any("dead-letter" in t for t in sent)
@pytest.mark.asyncio
async def test_dl_notified_not_updated_on_telegram_failure(monkeypatch):
"""텔레그램 실패(ok=False) 시 _dl_notified 갱신 안 됨 → 다음 사이클에서 재시도."""
calls = []
async def fake_send_raw(text, **kw):
calls.append(text)
if len(calls) == 1:
return {"ok": False} # 첫 호출: 텔레그램 다운
return {"ok": True} # 두 번째 호출: 성공
monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw)
monkeypatch.setattr("app.db.add_log", lambda *a, **k: None)
nm._node_state.clear(); nm._dl_notified.clear()
s = {"workers": [{"name": "video-render", "alive": True, "dead_letter": 2}], "links": []}
# 첫 호출: 텔레그램 다운 → ok=False → _dl_notified 갱신 안 됨
result1 = await nm.check_and_alert(status=s)
assert result1 == []
assert nm._dl_notified.get("video-render", 0) == 0
# 두 번째 호출: 같은 dl=2 → _dl_notified 미갱신으로 조건 재만족 → 재시도 발송
result2 = await nm.check_and_alert(status=s)
assert any("dead-letter" in t for t in result2)
assert nm._dl_notified.get("video-render") == 2
# ── I1: staleness 판정 신규 테스트 ─────────────────────────────────────────────
@pytest.mark.asyncio
async def test_stale_heartbeat_is_dead():
"""heartbeat 키가 존재해도 ts가 90s 초과면 alive=False (staleness 판정)."""
stale_ts = (dt.datetime.now(dt.timezone.utc) - dt.timedelta(seconds=300)).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
r = FakeRedis(kv={"worker:image-render:heartbeat": _hb("image-render", "render", "idle", ts=stale_ts)})
st = await node_monitor.collect_status(redis=r)
img = next(w for w in st["workers"] if w["name"] == "image-render")
assert img["alive"] is False
link = next(l for l in st["links"] if l["to"] == "image-render")
assert link["status"] == "down"
# ── I2: 전이 발송 실패 시 재시도 회귀 테스트 ──────────────────────────────────
@pytest.mark.asyncio
async def test_transition_send_failure_retries_next_cycle(monkeypatch):
"""alive→dead 전이 시 send_raw 실패하면 _node_state 갱신 안 됨 → 다음 사이클 재시도."""
calls = []
async def fake_send_raw(text, **kw):
calls.append(text)
if len(calls) == 1:
return {"ok": False} # 첫 호출: 텔레그램 다운
return {"ok": True} # 두 번째 호출: 성공
monkeypatch.setattr("app.telegram.messaging.send_raw", fake_send_raw)
monkeypatch.setattr("app.db.add_log", lambda *a, **k: None)
nm._node_state.clear(); nm._dl_notified.clear()
alive = {"workers": [{"name": "music-render", "alive": True, "dead_letter": 0}], "links": []}
dead = {"workers": [{"name": "music-render", "alive": False, "dead_letter": 0}], "links": []}
# 첫 관측: baseline 설정(전이 없음)
await nm.check_and_alert(status=alive)
assert nm._node_state.get("music-render") is True
# alive→dead 전이, send_raw 실패 → _node_state 갱신 안 됨
result1 = await nm.check_and_alert(status=dead)
assert result1 == [] # 경보 미발송
assert nm._node_state.get("music-render") is True # 여전히 True
# 두 번째 사이클: 동일 dead, send_raw 성공 → 경보 발송
result2 = await nm.check_and_alert(status=dead)
assert any("다운" in t for t in result2)
assert nm._node_state.get("music-render") is False # 이제 갱신

View File

@@ -0,0 +1,18 @@
# agent-office/tests/test_nodes_endpoint.py
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def client(monkeypatch):
from app import main
async def fake_collect(redis=None):
return {"redis_ok": True, "paused": False, "paused_reason": None,
"generated_at": "2026-06-29T00:00:00Z", "workers": [], "links": []}
monkeypatch.setattr("app.node_monitor.collect_status", fake_collect)
return TestClient(main.app)
def test_nodes_endpoint_returns_contract(client):
resp = client.get("/api/agent-office/nodes")
assert resp.status_code == 200
body = resp.json()
assert set(["redis_ok","paused","workers","links"]).issubset(body)

View File

@@ -0,0 +1,55 @@
import os
import sys
import tempfile
_fd, _TMP = tempfile.mkstemp(suffix=".db")
os.close(_fd)
os.unlink(_TMP)
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import pytest
from unittest.mock import AsyncMock, patch
@pytest.fixture(autouse=True)
def _init_db(monkeypatch):
import gc
gc.collect()
# config.DB_PATH는 첫 import 시 1회 고정되므로, 다른 테스트 파일과 조합 실행 시
# db가 이 파일의 _TMP가 아닌 다른 경로를 쓸 수 있다. db.DB_PATH를 이 파일 전용으로
# 강제해 영속 테이블의 테스트 간 누수를 결정적으로 차단.
import app.db as _db
monkeypatch.setattr(_db, "DB_PATH", _TMP)
# WAL 사이드카(-wal/-shm)까지 지워야 영속 상태가 남지 않음
for suffix in ("", "-wal", "-shm"):
p = _TMP + suffix
if os.path.exists(p):
os.remove(p)
_db.init_db()
yield
gc.collect()
@pytest.mark.asyncio
async def test_send_trade_alerts_to_user_and_wife():
from app.notifiers import telegram_trade
alerts = [{"ticker": "005930", "name": "삼성전자", "kind": "buy",
"condition": "buy_breakout", "price": 71500, "detail": {}}]
with patch("app.notifiers.telegram_trade.send_raw",
new=AsyncMock(return_value={"ok": True})) as m, \
patch("app.notifiers.telegram_trade.TELEGRAM_CHAT_ID", "U"), \
patch("app.notifiers.telegram_trade.TELEGRAM_WIFE_CHAT_ID", "W"):
res = await telegram_trade.send_trade_alerts(alerts)
assert res["ok"] is True
chat_ids = {c.kwargs.get("chat_id") for c in m.await_args_list}
assert chat_ids == {"U", "W"} # 둘 다 발송
@pytest.mark.asyncio
async def test_format_trade_alert_has_direction():
from app.notifiers.telegram_trade import format_trade_alert
txt = format_trade_alert({"ticker": "005930", "name": "삼성전자", "kind": "sell",
"condition": "sell_stop_loss", "price": 60000, "detail": {}})
assert "매도" in txt and "삼성전자" in txt

View File

@@ -0,0 +1,93 @@
import os
import sys
import tempfile
_fd, _TMP = tempfile.mkstemp(suffix=".db")
os.close(_fd)
os.unlink(_TMP)
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import pytest
from unittest.mock import AsyncMock, patch
@pytest.fixture(autouse=True)
def _init_db(monkeypatch):
import gc
gc.collect()
# config.DB_PATH는 첫 import 시 1회 고정되므로, 다른 테스트 파일과 조합 실행 시
# db가 이 파일의 _TMP가 아닌 다른 경로를 쓸 수 있다. db.DB_PATH를 이 파일 전용으로
# 강제해 영속 테이블의 테스트 간 누수를 결정적으로 차단.
import app.db as _db
monkeypatch.setattr(_db, "DB_PATH", _TMP)
for suffix in ("", "-wal", "-shm"):
p = _TMP + suffix
if os.path.exists(p):
os.remove(p)
_db.init_db()
yield
gc.collect()
@pytest.mark.asyncio
async def test_watch_command_calls_add():
from app.telegram import webhook
msg = {"chat": {"id": 1}, "text": "/watch 005930"}
with patch("app.telegram.webhook.service_proxy.watchlist_add",
new=AsyncMock(return_value={"ok": True})) as m, \
patch("app.telegram.webhook.api_call", new=AsyncMock(return_value={"ok": True})):
handled = await webhook.handle_watch_command(msg)
assert handled is True
m.assert_awaited_once_with("005930")
@pytest.mark.asyncio
async def test_non_watch_text_ignored():
from app.telegram import webhook
msg = {"chat": {"id": 1}, "text": "안녕"}
assert await webhook.handle_watch_command(msg) is False
@pytest.mark.asyncio
async def test_unwatch_command_calls_remove():
from app.telegram import webhook
msg = {"chat": {"id": 1}, "text": "/unwatch 005930"}
with patch("app.telegram.webhook.service_proxy.watchlist_remove",
new=AsyncMock(return_value={"ok": True})) as m, \
patch("app.telegram.webhook.api_call", new=AsyncMock(return_value={"ok": True})) as sent:
handled = await webhook.handle_watch_command(msg)
assert handled is True
m.assert_awaited_once_with("005930")
sent.assert_awaited_once()
@pytest.mark.asyncio
async def test_watchlist_command_calls_list_and_formats_items():
from app.telegram import webhook
msg = {"chat": {"id": 1}, "text": "/watchlist"}
items = {"watchlist": [{"ticker": "005930", "name": "삼성전자"}]}
with patch("app.telegram.webhook.service_proxy.watchlist_list",
new=AsyncMock(return_value=items)) as m, \
patch("app.telegram.webhook.api_call", new=AsyncMock(return_value={"ok": True})) as sent:
handled = await webhook.handle_watch_command(msg)
assert handled is True
m.assert_awaited_once_with()
text = sent.await_args.args[1]["text"]
assert "005930" in text and "삼성전자" in text
@pytest.mark.asyncio
async def test_watch_command_reaches_handle_webhook_before_slash_dispatch():
"""handle_webhook이 /watch 를 agent_dispatcher 호출 전에 가로채야 한다."""
from app.telegram import webhook
data = {"message": {"chat": {"id": 1}, "text": "/watch 005930"}}
dispatcher = AsyncMock(side_effect=AssertionError("agent_dispatcher가 호출되면 안 됨"))
with patch("app.telegram.webhook.service_proxy.watchlist_add",
new=AsyncMock(return_value={"ok": True})) as m, \
patch("app.telegram.webhook.api_call", new=AsyncMock(return_value={"ok": True})):
result = await webhook.handle_webhook(data, agent_dispatcher=dispatcher)
assert result is None
m.assert_awaited_once_with("005930")
dispatcher.assert_not_awaited()

View File

@@ -14,13 +14,20 @@ from unittest.mock import AsyncMock, patch
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def _init_db(): def _init_db(monkeypatch):
import gc import gc
gc.collect() gc.collect()
if os.path.exists(_TMP): # config.DB_PATH는 첫 import 시 1회 고정되므로, 다른 테스트 파일과 조합 실행 시
os.remove(_TMP) # db가 이 파일의 _TMP가 아닌 다른 경로를 쓸 수 있다. db.DB_PATH를 이 파일 전용으로
from app.db import init_db # 강제해 영속 테이블(notified_failed_pipelines 등)의 테스트 간 누수를 결정적으로 차단.
init_db() import app.db as _db
monkeypatch.setattr(_db, "DB_PATH", _TMP)
# WAL 사이드카(-wal/-shm)까지 지워야 영속 상태가 남지 않음
for suffix in ("", "-wal", "-shm"):
p = _TMP + suffix
if os.path.exists(p):
os.remove(p)
_db.init_db()
yield yield
gc.collect() gc.collect()
@@ -211,3 +218,70 @@ async def test_failed_poll_exception_is_silent():
# active 알림은 정상 발송 # active 알림은 정상 발송
assert sent.await_count == 1 assert sent.await_count == 1
@pytest.mark.asyncio
async def test_failed_notification_persists_across_restart():
"""컨테이너 재시작(새 에이전트 인스턴스)해도 이미 알린 failed는 재알림하지 않음."""
from app.agents.youtube_publisher import YoutubePublisherAgent
failed_pipeline = {
"id": 3,
"state": "failed",
"failed_reason": "video: timeout",
"track_title": "beat music v2",
}
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
with patch(
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
new=AsyncMock(return_value=[]),
), patch(
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
new=AsyncMock(return_value=[failed_pipeline]),
), patch(
"app.agents.youtube_publisher.send_raw",
new=sent,
):
agent1 = YoutubePublisherAgent()
await agent1.poll_state_changes()
# 컨테이너 재시작 시뮬레이션: 완전히 새로운 인스턴스(인메모리 상태 소실)
agent2 = YoutubePublisherAgent()
await agent2.poll_state_changes()
# 재시작해도 DB 원장으로 중복 방지 → 1회만 알림
assert sent.await_count == 1
@pytest.mark.asyncio
async def test_transient_failed_poll_keeps_ledger():
"""failed 폴링이 일시적으로 예외를 던져도 원장을 비우지 않아 다음 폴링에서 재알림하지 않음."""
from app.agents.youtube_publisher import YoutubePublisherAgent
failed_pipeline = {
"id": 3,
"state": "failed",
"failed_reason": "video: timeout",
"track_title": "beat music v2",
}
list_failed = AsyncMock(
side_effect=[[failed_pipeline], Exception("boom"), [failed_pipeline]]
)
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
with patch(
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
new=AsyncMock(return_value=[]),
), patch(
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
new=list_failed,
), patch(
"app.agents.youtube_publisher.send_raw",
new=sent,
):
agent = YoutubePublisherAgent()
await agent.poll_state_changes() # #3 최초 알림
await agent.poll_state_changes() # 예외 → 원장 유지되어야 (섣부른 정리 금지)
await agent.poll_state_changes() # #3 여전히 failed → 재알림 없어야
assert sent.await_count == 1

View File

@@ -268,6 +268,7 @@ services:
- CONVERSATION_HISTORY_LIMIT=${CONVERSATION_HISTORY_LIMIT:-20} - CONVERSATION_HISTORY_LIMIT=${CONVERSATION_HISTORY_LIMIT:-20}
- CONVERSATION_RATE_PER_MIN=${CONVERSATION_RATE_PER_MIN:-6} - CONVERSATION_RATE_PER_MIN=${CONVERSATION_RATE_PER_MIN:-6}
- YOUTUBE_DATA_API_KEY=${YOUTUBE_DATA_API_KEY:-} - YOUTUBE_DATA_API_KEY=${YOUTUBE_DATA_API_KEY:-}
- REDIS_URL=${REDIS_URL:-redis://redis:6379}
volumes: volumes:
- ${RUNTIME_PATH:-.}/data/agent-office:/app/data - ${RUNTIME_PATH:-.}/data/agent-office:/app/data
depends_on: depends_on:
@@ -275,6 +276,7 @@ services:
- music-lab - music-lab
- insta-lab - insta-lab
- realestate-lab - realestate-lab
- redis
healthcheck: healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"] test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
interval: 60s interval: 60s
@@ -462,7 +464,7 @@ services:
- "6379:6379" - "6379:6379"
volumes: volumes:
- ${RUNTIME_PATH}/redis-data:/data - ${RUNTIME_PATH}/redis-data:/data
command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru command: redis-server --appendonly yes --save "" --stop-writes-on-bgsave-error no --maxmemory 256mb --maxmemory-policy allkeys-lru
healthcheck: healthcheck:
test: ["CMD", "redis-cli", "ping"] test: ["CMD", "redis-cli", "ping"]
interval: 60s interval: 60s

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,207 @@
# 분산 워커 관측 시스템 (Distributed Worker Observability) — 설계 문서
> 작성일: 2026-06-29 · 작성 세션: BE (web-backend 소유)
> 대상 repo 3종: `web-ai`(워커) · `web-backend`(NAS 집계/경보) · `web-ui`(Three.js 대시보드)
---
## 1. 문제 정의 (Problem)
NAS 백엔드의 음악/영상/이미지/인스타 생성은 **무거운 작업을 Windows AI 머신(192.168.45.59)의 WSL2 Docker 워커**에 위임한다. NAS 게이트웨이(`music/video/image/insta-lab`)가 Redis 큐(`queue:<svc>-render`)에 job을 push하면, Windows 워커가 BLMOVE로 꺼내 처리하고 `/api/internal/<svc>/update` webhook으로 결과를 회신한다. 트레이딩봇 `ai_trade`(:8001)는 별도로 NAS stock(:18500)에서 HTTP pull을 한다.
**핵심 문제: 이 분산 워커들이 살아있는지 NAS·사용자가 알 길이 없다.**
- 각 워커에 로컬 `/health` 엔드포인트가 있으나 Windows 머신 안에서만 접근 가능.
- 실제 사고: `insta-render` 워커가 redis 블로킹 read 버그로 **2026-05-22 ~ 06-08 약 2주간 사일런트로 죽어 있었고**(모든 슬레이트 draft 정지) 아무도 몰랐다. 일감이 없을 때의 "한가함"과 "죽음"을 구분할 수단이 없었던 것이 근본 원인.
## 2. 목표 / 비목표 (Goals / Non-goals)
**목표 (Phase 1)**
- G1. 6개 워커(`music/video/image/insta-render` + `task-watcher` + `ai_trade`)의 생사·상태를 NAS에서 인지.
- G2. 큐 깊이·실패(dead-letter)·고아작업(processing)·일시정지(paused) 상태를 집계.
- G3. 상태 전이(다운/복구/실패누적)를 텔레그램으로 자동 경보.
- G4. web-ui 신규 페이지 `/infra`에서 NAS↔Windows 파이프라인을 **Three.js로 시각화** — 정상이면 통신이 흐르는 애니메이션, 장애면 해당 구간을 끊김/빨강으로 표시.
**비목표 (Phase 2 이후로 보류)**
- 원격 제어(워커 재시작, 큐 pause/resume, dead-letter 재처리) — Windows 머신 제어가 필요해 보안·구현 복잡도 큼.
- GPU 사용률(VRAM) 모니터링, stuck-task 자동 감지, WebSocket 라이브 푸시.
- 다중 노드 확장(현재 Windows 노드 1대).
## 3. 아키텍처 & 토폴로지
```
web-backend (NAS, 192.168.45.54) Windows 노드 (192.168.45.59)
┌──────────────────────────────────┐ ┌────────────────────────────────────┐
│ music-lab ─┐ │ ① job │ WSL2 Docker: │
│ video-lab ─┤ │ push │ ┌─ music-render │
│ image-lab ─┼─► [ Redis 큐 버스 ]═╪══════════╪══►├─ video-render (ReliableQueue) │
│ insta-lab ─┘ queue:*-render │ │ ├─ image-render │
│ queue:paused │◄═════════╪═══├─ insta-render │
│ │ ② webhook│ └─ task-watcher (paused 토글) │
│ agent-office │◄─────────╪── 각 워커 → worker:<name>:heartbeat│
│ ├─ node_monitor (집계) │◄─heartbeat (Redis SET, TTL 45s) │
│ └─ scheduler (1분 경보 cron) │ │ │
│ │ │ Windows 호스트(WSL 밖): │
│ stock (:18500) ◄── HTTP pull ────╪──────────╪── ai_trade (:8001) ─ heartbeat ───►│
└──────────────┬───────────────────┘ └────────────────────────────────────┘
│ GET /api/agent-office/nodes (FE 2~3초 폴링)
web-ui /infra ← Three.js 파이프라인 시각화
```
**설계 기반(이미 존재하는 자산)**
- 워커들은 이미 NAS Redis(`redis://192.168.45.54:6379`)에 BLMOVE로 연결 → heartbeat도 같은 Redis에 SET하면 방화벽/인바운드 포트 불필요, `queue:paused`여도 heartbeat는 계속 뛰므로 "정지 중이지만 살아있음"과 "죽음"을 구분 가능.
- `_shared/reliable_queue.py`(ReliableQueue)가 이미 `processing:queue:<svc>-render:<worker_id>` 리스트와 `dead_letter:queue:<svc>-render` 리스트를 Redis에 남김 → 집계기가 **신규 워커 코드 없이** 큐 깊이·실패·고아작업을 읽을 수 있음.
**채택하지 않은 대안**
- 집계기를 게이트웨이 중 하나에 배치 → "어느 게이트웨이가 전체 노드 상태를 소유하나"가 의미상 어색. `agent-office`가 ops 브레인(텔레그램·스케줄러·WebSocket·서비스 로그 수집 보유)이라 의미상 정확.
- NAS→워커 HTTP `/health` 폴링 → 워커별 포트 노출 + NAS→Windows 인바운드 접속 필요. Redis heartbeat가 단방향(워커→Redis)이라 더 단순.
- 라이브 갱신을 WebSocket으로 → Phase 1은 2~3초 폴링으로 충분(단순). WebSocket은 Phase 2 강화.
## 4. 컴포넌트 설계
### 4.1 web-ai — heartbeat 생산자 (AI 세션 소유)
**4.1.1 render 워커 4종 (`services/*-render/`)**
- 신규 공용 모듈 `services/_shared/heartbeat.py`:
- `async def heartbeat_loop(redis, name, stats, interval=15, ttl=45)``interval`초마다 `worker:<name>:heartbeat` 키에 JSON 값을 `SET ... EX ttl`.
- 값 스키마는 §5.1 참조. 죽으면 키가 TTL 만료 → 집계기가 "missing = dead" 판정.
- 각 워커 `main.py` lifespan에서 `worker_loop`와 함께 `heartbeat_loop` 태스크 spawn.
- `state` 산정: `queue:paused`가 set이면 `paused`, 현재 job 처리 중이면 `busy`, 아니면 `idle`. 처리 중 여부와 카운터(`jobs_done`/`jobs_failed`/`last_job_at`)는 `poll_once`가 갱신하는 모듈 레벨 `stats` 객체로 추적.
- TTL=45s = interval(15s)의 3배 → 1~2회 누락은 dead로 오판하지 않음.
**4.1.2 task-watcher (`services/task-watcher/`)**
- `watcher_loop`에 동일 heartbeat 추가. `worker:task-watcher:heartbeat``state` + 현재 `mode`(`trading`/`free`)를 함께 발행 → 대시보드가 paused의 **이유**("작업중(트레이딩)")를 표시.
**4.1.3 ai_trade (`ai_trade/`) — 다른 런타임**
- ai_trade는 Windows **호스트**에서 직접 uvicorn 실행(WSL Docker 아님), NAS Redis 큐에 연결되어 있지 않음(현재 NAS stock으로 HTTP pull만).
- 변경: `redis.asyncio` 의존성 추가 → `main.py` lifespan에 heartbeat 태스크 추가 → 같은 NAS Redis(`192.168.45.54:6379`)에 `worker:ai_trade:heartbeat` SET.
- Redis는 Windows 머신에서 이미 도달 가능(render 워커들이 같은 호스트에서 BLMOVE 중).
- heartbeat 로직은 ~10줄이므로 `ai_trade` 자체 미니 헬퍼로 둔다(`_shared` import 경로 의존 회피 — render 워커는 컨테이너 PYTHONPATH로 `_shared` 접근, ai_trade는 호스트 실행이라 경로가 다름). **계약(키 스키마)만 동일**하면 코드 공유 불필요.
- `state` 의미가 다름: render 워커의 idle/busy/paused가 아니라 `market_open`(poll_loop 활성·신호 생성 중) / `market_closed`(휴장·장외 idle). **task-watcher의 `queue:paused`와 무관**(트레이딩은 일시정지 대상 아님).
- 토폴로지 표현: Redis 큐 버스가 아니라 **HTTP pull 파이프라인**(ai_trade ⇄ NAS stock :18500)으로 별도 표시.
### 4.2 web-backend / agent-office — 집계기 + 경보 (이 BE 세션 소유)
**4.2.1 Redis 클라이언트 추가**
- `agent-office`는 현재 Redis 미사용 → `requirements.txt``redis>=5.0`(asyncio) 추가, `docker-compose.yml` agent-office 블록에 `REDIS_URL` 환경변수 + `depends_on: redis` 추가.
**4.2.2 `app/node_monitor.py` 신규**
- 워커 레지스트리(상수): 각 워커의 `name`, 연관 `queue`(있으면), `internal webhook` 경로, 토폴로지 link 타입(`redis-queue` | `http-pull`).
- `async def collect_status() -> dict`:
- 각 워커: `GET worker:<name>:heartbeat` → 존재하면 `alive=True` + JSON 파싱 + `last_beat_age_s = now - ts`; 없으면 `alive=False`(dead).
- 각 render 큐: `LLEN queue:<svc>-render`(depth), `LLEN dead_letter:queue:<svc>-render`, `processing:queue:<svc>-render:*` 키 스캔으로 in-flight 수.
- `GET queue:paused` + TTL → paused 플래그 + reason(task-watcher heartbeat의 mode).
- Redis 연결 실패 → `redis_ok=False`(전 구간 degrade).
- link 상태 합성(§5.2).
- 응답 스키마는 §5.2.
**4.2.3 엔드포인트**
- `GET /api/agent-office/nodes``collect_status()`. nginx `/api/agent-office/` 이미 라우팅됨 → **nginx 변경 불필요**.
**4.2.4 경보 cron (scheduler)**
- `_run_node_health_check` (APScheduler, 1분 간격):
- 직전 상태 `_node_state`(인메모리 dict)와 비교:
- `alive → dead`: 🔴 `<name> 워커 다운 (last beat Xs ago)`
- `dead → alive`: 🟢 `<name> 워커 복구`
- `dead_letter` 카운트가 임계(`NODE_ALERT_DEADLETTER_THRESHOLD`, 기본 1) 신규 초과: ❌ `<queue> 실패 누적 N건`
- `_notified` 패턴(기존 `youtube_publisher.poll_state_changes` 재사용)으로 스팸 방지, 복구 시 재알림 가능하도록 set 차집합.
- 텔레그램 발송은 agent-office 기존 봇 재사용.
### 4.3 web-ui — Three.js 대시보드 (FE 세션 소유)
- 신규 의존성: `three` + `@react-three/fiber` + `@react-three/drei`(React 코드베이스이므로 r3f가 관용적).
- 신규 라우트 `/infra`(Router.jsx) + Nav 등록.
- `pages/infra/InfraMonitor.jsx`:
- r3f `<Canvas>` 토폴로지 — 좌측 NAS(게이트웨이 sub-node) / 중앙 Redis 큐 버스(글로우 코어) / 우측 Windows 노드(워커 sub-node). ai_trade는 별도 HTTP-pull 파이프라인.
- 노드 간 파이프라인(튜브) + 상태별 머티리얼/애니메이션(§6).
- `useNodeStatus` 훅: `GET /api/agent-office/nodes`를 2~3초 폴링 → 상태를 시각 상태로 매핑(`src/api.js`에 헬퍼 추가).
- **2D 폴백**: WebGL 미지원/모바일 대비 카드·테이블 요약 뷰 토글.
- 실제 구현 시 `designer` 스킬 활성화(브레인스토밍 단계에서는 금지).
## 5. 잠그는 계약 (Contracts)
> 3 세션이 독립 병렬 작업하려면 이 두 스키마만 고정하면 된다.
### 5.1 Heartbeat 키 스키마
- **키**: `worker:<name>:heartbeat` (name ∈ `music-render`, `video-render`, `image-render`, `insta-render`, `task-watcher`, `ai_trade`)
- **값**(JSON 문자열), `SET ... EX 45`:
```json
{
"name": "image-render",
"kind": "render", // "render" | "watcher" | "trader"
"state": "idle", // render: idle|busy|paused / watcher: trading|free / trader: market_open|market_closed
"ts": "2026-06-29T12:34:56Z", // UTC ISO8601 (heartbeat 발신 시각)
"last_job_at": "2026-06-29T12:30:00Z", // nullable
"jobs_done": 42,
"jobs_failed": 1,
"mode": "free" // task-watcher 전용(paused 이유), 그 외 생략 가능
}
```
### 5.2 `/api/agent-office/nodes` 응답 스키마
```json
{
"redis_ok": true,
"paused": false,
"paused_reason": "trading", // queue:paused가 set일 때 task-watcher mode
"generated_at": "2026-06-29T12:34:57Z",
"workers": [
{
"name": "image-render", "kind": "render",
"alive": true, "state": "idle", "last_beat_age_s": 3,
"queue_depth": 0, "dead_letter": 0, "processing": 0,
"jobs_done": 42, "jobs_failed": 1, "last_job_at": "2026-06-29T12:30:00Z"
}
],
"links": [
{ "from": "nas", "to": "image-render", "type": "redis-queue", "status": "healthy" },
{ "from": "ai_trade", "to": "nas-stock", "type": "http-pull", "status": "healthy" }
]
}
```
- `link.status``healthy` | `paused` | `down` | `degraded`. 산정: 워커 dead → `down`; paused → `paused`; dead_letter>0 → `degraded`; redis_ok=false → 전 링크 `down`.
## 6. 시각화 상태 (Three.js)
| 상태 | 파이프라인(튜브) | 노드 |
|------|------------------|------|
| **정상 idle** | 시안/그린, 파티클이 NAS→워커→NAS 루프로 흐름(느림) | 초록 글로우 + 큐깊이/처리수 HUD |
| **정상 busy** | 파티클 빠르게 흐름 | "처리 중 N" |
| **일시정지 paused** | 앰버, 파티클 느려짐/정지 | "⏸ 작업중(트레이딩)" 라벨 |
| **장애 dead / link down** | 빨강, 흐름 멈춤, 끊긴 지점 스파크/단절 | 빨강 + ⚠ 경고, "last beat Xs ago" |
| **실패누적 dead-letter>0** | 해당 튜브 ❌ 뱃지 | dead-letter 카운트 강조 |
| **Redis/집계기 다운** | 중앙 버스 전체 빨강 | "집계 서버 연결 끊김" 오버레이 |
- ai_trade의 HTTP-pull 파이프라인은 큐 흐름이 아닌 pull 방향(ai_trade→NAS stock) 파티클로 구분 표현. `market_closed`는 정상 idle과 동일 톤(휴장은 장애 아님).
## 7. 에러 처리
- heartbeat TTL 만료 = dead 판정(권위 신호). 큐가 비어 일감이 없어도 heartbeat가 살아있으면 alive로 정확 판정(2주 사일런트 사고 재발 방지).
- Redis 다운 → `/nodes``redis_ok=false` 반환(500 아님) → 대시보드가 전 구간 degrade 표시.
- agent-office 다운 → FE 폴링 실패 → "집계 서버 연결 끊김" 오버레이.
- 집계기는 read-only(Redis에 쓰지 않음) → 워커 동작에 영향 0.
## 8. 테스트
- **web-ai**: `heartbeat.py` 단위 테스트(fakeredis/mock) — 발신 주기·TTL·state 전이·카운터. ai_trade heartbeat 별도 테스트.
- **web-backend**: `node_monitor.collect_status` 테스트(mock redis: 키 존재/만료/큐 깊이/dead-letter 케이스) + 경보 전이 테스트(alive→dead→alive, dead-letter 증가). TDD 적용.
- **web-ui**: `InfraMonitor` 컴포넌트가 mock 상태로 렌더 + 상태→색상 매핑 단위 테스트(r3f는 렌더 스모크 수준).
## 9. 단계 (Phasing)
- **Phase 1 (본 스펙 전체)**: 6 워커(render 4 + task-watcher + ai_trade) heartbeat / `/nodes` API / 텔레그램 경보 / Three.js `/infra` 대시보드.
- **Phase 2 (후속)**: GPU 사용률(VRAM 16GB 경합 가시화), stuck-task 감지, WebSocket 라이브 푸시, 원격 제어(워커 재시작·pause/resume·dead-letter 재처리).
## 10. 세션 분담 & 협업 (co-gahusb)
- **소유권**: BE(이 세션)=web-backend, AI 세션=web-ai, FE 세션=web-ui. 각자 자기 repo만 커밋.
- **선행 게이트**: §5의 두 계약(heartbeat 키 스키마 + `/nodes` 응답 스키마)을 먼저 확정·공유 → 3 세션 병렬 진행.
- **공유 리소스 락**: agent-office 의존성/compose 변경은 `compose` 락, nginx 무변경(불필요). 배포는 `nas-deploy` 락.
- BE 작업: agent-office redis 추가 + `node_monitor.py` + `/nodes` + 경보 cron + 본 메모리 기록. AI/FE 작업은 co-gahusb 태스크로 배분.
## 11. 메모리 갱신 계획
- 신규 cross-cutting 메모리 `infra_distributed_workers.md` 작성: 큐 계약 / webhook 계약 / ReliableQueue 키 / heartbeat 키 스키마 / task-watcher paused / node_monitor·`/nodes`·경보. `MEMORY.md` 인덱스 등재.
- 관련 서비스 메모리(`service_video/image/music/insta`)에 heartbeat·관측 추가 사실을 cross-link.
```

View File

@@ -0,0 +1,220 @@
# 실시간 매매 알람 (Real-time Trade Alerts) — 설계 스펙
- 작성일: 2026-07-02
- 상태: 설계 승인됨 (사용자 리뷰 대기)
- 관련 세션: BE(web-backend, 본 스펙 주도) · AI(web-ai 워커) · FE(web-ui 탭)
## 1. 목표
장이 열려 있는 동안(**시간외 포함**) 실시간으로 주가 기준치를 분석해, 조건 충족 시 **매수/매도 알람**을 텔레그램으로 **사용자 + 아내** 둘 다에게 전송한다. 기술적 분석(TA) 계산은 **Windows PC의 docker 워커**에서 수행한다.
기존에는 이 판단들이 EOD(하루 1회)로만 돌았다:
- 매수 후보 = 스크리너(평일 16:30) · 매도/보유 advisory = holdings_intel(08:30/16:50).
이번 작업의 핵심 = **동일 판단을 장중(+시간외) 1분 주기 실시간으로 전환 + 조건 충족 즉시 알람**.
## 2. 확정된 요구사항 (사용자 결정)
| 항목 | 결정 |
|------|------|
| 매수 유니버스 | **watchlist(사용자 관리) 당일 스크리너 후보** |
| 매수 트리거 | **TA 자동 시그널**(수동 목표가 없음) |
| 매도 트리거 | **기존 exit 룰 + 트레일링 스톱** |
| 감시 주기/세션 | **1분 폴링** · 장전 시간외 08:3009:00 · 정규장 09:0015:30 · 시간외 단일가 16:0018:00 |
| 중복 방지 | **상태 전이(edge-triggered)** — 거짓→참 전이 시만 알림, 참 유지 중 무알림, 재무장 |
| watchlist 관리 | **텔레그램 봇 명령 + web-ui 탭 둘 다** |
| 수신자 | **사용자 + 아내 둘 다**(매수·매도 모두) |
| TA 연산 위치 | **Windows WSL2 docker 신규 워커** |
| 트레일링 스톱 기본값 | 보유기간 고점 대비 **10%**(파라미터화) |
| 매수 신호 | 지지선 되돌림(MA20/50) · 돌파(전고점/52주) · RSI 과매도 반등 |
## 3. 아키텍처
```
[Windows WSL2 docker] trade-monitor 워커 (web-ai · AI세션)
1분 루프 (KST 세션 게이팅)
① GET NAS /api/webai/trade-alert/monitor-set (X-WebAI-Key)
② KIS 실시간/시간외 시세 + 분봉/일봉 → TA 계산
③ 조건 평가 → 현재 발화집합 F = {(ticker, kind, condition)}
④ POST NAS /api/webai/trade-alert/report {firing: F} (X-WebAI-Key)
⑤ heartbeat: worker:trade-monitor:heartbeat (EX45, 관측 편입)
[NAS] stock (:18500 · web-backend · BE)
• watchlist·alert_state(edge dedup, 영속)·alert_history·holding high-water
• monitor-set 조립(watchlist screener 후보 보유) + 세션/휴장 게이팅
• report 수신 → edge diff(F vs 직전 발화) → 신규 edge를 agent-office로 push
│ (텔레그램 전송 성공 시에만 alert_state 갱신)
[NAS] agent-office (:18900 · web-backend · BE)
• POST /api/agent-office/stock/trade-alert → 텔레그램(너+아내)
• 봇 명령 /watch /unwatch /watchlist → stock watchlist CRUD
• 알람 activity feed 편입
[web-ui] 관심종목 탭 (FE세션) — watchlist CRUD + 알람 이력 뷰
```
**설계 원칙**
- TA/조건판정 = Windows(요구사항). **edge 중복판정 상태 = NAS 영속** → 워커 재시작해도 재알림 스팸 없음(youtube_publisher 교훈 재적용).
- 워커는 dedup 상태를 **안 가진다**. 매 사이클 "현재 발화집합 전체"만 보고 → NAS가 diff(단일 진실원천).
- 워커의 대외 채널은 **NAS stock 한 곳**(기존 ai_trade↔stock의 `X-WebAI-Key` 재사용). 텔레그램 발송은 stock→agent-office push(기존 realestate→agent-office/notify 패턴).
## 4. DB 스키마 (stock.db)
```sql
-- 매수 감시 관심종목 (사용자 관리)
CREATE TABLE IF NOT EXISTS watchlist (
ticker TEXT PRIMARY KEY,
name TEXT,
note TEXT,
params_json TEXT NOT NULL DEFAULT '{}', -- 종목별 조건 오버라이드(선택)
added_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
);
-- edge 중복판정 상태 (영속 — 재시작 스팸 방지의 핵심)
CREATE TABLE IF NOT EXISTS trade_alert_state (
ticker TEXT NOT NULL,
kind TEXT NOT NULL, -- 'buy' | 'sell'
condition TEXT NOT NULL, -- ex) buy_ma20_pullback, sell_trailing_stop
currently_firing INTEGER NOT NULL DEFAULT 0,
first_fired_at TEXT,
last_fired_at TEXT,
last_seen_at TEXT,
PRIMARY KEY (ticker, kind, condition)
);
-- 알람 이력
CREATE TABLE IF NOT EXISTS trade_alert_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL,
name TEXT,
kind TEXT NOT NULL,
condition TEXT NOT NULL,
price REAL,
detail_json TEXT,
fired_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
);
CREATE INDEX IF NOT EXISTS idx_tah_fired ON trade_alert_history(fired_at DESC);
```
보유기간 고점(트레일링 스톱용) high-water는 `krx_daily_prices`(기존)에서 lookback max로 계산하거나 별도 컬럼으로 관리 — 구현 계획에서 확정(v1: 포지션 최초 관측 이후 일봉 고가 max, 없으면 최근 N일).
## 5. 계약 (Contracts) — cross-repo 잠금 대상
### 5.1 NAS stock ↔ Windows 워커 (X-WebAI-Key)
`GET /api/webai/trade-alert/monitor-set`
```json
{
"session": "pre | regular | after | closed",
"as_of": "2026-07-02T09:01:00+09:00",
"buy_targets": [{"ticker":"005930","name":"삼성전자","source":"watch|screener","params":{}}],
"sell_targets": [{"ticker":"000660","name":"SK하이닉스","avg_price":180000,"qty":10,
"holding_high":210000,"params":{}}],
"buy_params": {"rsi_oversold":30,"breakout_vol_mult":1.5,"pullback_pct":0.02},
"exit_params": {"stop_pct":0.08,"take_pct":0.25,"trailing_pct":0.10}
}
```
- `session=closed`면 워커는 KIS 호출 없이 sleep.
`POST /api/webai/trade-alert/report`
```json
{ "as_of":"2026-07-02T09:01:00+09:00",
"firing":[ {"ticker":"005930","kind":"buy","condition":"buy_ma20_pullback",
"price":71500,"detail":{"ma20":71200,"rsi":34}} ] }
```
응답: `{ "new_alerts": <int>, "cleared": <int> }`
- NAS가 `firing` vs `trade_alert_state[firing=1]` diff → 신규 edge만 텔레그램.
### 5.2 stock → agent-office (내부)
`POST /api/agent-office/stock/trade-alert`
```json
{ "alerts":[ {"ticker":"005930","name":"삼성전자","kind":"buy",
"condition":"buy_ma20_pullback","price":71500,
"detail":{...},"fired_at":"..."} ] }
```
→ agent-office가 너+아내에게 텔레그램. (realestate/notify 패턴)
### 5.3 stock watchlist CRUD (web-ui + agent-office 봇)
- `GET /api/stock/watchlist`
- `POST /api/stock/watchlist` `{ticker, note?}`
- `DELETE /api/stock/watchlist/{ticker}`
- `GET /api/stock/trade-alerts?days=N` (이력, web-ui용)
### 5.4 워커 heartbeat (관측 편입)
`worker:trade-monitor:heartbeat` EX45, 값 JSON `{name:"trade-monitor",kind:"trader",state:"market_open|market_closed|idle",ts,last_alert_at,...}`. `/api/agent-office/nodes` workers[]에 추가.
## 6. 알람 조건 (Windows 워커가 계산)
**매수** (buy_targets):
- `buy_ma20_pullback` — MA20>MA50>MA200 정렬 + 저가가 MA20/50에 `pullback_pct` 이내 접근 후 종가 반등
- `buy_breakout` — 종가 > (전 N일 고점 또는 52주 신고가) + 거래량 > `breakout_vol_mult`×20일평균
- `buy_rsi_bounce` — RSI(14)가 `rsi_oversold` 아래로 내려갔다가 **봉 시리즈 내에서** 다시 상향 돌파(최근 봉에서 30 상향 크로스). 워커는 무상태 — 매 사이클 봉 데이터로 크로스를 계산(cross-cycle 메모리 불필요)
**매도** (sell_targets):
- `sell_stop_loss` — (priceavg)/avg ≤ `stop_pct`
- `sell_ma_break` — 종가 < MA50 (심각: < MA200)
- `sell_take_profit` — (priceavg)/avg ≥ `take_pct`
- `sell_climax` — 급등 소진(holdings_intel climax 로직 이식)
- `sell_trailing_stop` — price ≤ holding_high × (1 `trailing_pct`)
## 7. 데이터 흐름 — edge dedup (NAS)
```
매 1분 report 수신 시:
F = report.firing 집합
prev = SELECT (ticker,kind,condition) FROM trade_alert_state WHERE currently_firing=1
new_edge = F prev
cleared = prev F
for e in new_edge:
ok = agent_office.send_trade_alert(e) # 텔레그램
if ok:
INSERT trade_alert_history(e)
UPSERT trade_alert_state(e, firing=1, fired/last=now)
# 실패 시 상태 미갱신 → 다음 사이클 재시도
for c in cleared:
UPDATE trade_alert_state SET firing=0 WHERE key=c # 재무장
UPDATE last_seen_at for all F
```
- 영속 `trade_alert_state` → 워커·NAS 재시작에도 재알림 스팸 없음.
- 텔레그램 실패 시 firing 미표시 → 재시도 보장(node_monitor "성공 시만 갱신" 관용).
## 8. 세션/휴장 게이팅
NAS `monitor-set.session` 필드가 KST 시각 + `holidays.json`(`is_market_open`)으로 판정:
- pre 08:3009:00 / regular 09:0015:30 / after 16:0018:00 → 그 외/휴장 = closed.
- 워커는 `closed`면 sleep. (불필요 KIS 호출·알람 차단)
## 9. 에러 처리
- 워커: KIS 실패 → 해당 사이클 skip + 다음 분 재시도, 종목별 실패 격리. heartbeat로 생사 노출.
- NAS: 워커 인증 `X-WebAI-Key`. 텔레그램 실패 → 상태 미갱신. `report`는 멱등(같은 F 재전송 무해).
- 워커 다운 시 알람 정지 → node_monitor 경보(기존 관측)로 감지.
## 10. 테스트 전략 (BE, TDD)
- watchlist CRUD (추가/중복/삭제/조회)
- monitor-set 조립 (watchlist screener 보유, 세션 게이팅, 휴장)
- **edge diff 로직**: 신규 edge만 알림 / 참 유지 무알림 / 해제 후 재발화 재알림 / 재시작 지속성(영속 상태)
- 텔레그램 전송 실패 시 상태 미갱신(재시도)
- alert_history 기록 / trade-alerts 조회
- agent-office: /watch·/unwatch·/watchlist 봇 명령 → stock CRUD, trade-alert notify → 텔레그램 포맷(너+아내)
- webai 계약 엔드포인트(monitor-set/report) 스키마·인증
## 11. 작업 분담
| repo | 세션 | 산출물 | 상태 |
|------|------|--------|------|
| **web-backend** (stock + agent-office) | **BE(본 세션)** | DB·watchlist·edge·webai 계약·텔레그램·봇 | 이번에 구현 |
| **web-ai** (`services/trade-monitor/` WSL2 docker) | AI세션 | 1분 루프·KIS·TA·조건평가·report·heartbeat | 계약 넘김 |
| **web-ui** (관심종목 탭) | FE세션 | watchlist CRUD·조건·이력 뷰 | 계약 넘김 |
- 계약(§5)은 co-gahusb로 잠근 뒤 3세션 병렬.
- 워커 재빌드는 로컬 docker(사용자): `wsl -d Ubuntu-24.04 -- docker compose up -d --build trade-monitor`.
## 12. 범위 밖 (YAGNI / 후속)
- 실주문 자동 집행(알람 전용, KIS 주문 X).
- KIS 웹소켓 실시간 틱(1분 폴링으로 충분).
- 종목별 수동 목표가(이번은 TA 자동만).
- 백테스트/성과 추적(후속 슬라이스).

View File

@@ -1100,6 +1100,19 @@ def get_pipeline(pid: int) -> Optional[Dict[str, Any]]:
return _parse_pipeline_row(row) return _parse_pipeline_row(row)
def delete_pipeline(pid: int) -> bool:
"""파이프라인과 자식행(pipeline_feedback, pipeline_jobs)을 하드 삭제.
SQLite FK를 강제하지 않으므로 자식행을 명시적으로 먼저 삭제한다.
파이프라인이 존재했으면 True, 없었으면 False.
"""
with _conn() as conn:
conn.execute("DELETE FROM pipeline_feedback WHERE pipeline_id = ?", (pid,))
conn.execute("DELETE FROM pipeline_jobs WHERE pipeline_id = ?", (pid,))
cur = conn.execute("DELETE FROM video_pipelines WHERE id = ?", (pid,))
return cur.rowcount > 0
def update_pipeline_state(pid: int, state: str, **fields) -> None: def update_pipeline_state(pid: int, state: str, **fields) -> None:
"""파이프라인 state를 갱신하고 옵션 컬럼을 함께 업데이트한다. """파이프라인 state를 갱신하고 옵션 컬럼을 함께 업데이트한다.

View File

@@ -1133,6 +1133,14 @@ def cancel_pipeline(pid: int):
return {"ok": True} return {"ok": True}
@app.delete("/api/music/pipeline/{pid}")
def delete_pipeline_endpoint(pid: int):
"""파이프라인 행을 하드 삭제(전체 목록에서 완전 제거). 없으면 404."""
if not _db_module.delete_pipeline(pid):
raise HTTPException(404)
return {"ok": True, "deleted": pid}
@app.post("/api/music/pipeline/{pid}/retry", status_code=202) @app.post("/api/music/pipeline/{pid}/retry", status_code=202)
async def retry_pipeline(pid: int, bg: BackgroundTasks): async def retry_pipeline(pid: int, bg: BackgroundTasks):
from .pipeline.state_machine import STEPS from .pipeline.state_machine import STEPS

View File

@@ -105,6 +105,29 @@ def test_cancel_pipeline(client):
assert db.get_pipeline(pid)["state"] == "cancelled" assert db.get_pipeline(pid)["state"] == "cancelled"
def test_delete_pipeline_removes_from_db(client):
pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
r = client.request("DELETE", f"/api/music/pipeline/{pid}")
assert r.status_code == 200
assert r.json()["ok"] is True
assert db.get_pipeline(pid) is None
all_ids = [p["id"] for p in client.get("/api/music/pipeline?status=all").json()["pipelines"]]
assert pid not in all_ids
def test_delete_pipeline_not_found_returns_404(client):
r = client.request("DELETE", "/api/music/pipeline/99999")
assert r.status_code == 404
def test_delete_pipeline_removes_child_jobs(client):
pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
db.create_pipeline_job(pid, "cover")
assert len(db.list_pipeline_jobs(pid)) == 1
client.request("DELETE", f"/api/music/pipeline/{pid}")
assert db.list_pipeline_jobs(pid) == []
def test_setup_get_returns_defaults(client): def test_setup_get_returns_defaults(client):
r = client.get("/api/music/setup") r = client.get("/api/music/setup")
assert r.status_code == 200 assert r.status_code == 200

View File

@@ -2,6 +2,7 @@ import sqlite3
import os import os
import hashlib import hashlib
import json import json
import datetime as dt
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
from app.screener.schema import ensure_screener_schema from app.screener.schema import ensure_screener_schema
@@ -125,6 +126,42 @@ def init_db():
conn.execute("CREATE INDEX IF NOT EXISTS idx_holdings_sig_ticker " conn.execute("CREATE INDEX IF NOT EXISTS idx_holdings_sig_ticker "
"ON holdings_signals(ticker, date DESC);") "ON holdings_signals(ticker, date DESC);")
# 실시간 매매 알람: watchlist / alert_state / alert_history
conn.execute("""
CREATE TABLE IF NOT EXISTS watchlist (
ticker TEXT PRIMARY KEY,
name TEXT,
note TEXT,
params_json TEXT NOT NULL DEFAULT '{}',
added_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS trade_alert_state (
ticker TEXT NOT NULL,
kind TEXT NOT NULL,
condition TEXT NOT NULL,
currently_firing INTEGER NOT NULL DEFAULT 0,
first_fired_at TEXT,
last_fired_at TEXT,
last_seen_at TEXT,
PRIMARY KEY (ticker, kind, condition)
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS trade_alert_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL,
name TEXT,
kind TEXT NOT NULL,
condition TEXT NOT NULL,
price REAL,
detail_json TEXT,
fired_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_tah_fired ON trade_alert_history(fired_at DESC)")
# Screener 스키마 부트스트랩 (7테이블 + 디폴트 설정 시드) # Screener 스키마 부트스트랩 (7테이블 + 디폴트 설정 시드)
ensure_screener_schema(conn) ensure_screener_schema(conn)
@@ -379,3 +416,105 @@ def get_holdings_signal_history(ticker: str, limit: int = 30) -> list:
"SELECT * FROM holdings_signals WHERE ticker=? ORDER BY date DESC LIMIT ?", "SELECT * FROM holdings_signals WHERE ticker=? ORDER BY date DESC LIMIT ?",
(ticker, limit)).fetchall() (ticker, limit)).fetchall()
return [_row_to_signal(r) for r in rows] return [_row_to_signal(r) for r in rows]
# --- 실시간 매매 알람: 공통 유틸 ---
def _now_iso() -> str:
return dt.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%fZ")
# --- Watchlist CRUD ---
def add_watchlist(ticker: str, name: str = None, note: str = None) -> None:
with _conn() as conn:
conn.execute(
"INSERT OR IGNORE INTO watchlist(ticker,name,note) VALUES(?,?,?)",
(ticker, name, note),
)
# 이름/노트 갱신(이미 있으면)
conn.execute(
"UPDATE watchlist SET name=COALESCE(?,name), note=COALESCE(?,note) WHERE ticker=?",
(name, note, ticker),
)
def remove_watchlist(ticker: str) -> bool:
with _conn() as conn:
cur = conn.execute("DELETE FROM watchlist WHERE ticker=?", (ticker,))
return cur.rowcount > 0
def get_watchlist() -> list:
with _conn() as conn:
rows = conn.execute("SELECT * FROM watchlist ORDER BY added_at").fetchall()
return [
{"ticker": r["ticker"], "name": r["name"], "note": r["note"],
"params": json.loads(r["params_json"] or "{}"), "added_at": r["added_at"]}
for r in rows
]
# --- Trade Alert State ---
def get_alert_state_firing() -> set:
with _conn() as conn:
rows = conn.execute(
"SELECT ticker,kind,condition FROM trade_alert_state WHERE currently_firing=1"
).fetchall()
return {(r["ticker"], r["kind"], r["condition"]) for r in rows}
def set_alert_firing(ticker: str, kind: str, condition: str, firing: bool, at_iso: str = None) -> None:
now = at_iso or _now_iso()
with _conn() as conn:
if firing:
conn.execute(
"""INSERT INTO trade_alert_state(ticker,kind,condition,currently_firing,first_fired_at,last_fired_at,last_seen_at)
VALUES(?,?,?,1,?,?,?)
ON CONFLICT(ticker,kind,condition) DO UPDATE SET
currently_firing=1,
first_fired_at=COALESCE(first_fired_at,excluded.first_fired_at),
last_fired_at=excluded.last_fired_at,
last_seen_at=excluded.last_seen_at""",
(ticker, kind, condition, now, now, now),
)
else:
conn.execute(
"UPDATE trade_alert_state SET currently_firing=0, last_seen_at=? WHERE ticker=? AND kind=? AND condition=?",
(now, ticker, kind, condition),
)
def touch_alert_seen(keys: list, at_iso: str) -> None:
with _conn() as conn:
for (ticker, kind, condition) in keys:
conn.execute(
"UPDATE trade_alert_state SET last_seen_at=? WHERE ticker=? AND kind=? AND condition=?",
(at_iso, ticker, kind, condition),
)
# --- Trade Alert History ---
def add_alert_history(ticker: str, name: str, kind: str, condition: str, price, detail: dict) -> int:
with _conn() as conn:
cur = conn.execute(
"INSERT INTO trade_alert_history(ticker,name,kind,condition,price,detail_json) VALUES(?,?,?,?,?,?)",
(ticker, name, kind, condition, price, json.dumps(detail or {}, ensure_ascii=False)),
)
return cur.lastrowid
def get_alert_history(days: int = 7) -> list:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM trade_alert_history WHERE fired_at >= strftime('%Y-%m-%dT%H:%M:%fZ','now', ?) ORDER BY fired_at DESC",
(f"-{int(days)} days",),
).fetchall()
return [
{"id": r["id"], "ticker": r["ticker"], "name": r["name"], "kind": r["kind"],
"condition": r["condition"], "price": r["price"],
"detail": json.loads(r["detail_json"] or "{}"), "fired_at": r["fired_at"]}
for r in rows
]

View File

@@ -21,6 +21,8 @@ from .db import (
upsert_broker_cash, get_all_broker_cash, delete_broker_cash, upsert_broker_cash, get_all_broker_cash, delete_broker_cash,
upsert_asset_snapshot, get_asset_snapshots, upsert_asset_snapshot, get_asset_snapshots,
add_sell_history, get_sell_history, update_sell_history, delete_sell_history, add_sell_history, get_sell_history, update_sell_history, delete_sell_history,
add_watchlist, remove_watchlist, get_watchlist, get_alert_history,
get_alert_state_firing, set_alert_firing, touch_alert_seen, add_alert_history,
) )
from .scraper import fetch_market_news, fetch_major_indices from .scraper import fetch_market_news, fetch_major_indices
from .price_fetcher import get_current_prices, get_current_prices_detail from .price_fetcher import get_current_prices, get_current_prices_detail
@@ -28,6 +30,10 @@ from .ai_summarizer import summarize_news, OllamaError
from .auth import verify_webai_key from .auth import verify_webai_key
from . import webai_cache from . import webai_cache
from . import holdings_intel from . import holdings_intel
from . import trade_alerts
from .trade_alerts import (
build_monitor_set, current_session, diff_firing, DEFAULT_EXIT_PARAMS, DEFAULT_BUY_PARAMS,
)
app = FastAPI() app = FastAPI()
install_access_log(app) install_access_log(app)
@@ -506,6 +512,63 @@ def get_webai_news_sentiment(date: str | None = None):
return result return result
@app.get("/api/webai/trade-alert/monitor-set", dependencies=[Depends(verify_webai_key)])
def get_trade_alert_monitor_set():
"""web-ai(Windows 워커) 전용 — 실시간 매매 알람 감시대상 조립 (계약 §5.1).
session은 KST 시각으로 pre/regular/after 판정 후, 평일·휴장 여부(is_market_open)를
함께 게이팅해 최종 closed 여부를 결정한다.
"""
from datetime import datetime, timezone, timedelta
kst = timezone(timedelta(hours=9))
now_kst = datetime.now(kst)
session = current_session(now_kst)
if not is_market_open(now_kst.date()):
session = "closed"
from .db import _conn
conn = _conn()
try:
return build_monitor_set(conn, session, DEFAULT_EXIT_PARAMS, DEFAULT_BUY_PARAMS)
finally:
conn.close()
class TradeAlertReport(BaseModel):
as_of: str | None = None
firing: list[dict] = []
@app.post("/api/webai/trade-alert/report", dependencies=[Depends(verify_webai_key)])
def post_trade_alert_report(req: TradeAlertReport):
"""web-ai(Windows 워커) 전용 — 발화 보고 수신 (계약 §5.2).
직전 발화상태 대비 edge diff(diff_firing) 후, 신규 alert는
agent-office 전송 성공 시에만 상태(firing=True)+이력 반영한다.
전송 실패 시 상태를 채택하지 않아 다음 사이클에 동일 alert가 다시
"신규"로 잡혀 재시도된다(멱등). 해제(cleared)는 전송과 무관하게 firing=False.
"""
prev = get_alert_state_firing()
d = diff_firing(req.firing, prev)
new_count = 0
for a in d["new"]:
if trade_alerts.notify_agent_office([a]):
set_alert_firing(a["ticker"], a["kind"], a["condition"], firing=True, at_iso=req.as_of)
add_alert_history(
a["ticker"], a.get("name"), a["kind"], a["condition"],
a.get("price"), a.get("detail") or {},
)
new_count += 1
for ticker, kind, condition in d["cleared"]:
set_alert_firing(ticker, kind, condition, firing=False)
touch_alert_seen(d["seen"], req.as_of or "")
return {"new_alerts": new_count, "cleared": len(d["cleared"])}
@app.post("/api/portfolio", status_code=201) @app.post("/api/portfolio", status_code=201)
def create_portfolio_item(req: PortfolioItemRequest): def create_portfolio_item(req: PortfolioItemRequest):
"""포트폴리오 종목 추가""" """포트폴리오 종목 추가"""
@@ -653,6 +716,41 @@ def remove_sell_history(record_id: int):
return {"ok": True} return {"ok": True}
# --- Watchlist & Trade Alerts API (실시간 매매 알람) ---
class WatchlistItemRequest(BaseModel):
ticker: str
name: str | None = None
note: str | None = None
@app.get("/api/stock/watchlist")
def list_watchlist():
"""관심종목 목록 조회"""
return {"watchlist": get_watchlist()}
@app.post("/api/stock/watchlist", status_code=201)
def create_watchlist_item(req: WatchlistItemRequest):
"""관심종목 추가 (이미 존재하면 name/note 갱신, 멱등)"""
add_watchlist(req.ticker, req.name, req.note)
return {"ok": True}
@app.delete("/api/stock/watchlist/{ticker}")
def delete_watchlist_item(ticker: str):
"""관심종목 삭제"""
if not remove_watchlist(ticker):
raise HTTPException(status_code=404, detail="not in watchlist")
return {"ok": True}
@app.get("/api/stock/trade-alerts")
def list_trade_alerts(days: int = 7):
"""매매 알람 이력 조회 (최근 N일)"""
return {"alerts": get_alert_history(days)}
# --- Holdings Intelligence API --- # --- Holdings Intelligence API ---
@app.get("/api/stock/holdings/intel") @app.get("/api/stock/holdings/intel")

View File

@@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
import datetime as dt
import json import json
import logging import logging
import os import os
@@ -59,13 +60,19 @@ async def score_sentiment(
*, *,
name: str | None = None, name: str | None = None,
model: str = DEFAULT_MODEL, model: str = DEFAULT_MODEL,
asof: dt.date | None = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Returns {ticker, score_raw, reason, news_count, tokens_input, tokens_output, model}.""" """Returns {ticker, score_raw, reason, news_count, tokens_input, tokens_output, model}.
asof(현재 KST 일자)를 주면 prompt 맨 앞에 오늘 날짜를 명시해 LLM이 현재 시점 기준으로 판단한다.
"""
news_block = _format_news_block(news) news_block = _format_news_block(news)
prompt = PROMPT_TEMPLATE.format( prompt = PROMPT_TEMPLATE.format(
name=name or ticker, ticker=ticker, name=name or ticker, ticker=ticker,
n=len(news), news_block=news_block, n=len(news), news_block=news_block,
) )
if asof is not None:
prompt = f"오늘 날짜: {asof.isoformat()} (이 시점 기준으로 뉴스를 평가하세요)\n\n" + prompt
resp = await llm.messages.create( resp = await llm.messages.create(
model=model, model=model,
max_tokens=200, max_tokens=200,

View File

@@ -39,11 +39,11 @@ def _make_llm():
async def _process_one( async def _process_one(
ticker: str, name: str, articles: List[Dict[str, Any]], ticker: str, name: str, articles: List[Dict[str, Any]],
sem: asyncio.Semaphore, llm, model: str, sem: asyncio.Semaphore, llm, model: str, asof: dt.date,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
async with sem: async with sem:
return await _analyzer.score_sentiment( return await _analyzer.score_sentiment(
llm, ticker, articles, name=name, model=model, llm, ticker, articles, name=name, model=model, asof=asof,
) )
@@ -110,7 +110,7 @@ async def refresh_daily(
arts = articles_by_ticker.get(t, []) arts = articles_by_ticker.get(t, [])
if not arts: if not arts:
continue # 매핑 0 — score 미생성 continue # 매핑 0 — score 미생성
tasks.append(_process_one(t, name_map.get(t, t), arts, sem, llm, model)) tasks.append(_process_one(t, name_map.get(t, t), arts, sem, llm, model, asof))
raw_results = await asyncio.gather(*tasks, return_exceptions=True) raw_results = await asyncio.gather(*tasks, return_exceptions=True)
successes: List[Dict[str, Any]] = [] successes: List[Dict[str, Any]] = []

View File

@@ -125,6 +125,16 @@ from . import telegram as _tg
from .engine import Screener, ScreenContext from .engine import Screener, ScreenContext
def _today_kst() -> dt.date:
"""KST 오늘 날짜.
stock 컨테이너는 python:3.12-alpine + tzdata 미설치라 TZ=Asia/Seoul이 무효 →
date.today()가 UTC를 반환한다. 08시대(KST) 리포트가 하루 밀리는 것을 막기 위해
UTC+9로 명시 보정한다(holdings_intel._today_kst와 동일한 관용).
"""
return (dt.datetime.utcnow() + dt.timedelta(hours=9)).date()
def _resolve_asof(asof_str, conn: sqlite3.Connection) -> dt.date: def _resolve_asof(asof_str, conn: sqlite3.Connection) -> dt.date:
if asof_str: if asof_str:
return dt.date.fromisoformat(asof_str) return dt.date.fromisoformat(asof_str)
@@ -263,7 +273,7 @@ from . import snapshot as _snap
@router.post("/snapshot/refresh") @router.post("/snapshot/refresh")
def post_snapshot_refresh(asof: Optional[str] = None): def post_snapshot_refresh(asof: Optional[str] = None):
asof_date = dt.date.fromisoformat(asof) if asof else dt.date.today() asof_date = dt.date.fromisoformat(asof) if asof else _today_kst()
if asof_date.weekday() >= 5: if asof_date.weekday() >= 5:
return {"asof": asof_date.isoformat(), "status": "skipped_weekend"} return {"asof": asof_date.isoformat(), "status": "skipped_weekend"}
with _conn() as c: with _conn() as c:
@@ -300,7 +310,7 @@ from .ai_news import validation as _ai_validation
@router.post("/snapshot/refresh-news-sentiment") @router.post("/snapshot/refresh-news-sentiment")
async def post_refresh_news_sentiment(asof: Optional[str] = None): async def post_refresh_news_sentiment(asof: Optional[str] = None):
asof_date = dt.date.fromisoformat(asof) if asof else dt.date.today() asof_date = dt.date.fromisoformat(asof) if asof else _today_kst()
if asof_date.weekday() >= 5: if asof_date.weekday() >= 5:
return {"asof": asof_date.isoformat(), "status": "skipped_weekend"} return {"asof": asof_date.isoformat(), "status": "skipped_weekend"}
if _is_holiday(asof_date): if _is_holiday(asof_date):

137
stock/app/trade_alerts.py Normal file
View File

@@ -0,0 +1,137 @@
"""매매 알람 — 감시대상(monitor-set) 조립. 순수 조립 로직(HTTP/텔레그램 없음).
계약 §5.1 (docs/superpowers/specs/2026-07-02-realtime-trade-alerts-design.md) —
Windows 워커가 GET /api/webai/trade-alert/monitor-set 로 받는 응답을 조립한다.
NAS는 watchlist screener 최신 성공 run 후보를 buy_targets로, 보유 종목을
sell_targets로 병합해 넘긴다. TA/조건판정은 워커 쪽 책임.
"""
import os
import httpx
from datetime import datetime, timedelta, timezone, time as _time
from typing import Optional
from app.db import get_all_portfolio, get_watchlist
_KST = timezone(timedelta(hours=9))
# KST 세션 창(시:분) — 평일+휴장 판정은 호출부에서 is_market_open으로 별도 게이팅
_SESSIONS = [
("pre", (8, 30), (9, 0)),
("regular", (9, 0), (15, 30)),
("after", (16, 0), (18, 0)),
]
def current_session(now_kst) -> str:
"""now_kst의 time만으로 pre/regular/after/closed 세션 판정 (요일·휴장 무관)."""
t = now_kst.time()
for name, (sh, sm), (eh, em) in _SESSIONS:
if _time(sh, sm) <= t < _time(eh, em):
return name
return "closed"
DEFAULT_EXIT_PARAMS = {"stop_pct": 0.08, "take_pct": 0.25, "trailing_pct": 0.10}
DEFAULT_BUY_PARAMS = {"rsi_oversold": 30, "breakout_vol_mult": 1.5, "pullback_pct": 0.02}
def latest_screener_candidates(conn) -> list:
"""최신 성공(status='success') screener run의 후보 {ticker,name} 목록."""
row = conn.execute(
"SELECT id FROM screener_runs WHERE status='success' ORDER BY asof DESC, id DESC LIMIT 1"
).fetchone()
if not row:
return []
run_id = row[0]
rows = conn.execute(
"SELECT ticker, name FROM screener_results WHERE run_id=? ORDER BY rank", (run_id,)
).fetchall()
return [{"ticker": r[0], "name": r[1]} for r in rows]
def holding_high(conn, ticker: str, lookback_days: int = 60) -> Optional[float]:
"""보유기간 고점(트레일링 스톱용) — krx_daily_prices 최근 lookback_days 최고 high."""
row = conn.execute(
"SELECT MAX(high) FROM krx_daily_prices WHERE ticker=? "
"AND date >= date('now', ?)",
(ticker, f"-{int(lookback_days)} days"),
).fetchone()
return row[0] if row and row[0] is not None else None
def build_monitor_set(conn, session: str, exit_params: dict, buy_params: dict) -> dict:
"""계약 §5.1 monitor-set 응답 dict 조립.
buy_targets = watchlist 최신 screener 후보 (ticker 기준 중복 제거, watchlist 우선)
sell_targets = 보유 종목(portfolio) + avg_price/qty/holding_high
"""
buy: dict[str, dict] = {}
for w in get_watchlist():
buy[w["ticker"]] = {
"ticker": w["ticker"], "name": w["name"],
"source": "watch", "params": w.get("params") or {},
}
for c in latest_screener_candidates(conn):
if c["ticker"] not in buy:
buy[c["ticker"]] = {
"ticker": c["ticker"], "name": c["name"],
"source": "screener", "params": {},
}
sell_targets = []
for p in get_all_portfolio():
ticker = p["ticker"]
sell_targets.append({
"ticker": ticker,
"name": p.get("name"),
"avg_price": p.get("avg_price"),
"qty": p.get("quantity"),
"holding_high": holding_high(conn, ticker),
"params": {},
})
return {
"session": session,
"as_of": datetime.now(_KST).isoformat(),
"buy_targets": list(buy.values()),
"sell_targets": sell_targets,
"buy_params": buy_params,
"exit_params": exit_params,
}
def diff_firing(reported: list, prev: set) -> dict:
"""워커 발화집합(reported) vs 직전 발화상태(prev) edge diff.
reported 각 항목: {ticker,kind,condition,price,detail,name?}.
key = (ticker,kind,condition).
반환 {"new":[신규 alert...], "cleared":[해제 key...], "seen":[현재 key...]}.
"""
cur = {}
for a in reported:
key = (a["ticker"], a["kind"], a["condition"])
cur[key] = a
cur_keys = set(cur.keys())
new_keys = cur_keys - prev
cleared = sorted(prev - cur_keys)
return {
"new": [cur[k] for k in cur_keys if k in new_keys],
"cleared": cleared,
"seen": sorted(cur_keys),
}
def notify_agent_office(alerts: list) -> bool:
"""신규 alert들을 agent-office로 push (계약 §5.2). 전송 성공 시 True.
실패(네트워크 오류/비-200)는 False — 호출부가 상태/이력 미채택 후 다음
사이클에 동일 alert를 재시도하도록 한다(멱등, at-least-once).
"""
url = os.getenv("AGENT_OFFICE_URL", "http://agent-office:8000") + "/api/agent-office/stock/trade-alert"
try:
with httpx.Client(timeout=10) as c:
resp = c.post(url, json={"alerts": alerts})
return resp.status_code == 200
except httpx.HTTPError:
return False

View File

@@ -58,6 +58,18 @@ async def test_score_sentiment_clamps_negative_out_of_range():
assert out["score_raw"] == -10.0 assert out["score_raw"] == -10.0
@pytest.mark.asyncio
async def test_score_sentiment_includes_asof_date_in_prompt():
"""asof(현재 KST 일자)를 넘기면 prompt에 오늘 날짜가 포함되어 LLM이 현재 일자 기준으로 판단."""
import datetime as _dt
llm = _mk_llm(json.dumps({"score": 5.0, "reason": "ok"}))
await analyzer.score_sentiment(
llm, "005930", NEWS, name="삼성전자", asof=_dt.date(2026, 7, 2),
)
user_msg = llm.messages.create.call_args.kwargs["messages"][0]["content"]
assert "2026-07-02" in user_msg
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_score_sentiment_includes_summary_in_prompt(): async def test_score_sentiment_includes_summary_in_prompt():
"""summary 가 있으면 prompt 에 포함, 없으면 title 만.""" """summary 가 있으면 prompt 에 포함, 없으면 title 만."""

View File

@@ -39,7 +39,7 @@ async def test_refresh_daily_happy_path(conn):
scores_by_ticker = { scores_by_ticker = {
"005930": 7.5, "000660": 4.0, "373220": -6.0, "005930": 7.5, "000660": 4.0, "373220": -6.0,
} }
async def fake_score(llm, ticker, news, *, name=None, model="m"): async def fake_score(llm, ticker, news, *, name=None, model="m", asof=None):
return { return {
"ticker": ticker, "score_raw": scores_by_ticker[ticker], "ticker": ticker, "score_raw": scores_by_ticker[ticker],
"reason": f"r{ticker}", "news_count": 1, "reason": f"r{ticker}", "news_count": 1,
@@ -81,7 +81,7 @@ async def test_refresh_daily_failures_isolated(conn):
} }
fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3} fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3}
async def fake_score(llm, ticker, news, *, name=None, model="m"): async def fake_score(llm, ticker, news, *, name=None, model="m", asof=None):
if ticker == "000660": if ticker == "000660":
raise RuntimeError("llm exploded") raise RuntimeError("llm exploded")
return { return {
@@ -116,7 +116,7 @@ async def test_refresh_daily_no_match_ticker_skipped(conn):
} }
fake_stats = {"total_articles": 1, "matched_pairs": 1, "hit_tickers": 1} fake_stats = {"total_articles": 1, "matched_pairs": 1, "hit_tickers": 1}
async def fake_score(llm, ticker, news, *, name=None, model="m"): async def fake_score(llm, ticker, news, *, name=None, model="m", asof=None):
return { return {
"ticker": ticker, "score_raw": 5.0, "reason": "r", "ticker": ticker, "score_raw": 5.0, "reason": "r",
"news_count": 1, "tokens_input": 100, "tokens_output": 20, "news_count": 1, "tokens_input": 100, "tokens_output": 20,
@@ -152,7 +152,7 @@ async def test_refresh_daily_sign_gate_no_positive_in_neg(conn):
fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3} fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3}
scores = {"005930": 6.0, "000660": 2.0, "373220": 0.5} # 모두 양수 scores = {"005930": 6.0, "000660": 2.0, "373220": 0.5} # 모두 양수
async def fake_score(llm, ticker, news, *, name=None, model="m"): async def fake_score(llm, ticker, news, *, name=None, model="m", asof=None):
return { return {
"ticker": ticker, "score_raw": scores[ticker], "reason": "r", "ticker": ticker, "score_raw": scores[ticker], "reason": "r",
"news_count": 1, "tokens_input": 1, "tokens_output": 1, "model": model, "news_count": 1, "tokens_input": 1, "tokens_output": 1, "model": model,
@@ -183,7 +183,7 @@ async def test_refresh_daily_sign_gate_excludes_neutral(conn):
fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3} fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3}
scores = {"005930": 3.0, "000660": 0.0, "373220": -3.0} scores = {"005930": 3.0, "000660": 0.0, "373220": -3.0}
async def fake_score(llm, ticker, news, *, name=None, model="m"): async def fake_score(llm, ticker, news, *, name=None, model="m", asof=None):
return { return {
"ticker": ticker, "score_raw": scores[ticker], "reason": "r", "ticker": ticker, "score_raw": scores[ticker], "reason": "r",
"news_count": 1, "tokens_input": 1, "tokens_output": 1, "model": model, "news_count": 1, "tokens_input": 1, "tokens_output": 1, "model": model,

View File

@@ -5,6 +5,21 @@ from fastapi.testclient import TestClient
from app.main import app from app.main import app
def test_today_kst_uses_kst_offset_not_utc(monkeypatch):
"""컨테이너가 UTC(Alpine, tzdata 미설치)라 date.today()는 08시 KST에 어제를 준다.
_today_kst()는 UTC+9로 보정해 오늘(KST)을 반환해야 한다."""
from app.screener import router
class _FrozenDT(dt.datetime):
@classmethod
def utcnow(cls):
# 2026-07-01 23:30 UTC == 2026-07-02 08:30 KST (AI 뉴스 리포트 시각대)
return dt.datetime(2026, 7, 1, 23, 30, 0)
monkeypatch.setattr(router.dt, "datetime", _FrozenDT)
assert router._today_kst() == dt.date(2026, 7, 2)
def test_refresh_news_sentiment_weekend_skip(): def test_refresh_news_sentiment_weekend_skip():
# 2026-05-16 = Saturday # 2026-05-16 = Saturday
client = TestClient(app) client = TestClient(app)

View File

@@ -0,0 +1,48 @@
import os, sqlite3, tempfile, datetime as dt
import pytest
@pytest.fixture
def db(monkeypatch, tmp_path):
from app import db as _db
monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db"))
_db.init_db()
return _db
def test_watchlist_add_get_remove(db):
db.add_watchlist("005930", "삼성전자", note="관심")
db.add_watchlist("005930", "삼성전자") # 멱등
wl = db.get_watchlist()
assert [w["ticker"] for w in wl] == ["005930"]
assert wl[0]["name"] == "삼성전자"
assert db.remove_watchlist("005930") is True
assert db.get_watchlist() == []
def test_alert_state_edge_firing_and_clear(db):
key = ("005930", "buy", "buy_breakout")
assert db.get_alert_state_firing() == set()
db.set_alert_firing(*key, firing=True, at_iso="2026-07-02T00:01:00Z")
assert key in db.get_alert_state_firing()
db.set_alert_firing(*key, firing=False)
assert key not in db.get_alert_state_firing()
def test_alert_history_records_and_reads(db):
db.add_alert_history("005930", "삼성전자", "buy", "buy_breakout", 71500, {"vol": 2.1})
rows = db.get_alert_history(days=7)
assert len(rows) == 1
assert rows[0]["ticker"] == "005930" and rows[0]["kind"] == "buy"
assert rows[0]["detail"]["vol"] == 2.1
def test_alert_history_days_filter_format_consistency(db):
"""fired_at은 ISO(T/Z)로 저장 — 필터도 ISO여야 경계일 비교가 정확.
7일 경계 밖(정확히 7일 전 자정) 레코드는 제외되어야 한다. 포맷 불일치면 잘못 포함됨."""
db.add_alert_history("005930", "삼성", "buy", "buy_breakout", 71500, {}) # now
conn = sqlite3.connect(db.DB_PATH)
conn.execute(
"INSERT INTO trade_alert_history(ticker,name,kind,condition,price,detail_json,fired_at) "
"VALUES('000660','SK','sell','sell_stop_loss',60000,'{}', "
"strftime('%Y-%m-%dT%H:%M:%fZ','now','-7 days','start of day'))"
)
conn.commit(); conn.close()
tickers = [r["ticker"] for r in db.get_alert_history(days=7)]
assert "005930" in tickers
assert "000660" not in tickers

View File

@@ -0,0 +1,18 @@
def test_diff_new_and_cleared_and_rearm():
from app.trade_alerts import diff_firing
reported = [{"ticker": "005930", "kind": "buy", "condition": "buy_breakout",
"price": 71500, "detail": {}}]
# 최초: prev 비어있음 → 신규
d1 = diff_firing(reported, prev=set())
assert [a["condition"] for a in d1["new"]] == ["buy_breakout"]
assert d1["cleared"] == []
# 유지: prev에 이미 있음 → 신규 없음
prev = {("005930", "buy", "buy_breakout")}
d2 = diff_firing(reported, prev=prev)
assert d2["new"] == []
# 해제: reported 비었고 prev에 있음 → cleared
d3 = diff_firing([], prev=prev)
assert d3["cleared"] == [("005930", "buy", "buy_breakout")]
# 재무장 후 재발화: prev 다시 비면 신규
d4 = diff_firing(reported, prev=set())
assert len(d4["new"]) == 1

View File

@@ -0,0 +1,101 @@
import sqlite3
import pytest
@pytest.fixture
def conn(monkeypatch, tmp_path):
from app import db as _db
monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db"))
_db.init_db()
c = sqlite3.connect(_db.DB_PATH)
c.row_factory = sqlite3.Row
# 보유 1종목 (add_portfolio_item 실제 시그니처: broker/ticker/name/quantity/avg_price — market 파라미터 없음)
_db.add_portfolio_item(ticker="000660", name="SK하이닉스", quantity=10,
avg_price=180000, broker="kis")
# watchlist 1종목
_db.add_watchlist("005930", "삼성전자")
yield c
c.close()
def test_build_monitor_set_merges_sources(conn):
from app import trade_alerts as ta
ms = ta.build_monitor_set(conn, session="regular",
exit_params={"stop_pct": 0.08}, buy_params={"rsi_oversold": 30})
buy_tickers = {t["ticker"] for t in ms["buy_targets"]}
sell_tickers = {t["ticker"] for t in ms["sell_targets"]}
assert "005930" in buy_tickers # watchlist
assert "000660" in sell_tickers # 보유
assert ms["session"] == "regular"
assert ms["exit_params"]["stop_pct"] == 0.08
sell = next(t for t in ms["sell_targets"] if t["ticker"] == "000660")
assert sell["avg_price"] == 180000 and sell["qty"] == 10
def test_latest_screener_candidates_empty_when_no_run(conn):
from app import trade_alerts as ta
assert ta.latest_screener_candidates(conn) == []
def test_latest_screener_candidates_picks_latest_success_run(conn):
from app import trade_alerts as ta
now = "2026-07-02T09:00:00Z"
conn.execute(
"INSERT INTO screener_runs (asof, mode, status, started_at, weights_json, "
"node_params_json, gate_params_json, top_n) VALUES (?,?,?,?,?,?,?,?)",
(now, "manual", "failed", now, "{}", "{}", "{}", 20),
)
conn.execute(
"INSERT INTO screener_runs (asof, mode, status, started_at, weights_json, "
"node_params_json, gate_params_json, top_n) VALUES (?,?,?,?,?,?,?,?)",
(now, "manual", "success", now, "{}", "{}", "{}", 20),
)
run_id = conn.execute("SELECT id FROM screener_runs WHERE status='success'").fetchone()[0]
conn.execute(
"INSERT INTO screener_results (run_id, rank, ticker, name, total_score, scores_json) "
"VALUES (?,?,?,?,?,?)",
(run_id, 1, "035720", "카카오", 88.5, "{}"),
)
conn.commit()
candidates = ta.latest_screener_candidates(conn)
assert candidates == [{"ticker": "035720", "name": "카카오"}]
def test_holding_high_returns_max_high_within_lookback(conn):
from app import trade_alerts as ta
conn.execute(
"INSERT INTO krx_daily_prices (ticker, date, high) VALUES (?,?,?)",
("000660", "2026-06-01", 200000),
)
conn.execute(
"INSERT INTO krx_daily_prices (ticker, date, high) VALUES (?,?,?)",
("000660", "2026-06-20", 210000),
)
conn.commit()
assert ta.holding_high(conn, "000660", lookback_days=60) == 210000
def test_holding_high_none_when_no_price_history(conn):
from app import trade_alerts as ta
assert ta.holding_high(conn, "999999") is None
def test_build_monitor_set_dedupes_watchlist_and_screener_overlap(conn):
from app import trade_alerts as ta
now = "2026-07-02T09:00:00Z"
cur = conn.execute(
"INSERT INTO screener_runs (asof, mode, status, started_at, weights_json, "
"node_params_json, gate_params_json, top_n) VALUES (?,?,?,?,?,?,?,?)",
(now, "manual", "success", now, "{}", "{}", "{}", 20),
)
run_id = cur.lastrowid
# 스크리너 후보가 watchlist와 중복(005930)
conn.execute(
"INSERT INTO screener_results (run_id, rank, ticker, name, total_score, scores_json) "
"VALUES (?,?,?,?,?,?)",
(run_id, 1, "005930", "삼성전자", 90.0, "{}"),
)
conn.commit()
ms = ta.build_monitor_set(conn, session="regular", exit_params={}, buy_params={})
buy_tickers = [t["ticker"] for t in ms["buy_targets"]]
assert buy_tickers.count("005930") == 1

View File

@@ -0,0 +1,35 @@
import datetime as dt
import pytest
from fastapi.testclient import TestClient
def test_current_session_windows():
from app.trade_alerts import current_session
d = dt.date(2026, 7, 2)
assert current_session(dt.datetime.combine(d, dt.time(8, 40))) == "pre"
assert current_session(dt.datetime.combine(d, dt.time(10, 0))) == "regular"
assert current_session(dt.datetime.combine(d, dt.time(17, 0))) == "after"
assert current_session(dt.datetime.combine(d, dt.time(20, 0))) == "closed"
@pytest.fixture
def client(monkeypatch, tmp_path):
from app import db as _db
monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db"))
_db.init_db()
monkeypatch.setenv("WEBAI_API_KEY", "k")
from app.main import app
return TestClient(app)
def test_monitor_set_requires_auth(client):
assert client.get("/api/webai/trade-alert/monitor-set").status_code == 401
def test_monitor_set_ok(client):
r = client.get("/api/webai/trade-alert/monitor-set", headers={"X-WebAI-Key": "k"})
assert r.status_code == 200
body = r.json()
assert body["session"] in ("pre", "regular", "after", "closed")
assert "buy_targets" in body and "sell_targets" in body
assert body["exit_params"]["trailing_pct"] == 0.10

View File

@@ -0,0 +1,56 @@
import pytest
from unittest.mock import patch
from fastapi.testclient import TestClient
@pytest.fixture
def client(monkeypatch, tmp_path):
from app import db as _db
monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db"))
_db.init_db()
monkeypatch.setenv("WEBAI_API_KEY", "k")
from app.main import app
return TestClient(app)
def _report(client, firing):
return client.post("/api/webai/trade-alert/report",
headers={"X-WebAI-Key": "k"},
json={"as_of": "2026-07-02T09:01:00+09:00", "firing": firing})
def test_report_new_edge_sends_and_persists(client):
firing = [{"ticker": "005930", "name": "삼성전자", "kind": "buy",
"condition": "buy_breakout", "price": 71500, "detail": {"vol": 2.0}}]
with patch("app.trade_alerts.notify_agent_office", return_value=True) as m:
r1 = _report(client, firing)
assert r1.json()["new_alerts"] == 1
assert m.called
# 2번째 동일 firing → 유지, 신규 0
with patch("app.trade_alerts.notify_agent_office", return_value=True):
r2 = _report(client, firing)
assert r2.json()["new_alerts"] == 0
# 이력 1건
assert len(client.get("/api/stock/trade-alerts?days=1").json()["alerts"]) == 1
def test_report_send_failure_does_not_persist(client):
firing = [{"ticker": "005930", "name": "삼성전자", "kind": "buy",
"condition": "buy_breakout", "price": 71500, "detail": {}}]
with patch("app.trade_alerts.notify_agent_office", return_value=False):
r = _report(client, firing)
assert r.json()["new_alerts"] == 0 # 전송 실패 → 미채택
# 다음 사이클(전송 성공) 재시도되어 알림
with patch("app.trade_alerts.notify_agent_office", return_value=True):
r2 = _report(client, firing)
assert r2.json()["new_alerts"] == 1
def test_report_cleared_rearm(client):
firing = [{"ticker": "005930", "name": "삼성", "kind": "buy",
"condition": "buy_breakout", "price": 71500, "detail": {}}]
with patch("app.trade_alerts.notify_agent_office", return_value=True):
_report(client, firing)
_report(client, []) # 해제
r = _report(client, firing) # 재발화
assert r.json()["new_alerts"] == 1

View File

@@ -0,0 +1,22 @@
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def client(monkeypatch, tmp_path):
from app import db as _db
monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db"))
_db.init_db()
from app.main import app
return TestClient(app)
def test_watchlist_crud(client):
assert client.get("/api/stock/watchlist").json()["watchlist"] == []
r = client.post("/api/stock/watchlist", json={"ticker": "005930", "name": "삼성전자"})
assert r.status_code == 201
wl = client.get("/api/stock/watchlist").json()["watchlist"]
assert wl[0]["ticker"] == "005930"
assert client.delete("/api/stock/watchlist/005930").status_code == 200
assert client.delete("/api/stock/watchlist/005930").status_code == 404
def test_trade_alerts_history_empty(client):
assert client.get("/api/stock/trade-alerts?days=7").json()["alerts"] == []