17 Commits

Author SHA1 Message Date
c4b2fffeb4 docs(CLAUDE.md): 실시간 매매 알람 엔드포인트 카탈로그 등재 (stock watchlist/webai + agent-office notify/봇명령) 2026-07-02 20:09:07 +09:00
c6540b2417 feat(agent-office): /watch /unwatch /watchlist 봇 명령 2026-07-02 20:05:59 +09:00
2bce07c367 feat(agent-office): 매매알람 텔레그램 notify(너+아내) 엔드포인트 2026-07-02 20:01:10 +09:00
2906a2ae3e feat(stock): webai report — edge diff→agent-office push→상태/이력(전송성공시만) 2026-07-02 19:56:58 +09:00
134b9e5d07 feat(stock): session 판정 + webai monitor-set 엔드포인트
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01EqCYBhvTcdeCTUDX3RhWx9
2026-07-02 19:51:57 +09:00
bf84328d59 feat(stock): edge diff(신규/해제/재무장) 순수 함수 2026-07-02 19:48:45 +09:00
d8b3267b98 feat(stock): 감시대상(monitor-set) 조립 로직 2026-07-02 15:51:06 +09:00
89c52b1fb6 feat(stock): watchlist CRUD + 알람 이력 API 2026-07-02 15:45:14 +09:00
01a8aee226 fix(stock): 매매알람 이력 days 필터 포맷을 ISO로 통일 (경계일 과다포함 수정, 리뷰 Important) 2026-07-02 15:43:22 +09:00
b2c4ca0e0b feat(stock): 매매알람 DB — watchlist/alert_state/history 테이블+헬퍼
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01EqCYBhvTcdeCTUDX3RhWx9
2026-07-02 15:34:53 +09:00
baa3a3075d docs(stock): 실시간 매매 알람 BE 구현 계획 (9 tasks, TDD)
watchlist/alert_state/history DB → CRUD API → monitor-set 조립 → edge diff →
webai monitor-set/report → agent-office 텔레그램(너+아내) → /watch 봇 명령 → 회귀/배포.
워커(web-ai)·탭(web-ui)은 계약(스펙 §5)만 정의해 각 세션 핸드오프.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01EqCYBhvTcdeCTUDX3RhWx9
2026-07-02 15:25:26 +09:00
4cb9dc6a7c docs(stock): 실시간 매매 알람 설계 스펙 (watchlist∪screener buy + exit+trailing sell, 1분 Windows 워커, NAS edge dedup)
브레인스토밍 확정 요구사항 6종 + 아키텍처 A(신규 Windows docker 워커). TA/조건판정은
Windows, edge 중복판정 상태는 NAS 영속(재시작 스팸 방지). cross-repo 계약(webai
monitor-set/report, agent-office notify, watchlist CRUD, heartbeat) 정의.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01EqCYBhvTcdeCTUDX3RhWx9
2026-07-02 15:19:00 +09:00
36e8d11060 fix(stock): AI 뉴스 리포트 하루 밀림 해소 — asof를 KST로 보정 + LLM에 현재 일자 주입
근본원인: stock 컨테이너는 python:3.12-alpine + tzdata 미설치라 TZ=Asia/Seoul이
무효 → date.today()가 UTC를 반환. AI 뉴스 리포트 cron은 08:00 KST(=전날 23:00 UTC)라
asof가 어제로 계산돼 라벨·기사 윈도우·news_sentiment 저장이 전부 하루 밀렸음
(월요일은 일요일 UTC로 계산돼 skip_weekend까지).

- screener/router.py: _today_kst()(=utcnow+9h, holdings_intel 관용) 추가.
  /snapshot/refresh · /snapshot/refresh-news-sentiment의 asof 기본값을 KST로.
- ai_news/analyzer.py: score_sentiment(asof=...) → 프롬프트 앞에 "오늘 날짜" 명시,
  LLM이 현재 일자 기준으로 뉴스 평가(사용자 요청).
- ai_news/pipeline.py: refresh_daily가 asof를 score_sentiment까지 스레딩.
- 테스트: _today_kst KST 보정 + analyzer asof 주입 2종 TDD Red→Green.
  기존 pipeline 목 시그니처에 asof 반영. stock 전체 149 passed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01EqCYBhvTcdeCTUDX3RhWx9
2026-07-02 14:38:51 +09:00
db6fed72b3 feat(music-lab): 파이프라인 하드 삭제 엔드포인트 DELETE /api/music/pipeline/{id}
cancel(state→cancelled, active/failed 뷰에서만 제거)만으론 status=all 뷰에
행이 남아 옛 dead 파이프라인을 완전히 치울 수 없었음. DELETE로 하드 삭제 추가.

- db.delete_pipeline(pid)→bool: 자식행(pipeline_feedback, pipeline_jobs) 먼저
  삭제 후 video_pipelines 삭제(SQLite FK 미강제라 명시적 cascade). 존재 여부 bool.
- DELETE /api/music/pipeline/{id}: 없으면 404, 있으면 {"ok":true,"deleted":id}.
  상태 가드 없음(관리자 정리 용도, cancel과 동일한 단순 정책).
