diff --git a/CHECK_POINT.md b/CHECK_POINT.md new file mode 100644 index 0000000..413f448 --- /dev/null +++ b/CHECK_POINT.md @@ -0,0 +1,109 @@ +# web-ui CHECK_POINT + +> React 18 + Vite + react-router-dom v6. Dev port 3007. NAS Docker 백엔드의 프론트엔드 (nginx :8080). +> 2026-05-22 갱신. + +--- + +## 🟢 현재 상태 (양호) + +- 라우트 16개 (12 메인 + 4 서브) 정상 운영 +- agent-office 3×3 그리드 재설계 완료 (5/7~14, WebP 93% 축소, WS 재연결 백오프) +- `/insta` 슬레이트 캐러셀 + 반응형 (5/15~16) +- Vite proxy 7개 (NAS API + Fear&Greed + VIX + Treasury + WTI + Brent) + +--- + +## ✅ 최근 완료 (5/18~22) + +- 2026-05-22: **거래 데스크 AI 투자 탭 제거** (e42b643) — web-ai signal_v1 legacy 이전과 정합 (V2 단독 운영 반영) +- 2026-05-22: stock 총 매입을 각 종목 매입가 단순 합으로 표시 (6533743) +- 2026-05-22: agent-office 모바일 사이드패널 전체화면 토글 + music 에이전트 이미지 교체 (ee5700d) +- 2026-05-14: agent-office Grid 재설계 (canvas 폐기), AGENT_META + GRID_SLOTS 중앙화 +- 2026-05-15~16: `/insta` 신규 페이지 + InstaCards.jsx + src/api.js(530줄) 음악·인스타·텔레그램 API 확장 + +--- + +## 🔴 즉시 (1~3일) + +### 1. `/insta` 비동기 폴링 구현 ⭐ (백엔드 준비 완료 → 구현 시점 도래) +- **배경**: web-backend insta-lab이 Redis 분할(SP-4) 완료 → `_bg_render`가 Redis push, `GET /api/insta/tasks/{task_id}` 폴링 엔드포인트 존재. **이제 frontend가 비동기 폴링으로 전환해야 정합** +- **파일**: `src/pages/insta/InstaCards.jsx` +- [ ] 슬레이트 생성 → `task_id` 받고 폴링 (2~5초 간격, NAS 부담 ↓) +- [ ] progress bar UI (0~100%) + `queue:paused` 상태 표시 (박재오 작업 중 = Windows 워커 정지) +- [ ] failed 상태 처리 (오류 메시지·재시도 버튼) + +### 2. agent-office WebSocket 안정성 점검 +- 5/7~14 재설계 + 5/22 모바일 토글 직후 운영 확인 +- [ ] 브라우저 콘솔 WS 끊김 → 재연결 지수 백오프 실제 작동 +- [ ] 4 테스트(TaskTab·CommandTab·AgentCard·ScoreNodeCard) 통과 재확인 + +### 3. agent-office lotto sim_consensus 노출 +- **배경**: web-backend `/api/lotto/best`에 5종 점수 array 노출됨 (lotto-signals) + weight-evolver 자율 학습 도입 +- [ ] agent-office lotto 에이전트 카드에 5종 점수·시그널 상태 표시 +- [ ] (선택) weight-evolver 진화 상태 미니 패널 + +--- + +## 🟡 중기 (1~2주) + +### 4. `/insta` 카드 템플릿 UI 고도화 +- 현재 default theme PNG 미리보기만. hedgy75 테마 추가 시 theme 선택 UI 필요 +- [ ] 테마 선택 dropdown (default / hedgy75) +- [ ] 미리보기 컴포넌트 페이지 종류별 분기 + +### 5. `/music` Sonic Forge 발행 모니터링 +- music-lab Redis 분할(SP-6) + Windows music-render 도입 → 발행 상태 모니터링 패널 필요 +- [ ] 발행 큐·실패·재시도 로그 표시 (Redis 큐 길이 연동) +- [ ] 텔레그램 5단계 승인 UX 점검 + +### 6. NAS↔Windows 작업 흐름 가시화 (신규) +- web-ai 워커 3종 + Redis 큐 도입으로 작업 분산 흐름이 복잡해짐 +- [ ] agent-office 또는 신규 `/node` 페이지에 큐 상태·Windows 노드 헬스 표시 (web-ai/web-backend 추가 아이디어와 연동) + +--- + +## 🟢 장기 (1개월+) + +### 7. 모바일 UX 일관 적용 +- BottomNav + PullToRefresh + MobileSheet + SwipeableView 있음. 신규 페이지 적용 부족 +- [ ] `/insta` 모바일 캐러셀 swipe + `/agent-office` 모바일 그리드 압축 + +### 8. `/lab` 페이지 확장 +- 현재 sword-stream · day-calc 2개 +- [ ] 박재오 데모 콘텐츠 큐 결정 (예: weight-evolver 진화 그래프, AI 음악 빠른 청취) + +--- + +## 💡 추가 아이디어 (신규 2026-05-22) + +- **`/node` Windows AI 노드 대시보드** — ai_trade + insta/music/video-render + task-watcher 상태, Redis 큐 길이, `queue:paused` 토글 버튼(task-watcher C안 = "토글 UI 1개"). web-ai/web-backend 모니터링 아이디어의 frontend 진입점 +- **video 생성 미리보기 페이지** — video-lab(SP-8) + Windows video-render 4 provider 결과 비교 그리드. 무신사 공모전 MU-진 영상 버전 관리에 활용 +- **weight-evolver 진화 시각화** — auto_picks 적중 추이 + weight base diff 그래프 (`/lab` 또는 lotto 페이지) +- **위키 페이지 수 정합** — [[사업-개인-웹-플랫폼]]에 "17개" 박혀 있으나 실제 16개 (12 메인 + 4 서브). *박재오 위키 갱신 항목* (web-ui 코드 아님) + +--- + +## 🚀 빌드 & 배포 + +```bash +npm run dev # 개발 (port 3007, Vite proxy) +npm run build # 빌드 (rimraf dist + Vite build) +npm run release:nas # 자동 배포 (deploy-nas.cjs) +``` + +배포: Windows `robocopy dist Z:\docker\webpage\frontend\` / macOS `rsync` → nginx 자동 reload + +--- + +## 📚 참고 + +- 위키: [[사업-개인-웹-플랫폼]] (백엔드 통합 인덱스) +- 라우트: `src/routes.jsx` (navLinks 메타) / Vite 프록시: `vite.config.js` +- API: 모든 페이지 `/api/` 상대 경로 (Mixed Content 방지) +- 백엔드 짝: web-backend CHECK_POINT (insta-lab Redis 분할 → /insta 비동기 폴링 정합 필요) + +## 변경 이력 + +- 2026-05-18: 페이지 신설. 즉시 3 + 중기 3 + 장기 2. +- 2026-05-22: 최근 완료 3건 반영(AI 투자 탭 제거·stock 매입 표시·모바일 사이드패널). **`/insta` 비동기 폴링을 즉시 1순위로 승격** (백엔드 insta-lab Redis 분할 완료 → frontend 정합 필요). lotto sim_consensus 노출 + NAS↔Windows 작업 흐름 가시화 항목 추가. 추가 아이디어 4건 신설 (/node 대시보드·video 미리보기·evolver 시각화·위키 페이지 수 정합). diff --git a/docs/superpowers/plans/2026-05-25-ai-trade-hotfix.md b/docs/superpowers/plans/2026-05-25-ai-trade-hotfix.md new file mode 100644 index 0000000..3f20d80 --- /dev/null +++ b/docs/superpowers/plans/2026-05-25-ai-trade-hotfix.md @@ -0,0 +1,642 @@ +# ai_trade Hotfix — Code Review F1·F2·F3·F4 Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** ai_trade(V2) 코드 리뷰 7개 finding 중 High 3건(F1·F2·F3) + Medium 1건(F4)을 TDD로 수정. F5/F6은 별도 plan, F7은 pushback. + +**Architecture:** 모두 ai_trade/ 내부 단일 모듈 수정. (1) config.py default 경로 — legacy/ 경유. (2) kis_client.py — asyncio.Lock으로 `_throttle()` 직렬화. (3) scheduler.py + pull_worker.py — post-close를 시간 윈도우가 아닌 "일 1회 + 16:00 이후" 상태기반으로 변경. (4) chronos_predictor.py — confidence 산식을 absolute spread 기반으로 통일. + +**Tech Stack:** Python 3.12, asyncio, pytest + pytest-asyncio + respx, httpx. + +**Test runner:** `.venv` 한글 경로 깨짐 + 리뷰어 Python 312 경로 부재 보고로, 시스템 Python 사용. 정확한 경로는 `where python` 으로 우선 확인. 기본 시도 순서: +1. `python -m pytest ai_trade/tests -q` (PATH의 Python) +2. `py -3.12 -m pytest ai_trade/tests -q` (py launcher) +3. 둘 다 실패 시 환경 셋업이 선행 작업 — plan 진행 중단하고 박재오에게 보고. + +**Working directory:** `C:\Users\jaeoh\Desktop\workspace\web-ai` (web-ai repo). Commit/push도 이 디렉토리에서만. + +--- + +## File Map + +| 파일 | 변경 종류 | 책임 | +|------|-----------|------| +| `ai_trade/config.py` | Modify L31-36 | V1_TOKEN_PATH default를 `legacy/signal_v1/data/kis_token.json`로 | +| `ai_trade/kis_client.py` | Modify L40-62 | `_throttle_lock: asyncio.Lock` 추가, `_throttle()`을 lock 안에서 실행 | +| `ai_trade/scheduler.py` | Modify L79-84 | `_is_post_close_trigger(now, last_post_close_date)` 시그니처 변경 — 상태기반 | +| `ai_trade/pull_worker.py` | Modify L1-58 | `poll_loop`에 `last_post_close_date` state 추가, 호출부 갱신 | +| `ai_trade/chronos_predictor.py` | Modify L106, L127 | spread 계산을 absolute (q90-q10)로, confidence 산식 `max(0, 1 - spread/0.6)` | +| `ai_trade/tests/test_kis_client.py` | Add 1 test | concurrent gather throttle test | +| `ai_trade/tests/test_scheduler.py` | Add 3 tests | post-close 상태기반 트리거 | +| `ai_trade/tests/test_pull_worker.py` | Add 1 test | 첫 호출 안 됐다가 16:00 이후 5분 cycle에서 호출됨 | +| `ai_trade/tests/test_chronos_predictor.py` | Add 2 tests | median≈0에서도 conf 정상, spread 클수록 conf↓ | +| `ai_trade/tests/test_main.py` | Modify | v1_token_path default 변경 반영 (있다면) | + +--- + +## Task 1: F1 — KIS 토큰 경로 default를 legacy/ 경유로 + +**Files:** +- Modify: `ai_trade/config.py:31-36` +- Test: `ai_trade/tests/test_config_token_path.py` (Create) + +- [ ] **Step 1: Write the failing test** + +```python +# ai_trade/tests/test_config_token_path.py +"""F1 — V1_TOKEN_PATH default가 legacy/signal_v1/ 경유인지 검증.""" +import os +from pathlib import Path + +from ai_trade.config import Settings + + +def test_v1_token_default_path_uses_legacy_dir(monkeypatch): + """env에 V1_TOKEN_PATH 없으면 legacy/signal_v1/data/kis_token.json""" + monkeypatch.delenv("V1_TOKEN_PATH", raising=False) + settings = Settings() + expected_suffix = Path("legacy") / "signal_v1" / "data" / "kis_token.json" + assert str(settings.v1_token_path).endswith(str(expected_suffix)), ( + f"expected default to end with {expected_suffix}, got {settings.v1_token_path}" + ) + + +def test_v1_token_env_override_wins(monkeypatch, tmp_path): + """env로 명시한 경로가 default를 덮어씀.""" + custom = tmp_path / "custom_token.json" + monkeypatch.setenv("V1_TOKEN_PATH", str(custom)) + settings = Settings() + assert settings.v1_token_path == custom +``` + +- [ ] **Step 2: Run test to verify it fails** + +``` +python -m pytest ai_trade/tests/test_config_token_path.py -v +``` + +Expected: `test_v1_token_default_path_uses_legacy_dir` FAILs (default가 `signal_v1/...` 임). env override는 PASS. + +- [ ] **Step 3: Fix config.py** + +`ai_trade/config.py:31-36` 변경: + +```python + v1_token_path: Path = field( + default_factory=lambda: Path( + os.getenv("V1_TOKEN_PATH", + str(Path(__file__).parent.parent / "legacy" / "signal_v1" / "data" / "kis_token.json")) + ) + ) +``` + +- [ ] **Step 4: Run test to verify it passes** + +``` +python -m pytest ai_trade/tests/test_config_token_path.py -v +``` + +Expected: 2 passed. + +- [ ] **Step 5: Verify full test suite still passes** + +``` +python -m pytest ai_trade/tests -q +``` + +Expected: 모든 기존 테스트 PASS (token path 기본값 변경이 다른 test에 영향 없는지 확인). + +- [ ] **Step 6: Commit** + +```bash +git add ai_trade/config.py ai_trade/tests/test_config_token_path.py +git commit -m "fix(ai_trade): V1_TOKEN_PATH default를 legacy/signal_v1/ 경유로 수정 (F1)" +``` + +--- + +## Task 2: F2 — KIS throttle을 asyncio.Lock으로 직렬화 + +**Files:** +- Modify: `ai_trade/kis_client.py:40-62` +- Test: `ai_trade/tests/test_kis_client.py` (Modify — 새 test 추가) + +- [ ] **Step 1: Write the failing test** + +`ai_trade/tests/test_kis_client.py` 파일 끝에 추가: + +```python +import asyncio +import time as time_module + + +@respx.mock +async def test_throttle_serializes_concurrent_gather(kis_client_factory): + """5개 동시 요청이 asyncio.gather로 들어와도 0.5초 간격으로 직렬화되어야 함 (F2). + + 초당 2회 = 0.5초 간격. 5개 요청이면 최소 (5-1)*0.5 = 2.0초 소요. + Race condition 있으면 5개가 거의 동시에 나가서 2초 훨씬 안쪽에 끝남. + """ + sample = {"output2": []} + respx.get( + "https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice" + ).mock(return_value=httpx.Response(200, json=sample)) + + client = kis_client_factory() + try: + start = time_module.monotonic() + await asyncio.gather(*[client.get_minute_ohlcv(f"00593{i}") for i in range(5)]) + elapsed = time_module.monotonic() - start + # 5개 throttle = 최소 (5-1)*0.5 = 2.0초. tolerance 0.3초. + assert elapsed >= 1.7, ( + f"throttle race condition: 5 concurrent calls took only {elapsed:.2f}s, " + f"expected >=1.7s (0.5s * 4 inter-call gaps)" + ) + finally: + await client.close() +``` + +- [ ] **Step 2: Run test to verify it fails** + +``` +python -m pytest ai_trade/tests/test_kis_client.py::test_throttle_serializes_concurrent_gather -v +``` + +Expected: FAIL — elapsed가 0.5초 이하 (race로 동시 깸). + +- [ ] **Step 3: Add asyncio.Lock to KISClient** + +`ai_trade/kis_client.py:40` `__init__` 끝에 한 줄 추가: + +```python + self._token_cache: tuple[str, float] | None = None # (token, file_mtime) + self._last_throttle_at = 0.0 + self._throttle_lock = asyncio.Lock() +``` + +그리고 `_throttle()` (L58-62)을 lock으로 감쌈: + +```python + async def _throttle(self) -> None: + async with self._throttle_lock: + elapsed = time.monotonic() - self._last_throttle_at + if elapsed < _THROTTLE_INTERVAL: + await asyncio.sleep(_THROTTLE_INTERVAL - elapsed) + self._last_throttle_at = time.monotonic() +``` + +- [ ] **Step 4: Run test to verify it passes** + +``` +python -m pytest ai_trade/tests/test_kis_client.py::test_throttle_serializes_concurrent_gather -v +``` + +Expected: PASS — elapsed >= 1.7s. + +- [ ] **Step 5: Verify full kis_client suite still passes** + +``` +python -m pytest ai_trade/tests/test_kis_client.py -v +``` + +Expected: 모든 test PASS (기존 429 retry 등 영향 없는지 확인). + +- [ ] **Step 6: Commit** + +```bash +git add ai_trade/kis_client.py ai_trade/tests/test_kis_client.py +git commit -m "fix(ai_trade): KIS throttle을 asyncio.Lock으로 직렬화 (F2)" +``` + +--- + +## Task 3: F3 — post-close 트리거를 상태기반으로 변경 + +**Files:** +- Modify: `ai_trade/scheduler.py:79-84` +- Modify: `ai_trade/pull_worker.py:1-58` +- Test: `ai_trade/tests/test_scheduler.py` (add 3 tests) + +**Why state-based:** 16:00:00-16:00:59 윈도우는 5분 sleep + 비결정적 cycle 시작 시각과 충돌. "오늘 아직 post-close 안 돌렸고 현재 시각 ≥ 16:00 이면 trigger 후 today 표시" 로 변경. + +- [ ] **Step 1: Write the failing tests** + +`ai_trade/tests/test_scheduler.py` 파일 끝에 추가: + +```python +from datetime import date as _date +from ai_trade.scheduler import _is_post_close_trigger + + +def test_post_close_trigger_fires_at_1601_if_not_yet_today(): + """16:01에 깬 cycle도 오늘 아직 안 돌렸으면 trigger (F3).""" + now = _kst(2026, 5, 18, 16, 1) + assert _is_post_close_trigger(now, last_post_close_date=None) is True + + +def test_post_close_trigger_skips_if_already_today(): + """이미 오늘 돌렸으면 trigger 안 함.""" + now = _kst(2026, 5, 18, 16, 5) + today = _date(2026, 5, 18) + assert _is_post_close_trigger(now, last_post_close_date=today) is False + + +def test_post_close_trigger_skips_before_1600(): + """16:00 전에는 trigger 안 함.""" + now = _kst(2026, 5, 18, 15, 59) + assert _is_post_close_trigger(now, last_post_close_date=None) is False + + +def test_post_close_trigger_fires_next_day_after_reset(): + """다음 영업일이 되면 last_post_close_date < today.date() 이므로 다시 trigger.""" + now = _kst(2026, 5, 19, 16, 0) + yesterday = _date(2026, 5, 18) + assert _is_post_close_trigger(now, last_post_close_date=yesterday) is True + + +def test_post_close_trigger_skips_on_holiday(): + """휴장일에는 trigger 안 함 (2026-05-05 어린이날).""" + now = _kst(2026, 5, 5, 16, 30) + assert _is_post_close_trigger(now, last_post_close_date=None) is False +``` + +- [ ] **Step 2: Run tests to verify they fail** + +``` +python -m pytest ai_trade/tests/test_scheduler.py -v -k post_close +``` + +Expected: FAIL — `_is_post_close_trigger`가 신규 시그니처(`last_post_close_date` 인자) 미지원. + +- [ ] **Step 3: Modify scheduler.py:79-84** + +```python +def _is_post_close_trigger(now: datetime, last_post_close_date) -> bool: + """16:00 KST 이후 오늘 아직 post-close cycle 안 돌렸으면 True (F3 상태기반). + + Args: + now: 현재 KST datetime. + last_post_close_date: 마지막 post-close 실행 영업일 date 객체 (None=미실행). + """ + if not _is_market_day(now): + return False + if now.time() < time(16, 0): + return False + today = now.date() + return last_post_close_date != today +``` + +- [ ] **Step 4: Run scheduler tests** + +``` +python -m pytest ai_trade/tests/test_scheduler.py -v +``` + +Expected: 신규 5개 PASS. 기존 test도 PASS (다른 함수 영향 없음). + +- [ ] **Step 5: Update pull_worker.py to track last_post_close_date** + +`ai_trade/pull_worker.py` 의 `poll_loop` (L18-58)을 다음으로 교체: + +```python +async def poll_loop( + client: StockClient, state: PollState, shutdown: asyncio.Event, + kis_client: KISClient | None = None, + chronos=None, + dedup=None, + settings=None, +) -> None: + """FastAPI lifespan 에서 asyncio.create_task 로 시작.""" + logger.info("poll_loop started") + last_post_close_date = None + while not shutdown.is_set(): + now = datetime.now(KST) + if _is_market_day(now) and _is_polling_window(now): + try: + await _run_polling_cycle(client, state, kis_client=kis_client) + except Exception: + logger.exception("poll cycle failed") + # Minute momentum 갱신 (매 cycle) + try: + update_minute_momentum_for_all(state) + except Exception: + logger.exception("minute momentum update failed") + # Post-close trigger (상태기반: 16:00 이후 + 오늘 미실행) + if ( + _is_post_close_trigger(now, last_post_close_date) + and chronos is not None and kis_client is not None + ): + try: + await _run_post_close_cycle(kis_client, chronos, state) + last_post_close_date = now.date() + except Exception: + logger.exception("post-close cycle failed") + # Phase 4: generate signals + if dedup is not None and settings is not None: + try: + from ai_trade.signal_generator import generate_signals + generate_signals(state, dedup, settings) + except Exception: + logger.exception("generate_signals failed") + interval = _next_interval(now) + try: + await asyncio.wait_for(shutdown.wait(), timeout=interval) + break + except asyncio.TimeoutError: + continue + logger.info("poll_loop ended") +``` + +- [ ] **Step 6: Add pull_worker test** + +`ai_trade/tests/test_pull_worker.py` 파일 끝에 추가: + +```python +from unittest.mock import AsyncMock, MagicMock +from datetime import datetime as _dt +from zoneinfo import ZoneInfo as _ZI +import asyncio as _asyncio + + +async def test_post_close_fires_at_1601_when_not_yet_today(monkeypatch): + """16:01에 깬 cycle도 post_close 안 돌렸으면 호출됨 (F3 회귀).""" + from ai_trade import pull_worker + + _kst = _ZI("Asia/Seoul") + now_at_1601 = _dt(2026, 5, 18, 16, 1, tzinfo=_kst) + + real_dt = _dt + + class FrozenDateTime: + @staticmethod + def now(tz=None): + return now_at_1601 + + monkeypatch.setattr(pull_worker, "datetime", FrozenDateTime) + monkeypatch.setattr( + pull_worker, "_is_market_day", lambda n: True, + ) + monkeypatch.setattr( + pull_worker, "_is_polling_window", lambda n: True, + ) + monkeypatch.setattr( + pull_worker, "_next_interval", lambda n: 0.01, + ) + monkeypatch.setattr( + pull_worker, "_run_polling_cycle", AsyncMock(), + ) + monkeypatch.setattr( + pull_worker, "update_minute_momentum_for_all", lambda s: None, + ) + post_close = AsyncMock() + monkeypatch.setattr(pull_worker, "_run_post_close_cycle", post_close) + + state = MagicMock() + chronos = MagicMock() + kis = MagicMock() + shutdown = _asyncio.Event() + + async def _stop_soon(): + await _asyncio.sleep(0.05) + shutdown.set() + + _asyncio.create_task(_stop_soon()) + await pull_worker.poll_loop( + client=MagicMock(), + state=state, + shutdown=shutdown, + kis_client=kis, + chronos=chronos, + dedup=None, + settings=None, + ) + + assert post_close.await_count >= 1, "post-close가 16:01에 호출되지 않음 (F3 회귀)" +``` + +- [ ] **Step 7: Run pull_worker test** + +``` +python -m pytest ai_trade/tests/test_pull_worker.py::test_post_close_fires_at_1601_when_not_yet_today -v +``` + +Expected: PASS. + +- [ ] **Step 8: Run full ai_trade suite** + +``` +python -m pytest ai_trade/tests -q +``` + +Expected: 모두 PASS. + +- [ ] **Step 9: Commit** + +```bash +git add ai_trade/scheduler.py ai_trade/pull_worker.py ai_trade/tests/test_scheduler.py ai_trade/tests/test_pull_worker.py +git commit -m "fix(ai_trade): post-close trigger를 상태기반으로 변경 (F3)" +``` + +--- + +## Task 4: F4 — Chronos confidence를 absolute spread 기반으로 통일 + +**Files:** +- Modify: `ai_trade/chronos_predictor.py:106, 127` +- Test: `ai_trade/tests/test_chronos_predictor.py` (add 2 tests) + +**Why absolute:** Phase 4 spec amendment (web-ui commit 534ded5)가 absolute spread로 hard gate를 결정. confidence도 같은 철학으로. 새 산식: `conf = max(0, min(1, 1 - spread / SPREAD_THRESHOLD))` — spread가 0.6에 도달하면 conf=0, 0이면 conf=1. + +- [ ] **Step 1: Write the failing tests** + +기존 `ai_trade/tests/test_chronos_predictor.py` 끝에 추가 (파일이 없거나 비어있으면 신규): + +```python +import numpy as np +import pytest +import torch + + +@pytest.fixture +def fake_pipeline(): + """predict_quantiles만 stub하는 가짜 pipeline.""" + class FakePipeline: + def __init__(self, q10_price, q50_price, q90_price): + self._q10, self._q50, self._q90 = q10_price, q50_price, q90_price + def predict_quantiles(self, contexts, prediction_length, quantile_levels): + n = len(contexts) + tensor = torch.tensor( + [[[self._q10, self._q50, self._q90]]] * n, + dtype=torch.float32, + ) + return tensor, None + return FakePipeline + + +def _make_predictor_with(pipeline_obj): + """ChronosPredictor 인스턴스 (실제 모델 안 부르고 pipeline만 주입).""" + from ai_trade.chronos_predictor import ChronosPredictor + p = ChronosPredictor.__new__(ChronosPredictor) + p._pipeline = pipeline_obj + p._device = "cpu" + return p + + +def test_confidence_high_when_spread_near_zero(fake_pipeline): + """median≈0, spread≈0 (q10=q90=last_close)일 때 conf≈1 (F4).""" + last_close = 100000.0 + p = _make_predictor_with(fake_pipeline(last_close, last_close, last_close)) + ohlcv = {"A": [{"close": last_close}] * 30} + out = p.predict_batch(ohlcv) + assert out["A"].conf > 0.95, ( + f"median≈0 + spread≈0인데 conf={out['A'].conf} (F4 회귀: relative spread로 폭증)" + ) + + +def test_confidence_drops_with_spread(fake_pipeline): + """spread 0.3일 때 conf≈0.5 (1 - 0.3/0.6 = 0.5).""" + last_close = 100000.0 + # q10=85000 → -0.15, q90=115000 → 0.15, spread=0.30 + p = _make_predictor_with(fake_pipeline(85000.0, 100000.0, 115000.0)) + ohlcv = {"A": [{"close": last_close}] * 30} + out = p.predict_batch(ohlcv) + # 1 - 0.30/0.60 = 0.50 + assert 0.45 < out["A"].conf < 0.55, ( + f"absolute spread 0.30에서 conf={out['A'].conf} (expected ≈0.5)" + ) + + +def test_confidence_zero_at_threshold_spread(fake_pipeline): + """spread가 threshold(0.6) 이상이면 conf=0.""" + last_close = 100000.0 + # q10=70000 → -0.30, q90=130000 → 0.30, spread=0.60 + p = _make_predictor_with(fake_pipeline(70000.0, 100000.0, 130000.0)) + ohlcv = {"A": [{"close": last_close}] * 30} + out = p.predict_batch(ohlcv) + assert out["A"].conf < 0.05, ( + f"spread=threshold에서 conf={out['A'].conf} (expected ≈0)" + ) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +``` +python -m pytest ai_trade/tests/test_chronos_predictor.py -v -k confidence +``` + +Expected: `test_confidence_high_when_spread_near_zero` 가 FAIL — 현행 relative spread 산식 때문에 median≈0에서 conf가 0으로 폭락. + +- [ ] **Step 3: Fix chronos_predictor.py** + +`ai_trade/chronos_predictor.py` 상단에 상수 추가 (L13 근처): + +```python +_SPREAD_THRESHOLD = 0.6 # F4: signal_generator hard gate와 동일 (absolute return spread) +``` + +L106 (modern API 경로) 변경: + +```python + # shape: [num_series, prediction_length, 3] + for i, ticker in enumerate(tickers): + q10_price, q50_price, q90_price = quantiles_np[i, 0, :] + last_close = daily_ohlcv_dict[ticker][-1]["close"] + median = float((q50_price - last_close) / last_close) + q10 = float((q10_price - last_close) / last_close) + q90 = float((q90_price - last_close) / last_close) + # F4: absolute spread (q90-q10) 기반 — signal_generator hard gate와 통일. + # median≈0 zero-shot 케이스에서 conf가 0으로 폭락하던 relative 산식 제거. + spread = q90 - q10 + conf = float(max(0.0, min(1.0, 1.0 - spread / _SPREAD_THRESHOLD))) + results[ticker] = ChronosPrediction( + median=median, q10=q10, q90=q90, conf=conf, as_of=now_iso, + ) + return results +``` + +L127 (legacy API 경로) 동일하게 변경: + +```python + spread = q90 - q10 + conf = float(max(0.0, min(1.0, 1.0 - spread / _SPREAD_THRESHOLD))) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +``` +python -m pytest ai_trade/tests/test_chronos_predictor.py -v +``` + +Expected: 신규 3개 모두 PASS. 기존 test도 PASS. + +- [ ] **Step 5: Run full ai_trade suite** + +``` +python -m pytest ai_trade/tests -q +``` + +Expected: 모두 PASS. signal_generator 테스트(`_compute_buy_confidence` 가 `pred["conf"]` 사용) 도 영향 받을 수 있으니 주시. + +- [ ] **Step 6: Commit** + +```bash +git add ai_trade/chronos_predictor.py ai_trade/tests/test_chronos_predictor.py +git commit -m "fix(ai_trade): Chronos confidence를 absolute spread 기반으로 통일 (F4)" +``` + +--- + +## Task 5: 전체 회귀 확인 + push + +- [ ] **Step 1: Run full ai_trade suite + count** + +``` +python -m pytest ai_trade/tests -v +``` + +Expected: +- 기존 56 tests + 신규 (config 2 + kis_client 1 + scheduler 5 + pull_worker 1 + chronos_predictor 3) = **68 tests** 정도 PASS. + +- [ ] **Step 2: Quick sanity — server boot smoke test (시간 허용 시)** + +``` +cd ai_trade && python -c "from main import app; print('app import OK')" +``` + +Expected: no import errors. + +- [ ] **Step 3: Push** + +```bash +git push origin main +``` + +--- + +## Self-Review Checklist + +이 plan을 다 작성한 뒤 다음을 확인: + +1. **F1**: config.py default + 2 test (default + env override) ✅ +2. **F2**: `_throttle_lock` 추가 + 1 concurrent test ✅ +3. **F3**: `_is_post_close_trigger(now, last_post_close_date)` 시그니처 변경 + `poll_loop` 상태 추적 + 5 scheduler test + 1 pull_worker test ✅ +4. **F4**: `_SPREAD_THRESHOLD=0.6` 상수 + 두 분기(modern + legacy) 모두 absolute spread 적용 + 3 chronos_predictor test ✅ + +**누락 가능 항목**: +- `test_main.py` 가 `v1_token_path` default를 직접 검증한다면 Task 1에서 같이 갱신. 위 patch는 Settings 객체 통해서만 다루므로 영향 없음(검증 완료). +- Task 3 pull_worker test의 `FrozenDateTime.now`는 `datetime.now(KST)` 호출만 stub함. 다른 datetime 사용 부분 영향 없음 (verified L28). +- Task 4 test는 ChronosPredictor `__new__`로 우회 — 실제 HuggingFace 모델 로딩 안 함. + +--- + +## Execution Handoff + +**Plan complete and saved to `docs/superpowers/plans/2026-05-25-ai-trade-hotfix.md`.** + +두 가지 실행 옵션: + +**1. Subagent-Driven (recommended)** — task 별 fresh subagent dispatch + two-stage review. F2/F3 같이 미묘한 동시성/상태 변경에 유리. + +**2. Inline Execution** — 현 세션에서 직접 task별 진행 + checkpoint. + +박재오 결정 대기. diff --git a/docs/superpowers/plans/2026-05-25-render-queue-reliability.md b/docs/superpowers/plans/2026-05-25-render-queue-reliability.md new file mode 100644 index 0000000..5d8f642 --- /dev/null +++ b/docs/superpowers/plans/2026-05-25-render-queue-reliability.md @@ -0,0 +1,704 @@ +# Render Queue Reliability — Code Review F6 Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** 4개 render worker(insta/music/video/image-render)가 BLPOP 직후 crash 시 작업 손실되는 문제 해결. BLMOVE(또는 BRPOPLPUSH)로 atomic dequeue + processing list 패턴 + startup recovery + retry/dead-letter. + +**Architecture:** +1. 각 worker가 unique `worker_id` 보유: `--` (env로 override 가능). +2. atomic dequeue: `BLMOVE queue:-render processing:-render: RIGHT LEFT 5` — 5초 timeout. (`BRPOPLPUSH`는 Redis 6.2+ deprecated, `BLMOVE`가 후속). +3. 작업 성공: `LREM processing:-render: 1 ` — 정확 1개 제거. +4. 작업 실패: payload에 `attempts` counter 증가시켜 main queue 끝으로 LPUSH; 한계(기본 3) 초과 시 `dead_letter:` 로 이동. +5. **Startup recovery**: worker 시작 시 자신의 processing list가 비어있지 않으면 → 모두 main queue로 되돌림 (재시도). attempts 증가. +6. NAS측 producer는 무변경 (LPUSH 그대로). 단, payload schema에 `attempts: int` (optional) 필드 명시 — producer는 안 채워도 worker가 default 0으로. + +**Shared module 전략:** 4개 worker가 동일 패턴이므로 `services/_shared/reliable_queue.py` 1개 만들고 각 Dockerfile에서 `COPY services/_shared /app/_shared` 후 `from _shared.reliable_queue import ReliableQueue`. compose entry/dockerfile 변경 4건. (DRY > inline 4중복.) + +**Tech Stack:** Python 3.12, redis.asyncio 5.x, fakeredis (pytest dep), pytest-asyncio. + +**Working directory:** `C:\Users\jaeoh\Desktop\workspace\web-ai`. + +--- + +## File Map + +| 파일 | 변경 | 책임 | +|------|------|------| +| `services/_shared/__init__.py` | Create | namespace package | +| `services/_shared/reliable_queue.py` | Create | `ReliableQueue` 클래스 — dequeue, ack, fail, recover | +| `services/_shared/tests/test_reliable_queue.py` | Create | fakeredis 단위 테스트 6개 | +| `services/_shared/requirements.txt` | Create | redis>=5.0, fakeredis (test only) | +| `services/insta-render/Dockerfile` | Modify | `COPY services/_shared /app/_shared` + PYTHONPATH | +| `services/insta-render/worker.py` | Modify L1~ | BLPOP → ReliableQueue 사용 | +| `services/insta-render/tests/test_worker.py` | Append | 1 integration test (recovery) | +| `services/music-render/Dockerfile` | Modify | shared copy | +| `services/music-render/worker.py` | Modify | ReliableQueue 사용 | +| `services/music-render/tests/test_worker.py` | Append | recovery test | +| `services/video-render/Dockerfile` | Modify | shared copy | +| `services/video-render/worker.py` | Modify | ReliableQueue 사용 | +| `services/video-render/tests/test_worker.py` | Append | recovery test | +| `services/image-render/Dockerfile` | Modify | shared copy | +| `services/image-render/worker.py` | Modify | ReliableQueue 사용 | +| `services/image-render/tests/test_worker.py` | Append | recovery test | +| `services/docker-compose.yml` (있다면) | Verify | build context가 services/ 루트 포함하는지 | + +--- + +## Task 1: ReliableQueue 공유 모듈 작성 + +**Files:** +- Create: `services/_shared/__init__.py` +- Create: `services/_shared/reliable_queue.py` +- Create: `services/_shared/tests/__init__.py` +- Create: `services/_shared/tests/test_reliable_queue.py` +- Create: `services/_shared/requirements.txt` + +- [ ] **Step 1: Create namespace package** + +```python +# services/_shared/__init__.py +``` +(빈 파일) + +```python +# services/_shared/tests/__init__.py +``` +(빈 파일) + +- [ ] **Step 2: Write failing tests first** + +```python +# services/_shared/tests/test_reliable_queue.py +"""F6 — ReliableQueue: atomic dequeue + recovery + retry.""" +import json + +import fakeredis.aioredis +import pytest + +from _shared.reliable_queue import ReliableQueue + + +@pytest.fixture +async def redis(): + r = fakeredis.aioredis.FakeRedis(decode_responses=False) + yield r + await r.flushall() + await r.aclose() + + +async def test_dequeue_atomically_moves_to_processing(redis): + """BLMOVE: queue → processing 원자적 이동.""" + q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1") + await redis.lpush("queue:test", json.dumps({"task_id": "t1"}).encode()) + payload, raw = await q.dequeue(timeout=1) + assert payload["task_id"] == "t1" + # main queue는 비어있고, processing list에 들어있어야 함 + assert await redis.llen("queue:test") == 0 + assert await redis.llen("processing:queue:test:w1") == 1 + + +async def test_dequeue_returns_none_on_timeout(redis): + q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1") + result = await q.dequeue(timeout=1) + assert result is None + + +async def test_ack_removes_from_processing(redis): + q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1") + await redis.lpush("queue:test", json.dumps({"task_id": "t1"}).encode()) + payload, raw = await q.dequeue(timeout=1) + await q.ack(raw) + assert await redis.llen("processing:queue:test:w1") == 0 + + +async def test_recover_returns_orphaned_to_main_queue(redis): + """startup recovery: 잔존 processing list 항목을 main queue로 되돌림.""" + # 이전 crash 시뮬레이션: processing list에 잔존 + orphan = json.dumps({"task_id": "t1", "attempts": 0}).encode() + await redis.lpush("processing:queue:test:w1", orphan) + q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1") + recovered = await q.recover() + assert recovered == 1 + assert await redis.llen("processing:queue:test:w1") == 0 + # 다시 dequeue 가능 + payload, raw = await q.dequeue(timeout=1) + assert payload["task_id"] == "t1" + assert payload["attempts"] == 1 # incremented on recover + + +async def test_fail_below_max_attempts_returns_to_main_queue(redis): + q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1", max_attempts=3) + await redis.lpush("queue:test", json.dumps({"task_id": "t1", "attempts": 0}).encode()) + payload, raw = await q.dequeue(timeout=1) + await q.fail(raw, payload) + assert await redis.llen("processing:queue:test:w1") == 0 + assert await redis.llen("queue:test") == 1 + # attempts 증가됐는지 + requeued_raw = await redis.lindex("queue:test", 0) + requeued = json.loads(requeued_raw) + assert requeued["attempts"] == 1 + + +async def test_fail_at_max_attempts_moves_to_dead_letter(redis): + q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1", max_attempts=3) + await redis.lpush( + "queue:test", json.dumps({"task_id": "t1", "attempts": 2}).encode() + ) + payload, raw = await q.dequeue(timeout=1) + await q.fail(raw, payload) + # attempts 2 → 3 (== max) → dead-letter + assert await redis.llen("queue:test") == 0 + assert await redis.llen("processing:queue:test:w1") == 0 + assert await redis.llen("dead_letter:queue:test") == 1 +``` + +- [ ] **Step 3: Add requirements** + +```text +# services/_shared/requirements.txt +redis>=5.0.0 +``` + +별도 dev requirements (test): + +```text +# services/_shared/tests/requirements-dev.txt (optional) +fakeredis>=2.20.0 +pytest>=8.0.0 +pytest-asyncio>=0.23.0 +``` + +- [ ] **Step 4: Run tests to verify they fail** + +``` +cd services/_shared && python -m pytest tests/ -v +``` + +Expected: ImportError — reliable_queue.py 미존재. + +- [ ] **Step 5: Write reliable_queue.py** + +```python +# services/_shared/reliable_queue.py +"""F6 — Reliable Redis queue with processing list + recovery + retry. + +Pattern: BLMOVE main → processing (atomic), then either ack (LREM processing) or +fail (LREM processing + re-enqueue or dead-letter). + +Startup recovery: any items left in the worker's processing list from a previous +crash are pushed back to main queue (with attempts incremented). +""" +from __future__ import annotations + +import json +import logging +import os +import socket +from typing import Optional + +logger = logging.getLogger(__name__) + + +def default_worker_id(queue_key: str) -> str: + """env > hostname-pid.""" + explicit = os.getenv("WORKER_ID") + if explicit: + return explicit + return f"{queue_key}-{socket.gethostname()}-{os.getpid()}" + + +class ReliableQueue: + """Wraps a redis client to provide BLMOVE-backed atomic dequeue + + processing list + retry/dead-letter. + + Producer side stays unchanged: LPUSH queue: . + Worker side: dequeue() → process → ack(raw) on success or fail(raw, payload) on error. + Startup: await queue.recover() to re-enqueue orphans. + """ + + def __init__( + self, + redis, + queue_key: str, + worker_id: Optional[str] = None, + max_attempts: int = 3, + ): + self._redis = redis + self._queue_key = queue_key + self._worker_id = worker_id or default_worker_id(queue_key) + self._processing_key = f"processing:{queue_key}:{self._worker_id}" + self._dead_letter_key = f"dead_letter:{queue_key}" + self._max_attempts = max_attempts + + @property + def processing_key(self) -> str: + return self._processing_key + + async def dequeue(self, timeout: int = 5) -> Optional[tuple[dict, bytes]]: + """Atomically move 1 item from main queue tail to processing head. + + Returns (parsed_dict, raw_bytes) or None on timeout. + Caller MUST call ack(raw) on success or fail(raw, payload) on error. + """ + raw = await self._redis.blmove( + self._queue_key, self._processing_key, + timeout=timeout, src="RIGHT", dest="LEFT", + ) + if raw is None: + return None + try: + payload = json.loads(raw) + except json.JSONDecodeError: + logger.error("invalid payload on dequeue, moving to dead-letter: %r", raw[:200]) + await self._redis.lrem(self._processing_key, 1, raw) + await self._redis.lpush(self._dead_letter_key, raw) + return None + return payload, raw + + async def ack(self, raw: bytes) -> None: + """Successful processing — remove from processing list.""" + removed = await self._redis.lrem(self._processing_key, 1, raw) + if removed == 0: + logger.warning("ack on missing payload (already removed?): %r", raw[:100]) + + async def fail(self, raw: bytes, payload: dict) -> None: + """Failed processing — remove from processing list and either re-enqueue or dead-letter.""" + await self._redis.lrem(self._processing_key, 1, raw) + attempts = int(payload.get("attempts", 0)) + 1 + if attempts >= self._max_attempts: + payload["attempts"] = attempts + await self._redis.lpush(self._dead_letter_key, json.dumps(payload).encode()) + logger.error( + "task moved to dead-letter after %d attempts: task_id=%s", + attempts, payload.get("task_id"), + ) + return + payload["attempts"] = attempts + await self._redis.lpush(self._queue_key, json.dumps(payload).encode()) + logger.info( + "task re-enqueued (attempt %d/%d): task_id=%s", + attempts, self._max_attempts, payload.get("task_id"), + ) + + async def recover(self) -> int: + """Startup: move all orphans from this worker's processing list back to main queue. + + Increments attempts counter (orphan == implicit failure). + Returns count of recovered items. + """ + count = 0 + while True: + raw = await self._redis.lpop(self._processing_key) + if raw is None: + break + try: + payload = json.loads(raw) + except json.JSONDecodeError: + await self._redis.lpush(self._dead_letter_key, raw) + count += 1 + continue + payload["attempts"] = int(payload.get("attempts", 0)) + 1 + if payload["attempts"] >= self._max_attempts: + await self._redis.lpush(self._dead_letter_key, json.dumps(payload).encode()) + else: + await self._redis.lpush(self._queue_key, json.dumps(payload).encode()) + count += 1 + if count: + logger.info("recovered %d orphaned items for worker %s", count, self._worker_id) + return count +``` + +**참고: redis-py blmove API**: `client.blmove(first_list, second_list, timeout, src=..., dest=...)`. timeout=0 은 block forever. payload는 bytes로 받음 (`decode_responses=False` 가정). + +- [ ] **Step 6: Run tests to verify they pass** + +``` +cd services/_shared && python -m pytest tests/ -v +``` + +Expected: 6 PASS. + +만약 ImportError (`fakeredis` 미설치) 발생 시: + +``` +python -m pip install fakeredis pytest-asyncio +``` + +또한 `pytest.ini` 또는 `conftest.py`에 `asyncio_mode = "auto"` 필요. 신규 conftest: + +```python +# services/_shared/tests/conftest.py +import pytest +pytest_plugins = ["pytest_asyncio"] + + +def pytest_collection_modifyitems(config, items): + for item in items: + if "asyncio" in item.fixturenames or item.get_closest_marker("asyncio") is not None: + continue + # auto-mark all async tests + if item.function.__name__.startswith("test_"): + import asyncio, inspect + if inspect.iscoroutinefunction(item.function): + item.add_marker(pytest.mark.asyncio) +``` + +또는 더 간단히 `services/_shared/pytest.ini`: + +```ini +[pytest] +asyncio_mode = auto +``` + +- [ ] **Step 7: Commit** + +```bash +git add services/_shared/ +git commit -m "feat(services): _shared/reliable_queue 신설 — BLMOVE + processing list + retry (F6 part 1)" +``` + +--- + +## Task 2: insta-render에 ReliableQueue 적용 + +**Files:** +- Modify: `services/insta-render/Dockerfile` +- Modify: `services/insta-render/worker.py` +- Modify: `services/insta-render/tests/test_worker.py` (append) + +- [ ] **Step 1: Update Dockerfile** + +`services/insta-render/Dockerfile` 에 `_shared` 복사 추가. 기존 Dockerfile 패턴을 먼저 읽고, `COPY services/insta-render /app` 같은 라인이 있다면 그 위 또는 옆에: + +```dockerfile +COPY services/_shared /app/_shared +ENV PYTHONPATH=/app:/app/_shared:${PYTHONPATH} +``` + +build context가 `services/` 루트여야 함. compose에서 `build: { context: ./services, dockerfile: insta-render/Dockerfile }` 인지 확인 — 아니라면 context 조정 필요. + +- [ ] **Step 2: Modify worker.py — failing test first** + +`services/insta-render/tests/test_worker.py` 끝에 추가: + +```python +import json +from unittest.mock import AsyncMock, patch +import pytest + + +@pytest.mark.asyncio +async def test_worker_calls_ack_on_success(): + """성공 시 ack() 호출 (F6).""" + import worker + fake_payload = {"task_id": "t1", "job_type": "card_generation", "params": {}} + fake_raw = json.dumps(fake_payload).encode() + + fake_queue = AsyncMock() + fake_queue.dequeue = AsyncMock(side_effect=[(fake_payload, fake_raw), None]) + fake_queue.ack = AsyncMock() + fake_queue.fail = AsyncMock() + fake_queue.recover = AsyncMock(return_value=0) + + with patch.object(worker, "ReliableQueue", return_value=fake_queue), \ + patch.object(worker, "_dispatch") as disp: + # poll_once로 1 cycle만 실행 (실제 loop 끊기 위해) + await worker.poll_once(fake_queue) + disp.assert_called_once() + fake_queue.ack.assert_called_once_with(fake_raw) + fake_queue.fail.assert_not_called() + + +@pytest.mark.asyncio +async def test_worker_calls_fail_on_dispatch_exception(): + """dispatch 예외 시 fail() 호출 — 작업 손실 안 됨 (F6).""" + import worker + fake_payload = {"task_id": "t2", "job_type": "card_generation", "params": {}} + fake_raw = json.dumps(fake_payload).encode() + + fake_queue = AsyncMock() + fake_queue.dequeue = AsyncMock(return_value=(fake_payload, fake_raw)) + fake_queue.ack = AsyncMock() + fake_queue.fail = AsyncMock() + + with patch.object(worker, "_dispatch", side_effect=RuntimeError("boom")): + await worker.poll_once(fake_queue) + fake_queue.fail.assert_called_once_with(fake_raw, fake_payload) + fake_queue.ack.assert_not_called() +``` + +- [ ] **Step 3: Run test to fail** + +``` +cd services/insta-render && python -m pytest tests/ -v -k "ack_on_success or fail_on_dispatch" +``` + +Expected: AttributeError (`worker.poll_once` 미존재, `worker.ReliableQueue` 미존재). + +- [ ] **Step 4: Rewrite insta-render worker.py** + +```python +"""Redis 기반 worker — F6 신뢰성 패턴 적용 (BLMOVE + processing list + retry).""" +from __future__ import annotations + +import asyncio +import logging +import os +import sys + +import redis.asyncio as aioredis + +from _shared.reliable_queue import ReliableQueue +from nas_client import webhook_update_task +# 기존 dispatch 대상 import 유지 +from card_renderer import render_card + +logger = logging.getLogger(__name__) + +REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379") +QUEUE_KEY = "queue:insta-render" +PAUSED_KEY = "queue:paused" + + +_DISPATCH_TABLE = { + "card_generation": "render_card", +} + + +def _dispatch(payload: dict) -> None: + job_type = payload.get("job_type", "") + task_id = payload.get("task_id", "") + params = payload.get("params", {}) + fn_name = _DISPATCH_TABLE.get(job_type) + if fn_name is None: + logger.error("unknown job_type=%s task=%s", job_type, task_id) + webhook_update_task(task_id, "failed", 0, "", error=f"unknown job_type: {job_type}") + return + try: + fn = getattr(sys.modules[__name__], fn_name) + except AttributeError: + webhook_update_task(task_id, "failed", 0, "", error=f"internal dispatch error: {fn_name}") + return + fn(task_id, params) + + +async def poll_once(queue: ReliableQueue) -> bool: + """1 cycle: dequeue → dispatch → ack/fail. Returns True if a job was handled.""" + result = await queue.dequeue(timeout=5) + if result is None: + return False + payload, raw = result + try: + await asyncio.to_thread(_dispatch, payload) + except Exception: + logger.exception("dispatch failed task_id=%s", payload.get("task_id")) + await queue.fail(raw, payload) + return True + await queue.ack(raw) + return True + + +async def worker_loop(): + redis = aioredis.from_url(REDIS_URL, decode_responses=False) + queue = ReliableQueue(redis, queue_key=QUEUE_KEY) + logger.info("insta-render worker started worker_id=%s", queue._worker_id) + # F6: startup recovery + try: + recovered = await queue.recover() + if recovered: + logger.info("recovered %d orphaned items at startup", recovered) + except Exception: + logger.exception("startup recover failed") + while True: + try: + paused = await redis.get(PAUSED_KEY) + if paused == b"1": + await asyncio.sleep(10) + continue + await poll_once(queue) + except asyncio.CancelledError: + logger.info("worker_loop cancelled") + raise + except Exception: + logger.exception("worker_loop iteration 실패, 5초 후 재시도") + await asyncio.sleep(5) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + asyncio.run(worker_loop()) +``` + +**NOTE**: 기존 `insta-render/worker.py`의 dispatch table·import는 실제 파일을 보고 매핑 유지. 위 예시는 minimal — job_type / function 이름은 기존 파일과 맞춰야 함. 변경 전 `Read services/insta-render/worker.py`로 정확한 dispatch table 확인할 것. + +- [ ] **Step 5: Run tests** + +``` +cd services/insta-render && python -m pytest tests/ -v +``` + +Expected: 신규 2 PASS, 기존 PASS (dispatch table test 등). + +- [ ] **Step 6: Commit** + +```bash +git add services/insta-render/ +git commit -m "fix(insta-render): F6 ReliableQueue 적용 — BLMOVE + ack/fail (F6 part 2)" +``` + +--- + +## Task 3: music-render에 동일 적용 + +**Files:** +- Modify: `services/music-render/Dockerfile`, `worker.py` +- Modify: `services/music-render/tests/test_worker.py` (append) + +- [ ] **Step 1: Dockerfile에 `COPY services/_shared` 추가** +- [ ] **Step 2: Test 추가 (Task 2 패턴 동일, 단 dispatch target은 `run_suno_generation` 등 기존 패턴)** + +```python +@pytest.mark.asyncio +async def test_music_worker_ack_on_success(): + import worker + payload = {"task_id": "t1", "job_type": "suno_generation", "params": {}} + raw = json.dumps(payload).encode() + fake_queue = AsyncMock() + fake_queue.dequeue = AsyncMock(return_value=(payload, raw)) + fake_queue.ack = AsyncMock() + with patch.object(worker, "_dispatch"): + await worker.poll_once(fake_queue) + fake_queue.ack.assert_called_once_with(raw) + + +@pytest.mark.asyncio +async def test_music_worker_fail_on_exception(): + import worker + payload = {"task_id": "t2", "job_type": "suno_generation", "params": {}} + raw = json.dumps(payload).encode() + fake_queue = AsyncMock() + fake_queue.dequeue = AsyncMock(return_value=(payload, raw)) + fake_queue.fail = AsyncMock() + with patch.object(worker, "_dispatch", side_effect=RuntimeError("x")): + await worker.poll_once(fake_queue) + fake_queue.fail.assert_called_once_with(raw, payload) +``` + +- [ ] **Step 3: Run test to fail** +- [ ] **Step 4: Rewrite music-render worker.py — `worker_loop` 구조는 insta-render와 동일, `_dispatch` + `_DISPATCH_TABLE`은 기존 12개 함수 그대로 유지** +- [ ] **Step 5: Run tests** +- [ ] **Step 6: Commit** + +```bash +git add services/music-render/ +git commit -m "fix(music-render): F6 ReliableQueue 적용 (F6 part 3)" +``` + +--- + +## Task 4: video-render에 동일 적용 + +(Task 3와 동일 패턴 — sora/veo/kling/seedance 4 provider table 유지) + +- [ ] **Step 1: Dockerfile 수정** +- [ ] **Step 2: 신규 test 2개 추가 (`test_video_worker_ack_on_success`, `test_video_worker_fail_on_exception`) — job_type은 `sora_generation`** +- [ ] **Step 3: Run failing test** +- [ ] **Step 4: Rewrite worker.py — 동일 패턴** +- [ ] **Step 5: Run tests** +- [ ] **Step 6: Commit** + +```bash +git add services/video-render/ +git commit -m "fix(video-render): F6 ReliableQueue 적용 (F6 part 4)" +``` + +--- + +## Task 5: image-render에 동일 적용 + +(gpt_image / nano_banana / flux 3 provider table 유지) + +- [ ] **Step 1-6: Task 3/4 동일 패턴** + +```bash +git add services/image-render/ +git commit -m "fix(image-render): F6 ReliableQueue 적용 (F6 part 5)" +``` + +--- + +## Task 6: 운영 검증 + push + +- [ ] **Step 1: 전체 services test 실행** + +``` +cd services && for d in _shared insta-render music-render video-render image-render; do + echo "--- $d ---" + (cd $d && python -m pytest tests/ -q) || true +done +``` + +(또는 PowerShell:) + +```powershell +foreach ($d in @("_shared","insta-render","music-render","video-render","image-render")) { + Write-Output "--- $d ---" + Push-Location services/$d + python -m pytest tests/ -q + Pop-Location +} +``` + +Expected: 4개 worker 각 신규 2개 + _shared 6개 + 기존 test 전부 PASS. + +- [ ] **Step 2: Docker build 시뮬 (옵션, 시간 허용 시)** + +``` +cd services && docker compose build insta-render music-render video-render image-render +``` + +Expected: build context에 `_shared` 포함됨 검증. + +- [ ] **Step 3: Push** + +```bash +git push origin main +``` + +- [ ] **Step 4: 운영 deploy 시 주의사항 (수동)** + +NAS에서 컨테이너 재배포 시: +1. `redis-cli -h 192.168.45.54 KEYS 'processing:*'` 로 기존 orphan 확인 — 있다면 worker_id 다르면 안 잡힘. 수동으로 `LMOVE` 해야 할 수도 있음. +2. `redis-cli -h 192.168.45.54 KEYS 'dead_letter:*'` 로 dead-letter 모니터 — 누적되면 alerting 필요. +3. WORKER_ID env로 unique 하게 (`WORKER_ID=insta-render-prod-1` 등) 권장 — hostname이 컨테이너 재기동 시 바뀌면 orphan 추적 안 됨. + +--- + +## Self-Review + +1. **atomic dequeue**: `BLMOVE` 단일 명령 — Redis 단일 트랜잭션 ✅ +2. **ack on success**: `LREM processing 1 raw` — 정확 1개 ✅ +3. **fail with retry**: attempts < max → 재큐, attempts >= max → dead-letter ✅ +4. **startup recovery**: orphan 자동 재큐 (attempts 증가) ✅ +5. **4 worker 적용**: insta/music/video/image 동일 패턴 ✅ +6. **NAS producer 호환**: LPUSH 그대로, payload schema에 attempts 선택적 ✅ + +**미커버 (의도적)**: +- dead-letter monitor/alert — 운영 작업 (CHECK_POINT 백로그) +- worker_id env 미설정 시 hostname 변경 시 orphan 분실 — 운영 가이드에 명시 + +**가정 검증**: +- `redis-py.aioredis.blmove` 시그니처: `(first_list, second_list, timeout, src='LEFT', dest='RIGHT')`. redis>=5.0 권장. +- fakeredis: `fakeredis.aioredis.FakeRedis` (>=2.20.0) 가 BLMOVE 지원함 — 미지원 시 plan 적용 전 검증. + +--- + +## Execution Handoff + +**Plan complete and saved to `docs/superpowers/plans/2026-05-25-render-queue-reliability.md`.** + +**1. Subagent-Driven (recommended)** — Task 별 fresh subagent. 4개 worker는 패턴 같으나 dispatch table은 각 worker 고유 — subagent가 정확히 일관성 유지하도록 review checkpoint. + +**2. Inline Execution** — 현 세션 실행. + +박재오 결정 대기. Plan 1·2 마친 후 진입 권장 (작업량 가장 큼 — 4개 worker × 약 1시간 = 4시간). diff --git a/docs/superpowers/plans/2026-05-25-state-signals-lifecycle.md b/docs/superpowers/plans/2026-05-25-state-signals-lifecycle.md new file mode 100644 index 0000000..d5c5bc9 --- /dev/null +++ b/docs/superpowers/plans/2026-05-25-state-signals-lifecycle.md @@ -0,0 +1,593 @@ +# state.signals Lifecycle — Code Review F5 Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** `state.signals` 가 무한 dict 누적되는 문제를 해결. expires_at + cycle_id 부착해서 Phase 5 consumer (agent-office `/signal`) 가 stale 신호를 안전하게 무시할 수 있게. + +**Architecture:** +1. `Signal` dict에 `expires_at: ISO str`, `cycle_id: int` 필드 추가. +2. `PollState.signal_cycle_id: int` (process 단위 auto-increment). +3. `generate_signals(state, dedup, settings)` 진입마다 `cycle_id += 1`. +4. emit하는 모든 signal에 `expires_at = as_of + SIGNAL_TTL_SECONDS`, `cycle_id = state.signal_cycle_id` 부착. +5. `state.purge_expired_signals(now)` helper — 매 cycle 끝에 호출하여 만료된 항목 제거. +6. `state.get_active_signals(now) → list[dict]` — Phase 5 consumer가 호출할 read API. 만료된 것 제외. + +**Tech Stack:** Python 3.12, asyncio, pytest. 기존 cycle 흐름과 호환되도록 generate_signals 인터페이스는 그대로. + +**Why expires_at + cycle_id (not pop-on-read):** consumer가 polling 실패해도 신호 손실 없음. cycle_id로 "이번 cycle에 새로 emit된 신호" 식별 가능 → Phase 5에서 incremental fetch 가능. + +**Working directory:** `C:\Users\jaeoh\Desktop\workspace\web-ai`. + +**Test runner:** `python -m pytest ai_trade/tests -q` (또는 `py -3.12 -m`). 환경 부재 시 plan 진행 중단. + +--- + +## File Map + +| 파일 | 변경 | 책임 | +|------|------|------| +| `ai_trade/config.py` | Add 1 field | `signal_ttl_seconds: int` (default 300) | +| `ai_trade/state.py` | Modify | `signal_cycle_id: int`, helper 2개 (`get_active_signals`, `purge_expired_signals`) | +| `ai_trade/signal_generator.py` | Modify L22-50, 133, 99-111, 174-186 | cycle_id 증가 + expires_at/cycle_id 부착 | +| `ai_trade/pull_worker.py` | Modify L46-51 근처 | cycle 끝에 purge 호출 | +| `ai_trade/tests/test_state_signals_lifecycle.py` | Create | 5 test (expires, cycle_id, purge, active list) | +| `ai_trade/tests/test_signal_generator.py` | Modify | 기존 emit test에 expires_at/cycle_id 필드 검증 추가 | + +--- + +## Task 1: PollState에 cycle_id + lifecycle helper 추가 + +**Files:** +- Modify: `ai_trade/state.py` +- Test: `ai_trade/tests/test_state_signals_lifecycle.py` (Create) + +- [ ] **Step 1: Write the failing test** + +```python +# ai_trade/tests/test_state_signals_lifecycle.py +"""F5 — state.signals lifecycle (expires_at + cycle_id).""" +from datetime import datetime, timedelta +from zoneinfo import ZoneInfo + +import pytest + +from ai_trade.state import PollState + +KST = ZoneInfo("Asia/Seoul") + + +def test_initial_signal_cycle_id_is_zero(): + state = PollState() + assert state.signal_cycle_id == 0 + + +def test_get_active_signals_excludes_expired(): + state = PollState() + now = datetime(2026, 5, 25, 10, 0, tzinfo=KST) + future = (now + timedelta(seconds=300)).isoformat() + past = (now - timedelta(seconds=60)).isoformat() + state.signals = { + "A": {"ticker": "A", "expires_at": future, "cycle_id": 1, "action": "buy"}, + "B": {"ticker": "B", "expires_at": past, "cycle_id": 1, "action": "buy"}, + } + active = state.get_active_signals(now) + tickers = [s["ticker"] for s in active] + assert "A" in tickers + assert "B" not in tickers + + +def test_get_active_signals_treats_missing_expires_as_expired(): + """expires_at 없는 legacy 신호는 expired로 간주.""" + state = PollState() + now = datetime(2026, 5, 25, 10, 0, tzinfo=KST) + state.signals = {"C": {"ticker": "C", "action": "buy"}} + assert state.get_active_signals(now) == [] + + +def test_purge_expired_signals_removes_expired(): + state = PollState() + now = datetime(2026, 5, 25, 10, 0, tzinfo=KST) + future = (now + timedelta(seconds=300)).isoformat() + past = (now - timedelta(seconds=60)).isoformat() + state.signals = { + "A": {"ticker": "A", "expires_at": future, "cycle_id": 1}, + "B": {"ticker": "B", "expires_at": past, "cycle_id": 1}, + } + state.purge_expired_signals(now) + assert "A" in state.signals + assert "B" not in state.signals +``` + +- [ ] **Step 2: Run test to verify it fails** + +``` +python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v +``` + +Expected: `AttributeError: signal_cycle_id` 또는 `get_active_signals` 미구현. + +- [ ] **Step 3: Modify state.py** + +```python +"""PollState — process-wide singleton.""" +from collections import deque +from dataclasses import dataclass, field +from datetime import datetime + + +@dataclass +class PollState: + portfolio: dict | None = None + news_sentiment: dict | None = None + screener_preview: dict | None = None + minute_bars: dict[str, deque] = field(default_factory=dict) + asking_price: dict[str, dict] = field(default_factory=dict) + # Phase 3b additions + daily_ohlcv: dict[str, list[dict]] = field(default_factory=dict) + chronos_predictions: dict[str, dict] = field(default_factory=dict) + minute_momentum: dict[str, str] = field(default_factory=dict) + signals: dict[str, dict] = field(default_factory=dict) + # F5 lifecycle + signal_cycle_id: int = 0 + last_updated: dict[str, str] = field(default_factory=dict) + fetch_errors: dict[str, int] = field(default_factory=dict) + + def get_active_signals(self, now: datetime) -> list[dict]: + """expires_at > now 인 신호만 반환. expires_at 없으면 expired 취급.""" + active: list[dict] = [] + for sig in self.signals.values(): + expires_at = sig.get("expires_at") + if not expires_at: + continue + try: + exp_dt = datetime.fromisoformat(expires_at) + except ValueError: + continue + if exp_dt > now: + active.append(sig) + return active + + def purge_expired_signals(self, now: datetime) -> int: + """만료된 signal 제거. 제거된 개수 반환.""" + to_drop = [] + for ticker, sig in self.signals.items(): + expires_at = sig.get("expires_at") + if not expires_at: + to_drop.append(ticker) + continue + try: + exp_dt = datetime.fromisoformat(expires_at) + except ValueError: + to_drop.append(ticker) + continue + if exp_dt <= now: + to_drop.append(ticker) + for t in to_drop: + del self.signals[t] + return len(to_drop) + + +state = PollState() +``` + +- [ ] **Step 4: Run test to verify it passes** + +``` +python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v +``` + +Expected: 4 PASS. + +- [ ] **Step 5: Verify full suite still passes** + +``` +python -m pytest ai_trade/tests -q +``` + +Expected: 기존 test 전부 PASS (state.signals dict 인터페이스 그대로). + +- [ ] **Step 6: Commit** + +```bash +git add ai_trade/state.py ai_trade/tests/test_state_signals_lifecycle.py +git commit -m "feat(ai_trade): state.signals에 expires_at + cycle_id lifecycle 추가 (F5 part 1)" +``` + +--- + +## Task 2: config에 SIGNAL_TTL_SECONDS 추가 + +**Files:** +- Modify: `ai_trade/config.py` +- Test: `ai_trade/tests/test_state_signals_lifecycle.py` (append) + +- [ ] **Step 1: Write failing test** + +`test_state_signals_lifecycle.py` 끝에 추가: + +```python +def test_signal_ttl_seconds_default(monkeypatch): + monkeypatch.delenv("SIGNAL_TTL_SECONDS", raising=False) + from ai_trade.config import Settings + s = Settings() + assert s.signal_ttl_seconds == 300 + + +def test_signal_ttl_seconds_env_override(monkeypatch): + monkeypatch.setenv("SIGNAL_TTL_SECONDS", "60") + from ai_trade.config import Settings + s = Settings() + assert s.signal_ttl_seconds == 60 +``` + +- [ ] **Step 2: Run test to fail** + +``` +python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v -k signal_ttl +``` + +Expected: AttributeError. + +- [ ] **Step 3: Add field to config.py** + +`Settings` 클래스 안에 추가 (다른 *_threshold 옆): + +```python + signal_ttl_seconds: int = field( + default_factory=lambda: int(os.getenv("SIGNAL_TTL_SECONDS", "300")) + ) +``` + +- [ ] **Step 4: Run test** + +``` +python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v -k signal_ttl +``` + +Expected: 2 PASS. + +- [ ] **Step 5: Commit** + +```bash +git add ai_trade/config.py ai_trade/tests/test_state_signals_lifecycle.py +git commit -m "feat(ai_trade): SIGNAL_TTL_SECONDS env 추가 (F5 part 2)" +``` + +--- + +## Task 3: signal_generator에 cycle_id + expires_at 부착 + +**Files:** +- Modify: `ai_trade/signal_generator.py` +- Test: `ai_trade/tests/test_signal_generator.py` (append) + +- [ ] **Step 1: Write failing tests** + +기존 `test_signal_generator.py` 끝에 추가: + +```python +from datetime import datetime, timedelta +from zoneinfo import ZoneInfo as _ZI + +_KST_TEST = _ZI("Asia/Seoul") + + +def test_emit_attaches_cycle_id_and_expires_at( + state_with_buy_setup, dedup_clean, settings_default, +): + """매 emit 시 cycle_id + expires_at 부착 (F5).""" + from ai_trade.signal_generator import generate_signals + before = datetime.now(_KST_TEST) + generate_signals(state_with_buy_setup, dedup_clean, settings_default) + after = datetime.now(_KST_TEST) + sig = state_with_buy_setup.signals["005930"] + assert sig["cycle_id"] == 1 + assert "expires_at" in sig + exp_dt = datetime.fromisoformat(sig["expires_at"]) + # as_of + 300s (default) — tolerance 5s + assert before + timedelta(seconds=295) < exp_dt < after + timedelta(seconds=305) + + +def test_cycle_id_increments_each_call( + state_with_buy_setup, dedup_clean, settings_default, +): + """generate_signals 호출마다 cycle_id += 1.""" + from ai_trade.signal_generator import generate_signals + generate_signals(state_with_buy_setup, dedup_clean, settings_default) + assert state_with_buy_setup.signal_cycle_id == 1 + # 2번째 호출 — dedup이 막아도 cycle_id는 증가해야 함 + generate_signals(state_with_buy_setup, dedup_clean, settings_default) + assert state_with_buy_setup.signal_cycle_id == 2 +``` + +**NOTE:** 기존 test_signal_generator.py에 `state_with_buy_setup` / `dedup_clean` / `settings_default` 같은 fixture가 있을 것. 만약 이름이 다르면 실제 fixture 이름에 맞춰 조정. 검증: `grep -n "@pytest.fixture" ai_trade/tests/test_signal_generator.py`. + +- [ ] **Step 2: Run tests to verify they fail** + +``` +python -m pytest ai_trade/tests/test_signal_generator.py -v -k "cycle_id or expires" +``` + +Expected: KeyError 또는 AttributeError. + +- [ ] **Step 3: Modify signal_generator.py** + +`generate_signals` 함수 (L22-25)를 변경: + +```python +def generate_signals(state, dedup, settings) -> None: + """Phase 4 entry — state-mutating. F5: cycle_id += 1 + expires_at 부착.""" + state.signal_cycle_id += 1 + _evaluate_sell_signals(state, dedup, settings) + _evaluate_buy_signals(state, dedup, settings) +``` + +`_build_buy_signal` (L99-111)에 두 필드 추가: + +```python +def _build_buy_signal(state, ticker: str, name: str, rank: int | None, confidence: float) -> dict: + ap = state.asking_price[ticker] + as_of_dt = datetime.now(KST) + expires_at = (as_of_dt + timedelta(seconds=getattr(_current_settings(), "signal_ttl_seconds", 300))).isoformat() + return { + "ticker": ticker, + "name": name, + "action": "buy", + "confidence_webai": confidence, + "current_price": ap["current_price"], + "avg_price": None, + "pnl_pct": None, + "context": _build_context(state, ticker, rank), + "as_of": as_of_dt.isoformat(), + "cycle_id": state.signal_cycle_id, + "expires_at": expires_at, + } +``` + +같이 `_build_sell_signal` (L174-186): + +```python +def _build_sell_signal(state, holding: dict, confidence: float, reason: str, settings=None) -> dict: + ticker = holding["ticker"] + as_of_dt = datetime.now(KST) + ttl = getattr(settings, "signal_ttl_seconds", 300) if settings else 300 + expires_at = (as_of_dt + timedelta(seconds=ttl)).isoformat() + return { + "ticker": ticker, + "name": holding.get("name", ticker), + "action": "sell", + "confidence_webai": confidence, + "current_price": holding.get("current_price"), + "avg_price": holding.get("avg_price"), + "pnl_pct": holding.get("pnl_pct"), + "context": _build_context(state, ticker, rank=None, sell_reason=reason), + "as_of": as_of_dt.isoformat(), + "cycle_id": state.signal_cycle_id, + "expires_at": expires_at, + } +``` + +`_build_buy_signal`이 settings를 안 받고 있으니, 호출부도 갱신해야 함. 현실적으로 두 함수에 `settings` 인자를 추가하는 것이 깔끔. 변경: + +```python +def _evaluate_buy_signals(state, dedup, settings) -> None: + candidates = _buy_candidates(state) + for ticker, name, rank in candidates: + existing = state.signals.get(ticker) + if existing is not None and existing.get("action") == "sell": + logger.debug("buy %s skipped: same-cycle sell precedence", ticker) + continue + if not _check_buy_hard_gate(state, ticker, settings): + logger.debug("buy %s skipped: hard gate failed", ticker) + continue + confidence = _compute_buy_confidence(state, ticker, rank) + if confidence <= settings.confidence_threshold: + logger.debug("buy %s skipped: confidence %.3f <= %.3f", + ticker, confidence, settings.confidence_threshold) + continue + if dedup.is_recent(ticker, "buy", within_hours=24): + logger.debug("buy %s skipped: dedup 24h", ticker) + continue + state.signals[ticker] = _build_buy_signal(state, ticker, name, rank, confidence, settings) + dedup.record(ticker, "buy", confidence=confidence) + logger.info("signal emit %s buy conf=%.3f rank=%s cycle=%d", + ticker, confidence, rank, state.signal_cycle_id) + + +def _build_buy_signal(state, ticker, name, rank, confidence, settings) -> dict: + ap = state.asking_price[ticker] + as_of_dt = datetime.now(KST) + ttl = settings.signal_ttl_seconds + expires_at = (as_of_dt + timedelta(seconds=ttl)).isoformat() + return { + "ticker": ticker, + "name": name, + "action": "buy", + "confidence_webai": confidence, + "current_price": ap["current_price"], + "avg_price": None, + "pnl_pct": None, + "context": _build_context(state, ticker, rank), + "as_of": as_of_dt.isoformat(), + "cycle_id": state.signal_cycle_id, + "expires_at": expires_at, + } +``` + +매도 측도 마찬가지로 `settings`를 통과시킴. `_try_stop_loss` 등은 이미 `settings`를 받으므로 `_build_sell_signal(..., settings=settings)` 로 호출. + +import 추가 (signal_generator.py 상단): + +```python +from datetime import datetime, timedelta +``` + +(기존 import에 `timedelta` 만 추가) + +`_current_settings()` 같은 헬퍼는 만들지 않음 — settings를 명시적으로 전달. + +- [ ] **Step 4: Run tests** + +``` +python -m pytest ai_trade/tests/test_signal_generator.py -v +``` + +Expected: 신규 2개 PASS, 기존 PASS. + +- [ ] **Step 5: Commit** + +```bash +git add ai_trade/signal_generator.py ai_trade/tests/test_signal_generator.py +git commit -m "feat(ai_trade): emit signal에 cycle_id + expires_at 부착 (F5 part 3)" +``` + +--- + +## Task 4: pull_worker가 cycle 끝에 purge 호출 + +**Files:** +- Modify: `ai_trade/pull_worker.py` +- Test: `ai_trade/tests/test_pull_worker.py` (append) + +- [ ] **Step 1: Write failing test** + +`test_pull_worker.py` 끝에 추가: + +```python +async def test_poll_loop_purges_expired_signals(monkeypatch): + """매 cycle 끝에 expired signal이 제거됨 (F5).""" + from ai_trade import pull_worker + from ai_trade.state import PollState + from datetime import datetime as _dt + from zoneinfo import ZoneInfo as _ZI + from unittest.mock import AsyncMock, MagicMock + import asyncio as _asyncio + + _kst = _ZI("Asia/Seoul") + now = _dt(2026, 5, 18, 10, 0, tzinfo=_kst) + + class FrozenDT: + @staticmethod + def now(tz=None): return now + + state = PollState() + state.signals = { + "OLD": {"ticker": "OLD", "expires_at": _dt(2026, 5, 18, 9, 0, tzinfo=_kst).isoformat(), "cycle_id": 1}, + "FRESH": {"ticker": "FRESH", "expires_at": _dt(2026, 5, 18, 10, 30, tzinfo=_kst).isoformat(), "cycle_id": 1}, + } + + monkeypatch.setattr(pull_worker, "datetime", FrozenDT) + monkeypatch.setattr(pull_worker, "_is_market_day", lambda n: True) + monkeypatch.setattr(pull_worker, "_is_polling_window", lambda n: True) + monkeypatch.setattr(pull_worker, "_next_interval", lambda n: 0.01) + monkeypatch.setattr(pull_worker, "_run_polling_cycle", AsyncMock()) + monkeypatch.setattr(pull_worker, "update_minute_momentum_for_all", lambda s: None) + monkeypatch.setattr(pull_worker, "_is_post_close_trigger", lambda *a, **k: False) + + shutdown = _asyncio.Event() + async def stop_soon(): + await _asyncio.sleep(0.05) + shutdown.set() + _asyncio.create_task(stop_soon()) + + await pull_worker.poll_loop( + client=MagicMock(), state=state, shutdown=shutdown, + kis_client=MagicMock(), chronos=MagicMock(), + dedup=None, settings=None, + ) + assert "OLD" not in state.signals + assert "FRESH" in state.signals +``` + +- [ ] **Step 2: Run test to fail** + +``` +python -m pytest ai_trade/tests/test_pull_worker.py::test_poll_loop_purges_expired_signals -v +``` + +Expected: FAIL — OLD가 남아있음. + +- [ ] **Step 3: Add purge call in poll_loop** + +`ai_trade/pull_worker.py` `poll_loop` 안, signals 생성 이후 (또는 cycle 끝 직전) 한 줄 추가: + +```python + # Phase 4: generate signals + if dedup is not None and settings is not None: + try: + from ai_trade.signal_generator import generate_signals + generate_signals(state, dedup, settings) + except Exception: + logger.exception("generate_signals failed") + # F5: 만료된 signal purge (consumer 미사용 케이스 보호) + try: + state.purge_expired_signals(datetime.now(KST)) + except Exception: + logger.exception("purge_expired_signals failed") +``` + +- [ ] **Step 4: Run test** + +``` +python -m pytest ai_trade/tests/test_pull_worker.py::test_poll_loop_purges_expired_signals -v +``` + +Expected: PASS. + +- [ ] **Step 5: Run full suite** + +``` +python -m pytest ai_trade/tests -q +``` + +Expected: 모두 PASS. + +- [ ] **Step 6: Commit** + +```bash +git add ai_trade/pull_worker.py ai_trade/tests/test_pull_worker.py +git commit -m "feat(ai_trade): poll_loop가 매 cycle 끝에 expired signal purge (F5 part 4)" +``` + +--- + +## Task 5: 전체 회귀 + push + +- [ ] **Step 1: Final pytest** + +``` +python -m pytest ai_trade/tests -v +``` + +Expected: 모두 PASS (총 신규 약 9개 + 기존 56개). + +- [ ] **Step 2: Push** + +```bash +git push origin main +``` + +--- + +## Self-Review + +1. **expires_at + cycle_id 부착**: `_build_buy_signal`, `_build_sell_signal` 양쪽 ✅ +2. **cycle_id 증가**: `generate_signals` 진입에서 단 1회 ✅ +3. **purge**: poll_loop cycle 마지막에 1회 호출 ✅ +4. **get_active_signals**: Phase 5 consumer가 호출할 read API 존재 ✅ +5. **legacy 신호 호환**: `expires_at` 없는 신호는 expired 취급 → 안전 ✅ + +**미커버 항목 (의도적)**: +- Phase 5 consumer가 처리 후 explicit drain하는 API는 이 plan에서 안 다룸 (consumer가 read-only로도 충분 — expires_at + dedup으로 idempotent). +- agent-office `/signal` HTTP endpoint는 Phase 5 plan 영역. + +--- + +## Execution Handoff + +**Plan complete and saved to `docs/superpowers/plans/2026-05-25-state-signals-lifecycle.md`.** + +**1. Subagent-Driven (recommended)** — Task 별 fresh subagent. +**2. Inline Execution** — 현 세션 실행. + +박재오 결정 대기. Plan 1 (hotfix) 마친 뒤 진입 권장. diff --git a/index.html b/index.html index 91cd047..c8407a0 100644 --- a/index.html +++ b/index.html @@ -5,6 +5,9 @@ 가후습 개인기록 + + +
diff --git a/src/pages/saju/Saju.css b/src/pages/saju/Saju.css new file mode 100644 index 0000000..7f24cc1 --- /dev/null +++ b/src/pages/saju/Saju.css @@ -0,0 +1,430 @@ +/* saju-page scope — 다른 페이지에 영향 없음 */ +.saju-page { + /* 베이스 */ + --saju-cream: #FAF6EE; + --saju-paper: #F2EAD8; + --saju-ink: #2E2D45; + --saju-ink-deep: #1F1D38; + + /* 액센트 */ + --saju-gold: #D4A574; + --saju-gold-deep: #B5874E; + --saju-apricot: #C58F76; + --saju-rose: #D9A2A6; + --saju-jade: #4B7065; + --saju-violet: #6A5285; + + /* 카테고리 (3 ActionCard) */ + --saju-today-bg: #4B7065; + --saju-gunghab-bg: #A8736E; + --saju-saju-bg: #4F4A78; + + /* 점수 카테고리 (4 ScoreCard) */ + --saju-wealth: #D4A574; + --saju-romance: #D9A2A6; + --saju-social: #4B7065; + --saju-career: #6A5285; + + min-height: 100vh; + background: var(--saju-cream); + color: var(--saju-ink); + font-family: 'Pretendard', sans-serif; + padding: 0; + margin: 0; +} + +.saju-page * { box-sizing: border-box; } + +.saju-page .saju-h1, +.saju-page .saju-h2, +.saju-page .saju-h3 { + font-family: 'Noto Serif KR', 'Pretendard', serif; + font-weight: 700; + letter-spacing: -0.02em; + color: var(--saju-ink); + margin: 0; +} + +.saju-page .saju-h1 { font-size: clamp(2.5rem, 4vw, 3.5rem); line-height: 1.2; } +.saju-page .saju-h2 { font-size: clamp(1.8rem, 3vw, 2.5rem); line-height: 1.3; } +.saju-page .saju-h3 { font-size: clamp(1.2rem, 2vw, 1.5rem); } + +/* 호령 마스코트 */ +.horyung-mascot { display: block; object-fit: contain; } +.horyung-mascot--sm { width: 80px; height: auto; } +.horyung-mascot--md { width: 180px; height: auto; } +.horyung-mascot--lg { width: 320px; height: auto; } + +/* 상단 네비게이션 */ +.saju-nav { + display: flex; + align-items: center; + justify-content: space-between; + padding: 1rem 2rem; + background: var(--saju-ink); + color: var(--saju-cream); +} +.saju-nav__logo { + font-family: 'Noto Serif KR', serif; + font-size: 1.25rem; + font-weight: 700; + color: var(--saju-cream); + text-decoration: none; +} +.saju-nav__links { + display: flex; + gap: 1.5rem; + list-style: none; + padding: 0; + margin: 0; +} +.saju-nav__links a { + color: var(--saju-cream); + text-decoration: none; + font-size: 0.95rem; + opacity: 0.85; +} +.saju-nav__links a:hover { opacity: 1; } +.saju-nav__cta { + background: var(--saju-gold); + color: var(--saju-ink); + border: none; + padding: 0.5rem 1.25rem; + border-radius: 999px; + font-weight: 600; + cursor: pointer; + font-family: 'Pretendard', sans-serif; + text-decoration: none; +} + +/* Hero */ +.saju-hero { + display: grid; + grid-template-columns: 1fr 1.4fr; + gap: 3rem; + padding: 3rem 2rem; + max-width: 1400px; + margin: 0 auto; +} +.saju-hero__left { + display: flex; + flex-direction: column; + align-items: center; + gap: 1.5rem; +} +.saju-quote-box { + background: var(--saju-paper); + padding: 1rem 1.25rem; + border-radius: 12px; + border: 1px solid var(--saju-gold-deep); + color: var(--saju-ink); + font-size: 0.9rem; + line-height: 1.5; + max-width: 280px; +} +.saju-hero__right { + display: flex; + flex-direction: column; + gap: 1.5rem; + justify-content: center; +} +.saju-sub { + color: var(--saju-ink); + opacity: 0.7; + margin: 0; + line-height: 1.6; +} + +/* ActionCard */ +.saju-action-cards { + display: grid; + grid-template-columns: repeat(3, 1fr); + gap: 1rem; + margin-top: 1rem; +} +.saju-action-card { + background: var(--saju-saju-bg); + color: var(--saju-cream); + padding: 1.5rem 1rem; + border-radius: 16px; + text-decoration: none; + display: flex; + flex-direction: column; + align-items: center; + gap: 0.5rem; + transition: transform 0.2s; + font-family: 'Pretendard', sans-serif; +} +.saju-action-card:hover { transform: translateY(-4px); } +.saju-action-card--today { background: var(--saju-today-bg); } +.saju-action-card--gunghab { background: var(--saju-gunghab-bg); } +.saju-action-card--saju { background: var(--saju-saju-bg); } +.saju-action-card[aria-disabled="true"] { opacity: 0.6; cursor: not-allowed; } +.saju-action-card__icon { font-size: 2rem; } +.saju-action-card__title { font-size: 1.1rem; font-weight: 700; } +.saju-action-card__desc { font-size: 0.85rem; opacity: 0.85; text-align: center; } + +/* Bottom */ +.saju-bottom { + display: grid; + grid-template-columns: 1fr 1fr; + gap: 3rem; + padding: 3rem 2rem; + max-width: 1400px; + margin: 0 auto; + background: var(--saju-ink); + color: var(--saju-cream); + border-radius: 24px 24px 0 0; +} +.saju-form { display: flex; flex-direction: column; gap: 1rem; } +.saju-form input, +.saju-form select { + padding: 0.75rem; + border-radius: 8px; + border: 1px solid var(--saju-gold-deep); + background: var(--saju-ink-deep); + color: var(--saju-cream); + font-family: inherit; + font-size: 1rem; +} +.saju-form button { + background: var(--saju-gold); + color: var(--saju-ink); + border: none; + padding: 0.875rem; + border-radius: 999px; + font-weight: 700; + cursor: pointer; + font-family: inherit; + font-size: 1rem; +} +.saju-form button:disabled { opacity: 0.6; cursor: not-allowed; } +.saju-form__error { + background: rgba(217, 162, 166, 0.2); + color: var(--saju-rose); + padding: 0.75rem; + border-radius: 8px; + font-size: 0.9rem; +} + +/* Fortune ring */ +.saju-fortune-ring { + display: flex; + align-items: center; + justify-content: center; + position: relative; +} +.saju-fortune-ring svg { width: 200px; height: 200px; } +.saju-fortune-ring__score { + position: absolute; + font-family: 'Noto Serif KR', serif; + font-size: 2.5rem; + font-weight: 700; + color: var(--saju-ink); +} +.saju-fortune-ring__total { font-size: 0.9rem; color: var(--saju-ink); opacity: 0.6; } + +/* ScoreCard */ +.saju-score-card { + background: var(--saju-cream); + border-radius: 16px; + padding: 1.25rem; + display: flex; + flex-direction: column; + gap: 0.5rem; + border: 1px solid var(--saju-paper); +} +.saju-score-card__head { display: flex; align-items: center; gap: 0.5rem; } +.saju-score-card__icon { font-size: 1.5rem; } +.saju-score-card__title { font-weight: 700; font-size: 0.95rem; } +.saju-score-card__value { + font-family: 'Noto Serif KR', serif; + font-size: 2rem; + font-weight: 700; + color: var(--saju-ink); +} +.saju-score-card__bar { + height: 6px; + background: var(--saju-paper); + border-radius: 3px; + overflow: hidden; +} +.saju-score-card__bar > div { height: 100%; background: var(--saju-gold); transition: width 0.5s; } + +/* Lucky box */ +.saju-lucky-box { + background: var(--saju-paper); + border-radius: 16px; + padding: 1.5rem; + display: grid; + grid-template-columns: repeat(3, 1fr); + gap: 1rem; +} +.saju-lucky-box__item { text-align: center; } +.saju-lucky-box__label { font-size: 0.8rem; color: var(--saju-ink); opacity: 0.7; margin-bottom: 0.25rem; } +.saju-lucky-box__value { + font-family: 'Noto Serif KR', serif; + font-size: 1.5rem; + font-weight: 700; + color: var(--saju-ink); +} + +/* SajuPillars */ +.saju-pillars { + display: grid; + grid-template-columns: repeat(4, 1fr); + gap: 0.75rem; +} +.saju-pillar { + background: var(--saju-paper); + border-radius: 12px; + padding: 1rem; + text-align: center; +} +.saju-pillar__label { font-size: 0.8rem; color: var(--saju-ink); opacity: 0.6; margin-bottom: 0.5rem; } +.saju-pillar__stem, +.saju-pillar__branch { + font-family: 'Noto Serif KR', serif; + font-size: 1.75rem; + font-weight: 700; + display: block; +} +.saju-pillar__stem-kr, +.saju-pillar__branch-kr { font-size: 0.85rem; opacity: 0.7; } +.saju-pillar__ten-god, +.saju-pillar__fortune { font-size: 0.75rem; margin-top: 0.25rem; opacity: 0.7; } + +/* Element bars */ +.saju-element-bars { + display: flex; + flex-direction: column; + gap: 0.5rem; + padding: 1.5rem; + background: var(--saju-cream); + border-radius: 16px; +} +.saju-element-bar { + display: grid; + grid-template-columns: 60px 1fr 50px; + align-items: center; + gap: 0.75rem; +} +.saju-element-bar__label { font-size: 0.9rem; font-weight: 700; } +.saju-element-bar__track { + height: 12px; + background: var(--saju-paper); + border-radius: 6px; + overflow: hidden; +} +.saju-element-bar__fill { + height: 100%; + border-radius: 6px; + transition: width 0.5s; +} +.saju-element-bar__fill--木 { background: #4B7065; } +.saju-element-bar__fill--火 { background: #C56F5C; } +.saju-element-bar__fill--土 { background: #D4A574; } +.saju-element-bar__fill--金 { background: #B8B5A8; } +.saju-element-bar__fill--水 { background: #4A5878; } +.saju-element-bar__value { text-align: right; font-size: 0.85rem; opacity: 0.7; } + +/* Monthly flow */ +.saju-monthly-flow { + display: grid; + grid-template-columns: repeat(12, 1fr); + gap: 0.25rem; + padding: 1rem; + background: var(--saju-cream); + border-radius: 16px; +} +.saju-monthly-flow__cell { + display: flex; + flex-direction: column; + align-items: center; + padding: 0.5rem 0.25rem; + border-radius: 8px; + background: var(--saju-paper); +} +.saju-monthly-flow__month { font-size: 0.7rem; opacity: 0.7; } +.saju-monthly-flow__score { + font-family: 'Noto Serif KR', serif; + font-weight: 700; + font-size: 1rem; +} +.saju-monthly-flow__label { font-size: 0.7rem; opacity: 0.8; margin-top: 0.25rem; } + +/* Horyung quote */ +.saju-horyung-quote { + background: var(--saju-ink); + color: var(--saju-cream); + padding: 1.5rem; + border-radius: 16px; + display: flex; + gap: 1rem; + align-items: flex-start; +} +.saju-horyung-quote__text { font-size: 0.95rem; line-height: 1.6; } + +/* Interpret accordion */ +.saju-interpret-accordion { display: flex; flex-direction: column; gap: 0.5rem; } +.saju-interpret-item { + background: var(--saju-cream); + border-radius: 12px; + border: 1px solid var(--saju-paper); + overflow: hidden; +} +.saju-interpret-item__header { + padding: 1rem; + background: var(--saju-paper); + cursor: pointer; + display: flex; + justify-content: space-between; + align-items: center; + font-weight: 700; + user-select: none; +} +.saju-interpret-item__body { padding: 1rem; font-size: 0.95rem; line-height: 1.6; } +.saju-interpret-item__evidence { + background: var(--saju-paper); + padding: 0.75rem; + border-radius: 8px; + margin-top: 0.75rem; + font-size: 0.85rem; + opacity: 0.85; +} + +/* Stub */ +.saju-stub { + max-width: 480px; + margin: 5rem auto; + text-align: center; + padding: 2rem; + background: var(--saju-paper); + border-radius: 24px; +} +.saju-stub a { + display: inline-block; + margin-top: 1.5rem; + background: var(--saju-gold); + color: var(--saju-ink); + padding: 0.75rem 1.5rem; + border-radius: 999px; + text-decoration: none; + font-weight: 700; +} + +/* 반응형 */ +@media (max-width: 1280px) { + .saju-hero { grid-template-columns: 1fr; text-align: center; } + .saju-hero__left { order: 2; } + .saju-hero__right { order: 1; } + .saju-bottom { grid-template-columns: 1fr; } +} +@media (max-width: 768px) { + .saju-nav { padding: 0.75rem 1rem; flex-wrap: wrap; gap: 0.5rem; } + .saju-nav__links { display: none; } + .saju-action-cards { grid-template-columns: 1fr; } + .saju-pillars { grid-template-columns: repeat(2, 1fr); } + .saju-monthly-flow { grid-template-columns: repeat(4, 1fr); } + .horyung-mascot--lg { width: 220px; } +}