- 테스트 3종(삭제+404+자식행 cascade) TDD Red→Green. music-lab 152 passed.
- CLAUDE.md 엔드포인트 카탈로그 갱신.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01EqCYBhvTcdeCTUDX3RhWx9
2026-07-02 13:52:11 +09:00
7cce5c422f fix(agent-office): 파이프라인 실패 알림 dedup을 DB 영속화 (재시작 재알림 스팸 해소)
youtube_publisher._notified_failed(인메모리 set)가 컨테이너 재시작 시 소실되어
기존 failed 파이프라인(예: video 인코딩 구버전 실패 #3)을 매 재시작마다 "신규"로
재알림하던 스팸 버그를 notified_failed_pipelines 테이블로 영속화해 해결.

부수 버그 fix: failed 폴링이 예외를 던지면 failed=[]로 오해해 원장을 통째로
비우던 코드 → 예외 시 early-return(원장 보존).

진행 중 *_pending 승인 dedup(_notified_state_per_pipeline)은 의도적으로 인메모리
유지(재시작 시 살아있는 파이프라인 승인 재알림은 유용한 리마인더).

테스트: 재시작 지속성 + 일시적 폴링 예외 재현 테스트 2종 추가(TDD Red→Green).
DB_PATH 첫 import 고정으로 인한 테스트 간 영속 테이블 누수를 monkeypatch로 격리.
agent-office 전체 140개 통과.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01EqCYBhvTcdeCTUDX3RhWx9
2026-07-01 15:20:07 +09:00
94beecbfaf docs(CLAUDE.md): agent-office 카탈로그에 /nodes 엔드포인트 + node_monitor.py 등재
분산 워커 관측 시스템 — GET /api/agent-office/nodes(heartbeat 생사+큐깊이+
dead-letter 집계, web-ui /infra 소비) 엔드포인트 표 추가 + 핵심파일에
node_monitor.py 추가. 상세는 infra_distributed_workers.md 메모리.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
2026-07-01 02:56:37 +09:00
98b17f3a3a fix(redis): bgsave fork 실패로 인한 쓰기 차단 해소 (--save "" + stop-writes off)
근본원인: NAS vm.overcommit_memory=0 + Committed_AS≈CommitLimit(98%)로 redis
bgsave fork()가 거부되어 stop-writes-on-bgsave-error(기본 yes)가 모든 쓰기를
차단(6/29 20:36 이후). AOF가 durability를 담당하므로 실패하는 RDB 스냅샷을
비활성화(--save "")하고 stop-writes-on-bgsave-error no로 안전망 추가.
호스트 vm.overcommit_memory=1(sudo)은 별도 권장.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
2026-06-30 14:21:09 +09:00
31 changed files with 2444 additions and 26 deletions

View File

@@ -246,6 +246,10 @@ docker compose up -d
| GET | `/api/portfolio/snapshot/history` | 스냅샷 이력 (`days`) | | GET | `/api/portfolio/snapshot/history` | 스냅샷 이력 (`days`) |
| GET/POST | `/api/portfolio/sell-history` | 매도 내역 조회/저장 | | GET/POST | `/api/portfolio/sell-history` | 매도 내역 조회/저장 |
| PUT/DELETE | `/api/portfolio/sell-history/{id}` | 매도 기록 수정/삭제 | | PUT/DELETE | `/api/portfolio/sell-history/{id}` | 매도 기록 수정/삭제 |
| GET/POST/DELETE | `/api/stock/watchlist` (+ `/{ticker}`) | 실시간 매수 알람 관심종목 CRUD |
| GET | `/api/stock/trade-alerts` | 매매 알람 이력 (`days`) |
| GET | `/api/webai/trade-alert/monitor-set` | (워커) 감시대상 조립 = watchlistscreener보유 + session/params (X-WebAI-Key) |
| POST | `/api/webai/trade-alert/report` | (워커) 발화집합 수신 → edge diff → 신규만 텔레그램 push (X-WebAI-Key) |
### music-lab (music-lab/) ### music-lab (music-lab/)
듀얼 프로바이더 음악 생성(Suno + MusicGen) + YouTube 영상 자동화 파이프라인 + 시장 트렌드. 듀얼 프로바이더 음악 생성(Suno + MusicGen) + YouTube 영상 자동화 파이프라인 + 시장 트렌드.
@@ -268,6 +272,7 @@ docker compose up -d
| POST/GET | `/api/music/compile` (+ `/compiles/{id}/export`) | 컴파일 | | POST/GET | `/api/music/compile` (+ `/compiles/{id}/export`) | 컴파일 |
| POST/GET/DELETE | `/api/music/video-project` (+ `/{id}/render`, `/export`) | 영상 프로젝트 | | POST/GET/DELETE | `/api/music/video-project` (+ `/{id}/render`, `/export`) | 영상 프로젝트 |
| ALL | `/api/music/pipeline` (생성/start/feedback/cancel/publish/retry/telegram-msg/lookup) | YouTube 자동화 파이프라인. `POST /{id}/retry`=실패 step 재개(publish+업로드완료 시 409) | | ALL | `/api/music/pipeline` (생성/start/feedback/cancel/publish/retry/telegram-msg/lookup) | YouTube 자동화 파이프라인. `POST /{id}/retry`=실패 step 재개(publish+업로드완료 시 409) |
| DELETE | `/api/music/pipeline/{id}` | 파이프라인 행 하드 삭제(자식 jobs/feedback 포함, 전체 목록에서 제거). 없으면 404 |
| GET/PUT | `/api/music/setup` | 파이프라인 설정 | | GET/PUT | `/api/music/setup` | 파이프라인 설정 |
| GET | `/api/music/youtube/auth-url`, `/callback`, `/status`; POST `/disconnect` | YouTube OAuth | | GET | `/api/music/youtube/auth-url`, `/callback`, `/status`; POST `/disconnect` | YouTube OAuth |
| GET/POST/PUT/DELETE | `/api/music/revenue` (+ `/dashboard`) | 수익 기록 | | GET/POST/PUT/DELETE | `/api/music/revenue` (+ `/dashboard`) | 수익 기록 |
@@ -347,7 +352,7 @@ docker compose up -d
### agent-office (agent-office/) ### agent-office (agent-office/)
AI 에이전트 가상 오피스 — 기존 서비스 API를 프록시로 호출, 실시간 WebSocket + 텔레그램 봇. AI 에이전트 가상 오피스 — 기존 서비스 API를 프록시로 호출, 실시간 WebSocket + 텔레그램 봇.
- 핵심 파일: `main.py`, `db.py`, `config.py`, `websocket_manager.py`, `service_proxy.py`, `telegram_bot.py`, `scheduler.py`, `agents/`(stock/music/realestate/youtube/youtube_publisher/lotto/base) - 핵심 파일: `main.py`, `db.py`, `config.py`, `websocket_manager.py`, `service_proxy.py`, `telegram_bot.py`, `scheduler.py`, `node_monitor.py`(분산 워커 관측 집계+경보), `agents/`(stock/music/realestate/youtube/youtube_publisher/lotto/base)
- 에이전트 7종 레지스트리. 명령 API body 필드명 → `reference_agent_office_command_api.md` - 에이전트 7종 레지스트리. 명령 API body 필드명 → `reference_agent_office_command_api.md`
- 📌 상세(DB 9테이블·FSM·전체 cron 목록·AGENT_CONTAINER_MAP·텔레그램 캐싱·env): **`service_agent_office.md`** - 📌 상세(DB 9테이블·FSM·전체 cron 목록·AGENT_CONTAINER_MAP·텔레그램 캐싱·env): **`service_agent_office.md`**
@@ -362,11 +367,13 @@ AI 에이전트 가상 오피스 — 기존 서비스 API를 프록시로 호출
| POST | `/api/agent-office/telegram/webhook` | 텔레그램 Webhook (realestate_bookmark_* 콜백 포함) | | POST | `/api/agent-office/telegram/webhook` | 텔레그램 Webhook (realestate_bookmark_* 콜백 포함) |
| POST | `/api/agent-office/realestate/notify` | realestate-lab 전용 push 수신 → 텔레그램 | | POST | `/api/agent-office/realestate/notify` | realestate-lab 전용 push 수신 → 텔레그램 |
| GET | `/api/agent-office/states` | 전체 에이전트 상태 | | GET | `/api/agent-office/states` | 전체 에이전트 상태 |
| GET | `/api/agent-office/nodes` | 분산 워커(NAS↔Windows) 관측 — heartbeat 생사+큐깊이+dead-letter 집계 (web-ui `/infra` Three.js 시각화 소비). 상세 → `infra_distributed_workers.md` |
| GET | `/api/agent-office/activity` | 전 에이전트 통합 활동 피드 (tasks+logs UNION). 필터 `agent_id`/`type`(task\|log)/`status`/`days` + `limit`/`offset` | | GET | `/api/agent-office/activity` | 전 에이전트 통합 활동 피드 (tasks+logs UNION). 필터 `agent_id`/`type`(task\|log)/`status`/`days` + `limit`/`offset` |
| GET | `/api/agent-office/conversation/stats` | 텔레그램 대화 토큰·캐시 통계 (`days`) | | GET | `/api/agent-office/conversation/stats` | 텔레그램 대화 토큰·캐시 통계 (`days`) |
| POST/GET | `/api/agent-office/youtube/research` (+ `/status`) | YouTube 트렌드 수집 트리거/상태 | | POST/GET | `/api/agent-office/youtube/research` (+ `/status`) | YouTube 트렌드 수집 트리거/상태 |
| GET | `/api/agent-office/lotto/signals`, `/lotto/baselines` | 로또 시그널 이력·baseline | | GET | `/api/agent-office/lotto/signals`, `/lotto/baselines` | 로또 시그널 이력·baseline |
| POST | `/api/agent-office/lotto/signal-check` | 로또 시그널 평가 트리거 (light/sim/deep) | | POST | `/api/agent-office/lotto/signal-check` | 로또 시그널 평가 트리거 (light/sim/deep) |
| POST | `/api/agent-office/stock/trade-alert` | stock에서 push된 매매 알람 → 텔레그램(너+아내). 봇 명령 `/watch`·`/unwatch`·`/watchlist`로 watchlist 관리 |
### tarot-lab (tarot-lab/) ### tarot-lab (tarot-lab/)
타로 카드 해석 (Claude Sonnet, agent-office에서 2026-05-25 독립). 타로 카드 해석 (Claude Sonnet, agent-office에서 2026-05-25 독립).

View File

@@ -4,7 +4,12 @@ import logging
from .base import BaseAgent from .base import BaseAgent
from . import classify_intent from . import classify_intent
from .. import service_proxy from .. import service_proxy
from ..db import add_log from ..db import (
add_log,
get_notified_failed_pipelines,
add_notified_failed_pipeline,
prune_notified_failed_pipelines,
)
from ..telegram.messaging import send_raw from ..telegram.messaging import send_raw
logger = logging.getLogger("agent-office.youtube_publisher") logger = logging.getLogger("agent-office.youtube_publisher")
@@ -25,8 +30,9 @@ class YoutubePublisherAgent(BaseAgent):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
# 진행 중(*_pending) 승인 요청 dedup — 인메모리 유지(의도적).
# 재시작 시 살아있는 파이프라인 승인 재알림은 유용한 리마인더라 스팸 아님.
self._notified_state_per_pipeline: dict[int, tuple] = {} self._notified_state_per_pipeline: dict[int, tuple] = {}
self._notified_failed: set[int] = set()
async def poll_state_changes(self) -> None: async def poll_state_changes(self) -> None:
"""주기적으로 호출되어 *_pending 신규 진입 시 텔레그램 발송.""" """주기적으로 호출되어 *_pending 신규 진입 시 텔레그램 발송."""
@@ -52,18 +58,21 @@ class YoutubePublisherAgent(BaseAgent):
try: try:
failed = await service_proxy.list_failed_pipelines() failed = await service_proxy.list_failed_pipelines()
except Exception as e: except Exception as e:
# 일시적 폴링 실패를 "failed 없음"으로 오해하면 원장을 비워 재알림 스팸이 남.
# → 원장을 건드리지 않고 조용히 종료(다음 폴링에서 재시도).
logger.warning("failed 폴링 실패: %s", e) logger.warning("failed 폴링 실패: %s", e)
failed = [] return
notified = get_notified_failed_pipelines()
for p in failed: for p in failed:
pid = p.get("id") pid = p.get("id")
if pid is None: if pid is None:
continue continue
if pid not in self._notified_failed: if pid not in notified:
await self._notify_failed(p) await self._notify_failed(p)
self._notified_failed.add(pid) add_notified_failed_pipeline(pid)
# 재개되어 failed에서 벗어난 파이프라인은 재알림 가능하도록 해제 # 재개되어 failed에서 벗어난 파이프라인은 재알림 가능하도록 원장에서 제거
failed_ids = {p.get("id") for p in failed} failed_ids = {p.get("id") for p in failed if p.get("id") is not None}
self._notified_failed &= failed_ids prune_notified_failed_pipelines(failed_ids)
async def _notify_failed(self, p: dict) -> None: async def _notify_failed(self, p: dict) -> None:
reason = p.get("failed_reason") or "?" reason = p.get("failed_reason") or "?"

View File

@@ -158,6 +158,12 @@ def init_db() -> None:
CREATE INDEX IF NOT EXISTS idx_tarot_favorite CREATE INDEX IF NOT EXISTS idx_tarot_favorite
ON tarot_readings(favorite, created_at DESC) ON tarot_readings(favorite, created_at DESC)
""") """)
conn.execute("""
CREATE TABLE IF NOT EXISTS notified_failed_pipelines (
pipeline_id INTEGER PRIMARY KEY,
notified_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
# Seed default agent configs # Seed default agent configs
for agent_id, name in [ for agent_id, name in [
("stock", "주식 트레이더"), ("stock", "주식 트레이더"),
@@ -826,6 +832,47 @@ def get_all_baselines() -> List[Dict[str, Any]]:
return out return out
# --- notified_failed_pipelines (파이프라인 실패 알림 dedup 원장, 재시작 지속) ---
def get_notified_failed_pipelines() -> set:
"""이미 실패 알림을 발송한 pipeline_id 집합."""
with _conn() as conn:
rows = conn.execute(
"SELECT pipeline_id FROM notified_failed_pipelines"
).fetchall()
return {r["pipeline_id"] for r in rows}
def add_notified_failed_pipeline(pipeline_id: int) -> None:
with _conn() as conn:
conn.execute(
"INSERT OR IGNORE INTO notified_failed_pipelines(pipeline_id) VALUES(?)",
(pipeline_id,),
)
def prune_notified_failed_pipelines(active_failed_ids) -> None:
"""현재 failed 목록에 없는 pipeline_id를 원장에서 제거.
재개되어 failed에서 벗어난 파이프라인이 다시 실패하면 재알림 가능하도록 함.
(기존 인메모리 `_notified_failed &= failed_ids`의 영속 버전)
"""
keep = set(active_failed_ids)
with _conn() as conn:
existing = {
r["pipeline_id"]
for r in conn.execute(
"SELECT pipeline_id FROM notified_failed_pipelines"
).fetchall()
}
stale = existing - keep
for pid in stale:
conn.execute(
"DELETE FROM notified_failed_pipelines WHERE pipeline_id=?",
(pid,),
)
def get_tasks_by_agent_date_kind(agent_id: str, date_iso: str, task_type: str) -> List[Dict[str, Any]]: def get_tasks_by_agent_date_kind(agent_id: str, date_iso: str, task_type: str) -> List[Dict[str, Any]]:
"""같은 (agent, date, task_type)으로 이미 생성된 task 조회. 멱등 guard.""" """같은 (agent, date, task_type)으로 이미 생성된 task 조회. 멱등 guard."""
with _conn() as conn: with _conn() as conn:

View File

@@ -278,3 +278,19 @@ async def trigger_signal_check(source: str = "light"):
if not agent: if not agent:
raise HTTPException(status_code=503, detail="lotto agent not registered") raise HTTPException(status_code=503, detail="lotto agent not registered")
return await agent.run_signal_check(source=source) return await agent.run_signal_check(source=source)
# --- Trade Alert Notify Endpoint ---
class TradeAlertBody(BaseModel):
alerts: List[Dict[str, Any]] = []
@app.post("/api/agent-office/stock/trade-alert")
async def stock_trade_alert(body: TradeAlertBody):
from .notifiers.telegram_trade import send_trade_alerts
from .db import add_log
res = await send_trade_alerts(body.alerts)
for a in body.alerts:
add_log("stock", f"매매알람 {a.get('kind')} {a.get('ticker')} {a.get('condition')}", "info")
return res

View File

@@ -0,0 +1,45 @@
"""매매 알람 텔레그램 포맷+전송 (본인+아내 각각)."""
import logging
from typing import Any, Dict, List
from ..telegram.messaging import send_raw
from ..config import TELEGRAM_CHAT_ID, TELEGRAM_WIFE_CHAT_ID
logger = logging.getLogger("agent-office")
_KIND_LABEL = {"buy": "🟢 매수", "sell": "🔴 매도"}
_COND_LABEL = {
"buy_ma20_pullback": "지지선 되돌림", "buy_breakout": "돌파", "buy_rsi_bounce": "RSI 과매도 반등",
"sell_stop_loss": "손절", "sell_ma_break": "이평 이탈", "sell_take_profit": "익절",
"sell_climax": "급등 소진", "sell_trailing_stop": "트레일링 스톱",
}
def format_trade_alert(a: Dict[str, Any]) -> str:
kind = _KIND_LABEL.get(a["kind"], a["kind"])
cond = _COND_LABEL.get(a["condition"], a["condition"])
name = a.get("name") or a["ticker"]
price = a.get("price")
price_s = f"{int(price):,}" if price else "-"
return f"{kind} 알람\n<b>{name}</b> ({a['ticker']})\n조건: {cond}\n현재가: {price_s}"
async def send_trade_alerts(alerts: List[Dict[str, Any]]) -> dict:
"""알람마다 본인+아내 chat_id 각각으로 send_raw. 실패해도 계속 진행."""
sent = 0
all_ok = True
chat_ids = [c for c in (TELEGRAM_CHAT_ID, TELEGRAM_WIFE_CHAT_ID) if c]
for a in alerts:
text = format_trade_alert(a)
for cid in chat_ids:
try:
r = await send_raw(text, chat_id=cid)
except Exception as e:
logger.warning(f"[telegram_trade] send failed (chat_id={cid}): {e}")
all_ok = False
continue
if r.get("ok"):
sent += 1
else:
all_ok = False
return {"sent": sent, "ok": all_ok}

View File

@@ -111,6 +111,29 @@ async def stock_holdings_brief() -> Dict[str, Any]:
return resp.json() return resp.json()
# --- stock watchlist (실시간 매매 알람) ---
async def watchlist_add(ticker: str) -> Dict[str, Any]:
"""stock의 관심종목 추가 (POST, 이미 존재하면 멱등하게 갱신)."""
resp = await _client.post(f"{STOCK_URL}/api/stock/watchlist", json={"ticker": ticker})
resp.raise_for_status()
return resp.json()
async def watchlist_remove(ticker: str) -> Dict[str, Any]:
"""stock의 관심종목 삭제."""
resp = await _client.delete(f"{STOCK_URL}/api/stock/watchlist/{ticker}")
resp.raise_for_status()
return resp.json()
async def watchlist_list() -> Dict[str, Any]:
"""stock의 관심종목 목록 조회 → {"watchlist": [...]}."""
resp = await _client.get(f"{STOCK_URL}/api/stock/watchlist")
resp.raise_for_status()
return resp.json()
async def generate_music(payload: dict) -> Dict[str, Any]: async def generate_music(payload: dict) -> Dict[str, Any]:
resp = await _client.post(f"{MUSIC_LAB_URL}/api/music/generate", json=payload) resp = await _client.post(f"{MUSIC_LAB_URL}/api/music/generate", json=payload)
resp.raise_for_status() resp.raise_for_status()

View File

@@ -1,6 +1,7 @@
"""텔레그램 Webhook 이벤트 처리.""" """텔레그램 Webhook 이벤트 처리."""
from typing import Optional from typing import Optional
from .. import service_proxy
from ..db import get_telegram_callback, mark_telegram_responded from ..db import get_telegram_callback, mark_telegram_responded
from .client import _enabled, api_call from .client import _enabled, api_call
@@ -23,12 +24,43 @@ async def handle_webhook(data: dict, agent_dispatcher=None) -> Optional[dict]:
if message: if message:
chat = message.get("chat", {}) chat = message.get("chat", {})
print(f"[TG-WEBHOOK] chat.id={chat.get('id')} type={chat.get('type')} text={message.get('text')!r}", flush=True) print(f"[TG-WEBHOOK] chat.id={chat.get('id')} type={chat.get('type')} text={message.get('text')!r}", flush=True)
if message and message.get("text"):
if await handle_watch_command(message):
return None
if message and message.get("text") and agent_dispatcher is not None: if message and message.get("text") and agent_dispatcher is not None:
return await _handle_message(message, agent_dispatcher) return await _handle_message(message, agent_dispatcher)
return None return None
async def handle_watch_command(message: dict) -> bool:
"""/watch /unwatch /watchlist 명령을 처리해 stock watchlist API로 프록시.
처리했으면(응답 전송 포함) True, 매칭되지 않는 텍스트면 False."""
text = (message.get("text") or "").strip()
chat_id = message.get("chat", {}).get("id")
parts = text.split()
cmd = parts[0].lower() if parts else ""
if cmd == "/watch" and len(parts) >= 2:
await service_proxy.watchlist_add(parts[1])
reply = f"관심종목 추가: {parts[1]}"
elif cmd == "/unwatch" and len(parts) >= 2:
await service_proxy.watchlist_remove(parts[1])
reply = f"관심종목 삭제: {parts[1]}"
elif cmd == "/watchlist":
res = await service_proxy.watchlist_list()
items = res.get("watchlist", [])
reply = "관심종목:\n" + (
"\n".join(f"- {w.get('name') or ''} ({w['ticker']})" for w in items) or "(없음)"
)
else:
return False
await api_call("sendMessage", {"chat_id": chat_id, "text": reply})
return True
async def _handle_callback(callback_query: dict) -> Optional[dict]: async def _handle_callback(callback_query: dict) -> Optional[dict]:
"""승인/거절 및 realestate 북마크 콜백 처리.""" """승인/거절 및 realestate 북마크 콜백 처리."""
callback_id = callback_query.get("data", "") callback_id = callback_query.get("data", "")

View File

@@ -0,0 +1,55 @@
import os
import sys
import tempfile
_fd, _TMP = tempfile.mkstemp(suffix=".db")
os.close(_fd)
os.unlink(_TMP)
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import pytest
from unittest.mock import AsyncMock, patch
@pytest.fixture(autouse=True)
def _init_db(monkeypatch):
import gc
gc.collect()
# config.DB_PATH는 첫 import 시 1회 고정되므로, 다른 테스트 파일과 조합 실행 시
# db가 이 파일의 _TMP가 아닌 다른 경로를 쓸 수 있다. db.DB_PATH를 이 파일 전용으로
# 강제해 영속 테이블의 테스트 간 누수를 결정적으로 차단.
import app.db as _db
monkeypatch.setattr(_db, "DB_PATH", _TMP)
# WAL 사이드카(-wal/-shm)까지 지워야 영속 상태가 남지 않음
for suffix in ("", "-wal", "-shm"):
p = _TMP + suffix
if os.path.exists(p):
os.remove(p)
_db.init_db()
yield
gc.collect()
@pytest.mark.asyncio
async def test_send_trade_alerts_to_user_and_wife():
from app.notifiers import telegram_trade
alerts = [{"ticker": "005930", "name": "삼성전자", "kind": "buy",
"condition": "buy_breakout", "price": 71500, "detail": {}}]
with patch("app.notifiers.telegram_trade.send_raw",
new=AsyncMock(return_value={"ok": True})) as m, \
patch("app.notifiers.telegram_trade.TELEGRAM_CHAT_ID", "U"), \
patch("app.notifiers.telegram_trade.TELEGRAM_WIFE_CHAT_ID", "W"):
res = await telegram_trade.send_trade_alerts(alerts)
assert res["ok"] is True
chat_ids = {c.kwargs.get("chat_id") for c in m.await_args_list}
assert chat_ids == {"U", "W"} # 둘 다 발송
@pytest.mark.asyncio
async def test_format_trade_alert_has_direction():
from app.notifiers.telegram_trade import format_trade_alert
txt = format_trade_alert({"ticker": "005930", "name": "삼성전자", "kind": "sell",
"condition": "sell_stop_loss", "price": 60000, "detail": {}})
assert "매도" in txt and "삼성전자" in txt

View File

@@ -0,0 +1,93 @@
import os
import sys
import tempfile
_fd, _TMP = tempfile.mkstemp(suffix=".db")
os.close(_fd)
os.unlink(_TMP)
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import pytest
from unittest.mock import AsyncMock, patch
@pytest.fixture(autouse=True)
def _init_db(monkeypatch):
import gc
gc.collect()
# config.DB_PATH는 첫 import 시 1회 고정되므로, 다른 테스트 파일과 조합 실행 시
# db가 이 파일의 _TMP가 아닌 다른 경로를 쓸 수 있다. db.DB_PATH를 이 파일 전용으로
# 강제해 영속 테이블의 테스트 간 누수를 결정적으로 차단.
import app.db as _db
monkeypatch.setattr(_db, "DB_PATH", _TMP)
for suffix in ("", "-wal", "-shm"):
p = _TMP + suffix
if os.path.exists(p):
os.remove(p)
_db.init_db()
yield
gc.collect()
@pytest.mark.asyncio
async def test_watch_command_calls_add():
from app.telegram import webhook
msg = {"chat": {"id": 1}, "text": "/watch 005930"}
with patch("app.telegram.webhook.service_proxy.watchlist_add",
new=AsyncMock(return_value={"ok": True})) as m, \
patch("app.telegram.webhook.api_call", new=AsyncMock(return_value={"ok": True})):
handled = await webhook.handle_watch_command(msg)
assert handled is True
m.assert_awaited_once_with("005930")
@pytest.mark.asyncio
async def test_non_watch_text_ignored():
from app.telegram import webhook
msg = {"chat": {"id": 1}, "text": "안녕"}
assert await webhook.handle_watch_command(msg) is False
@pytest.mark.asyncio
async def test_unwatch_command_calls_remove():
from app.telegram import webhook
msg = {"chat": {"id": 1}, "text": "/unwatch 005930"}
with patch("app.telegram.webhook.service_proxy.watchlist_remove",
new=AsyncMock(return_value={"ok": True})) as m, \
patch("app.telegram.webhook.api_call", new=AsyncMock(return_value={"ok": True})) as sent:
handled = await webhook.handle_watch_command(msg)
assert handled is True
m.assert_awaited_once_with("005930")
sent.assert_awaited_once()
@pytest.mark.asyncio
async def test_watchlist_command_calls_list_and_formats_items():
from app.telegram import webhook
msg = {"chat": {"id": 1}, "text": "/watchlist"}
items = {"watchlist": [{"ticker": "005930", "name": "삼성전자"}]}
with patch("app.telegram.webhook.service_proxy.watchlist_list",
new=AsyncMock(return_value=items)) as m, \
patch("app.telegram.webhook.api_call", new=AsyncMock(return_value={"ok": True})) as sent:
handled = await webhook.handle_watch_command(msg)
assert handled is True
m.assert_awaited_once_with()
text = sent.await_args.args[1]["text"]
assert "005930" in text and "삼성전자" in text
@pytest.mark.asyncio
async def test_watch_command_reaches_handle_webhook_before_slash_dispatch():
"""handle_webhook이 /watch 를 agent_dispatcher 호출 전에 가로채야 한다."""
from app.telegram import webhook
data = {"message": {"chat": {"id": 1}, "text": "/watch 005930"}}
dispatcher = AsyncMock(side_effect=AssertionError("agent_dispatcher가 호출되면 안 됨"))
with patch("app.telegram.webhook.service_proxy.watchlist_add",
new=AsyncMock(return_value={"ok": True})) as m, \
patch("app.telegram.webhook.api_call", new=AsyncMock(return_value={"ok": True})):
result = await webhook.handle_webhook(data, agent_dispatcher=dispatcher)
assert result is None
m.assert_awaited_once_with("005930")
dispatcher.assert_not_awaited()

View File

@@ -14,13 +14,20 @@ from unittest.mock import AsyncMock, patch
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def _init_db(): def _init_db(monkeypatch):
import gc import gc
gc.collect() gc.collect()
if os.path.exists(_TMP): # config.DB_PATH는 첫 import 시 1회 고정되므로, 다른 테스트 파일과 조합 실행 시
os.remove(_TMP) # db가 이 파일의 _TMP가 아닌 다른 경로를 쓸 수 있다. db.DB_PATH를 이 파일 전용으로
from app.db import init_db # 강제해 영속 테이블(notified_failed_pipelines 등)의 테스트 간 누수를 결정적으로 차단.
init_db() import app.db as _db
monkeypatch.setattr(_db, "DB_PATH", _TMP)
# WAL 사이드카(-wal/-shm)까지 지워야 영속 상태가 남지 않음
for suffix in ("", "-wal", "-shm"):
p = _TMP + suffix
if os.path.exists(p):
os.remove(p)
_db.init_db()
yield yield
gc.collect() gc.collect()
@@ -211,3 +218,70 @@ async def test_failed_poll_exception_is_silent():
# active 알림은 정상 발송 # active 알림은 정상 발송
assert sent.await_count == 1 assert sent.await_count == 1
@pytest.mark.asyncio
async def test_failed_notification_persists_across_restart():
"""컨테이너 재시작(새 에이전트 인스턴스)해도 이미 알린 failed는 재알림하지 않음."""
from app.agents.youtube_publisher import YoutubePublisherAgent
failed_pipeline = {
"id": 3,
"state": "failed",
"failed_reason": "video: timeout",
"track_title": "beat music v2",
}
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
with patch(
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
new=AsyncMock(return_value=[]),
), patch(
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
new=AsyncMock(return_value=[failed_pipeline]),
), patch(
"app.agents.youtube_publisher.send_raw",
new=sent,
):
agent1 = YoutubePublisherAgent()
await agent1.poll_state_changes()
# 컨테이너 재시작 시뮬레이션: 완전히 새로운 인스턴스(인메모리 상태 소실)
agent2 = YoutubePublisherAgent()
await agent2.poll_state_changes()
# 재시작해도 DB 원장으로 중복 방지 → 1회만 알림
assert sent.await_count == 1
@pytest.mark.asyncio
async def test_transient_failed_poll_keeps_ledger():
"""failed 폴링이 일시적으로 예외를 던져도 원장을 비우지 않아 다음 폴링에서 재알림하지 않음."""
from app.agents.youtube_publisher import YoutubePublisherAgent
failed_pipeline = {
"id": 3,
"state": "failed",
"failed_reason": "video: timeout",
"track_title": "beat music v2",
}
list_failed = AsyncMock(
side_effect=[[failed_pipeline], Exception("boom"), [failed_pipeline]]
)
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
with patch(
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
new=AsyncMock(return_value=[]),
), patch(
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
new=list_failed,
), patch(
"app.agents.youtube_publisher.send_raw",
new=sent,
):
agent = YoutubePublisherAgent()
await agent.poll_state_changes() # #3 최초 알림
await agent.poll_state_changes() # 예외 → 원장 유지되어야 (섣부른 정리 금지)
await agent.poll_state_changes() # #3 여전히 failed → 재알림 없어야
assert sent.await_count == 1

View File

@@ -464,7 +464,7 @@ services:
- "6379:6379" - "6379:6379"
volumes: volumes:
- ${RUNTIME_PATH}/redis-data:/data - ${RUNTIME_PATH}/redis-data:/data
command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru command: redis-server --appendonly yes --save "" --stop-writes-on-bgsave-error no --maxmemory 256mb --maxmemory-policy allkeys-lru
healthcheck: healthcheck:
test: ["CMD", "redis-cli", "ping"] test: ["CMD", "redis-cli", "ping"]
interval: 60s interval: 60s

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,220 @@
# 실시간 매매 알람 (Real-time Trade Alerts) — 설계 스펙
- 작성일: 2026-07-02
- 상태: 설계 승인됨 (사용자 리뷰 대기)
- 관련 세션: BE(web-backend, 본 스펙 주도) · AI(web-ai 워커) · FE(web-ui 탭)
## 1. 목표
장이 열려 있는 동안(**시간외 포함**) 실시간으로 주가 기준치를 분석해, 조건 충족 시 **매수/매도 알람**을 텔레그램으로 **사용자 + 아내** 둘 다에게 전송한다. 기술적 분석(TA) 계산은 **Windows PC의 docker 워커**에서 수행한다.
기존에는 이 판단들이 EOD(하루 1회)로만 돌았다:
- 매수 후보 = 스크리너(평일 16:30) · 매도/보유 advisory = holdings_intel(08:30/16:50).
이번 작업의 핵심 = **동일 판단을 장중(+시간외) 1분 주기 실시간으로 전환 + 조건 충족 즉시 알람**.
## 2. 확정된 요구사항 (사용자 결정)
| 항목 | 결정 |
|------|------|
| 매수 유니버스 | **watchlist(사용자 관리) 당일 스크리너 후보** |
| 매수 트리거 | **TA 자동 시그널**(수동 목표가 없음) |
| 매도 트리거 | **기존 exit 룰 + 트레일링 스톱** |
| 감시 주기/세션 | **1분 폴링** · 장전 시간외 08:3009:00 · 정규장 09:0015:30 · 시간외 단일가 16:0018:00 |
| 중복 방지 | **상태 전이(edge-triggered)** — 거짓→참 전이 시만 알림, 참 유지 중 무알림, 재무장 |
| watchlist 관리 | **텔레그램 봇 명령 + web-ui 탭 둘 다** |
| 수신자 | **사용자 + 아내 둘 다**(매수·매도 모두) |
| TA 연산 위치 | **Windows WSL2 docker 신규 워커** |
| 트레일링 스톱 기본값 | 보유기간 고점 대비 **10%**(파라미터화) |
| 매수 신호 | 지지선 되돌림(MA20/50) · 돌파(전고점/52주) · RSI 과매도 반등 |
## 3. 아키텍처
```
[Windows WSL2 docker] trade-monitor 워커 (web-ai · AI세션)
1분 루프 (KST 세션 게이팅)
① GET NAS /api/webai/trade-alert/monitor-set (X-WebAI-Key)
② KIS 실시간/시간외 시세 + 분봉/일봉 → TA 계산
③ 조건 평가 → 현재 발화집합 F = {(ticker, kind, condition)}
④ POST NAS /api/webai/trade-alert/report {firing: F} (X-WebAI-Key)
⑤ heartbeat: worker:trade-monitor:heartbeat (EX45, 관측 편입)
[NAS] stock (:18500 · web-backend · BE)
• watchlist·alert_state(edge dedup, 영속)·alert_history·holding high-water
• monitor-set 조립(watchlist screener 후보 보유) + 세션/휴장 게이팅
• report 수신 → edge diff(F vs 직전 발화) → 신규 edge를 agent-office로 push
│ (텔레그램 전송 성공 시에만 alert_state 갱신)
[NAS] agent-office (:18900 · web-backend · BE)
• POST /api/agent-office/stock/trade-alert → 텔레그램(너+아내)
• 봇 명령 /watch /unwatch /watchlist → stock watchlist CRUD
• 알람 activity feed 편입
[web-ui] 관심종목 탭 (FE세션) — watchlist CRUD + 알람 이력 뷰
```
**설계 원칙**
- TA/조건판정 = Windows(요구사항). **edge 중복판정 상태 = NAS 영속** → 워커 재시작해도 재알림 스팸 없음(youtube_publisher 교훈 재적용).
- 워커는 dedup 상태를 **안 가진다**. 매 사이클 "현재 발화집합 전체"만 보고 → NAS가 diff(단일 진실원천).
- 워커의 대외 채널은 **NAS stock 한 곳**(기존 ai_trade↔stock의 `X-WebAI-Key` 재사용). 텔레그램 발송은 stock→agent-office push(기존 realestate→agent-office/notify 패턴).
## 4. DB 스키마 (stock.db)
```sql
-- 매수 감시 관심종목 (사용자 관리)
CREATE TABLE IF NOT EXISTS watchlist (
ticker TEXT PRIMARY KEY,
name TEXT,
note TEXT,
params_json TEXT NOT NULL DEFAULT '{}', -- 종목별 조건 오버라이드(선택)
added_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
);
-- edge 중복판정 상태 (영속 — 재시작 스팸 방지의 핵심)
CREATE TABLE IF NOT EXISTS trade_alert_state (
ticker TEXT NOT NULL,
kind TEXT NOT NULL, -- 'buy' | 'sell'
condition TEXT NOT NULL, -- ex) buy_ma20_pullback, sell_trailing_stop
currently_firing INTEGER NOT NULL DEFAULT 0,
first_fired_at TEXT,
last_fired_at TEXT,
last_seen_at TEXT,
PRIMARY KEY (ticker, kind, condition)
);
-- 알람 이력
CREATE TABLE IF NOT EXISTS trade_alert_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL,
name TEXT,
kind TEXT NOT NULL,
condition TEXT NOT NULL,
price REAL,
detail_json TEXT,
fired_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
);
CREATE INDEX IF NOT EXISTS idx_tah_fired ON trade_alert_history(fired_at DESC);
```
보유기간 고점(트레일링 스톱용) high-water는 `krx_daily_prices`(기존)에서 lookback max로 계산하거나 별도 컬럼으로 관리 — 구현 계획에서 확정(v1: 포지션 최초 관측 이후 일봉 고가 max, 없으면 최근 N일).
## 5. 계약 (Contracts) — cross-repo 잠금 대상
### 5.1 NAS stock ↔ Windows 워커 (X-WebAI-Key)
`GET /api/webai/trade-alert/monitor-set`
```json
{
"session": "pre | regular | after | closed",
"as_of": "2026-07-02T09:01:00+09:00",
"buy_targets": [{"ticker":"005930","name":"삼성전자","source":"watch|screener","params":{}}],
"sell_targets": [{"ticker":"000660","name":"SK하이닉스","avg_price":180000,"qty":10,
"holding_high":210000,"params":{}}],
"buy_params": {"rsi_oversold":30,"breakout_vol_mult":1.5,"pullback_pct":0.02},
"exit_params": {"stop_pct":0.08,"take_pct":0.25,"trailing_pct":0.10}
}
```
- `session=closed`면 워커는 KIS 호출 없이 sleep.
`POST /api/webai/trade-alert/report`
```json
{ "as_of":"2026-07-02T09:01:00+09:00",
"firing":[ {"ticker":"005930","kind":"buy","condition":"buy_ma20_pullback",
"price":71500,"detail":{"ma20":71200,"rsi":34}} ] }
```
응답: `{ "new_alerts": <int>, "cleared": <int> }`
- NAS가 `firing` vs `trade_alert_state[firing=1]` diff → 신규 edge만 텔레그램.
### 5.2 stock → agent-office (내부)
`POST /api/agent-office/stock/trade-alert`
```json
{ "alerts":[ {"ticker":"005930","name":"삼성전자","kind":"buy",
"condition":"buy_ma20_pullback","price":71500,
"detail":{...},"fired_at":"..."} ] }
```
→ agent-office가 너+아내에게 텔레그램. (realestate/notify 패턴)
### 5.3 stock watchlist CRUD (web-ui + agent-office 봇)
- `GET /api/stock/watchlist`
- `POST /api/stock/watchlist` `{ticker, note?}`
- `DELETE /api/stock/watchlist/{ticker}`
- `GET /api/stock/trade-alerts?days=N` (이력, web-ui용)
### 5.4 워커 heartbeat (관측 편입)
`worker:trade-monitor:heartbeat` EX45, 값 JSON `{name:"trade-monitor",kind:"trader",state:"market_open|market_closed|idle",ts,last_alert_at,...}`. `/api/agent-office/nodes` workers[]에 추가.
## 6. 알람 조건 (Windows 워커가 계산)
**매수** (buy_targets):
- `buy_ma20_pullback` — MA20>MA50>MA200 정렬 + 저가가 MA20/50에 `pullback_pct` 이내 접근 후 종가 반등
- `buy_breakout` — 종가 > (전 N일 고점 또는 52주 신고가) + 거래량 > `breakout_vol_mult`×20일평균
- `buy_rsi_bounce` — RSI(14)가 `rsi_oversold` 아래로 내려갔다가 **봉 시리즈 내에서** 다시 상향 돌파(최근 봉에서 30 상향 크로스). 워커는 무상태 — 매 사이클 봉 데이터로 크로스를 계산(cross-cycle 메모리 불필요)
**매도** (sell_targets):
- `sell_stop_loss` — (priceavg)/avg ≤ `stop_pct`
- `sell_ma_break` — 종가 < MA50 (심각: < MA200)
- `sell_take_profit` — (priceavg)/avg ≥ `take_pct`
- `sell_climax` — 급등 소진(holdings_intel climax 로직 이식)
- `sell_trailing_stop` — price ≤ holding_high × (1 `trailing_pct`)
## 7. 데이터 흐름 — edge dedup (NAS)
```
매 1분 report 수신 시:
F = report.firing 집합
prev = SELECT (ticker,kind,condition) FROM trade_alert_state WHERE currently_firing=1
new_edge = F prev
cleared = prev F
for e in new_edge:
ok = agent_office.send_trade_alert(e) # 텔레그램
if ok:
INSERT trade_alert_history(e)
UPSERT trade_alert_state(e, firing=1, fired/last=now)
# 실패 시 상태 미갱신 → 다음 사이클 재시도
for c in cleared:
UPDATE trade_alert_state SET firing=0 WHERE key=c # 재무장
UPDATE last_seen_at for all F
```
- 영속 `trade_alert_state` → 워커·NAS 재시작에도 재알림 스팸 없음.
- 텔레그램 실패 시 firing 미표시 → 재시도 보장(node_monitor "성공 시만 갱신" 관용).
## 8. 세션/휴장 게이팅
NAS `monitor-set.session` 필드가 KST 시각 + `holidays.json`(`is_market_open`)으로 판정:
- pre 08:3009:00 / regular 09:0015:30 / after 16:0018:00 → 그 외/휴장 = closed.
- 워커는 `closed`면 sleep. (불필요 KIS 호출·알람 차단)
## 9. 에러 처리
- 워커: KIS 실패 → 해당 사이클 skip + 다음 분 재시도, 종목별 실패 격리. heartbeat로 생사 노출.
- NAS: 워커 인증 `X-WebAI-Key`. 텔레그램 실패 → 상태 미갱신. `report`는 멱등(같은 F 재전송 무해).
- 워커 다운 시 알람 정지 → node_monitor 경보(기존 관측)로 감지.
## 10. 테스트 전략 (BE, TDD)
- watchlist CRUD (추가/중복/삭제/조회)
- monitor-set 조립 (watchlist screener 보유, 세션 게이팅, 휴장)
- **edge diff 로직**: 신규 edge만 알림 / 참 유지 무알림 / 해제 후 재발화 재알림 / 재시작 지속성(영속 상태)
- 텔레그램 전송 실패 시 상태 미갱신(재시도)
- alert_history 기록 / trade-alerts 조회
- agent-office: /watch·/unwatch·/watchlist 봇 명령 → stock CRUD, trade-alert notify → 텔레그램 포맷(너+아내)
- webai 계약 엔드포인트(monitor-set/report) 스키마·인증
## 11. 작업 분담
| repo | 세션 | 산출물 | 상태 |
|------|------|--------|------|
| **web-backend** (stock + agent-office) | **BE(본 세션)** | DB·watchlist·edge·webai 계약·텔레그램·봇 | 이번에 구현 |
| **web-ai** (`services/trade-monitor/` WSL2 docker) | AI세션 | 1분 루프·KIS·TA·조건평가·report·heartbeat | 계약 넘김 |
| **web-ui** (관심종목 탭) | FE세션 | watchlist CRUD·조건·이력 뷰 | 계약 넘김 |
- 계약(§5)은 co-gahusb로 잠근 뒤 3세션 병렬.
- 워커 재빌드는 로컬 docker(사용자): `wsl -d Ubuntu-24.04 -- docker compose up -d --build trade-monitor`.
## 12. 범위 밖 (YAGNI / 후속)
- 실주문 자동 집행(알람 전용, KIS 주문 X).
- KIS 웹소켓 실시간 틱(1분 폴링으로 충분).
- 종목별 수동 목표가(이번은 TA 자동만).
- 백테스트/성과 추적(후속 슬라이스).

View File

@@ -1100,6 +1100,19 @@ def get_pipeline(pid: int) -> Optional[Dict[str, Any]]:
return _parse_pipeline_row(row) return _parse_pipeline_row(row)
def delete_pipeline(pid: int) -> bool:
"""파이프라인과 자식행(pipeline_feedback, pipeline_jobs)을 하드 삭제.
SQLite FK를 강제하지 않으므로 자식행을 명시적으로 먼저 삭제한다.
파이프라인이 존재했으면 True, 없었으면 False.
"""
with _conn() as conn:
conn.execute("DELETE FROM pipeline_feedback WHERE pipeline_id = ?", (pid,))
conn.execute("DELETE FROM pipeline_jobs WHERE pipeline_id = ?", (pid,))
cur = conn.execute("DELETE FROM video_pipelines WHERE id = ?", (pid,))
return cur.rowcount > 0
def update_pipeline_state(pid: int, state: str, **fields) -> None: def update_pipeline_state(pid: int, state: str, **fields) -> None:
"""파이프라인 state를 갱신하고 옵션 컬럼을 함께 업데이트한다. """파이프라인 state를 갱신하고 옵션 컬럼을 함께 업데이트한다.

View File

@@ -1133,6 +1133,14 @@ def cancel_pipeline(pid: int):
return {"ok": True} return {"ok": True}
@app.delete("/api/music/pipeline/{pid}")
def delete_pipeline_endpoint(pid: int):
"""파이프라인 행을 하드 삭제(전체 목록에서 완전 제거). 없으면 404."""
if not _db_module.delete_pipeline(pid):
raise HTTPException(404)
return {"ok": True, "deleted": pid}
@app.post("/api/music/pipeline/{pid}/retry", status_code=202) @app.post("/api/music/pipeline/{pid}/retry", status_code=202)
async def retry_pipeline(pid: int, bg: BackgroundTasks): async def retry_pipeline(pid: int, bg: BackgroundTasks):
from .pipeline.state_machine import STEPS from .pipeline.state_machine import STEPS

View File

@@ -105,6 +105,29 @@ def test_cancel_pipeline(client):
assert db.get_pipeline(pid)["state"] == "cancelled" assert db.get_pipeline(pid)["state"] == "cancelled"
def test_delete_pipeline_removes_from_db(client):
pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
r = client.request("DELETE", f"/api/music/pipeline/{pid}")
assert r.status_code == 200
assert r.json()["ok"] is True
assert db.get_pipeline(pid) is None
all_ids = [p["id"] for p in client.get("/api/music/pipeline?status=all").json()["pipelines"]]
assert pid not in all_ids
def test_delete_pipeline_not_found_returns_404(client):
r = client.request("DELETE", "/api/music/pipeline/99999")
assert r.status_code == 404
def test_delete_pipeline_removes_child_jobs(client):
pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
db.create_pipeline_job(pid, "cover")
assert len(db.list_pipeline_jobs(pid)) == 1
client.request("DELETE", f"/api/music/pipeline/{pid}")
assert db.list_pipeline_jobs(pid) == []
def test_setup_get_returns_defaults(client): def test_setup_get_returns_defaults(client):
r = client.get("/api/music/setup") r = client.get("/api/music/setup")
assert r.status_code == 200 assert r.status_code == 200

View File

@@ -2,6 +2,7 @@ import sqlite3
import os import os
import hashlib import hashlib
import json import json
import datetime as dt
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
from app.screener.schema import ensure_screener_schema from app.screener.schema import ensure_screener_schema
@@ -125,6 +126,42 @@ def init_db():
conn.execute("CREATE INDEX IF NOT EXISTS idx_holdings_sig_ticker " conn.execute("CREATE INDEX IF NOT EXISTS idx_holdings_sig_ticker "
"ON holdings_signals(ticker, date DESC);") "ON holdings_signals(ticker, date DESC);")
# 실시간 매매 알람: watchlist / alert_state / alert_history
conn.execute("""
CREATE TABLE IF NOT EXISTS watchlist (
ticker TEXT PRIMARY KEY,
name TEXT,
note TEXT,
params_json TEXT NOT NULL DEFAULT '{}',
added_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS trade_alert_state (
ticker TEXT NOT NULL,
kind TEXT NOT NULL,
condition TEXT NOT NULL,
currently_firing INTEGER NOT NULL DEFAULT 0,
first_fired_at TEXT,
last_fired_at TEXT,
last_seen_at TEXT,
PRIMARY KEY (ticker, kind, condition)
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS trade_alert_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL,
name TEXT,
kind TEXT NOT NULL,
condition TEXT NOT NULL,
price REAL,
detail_json TEXT,
fired_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_tah_fired ON trade_alert_history(fired_at DESC)")
# Screener 스키마 부트스트랩 (7테이블 + 디폴트 설정 시드) # Screener 스키마 부트스트랩 (7테이블 + 디폴트 설정 시드)
ensure_screener_schema(conn) ensure_screener_schema(conn)
@@ -379,3 +416,105 @@ def get_holdings_signal_history(ticker: str, limit: int = 30) -> list:
"SELECT * FROM holdings_signals WHERE ticker=? ORDER BY date DESC LIMIT ?", "SELECT * FROM holdings_signals WHERE ticker=? ORDER BY date DESC LIMIT ?",
(ticker, limit)).fetchall() (ticker, limit)).fetchall()
return [_row_to_signal(r) for r in rows] return [_row_to_signal(r) for r in rows]
# --- 실시간 매매 알람: 공통 유틸 ---
def _now_iso() -> str:
return dt.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%fZ")
# --- Watchlist CRUD ---
def add_watchlist(ticker: str, name: str = None, note: str = None) -> None:
with _conn() as conn:
conn.execute(
"INSERT OR IGNORE INTO watchlist(ticker,name,note) VALUES(?,?,?)",
(ticker, name, note),
)
# 이름/노트 갱신(이미 있으면)
conn.execute(
"UPDATE watchlist SET name=COALESCE(?,name), note=COALESCE(?,note) WHERE ticker=?",
(name, note, ticker),
)
def remove_watchlist(ticker: str) -> bool:
with _conn() as conn:
cur = conn.execute("DELETE FROM watchlist WHERE ticker=?", (ticker,))
return cur.rowcount > 0
def get_watchlist() -> list:
with _conn() as conn:
rows = conn.execute("SELECT * FROM watchlist ORDER BY added_at").fetchall()
return [
{"ticker": r["ticker"], "name": r["name"], "note": r["note"],
"params": json.loads(r["params_json"] or "{}"), "added_at": r["added_at"]}
for r in rows
]
# --- Trade Alert State ---
def get_alert_state_firing() -> set:
with _conn() as conn:
rows = conn.execute(
"SELECT ticker,kind,condition FROM trade_alert_state WHERE currently_firing=1"
).fetchall()
return {(r["ticker"], r["kind"], r["condition"]) for r in rows}
def set_alert_firing(ticker: str, kind: str, condition: str, firing: bool, at_iso: str = None) -> None:
now = at_iso or _now_iso()
with _conn() as conn:
if firing:
conn.execute(
"""INSERT INTO trade_alert_state(ticker,kind,condition,currently_firing,first_fired_at,last_fired_at,last_seen_at)
VALUES(?,?,?,1,?,?,?)
ON CONFLICT(ticker,kind,condition) DO UPDATE SET
currently_firing=1,
first_fired_at=COALESCE(first_fired_at,excluded.first_fired_at),
last_fired_at=excluded.last_fired_at,
last_seen_at=excluded.last_seen_at""",
(ticker, kind, condition, now, now, now),
)
else:
conn.execute(
"UPDATE trade_alert_state SET currently_firing=0, last_seen_at=? WHERE ticker=? AND kind=? AND condition=?",
(now, ticker, kind, condition),
)
def touch_alert_seen(keys: list, at_iso: str) -> None:
with _conn() as conn:
for (ticker, kind, condition) in keys:
conn.execute(
"UPDATE trade_alert_state SET last_seen_at=? WHERE ticker=? AND kind=? AND condition=?",
(at_iso, ticker, kind, condition),
)
# --- Trade Alert History ---
def add_alert_history(ticker: str, name: str, kind: str, condition: str, price, detail: dict) -> int:
with _conn() as conn:
cur = conn.execute(
"INSERT INTO trade_alert_history(ticker,name,kind,condition,price,detail_json) VALUES(?,?,?,?,?,?)",
(ticker, name, kind, condition, price, json.dumps(detail or {}, ensure_ascii=False)),
)
return cur.lastrowid
def get_alert_history(days: int = 7) -> list:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM trade_alert_history WHERE fired_at >= strftime('%Y-%m-%dT%H:%M:%fZ','now', ?) ORDER BY fired_at DESC",
(f"-{int(days)} days",),
).fetchall()
return [
{"id": r["id"], "ticker": r["ticker"], "name": r["name"], "kind": r["kind"],
"condition": r["condition"], "price": r["price"],
"detail": json.loads(r["detail_json"] or "{}"), "fired_at": r["fired_at"]}
for r in rows
]

View File

@@ -21,6 +21,8 @@ from .db import (
upsert_broker_cash, get_all_broker_cash, delete_broker_cash, upsert_broker_cash, get_all_broker_cash, delete_broker_cash,
upsert_asset_snapshot, get_asset_snapshots, upsert_asset_snapshot, get_asset_snapshots,
add_sell_history, get_sell_history, update_sell_history, delete_sell_history, add_sell_history, get_sell_history, update_sell_history, delete_sell_history,
add_watchlist, remove_watchlist, get_watchlist, get_alert_history,
get_alert_state_firing, set_alert_firing, touch_alert_seen, add_alert_history,
) )
from .scraper import fetch_market_news, fetch_major_indices from .scraper import fetch_market_news, fetch_major_indices
from .price_fetcher import get_current_prices, get_current_prices_detail from .price_fetcher import get_current_prices, get_current_prices_detail
@@ -28,6 +30,10 @@ from .ai_summarizer import summarize_news, OllamaError
from .auth import verify_webai_key from .auth import verify_webai_key
from . import webai_cache from . import webai_cache
from . import holdings_intel from . import holdings_intel
from . import trade_alerts
from .trade_alerts import (
build_monitor_set, current_session, diff_firing, DEFAULT_EXIT_PARAMS, DEFAULT_BUY_PARAMS,
)
app = FastAPI() app = FastAPI()
install_access_log(app) install_access_log(app)
@@ -506,6 +512,63 @@ def get_webai_news_sentiment(date: str | None = None):
return result return result
@app.get("/api/webai/trade-alert/monitor-set", dependencies=[Depends(verify_webai_key)])
def get_trade_alert_monitor_set():
"""web-ai(Windows 워커) 전용 — 실시간 매매 알람 감시대상 조립 (계약 §5.1).
session은 KST 시각으로 pre/regular/after 판정 후, 평일·휴장 여부(is_market_open)를
함께 게이팅해 최종 closed 여부를 결정한다.
"""
from datetime import datetime, timezone, timedelta
kst = timezone(timedelta(hours=9))
now_kst = datetime.now(kst)
session = current_session(now_kst)
if not is_market_open(now_kst.date()):
session = "closed"
from .db import _conn
conn = _conn()
try:
return build_monitor_set(conn, session, DEFAULT_EXIT_PARAMS, DEFAULT_BUY_PARAMS)
finally:
conn.close()
class TradeAlertReport(BaseModel):
as_of: str | None = None
firing: list[dict] = []
@app.post("/api/webai/trade-alert/report", dependencies=[Depends(verify_webai_key)])
def post_trade_alert_report(req: TradeAlertReport):
"""web-ai(Windows 워커) 전용 — 발화 보고 수신 (계약 §5.2).
직전 발화상태 대비 edge diff(diff_firing) 후, 신규 alert는
agent-office 전송 성공 시에만 상태(firing=True)+이력 반영한다.
전송 실패 시 상태를 채택하지 않아 다음 사이클에 동일 alert가 다시
"신규"로 잡혀 재시도된다(멱등). 해제(cleared)는 전송과 무관하게 firing=False.
"""
prev = get_alert_state_firing()
d = diff_firing(req.firing, prev)
new_count = 0
for a in d["new"]:
if trade_alerts.notify_agent_office([a]):
set_alert_firing(a["ticker"], a["kind"], a["condition"], firing=True, at_iso=req.as_of)
add_alert_history(
a["ticker"], a.get("name"), a["kind"], a["condition"],
a.get("price"), a.get("detail") or {},
)
new_count += 1
for ticker, kind, condition in d["cleared"]:
set_alert_firing(ticker, kind, condition, firing=False)
touch_alert_seen(d["seen"], req.as_of or "")
return {"new_alerts": new_count, "cleared": len(d["cleared"])}
@app.post("/api/portfolio", status_code=201) @app.post("/api/portfolio", status_code=201)
def create_portfolio_item(req: PortfolioItemRequest): def create_portfolio_item(req: PortfolioItemRequest):
"""포트폴리오 종목 추가""" """포트폴리오 종목 추가"""
@@ -653,6 +716,41 @@ def remove_sell_history(record_id: int):
return {"ok": True} return {"ok": True}
# --- Watchlist & Trade Alerts API (실시간 매매 알람) ---
class WatchlistItemRequest(BaseModel):
ticker: str
name: str | None = None
note: str | None = None
@app.get("/api/stock/watchlist")
def list_watchlist():
"""관심종목 목록 조회"""
return {"watchlist": get_watchlist()}
@app.post("/api/stock/watchlist", status_code=201)
def create_watchlist_item(req: WatchlistItemRequest):
"""관심종목 추가 (이미 존재하면 name/note 갱신, 멱등)"""
add_watchlist(req.ticker, req.name, req.note)
return {"ok": True}
@app.delete("/api/stock/watchlist/{ticker}")
def delete_watchlist_item(ticker: str):
"""관심종목 삭제"""
if not remove_watchlist(ticker):
raise HTTPException(status_code=404, detail="not in watchlist")
return {"ok": True}
@app.get("/api/stock/trade-alerts")
def list_trade_alerts(days: int = 7):
"""매매 알람 이력 조회 (최근 N일)"""
return {"alerts": get_alert_history(days)}
# --- Holdings Intelligence API --- # --- Holdings Intelligence API ---
@app.get("/api/stock/holdings/intel") @app.get("/api/stock/holdings/intel")

View File

@@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
import datetime as dt
import json import json
import logging import logging
import os import os
@@ -59,13 +60,19 @@ async def score_sentiment(
*, *,
name: str | None = None, name: str | None = None,
model: str = DEFAULT_MODEL, model: str = DEFAULT_MODEL,
asof: dt.date | None = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Returns {ticker, score_raw, reason, news_count, tokens_input, tokens_output, model}.""" """Returns {ticker, score_raw, reason, news_count, tokens_input, tokens_output, model}.
asof(현재 KST 일자)를 주면 prompt 맨 앞에 오늘 날짜를 명시해 LLM이 현재 시점 기준으로 판단한다.
"""
news_block = _format_news_block(news) news_block = _format_news_block(news)
prompt = PROMPT_TEMPLATE.format( prompt = PROMPT_TEMPLATE.format(
name=name or ticker, ticker=ticker, name=name or ticker, ticker=ticker,
n=len(news), news_block=news_block, n=len(news), news_block=news_block,
) )
if asof is not None:
prompt = f"오늘 날짜: {asof.isoformat()} (이 시점 기준으로 뉴스를 평가하세요)\n\n" + prompt
resp = await llm.messages.create( resp = await llm.messages.create(
model=model, model=model,
max_tokens=200, max_tokens=200,

View File

@@ -39,11 +39,11 @@ def _make_llm():
async def _process_one( async def _process_one(
ticker: str, name: str, articles: List[Dict[str, Any]], ticker: str, name: str, articles: List[Dict[str, Any]],
sem: asyncio.Semaphore, llm, model: str, sem: asyncio.Semaphore, llm, model: str, asof: dt.date,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
async with sem: async with sem:
return await _analyzer.score_sentiment( return await _analyzer.score_sentiment(
llm, ticker, articles, name=name, model=model, llm, ticker, articles, name=name, model=model, asof=asof,
) )
@@ -110,7 +110,7 @@ async def refresh_daily(
arts = articles_by_ticker.get(t, []) arts = articles_by_ticker.get(t, [])
if not arts: if not arts:
continue # 매핑 0 — score 미생성 continue # 매핑 0 — score 미생성
tasks.append(_process_one(t, name_map.get(t, t), arts, sem, llm, model)) tasks.append(_process_one(t, name_map.get(t, t), arts, sem, llm, model, asof))
raw_results = await asyncio.gather(*tasks, return_exceptions=True) raw_results = await asyncio.gather(*tasks, return_exceptions=True)
successes: List[Dict[str, Any]] = [] successes: List[Dict[str, Any]] = []

View File

@@ -125,6 +125,16 @@ from . import telegram as _tg
from .engine import Screener, ScreenContext from .engine import Screener, ScreenContext
def _today_kst() -> dt.date:
"""KST 오늘 날짜.
stock 컨테이너는 python:3.12-alpine + tzdata 미설치라 TZ=Asia/Seoul이 무효 →
date.today()가 UTC를 반환한다. 08시대(KST) 리포트가 하루 밀리는 것을 막기 위해
UTC+9로 명시 보정한다(holdings_intel._today_kst와 동일한 관용).
"""
return (dt.datetime.utcnow() + dt.timedelta(hours=9)).date()
def _resolve_asof(asof_str, conn: sqlite3.Connection) -> dt.date: def _resolve_asof(asof_str, conn: sqlite3.Connection) -> dt.date:
if asof_str: if asof_str:
return dt.date.fromisoformat(asof_str) return dt.date.fromisoformat(asof_str)
@@ -263,7 +273,7 @@ from . import snapshot as _snap
@router.post("/snapshot/refresh") @router.post("/snapshot/refresh")
def post_snapshot_refresh(asof: Optional[str] = None): def post_snapshot_refresh(asof: Optional[str] = None):
asof_date = dt.date.fromisoformat(asof) if asof else dt.date.today() asof_date = dt.date.fromisoformat(asof) if asof else _today_kst()
if asof_date.weekday() >= 5: if asof_date.weekday() >= 5:
return {"asof": asof_date.isoformat(), "status": "skipped_weekend"} return {"asof": asof_date.isoformat(), "status": "skipped_weekend"}
with _conn() as c: with _conn() as c:
@@ -300,7 +310,7 @@ from .ai_news import validation as _ai_validation
@router.post("/snapshot/refresh-news-sentiment") @router.post("/snapshot/refresh-news-sentiment")
async def post_refresh_news_sentiment(asof: Optional[str] = None): async def post_refresh_news_sentiment(asof: Optional[str] = None):
asof_date = dt.date.fromisoformat(asof) if asof else dt.date.today() asof_date = dt.date.fromisoformat(asof) if asof else _today_kst()
if asof_date.weekday() >= 5: if asof_date.weekday() >= 5:
return {"asof": asof_date.isoformat(), "status": "skipped_weekend"} return {"asof": asof_date.isoformat(), "status": "skipped_weekend"}
if _is_holiday(asof_date): if _is_holiday(asof_date):

137
stock/app/trade_alerts.py Normal file
View File

@@ -0,0 +1,137 @@
"""매매 알람 — 감시대상(monitor-set) 조립. 순수 조립 로직(HTTP/텔레그램 없음).
계약 §5.1 (docs/superpowers/specs/2026-07-02-realtime-trade-alerts-design.md) —
Windows 워커가 GET /api/webai/trade-alert/monitor-set 로 받는 응답을 조립한다.
NAS는 watchlist screener 최신 성공 run 후보를 buy_targets로, 보유 종목을
sell_targets로 병합해 넘긴다. TA/조건판정은 워커 쪽 책임.
"""
import os
import httpx
from datetime import datetime, timedelta, timezone, time as _time
from typing import Optional
from app.db import get_all_portfolio, get_watchlist
_KST = timezone(timedelta(hours=9))
# KST 세션 창(시:분) — 평일+휴장 판정은 호출부에서 is_market_open으로 별도 게이팅
_SESSIONS = [
("pre", (8, 30), (9, 0)),
("regular", (9, 0), (15, 30)),
("after", (16, 0), (18, 0)),
]
def current_session(now_kst) -> str:
"""now_kst의 time만으로 pre/regular/after/closed 세션 판정 (요일·휴장 무관)."""
t = now_kst.time()
for name, (sh, sm), (eh, em) in _SESSIONS:
if _time(sh, sm) <= t < _time(eh, em):
return name
return "closed"
DEFAULT_EXIT_PARAMS = {"stop_pct": 0.08, "take_pct": 0.25, "trailing_pct": 0.10}
DEFAULT_BUY_PARAMS = {"rsi_oversold": 30, "breakout_vol_mult": 1.5, "pullback_pct": 0.02}
def latest_screener_candidates(conn) -> list:
"""최신 성공(status='success') screener run의 후보 {ticker,name} 목록."""
row = conn.execute(
"SELECT id FROM screener_runs WHERE status='success' ORDER BY asof DESC, id DESC LIMIT 1"
).fetchone()
if not row:
return []
run_id = row[0]
rows = conn.execute(
"SELECT ticker, name FROM screener_results WHERE run_id=? ORDER BY rank", (run_id,)
).fetchall()
return [{"ticker": r[0], "name": r[1]} for r in rows]
def holding_high(conn, ticker: str, lookback_days: int = 60) -> Optional[float]:
"""보유기간 고점(트레일링 스톱용) — krx_daily_prices 최근 lookback_days 최고 high."""
row = conn.execute(
"SELECT MAX(high) FROM krx_daily_prices WHERE ticker=? "
"AND date >= date('now', ?)",
(ticker, f"-{int(lookback_days)} days"),
).fetchone()
return row[0] if row and row[0] is not None else None
def build_monitor_set(conn, session: str, exit_params: dict, buy_params: dict) -> dict:
"""계약 §5.1 monitor-set 응답 dict 조립.
buy_targets = watchlist 최신 screener 후보 (ticker 기준 중복 제거, watchlist 우선)
sell_targets = 보유 종목(portfolio) + avg_price/qty/holding_high
"""
buy: dict[str, dict] = {}
for w in get_watchlist():
buy[w["ticker"]] = {
"ticker": w["ticker"], "name": w["name"],
"source": "watch", "params": w.get("params") or {},
}
for c in latest_screener_candidates(conn):
if c["ticker"] not in buy:
buy[c["ticker"]] = {
"ticker": c["ticker"], "name": c["name"],
"source": "screener", "params": {},
}
sell_targets = []
for p in get_all_portfolio():
ticker = p["ticker"]
sell_targets.append({
"ticker": ticker,
"name": p.get("name"),
"avg_price": p.get("avg_price"),
"qty": p.get("quantity"),
"holding_high": holding_high(conn, ticker),
"params": {},
})
return {
"session": session,
"as_of": datetime.now(_KST).isoformat(),
"buy_targets": list(buy.values()),
"sell_targets": sell_targets,
"buy_params": buy_params,
"exit_params": exit_params,
}
def diff_firing(reported: list, prev: set) -> dict:
"""워커 발화집합(reported) vs 직전 발화상태(prev) edge diff.
reported 각 항목: {ticker,kind,condition,price,detail,name?}.
key = (ticker,kind,condition).
반환 {"new":[신규 alert...], "cleared":[해제 key...], "seen":[현재 key...]}.
"""
cur = {}
for a in reported:
key = (a["ticker"], a["kind"], a["condition"])
cur[key] = a
cur_keys = set(cur.keys())
new_keys = cur_keys - prev
cleared = sorted(prev - cur_keys)
return {
"new": [cur[k] for k in cur_keys if k in new_keys],
"cleared": cleared,
"seen": sorted(cur_keys),
}
def notify_agent_office(alerts: list) -> bool:
"""신규 alert들을 agent-office로 push (계약 §5.2). 전송 성공 시 True.
실패(네트워크 오류/비-200)는 False — 호출부가 상태/이력 미채택 후 다음
사이클에 동일 alert를 재시도하도록 한다(멱등, at-least-once).
"""
url = os.getenv("AGENT_OFFICE_URL", "http://agent-office:8000") + "/api/agent-office/stock/trade-alert"
try:
with httpx.Client(timeout=10) as c:
resp = c.post(url, json={"alerts": alerts})
return resp.status_code == 200
except httpx.HTTPError:
return False

View File

@@ -58,6 +58,18 @@ async def test_score_sentiment_clamps_negative_out_of_range():
assert out["score_raw"] == -10.0 assert out["score_raw"] == -10.0
@pytest.mark.asyncio
async def test_score_sentiment_includes_asof_date_in_prompt():
"""asof(현재 KST 일자)를 넘기면 prompt에 오늘 날짜가 포함되어 LLM이 현재 일자 기준으로 판단."""
import datetime as _dt
llm = _mk_llm(json.dumps({"score": 5.0, "reason": "ok"}))
await analyzer.score_sentiment(
llm, "005930", NEWS, name="삼성전자", asof=_dt.date(2026, 7, 2),
)
user_msg = llm.messages.create.call_args.kwargs["messages"][0]["content"]
assert "2026-07-02" in user_msg
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_score_sentiment_includes_summary_in_prompt(): async def test_score_sentiment_includes_summary_in_prompt():
"""summary 가 있으면 prompt 에 포함, 없으면 title 만.""" """summary 가 있으면 prompt 에 포함, 없으면 title 만."""

View File

@@ -39,7 +39,7 @@ async def test_refresh_daily_happy_path(conn):
scores_by_ticker = { scores_by_ticker = {
"005930": 7.5, "000660": 4.0, "373220": -6.0, "005930": 7.5, "000660": 4.0, "373220": -6.0,
} }
async def fake_score(llm, ticker, news, *, name=None, model="m"): async def fake_score(llm, ticker, news, *, name=None, model="m", asof=None):
return { return {
"ticker": ticker, "score_raw": scores_by_ticker[ticker], "ticker": ticker, "score_raw": scores_by_ticker[ticker],
"reason": f"r{ticker}", "news_count": 1, "reason": f"r{ticker}", "news_count": 1,
@@ -81,7 +81,7 @@ async def test_refresh_daily_failures_isolated(conn):
} }
fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3} fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3}
async def fake_score(llm, ticker, news, *, name=None, model="m"): async def fake_score(llm, ticker, news, *, name=None, model="m", asof=None):
if ticker == "000660": if ticker == "000660":
raise RuntimeError("llm exploded") raise RuntimeError("llm exploded")
return { return {
@@ -116,7 +116,7 @@ async def test_refresh_daily_no_match_ticker_skipped(conn):
} }
fake_stats = {"total_articles": 1, "matched_pairs": 1, "hit_tickers": 1} fake_stats = {"total_articles": 1, "matched_pairs": 1, "hit_tickers": 1}
async def fake_score(llm, ticker, news, *, name=None, model="m"): async def fake_score(llm, ticker, news, *, name=None, model="m", asof=None):
return { return {
"ticker": ticker, "score_raw": 5.0, "reason": "r", "ticker": ticker, "score_raw": 5.0, "reason": "r",
"news_count": 1, "tokens_input": 100, "tokens_output": 20, "news_count": 1, "tokens_input": 100, "tokens_output": 20,
@@ -152,7 +152,7 @@ async def test_refresh_daily_sign_gate_no_positive_in_neg(conn):
fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3} fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3}
scores = {"005930": 6.0, "000660": 2.0, "373220": 0.5} # 모두 양수 scores = {"005930": 6.0, "000660": 2.0, "373220": 0.5} # 모두 양수
async def fake_score(llm, ticker, news, *, name=None, model="m"): async def fake_score(llm, ticker, news, *, name=None, model="m", asof=None):
return { return {
"ticker": ticker, "score_raw": scores[ticker], "reason": "r", "ticker": ticker, "score_raw": scores[ticker], "reason": "r",
"news_count": 1, "tokens_input": 1, "tokens_output": 1, "model": model, "news_count": 1, "tokens_input": 1, "tokens_output": 1, "model": model,
@@ -183,7 +183,7 @@ async def test_refresh_daily_sign_gate_excludes_neutral(conn):
fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3} fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3}
scores = {"005930": 3.0, "000660": 0.0, "373220": -3.0} scores = {"005930": 3.0, "000660": 0.0, "373220": -3.0}
async def fake_score(llm, ticker, news, *, name=None, model="m"): async def fake_score(llm, ticker, news, *, name=None, model="m", asof=None):
return { return {
"ticker": ticker, "score_raw": scores[ticker], "reason": "r", "ticker": ticker, "score_raw": scores[ticker], "reason": "r",
"news_count": 1, "tokens_input": 1, "tokens_output": 1, "model": model, "news_count": 1, "tokens_input": 1, "tokens_output": 1, "model": model,

View File

@@ -5,6 +5,21 @@ from fastapi.testclient import TestClient
from app.main import app from app.main import app
def test_today_kst_uses_kst_offset_not_utc(monkeypatch):
"""컨테이너가 UTC(Alpine, tzdata 미설치)라 date.today()는 08시 KST에 어제를 준다.
_today_kst()는 UTC+9로 보정해 오늘(KST)을 반환해야 한다."""
from app.screener import router
class _FrozenDT(dt.datetime):
@classmethod
def utcnow(cls):
# 2026-07-01 23:30 UTC == 2026-07-02 08:30 KST (AI 뉴스 리포트 시각대)
return dt.datetime(2026, 7, 1, 23, 30, 0)
monkeypatch.setattr(router.dt, "datetime", _FrozenDT)
assert router._today_kst() == dt.date(2026, 7, 2)
def test_refresh_news_sentiment_weekend_skip(): def test_refresh_news_sentiment_weekend_skip():
# 2026-05-16 = Saturday # 2026-05-16 = Saturday
client = TestClient(app) client = TestClient(app)

View File

@@ -0,0 +1,48 @@
import os, sqlite3, tempfile, datetime as dt
import pytest
@pytest.fixture
def db(monkeypatch, tmp_path):
from app import db as _db
monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db"))
_db.init_db()
return _db
def test_watchlist_add_get_remove(db):
db.add_watchlist("005930", "삼성전자", note="관심")
db.add_watchlist("005930", "삼성전자") # 멱등
wl = db.get_watchlist()
assert [w["ticker"] for w in wl] == ["005930"]
assert wl[0]["name"] == "삼성전자"
assert db.remove_watchlist("005930") is True
assert db.get_watchlist() == []
def test_alert_state_edge_firing_and_clear(db):
key = ("005930", "buy", "buy_breakout")
assert db.get_alert_state_firing() == set()
db.set_alert_firing(*key, firing=True, at_iso="2026-07-02T00:01:00Z")
assert key in db.get_alert_state_firing()
db.set_alert_firing(*key, firing=False)
assert key not in db.get_alert_state_firing()
def test_alert_history_records_and_reads(db):
db.add_alert_history("005930", "삼성전자", "buy", "buy_breakout", 71500, {"vol": 2.1})
rows = db.get_alert_history(days=7)
assert len(rows) == 1
assert rows[0]["ticker"] == "005930" and rows[0]["kind"] == "buy"
assert rows[0]["detail"]["vol"] == 2.1
def test_alert_history_days_filter_format_consistency(db):
"""fired_at은 ISO(T/Z)로 저장 — 필터도 ISO여야 경계일 비교가 정확.
7일 경계 밖(정확히 7일 전 자정) 레코드는 제외되어야 한다. 포맷 불일치면 잘못 포함됨."""
db.add_alert_history("005930", "삼성", "buy", "buy_breakout", 71500, {}) # now
conn = sqlite3.connect(db.DB_PATH)
conn.execute(
"INSERT INTO trade_alert_history(ticker,name,kind,condition,price,detail_json,fired_at) "
"VALUES('000660','SK','sell','sell_stop_loss',60000,'{}', "
"strftime('%Y-%m-%dT%H:%M:%fZ','now','-7 days','start of day'))"
)
conn.commit(); conn.close()
tickers = [r["ticker"] for r in db.get_alert_history(days=7)]
assert "005930" in tickers
assert "000660" not in tickers

View File

@@ -0,0 +1,18 @@
def test_diff_new_and_cleared_and_rearm():
from app.trade_alerts import diff_firing
reported = [{"ticker": "005930", "kind": "buy", "condition": "buy_breakout",
"price": 71500, "detail": {}}]
# 최초: prev 비어있음 → 신규
d1 = diff_firing(reported, prev=set())
assert [a["condition"] for a in d1["new"]] == ["buy_breakout"]
assert d1["cleared"] == []
# 유지: prev에 이미 있음 → 신규 없음
prev = {("005930", "buy", "buy_breakout")}
d2 = diff_firing(reported, prev=prev)
assert d2["new"] == []
# 해제: reported 비었고 prev에 있음 → cleared
d3 = diff_firing([], prev=prev)
assert d3["cleared"] == [("005930", "buy", "buy_breakout")]
# 재무장 후 재발화: prev 다시 비면 신규
d4 = diff_firing(reported, prev=set())
assert len(d4["new"]) == 1

View File

@@ -0,0 +1,101 @@
import sqlite3
import pytest
@pytest.fixture
def conn(monkeypatch, tmp_path):
from app import db as _db
monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db"))
_db.init_db()
c = sqlite3.connect(_db.DB_PATH)
c.row_factory = sqlite3.Row
# 보유 1종목 (add_portfolio_item 실제 시그니처: broker/ticker/name/quantity/avg_price — market 파라미터 없음)
_db.add_portfolio_item(ticker="000660", name="SK하이닉스", quantity=10,
avg_price=180000, broker="kis")
# watchlist 1종목
_db.add_watchlist("005930", "삼성전자")
yield c
c.close()
def test_build_monitor_set_merges_sources(conn):
from app import trade_alerts as ta
ms = ta.build_monitor_set(conn, session="regular",
exit_params={"stop_pct": 0.08}, buy_params={"rsi_oversold": 30})
buy_tickers = {t["ticker"] for t in ms["buy_targets"]}
sell_tickers = {t["ticker"] for t in ms["sell_targets"]}
assert "005930" in buy_tickers # watchlist
assert "000660" in sell_tickers # 보유
assert ms["session"] == "regular"
assert ms["exit_params"]["stop_pct"] == 0.08
sell = next(t for t in ms["sell_targets"] if t["ticker"] == "000660")
assert sell["avg_price"] == 180000 and sell["qty"] == 10
def test_latest_screener_candidates_empty_when_no_run(conn):
from app import trade_alerts as ta
assert ta.latest_screener_candidates(conn) == []
def test_latest_screener_candidates_picks_latest_success_run(conn):
from app import trade_alerts as ta
now = "2026-07-02T09:00:00Z"
conn.execute(
"INSERT INTO screener_runs (asof, mode, status, started_at, weights_json, "
"node_params_json, gate_params_json, top_n) VALUES (?,?,?,?,?,?,?,?)",
(now, "manual", "failed", now, "{}", "{}", "{}", 20),
)
conn.execute(
"INSERT INTO screener_runs (asof, mode, status, started_at, weights_json, "
"node_params_json, gate_params_json, top_n) VALUES (?,?,?,?,?,?,?,?)",
(now, "manual", "success", now, "{}", "{}", "{}", 20),
)
run_id = conn.execute("SELECT id FROM screener_runs WHERE status='success'").fetchone()[0]
conn.execute(
"INSERT INTO screener_results (run_id, rank, ticker, name, total_score, scores_json) "
"VALUES (?,?,?,?,?,?)",
(run_id, 1, "035720", "카카오", 88.5, "{}"),
)
conn.commit()
candidates = ta.latest_screener_candidates(conn)
assert candidates == [{"ticker": "035720", "name": "카카오"}]
def test_holding_high_returns_max_high_within_lookback(conn):
from app import trade_alerts as ta
conn.execute(
"INSERT INTO krx_daily_prices (ticker, date, high) VALUES (?,?,?)",
("000660", "2026-06-01", 200000),
)
conn.execute(
"INSERT INTO krx_daily_prices (ticker, date, high) VALUES (?,?,?)",
("000660", "2026-06-20", 210000),
)
conn.commit()
assert ta.holding_high(conn, "000660", lookback_days=60) == 210000
def test_holding_high_none_when_no_price_history(conn):
from app import trade_alerts as ta
assert ta.holding_high(conn, "999999") is None
def test_build_monitor_set_dedupes_watchlist_and_screener_overlap(conn):
from app import trade_alerts as ta
now = "2026-07-02T09:00:00Z"
cur = conn.execute(
"INSERT INTO screener_runs (asof, mode, status, started_at, weights_json, "
"node_params_json, gate_params_json, top_n) VALUES (?,?,?,?,?,?,?,?)",
(now, "manual", "success", now, "{}", "{}", "{}", 20),
)
run_id = cur.lastrowid
# 스크리너 후보가 watchlist와 중복(005930)
conn.execute(
"INSERT INTO screener_results (run_id, rank, ticker, name, total_score, scores_json) "
"VALUES (?,?,?,?,?,?)",
(run_id, 1, "005930", "삼성전자", 90.0, "{}"),
)
conn.commit()
ms = ta.build_monitor_set(conn, session="regular", exit_params={}, buy_params={})
buy_tickers = [t["ticker"] for t in ms["buy_targets"]]
assert buy_tickers.count("005930") == 1

View File

@@ -0,0 +1,35 @@
import datetime as dt
import pytest
from fastapi.testclient import TestClient
def test_current_session_windows():
from app.trade_alerts import current_session
d = dt.date(2026, 7, 2)
assert current_session(dt.datetime.combine(d, dt.time(8, 40))) == "pre"
assert current_session(dt.datetime.combine(d, dt.time(10, 0))) == "regular"
assert current_session(dt.datetime.combine(d, dt.time(17, 0))) == "after"
assert current_session(dt.datetime.combine(d, dt.time(20, 0))) == "closed"
@pytest.fixture
def client(monkeypatch, tmp_path):
from app import db as _db
monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db"))
_db.init_db()
monkeypatch.setenv("WEBAI_API_KEY", "k")
from app.main import app
return TestClient(app)
def test_monitor_set_requires_auth(client):
assert client.get("/api/webai/trade-alert/monitor-set").status_code == 401
def test_monitor_set_ok(client):
r = client.get("/api/webai/trade-alert/monitor-set", headers={"X-WebAI-Key": "k"})
assert r.status_code == 200
body = r.json()
assert body["session"] in ("pre", "regular", "after", "closed")
assert "buy_targets" in body and "sell_targets" in body
assert body["exit_params"]["trailing_pct"] == 0.10

View File

@@ -0,0 +1,56 @@
import pytest
from unittest.mock import patch
from fastapi.testclient import TestClient
@pytest.fixture
def client(monkeypatch, tmp_path):
from app import db as _db
monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db"))
_db.init_db()
monkeypatch.setenv("WEBAI_API_KEY", "k")
from app.main import app
return TestClient(app)
def _report(client, firing):
return client.post("/api/webai/trade-alert/report",
headers={"X-WebAI-Key": "k"},
json={"as_of": "2026-07-02T09:01:00+09:00", "firing": firing})
def test_report_new_edge_sends_and_persists(client):
firing = [{"ticker": "005930", "name": "삼성전자", "kind": "buy",
"condition": "buy_breakout", "price": 71500, "detail": {"vol": 2.0}}]
with patch("app.trade_alerts.notify_agent_office", return_value=True) as m:
r1 = _report(client, firing)
assert r1.json()["new_alerts"] == 1
assert m.called
# 2번째 동일 firing → 유지, 신규 0
with patch("app.trade_alerts.notify_agent_office", return_value=True):
r2 = _report(client, firing)
assert r2.json()["new_alerts"] == 0
# 이력 1건
assert len(client.get("/api/stock/trade-alerts?days=1").json()["alerts"]) == 1
def test_report_send_failure_does_not_persist(client):
firing = [{"ticker": "005930", "name": "삼성전자", "kind": "buy",
"condition": "buy_breakout", "price": 71500, "detail": {}}]
with patch("app.trade_alerts.notify_agent_office", return_value=False):
r = _report(client, firing)
assert r.json()["new_alerts"] == 0 # 전송 실패 → 미채택
# 다음 사이클(전송 성공) 재시도되어 알림
with patch("app.trade_alerts.notify_agent_office", return_value=True):
r2 = _report(client, firing)
assert r2.json()["new_alerts"] == 1
def test_report_cleared_rearm(client):
firing = [{"ticker": "005930", "name": "삼성", "kind": "buy",
"condition": "buy_breakout", "price": 71500, "detail": {}}]
with patch("app.trade_alerts.notify_agent_office", return_value=True):
_report(client, firing)
_report(client, []) # 해제
r = _report(client, firing) # 재발화
assert r.json()["new_alerts"] == 1

View File

@@ -0,0 +1,22 @@
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def client(monkeypatch, tmp_path):
from app import db as _db
monkeypatch.setattr(_db, "DB_PATH", str(tmp_path / "stock.db"))
_db.init_db()
from app.main import app
return TestClient(app)
def test_watchlist_crud(client):
assert client.get("/api/stock/watchlist").json()["watchlist"] == []
r = client.post("/api/stock/watchlist", json={"ticker": "005930", "name": "삼성전자"})
assert r.status_code == 201
wl = client.get("/api/stock/watchlist").json()["watchlist"]
assert wl[0]["ticker"] == "005930"
assert client.delete("/api/stock/watchlist/005930").status_code == 200
assert client.delete("/api/stock/watchlist/005930").status_code == 404
def test_trade_alerts_history_empty(client):
assert client.get("/api/stock/trade-alerts?days=7").json()["alerts"] == []