feat(saju): Saju.css 컬러 토큰 + 폰트 + 격리 + Noto Serif KR Google Fonts

This commit is contained in:
2026-05-26 08:23:00 +09:00
parent 3e30612b38
commit 8fd7f83586
6 changed files with 2481 additions and 0 deletions

109
CHECK_POINT.md Normal file
View File

@@ -0,0 +1,109 @@
# web-ui CHECK_POINT
> React 18 + Vite + react-router-dom v6. Dev port 3007. NAS Docker 백엔드의 프론트엔드 (nginx :8080).
> 2026-05-22 갱신.
---
## 🟢 현재 상태 (양호)
- 라우트 16개 (12 메인 + 4 서브) 정상 운영
- agent-office 3×3 그리드 재설계 완료 (5/7~14, WebP 93% 축소, WS 재연결 백오프)
- `/insta` 슬레이트 캐러셀 + 반응형 (5/15~16)
- Vite proxy 7개 (NAS API + Fear&Greed + VIX + Treasury + WTI + Brent)
---
## ✅ 최근 완료 (5/18~22)
- 2026-05-22: **거래 데스크 AI 투자 탭 제거** (e42b643) — web-ai signal_v1 legacy 이전과 정합 (V2 단독 운영 반영)
- 2026-05-22: stock 총 매입을 각 종목 매입가 단순 합으로 표시 (6533743)
- 2026-05-22: agent-office 모바일 사이드패널 전체화면 토글 + music 에이전트 이미지 교체 (ee5700d)
- 2026-05-14: agent-office Grid 재설계 (canvas 폐기), AGENT_META + GRID_SLOTS 중앙화
- 2026-05-15~16: `/insta` 신규 페이지 + InstaCards.jsx + src/api.js(530줄) 음악·인스타·텔레그램 API 확장
---
## 🔴 즉시 (1~3일)
### 1. `/insta` 비동기 폴링 구현 ⭐ (백엔드 준비 완료 → 구현 시점 도래)
- **배경**: web-backend insta-lab이 Redis 분할(SP-4) 완료 → `_bg_render`가 Redis push, `GET /api/insta/tasks/{task_id}` 폴링 엔드포인트 존재. **이제 frontend가 비동기 폴링으로 전환해야 정합**
- **파일**: `src/pages/insta/InstaCards.jsx`
- [ ] 슬레이트 생성 → `task_id` 받고 폴링 (2~5초 간격, NAS 부담 ↓)
- [ ] progress bar UI (0~100%) + `queue:paused` 상태 표시 (박재오 작업 중 = Windows 워커 정지)
- [ ] failed 상태 처리 (오류 메시지·재시도 버튼)
### 2. agent-office WebSocket 안정성 점검
- 5/7~14 재설계 + 5/22 모바일 토글 직후 운영 확인
- [ ] 브라우저 콘솔 WS 끊김 → 재연결 지수 백오프 실제 작동
- [ ] 4 테스트(TaskTab·CommandTab·AgentCard·ScoreNodeCard) 통과 재확인
### 3. agent-office lotto sim_consensus 노출
- **배경**: web-backend `/api/lotto/best`에 5종 점수 array 노출됨 (lotto-signals) + weight-evolver 자율 학습 도입
- [ ] agent-office lotto 에이전트 카드에 5종 점수·시그널 상태 표시
- [ ] (선택) weight-evolver 진화 상태 미니 패널
---
## 🟡 중기 (1~2주)
### 4. `/insta` 카드 템플릿 UI 고도화
- 현재 default theme PNG 미리보기만. hedgy75 테마 추가 시 theme 선택 UI 필요
- [ ] 테마 선택 dropdown (default / hedgy75)
- [ ] 미리보기 컴포넌트 페이지 종류별 분기
### 5. `/music` Sonic Forge 발행 모니터링
- music-lab Redis 분할(SP-6) + Windows music-render 도입 → 발행 상태 모니터링 패널 필요
- [ ] 발행 큐·실패·재시도 로그 표시 (Redis 큐 길이 연동)
- [ ] 텔레그램 5단계 승인 UX 점검
### 6. NAS↔Windows 작업 흐름 가시화 (신규)
- web-ai 워커 3종 + Redis 큐 도입으로 작업 분산 흐름이 복잡해짐
- [ ] agent-office 또는 신규 `/node` 페이지에 큐 상태·Windows 노드 헬스 표시 (web-ai/web-backend 추가 아이디어와 연동)
---
## 🟢 장기 (1개월+)
### 7. 모바일 UX 일관 적용
- BottomNav + PullToRefresh + MobileSheet + SwipeableView 있음. 신규 페이지 적용 부족
- [ ] `/insta` 모바일 캐러셀 swipe + `/agent-office` 모바일 그리드 압축
### 8. `/lab` 페이지 확장
- 현재 sword-stream · day-calc 2개
- [ ] 박재오 데모 콘텐츠 큐 결정 (예: weight-evolver 진화 그래프, AI 음악 빠른 청취)
---
## 💡 추가 아이디어 (신규 2026-05-22)
- **`/node` Windows AI 노드 대시보드** — ai_trade + insta/music/video-render + task-watcher 상태, Redis 큐 길이, `queue:paused` 토글 버튼(task-watcher C안 = "토글 UI 1개"). web-ai/web-backend 모니터링 아이디어의 frontend 진입점
- **video 생성 미리보기 페이지** — video-lab(SP-8) + Windows video-render 4 provider 결과 비교 그리드. 무신사 공모전 MU-진 영상 버전 관리에 활용
- **weight-evolver 진화 시각화** — auto_picks 적중 추이 + weight base diff 그래프 (`/lab` 또는 lotto 페이지)
- **위키 페이지 수 정합** — [[사업-개인-웹-플랫폼]]에 "17개" 박혀 있으나 실제 16개 (12 메인 + 4 서브). *박재오 위키 갱신 항목* (web-ui 코드 아님)
---
## 🚀 빌드 & 배포
```bash
npm run dev # 개발 (port 3007, Vite proxy)
npm run build # 빌드 (rimraf dist + Vite build)
npm run release:nas # 자동 배포 (deploy-nas.cjs)
```
배포: Windows `robocopy dist Z:\docker\webpage\frontend\` / macOS `rsync` → nginx 자동 reload
---
## 📚 참고
- 위키: [[사업-개인-웹-플랫폼]] (백엔드 통합 인덱스)
- 라우트: `src/routes.jsx` (navLinks 메타) / Vite 프록시: `vite.config.js`
- API: 모든 페이지 `/api/` 상대 경로 (Mixed Content 방지)
- 백엔드 짝: web-backend CHECK_POINT (insta-lab Redis 분할 → /insta 비동기 폴링 정합 필요)
## 변경 이력
- 2026-05-18: 페이지 신설. 즉시 3 + 중기 3 + 장기 2.
- 2026-05-22: 최근 완료 3건 반영(AI 투자 탭 제거·stock 매입 표시·모바일 사이드패널). **`/insta` 비동기 폴링을 즉시 1순위로 승격** (백엔드 insta-lab Redis 분할 완료 → frontend 정합 필요). lotto sim_consensus 노출 + NAS↔Windows 작업 흐름 가시화 항목 추가. 추가 아이디어 4건 신설 (/node 대시보드·video 미리보기·evolver 시각화·위키 페이지 수 정합).

View File

@@ -0,0 +1,642 @@
# ai_trade Hotfix — Code Review F1·F2·F3·F4 Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** ai_trade(V2) 코드 리뷰 7개 finding 중 High 3건(F1·F2·F3) + Medium 1건(F4)을 TDD로 수정. F5/F6은 별도 plan, F7은 pushback.
**Architecture:** 모두 ai_trade/ 내부 단일 모듈 수정. (1) config.py default 경로 — legacy/ 경유. (2) kis_client.py — asyncio.Lock으로 `_throttle()` 직렬화. (3) scheduler.py + pull_worker.py — post-close를 시간 윈도우가 아닌 "일 1회 + 16:00 이후" 상태기반으로 변경. (4) chronos_predictor.py — confidence 산식을 absolute spread 기반으로 통일.
**Tech Stack:** Python 3.12, asyncio, pytest + pytest-asyncio + respx, httpx.
**Test runner:** `.venv` 한글 경로 깨짐 + 리뷰어 Python 312 경로 부재 보고로, 시스템 Python 사용. 정확한 경로는 `where python` 으로 우선 확인. 기본 시도 순서:
1. `python -m pytest ai_trade/tests -q` (PATH의 Python)
2. `py -3.12 -m pytest ai_trade/tests -q` (py launcher)
3. 둘 다 실패 시 환경 셋업이 선행 작업 — plan 진행 중단하고 박재오에게 보고.
**Working directory:** `C:\Users\jaeoh\Desktop\workspace\web-ai` (web-ai repo). Commit/push도 이 디렉토리에서만.
---
## File Map
| 파일 | 변경 종류 | 책임 |
|------|-----------|------|
| `ai_trade/config.py` | Modify L31-36 | V1_TOKEN_PATH default를 `legacy/signal_v1/data/kis_token.json`로 |
| `ai_trade/kis_client.py` | Modify L40-62 | `_throttle_lock: asyncio.Lock` 추가, `_throttle()`을 lock 안에서 실행 |
| `ai_trade/scheduler.py` | Modify L79-84 | `_is_post_close_trigger(now, last_post_close_date)` 시그니처 변경 — 상태기반 |
| `ai_trade/pull_worker.py` | Modify L1-58 | `poll_loop``last_post_close_date` state 추가, 호출부 갱신 |
| `ai_trade/chronos_predictor.py` | Modify L106, L127 | spread 계산을 absolute (q90-q10)로, confidence 산식 `max(0, 1 - spread/0.6)` |
| `ai_trade/tests/test_kis_client.py` | Add 1 test | concurrent gather throttle test |
| `ai_trade/tests/test_scheduler.py` | Add 3 tests | post-close 상태기반 트리거 |
| `ai_trade/tests/test_pull_worker.py` | Add 1 test | 첫 호출 안 됐다가 16:00 이후 5분 cycle에서 호출됨 |
| `ai_trade/tests/test_chronos_predictor.py` | Add 2 tests | median≈0에서도 conf 정상, spread 클수록 conf↓ |
| `ai_trade/tests/test_main.py` | Modify | v1_token_path default 변경 반영 (있다면) |
---
## Task 1: F1 — KIS 토큰 경로 default를 legacy/ 경유로
**Files:**
- Modify: `ai_trade/config.py:31-36`
- Test: `ai_trade/tests/test_config_token_path.py` (Create)
- [ ] **Step 1: Write the failing test**
```python
# ai_trade/tests/test_config_token_path.py
"""F1 — V1_TOKEN_PATH default가 legacy/signal_v1/ 경유인지 검증."""
import os
from pathlib import Path
from ai_trade.config import Settings
def test_v1_token_default_path_uses_legacy_dir(monkeypatch):
"""env에 V1_TOKEN_PATH 없으면 legacy/signal_v1/data/kis_token.json"""
monkeypatch.delenv("V1_TOKEN_PATH", raising=False)
settings = Settings()
expected_suffix = Path("legacy") / "signal_v1" / "data" / "kis_token.json"
assert str(settings.v1_token_path).endswith(str(expected_suffix)), (
f"expected default to end with {expected_suffix}, got {settings.v1_token_path}"
)
def test_v1_token_env_override_wins(monkeypatch, tmp_path):
"""env로 명시한 경로가 default를 덮어씀."""
custom = tmp_path / "custom_token.json"
monkeypatch.setenv("V1_TOKEN_PATH", str(custom))
settings = Settings()
assert settings.v1_token_path == custom
```
- [ ] **Step 2: Run test to verify it fails**
```
python -m pytest ai_trade/tests/test_config_token_path.py -v
```
Expected: `test_v1_token_default_path_uses_legacy_dir` FAILs (default가 `signal_v1/...` 임). env override는 PASS.
- [ ] **Step 3: Fix config.py**
`ai_trade/config.py:31-36` 변경:
```python
v1_token_path: Path = field(
default_factory=lambda: Path(
os.getenv("V1_TOKEN_PATH",
str(Path(__file__).parent.parent / "legacy" / "signal_v1" / "data" / "kis_token.json"))
)
)
```
- [ ] **Step 4: Run test to verify it passes**
```
python -m pytest ai_trade/tests/test_config_token_path.py -v
```
Expected: 2 passed.
- [ ] **Step 5: Verify full test suite still passes**
```
python -m pytest ai_trade/tests -q
```
Expected: 모든 기존 테스트 PASS (token path 기본값 변경이 다른 test에 영향 없는지 확인).
- [ ] **Step 6: Commit**
```bash
git add ai_trade/config.py ai_trade/tests/test_config_token_path.py
git commit -m "fix(ai_trade): V1_TOKEN_PATH default를 legacy/signal_v1/ 경유로 수정 (F1)"
```
---
## Task 2: F2 — KIS throttle을 asyncio.Lock으로 직렬화
**Files:**
- Modify: `ai_trade/kis_client.py:40-62`
- Test: `ai_trade/tests/test_kis_client.py` (Modify — 새 test 추가)
- [ ] **Step 1: Write the failing test**
`ai_trade/tests/test_kis_client.py` 파일 끝에 추가:
```python
import asyncio
import time as time_module
@respx.mock
async def test_throttle_serializes_concurrent_gather(kis_client_factory):
"""5개 동시 요청이 asyncio.gather로 들어와도 0.5초 간격으로 직렬화되어야 함 (F2).
초당 2회 = 0.5초 간격. 5개 요청이면 최소 (5-1)*0.5 = 2.0초 소요.
Race condition 있으면 5개가 거의 동시에 나가서 2초 훨씬 안쪽에 끝남.
"""
sample = {"output2": []}
respx.get(
"https://openapivts.koreainvestment.com:29443/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
).mock(return_value=httpx.Response(200, json=sample))
client = kis_client_factory()
try:
start = time_module.monotonic()
await asyncio.gather(*[client.get_minute_ohlcv(f"00593{i}") for i in range(5)])
elapsed = time_module.monotonic() - start
# 5개 throttle = 최소 (5-1)*0.5 = 2.0초. tolerance 0.3초.
assert elapsed >= 1.7, (
f"throttle race condition: 5 concurrent calls took only {elapsed:.2f}s, "
f"expected >=1.7s (0.5s * 4 inter-call gaps)"
)
finally:
await client.close()
```
- [ ] **Step 2: Run test to verify it fails**
```
python -m pytest ai_trade/tests/test_kis_client.py::test_throttle_serializes_concurrent_gather -v
```
Expected: FAIL — elapsed가 0.5초 이하 (race로 동시 깸).
- [ ] **Step 3: Add asyncio.Lock to KISClient**
`ai_trade/kis_client.py:40` `__init__` 끝에 한 줄 추가:
```python
self._token_cache: tuple[str, float] | None = None # (token, file_mtime)
self._last_throttle_at = 0.0
self._throttle_lock = asyncio.Lock()
```
그리고 `_throttle()` (L58-62)을 lock으로 감쌈:
```python
async def _throttle(self) -> None:
async with self._throttle_lock:
elapsed = time.monotonic() - self._last_throttle_at
if elapsed < _THROTTLE_INTERVAL:
await asyncio.sleep(_THROTTLE_INTERVAL - elapsed)
self._last_throttle_at = time.monotonic()
```
- [ ] **Step 4: Run test to verify it passes**
```
python -m pytest ai_trade/tests/test_kis_client.py::test_throttle_serializes_concurrent_gather -v
```
Expected: PASS — elapsed >= 1.7s.
- [ ] **Step 5: Verify full kis_client suite still passes**
```
python -m pytest ai_trade/tests/test_kis_client.py -v
```
Expected: 모든 test PASS (기존 429 retry 등 영향 없는지 확인).
- [ ] **Step 6: Commit**
```bash
git add ai_trade/kis_client.py ai_trade/tests/test_kis_client.py
git commit -m "fix(ai_trade): KIS throttle을 asyncio.Lock으로 직렬화 (F2)"
```
---
## Task 3: F3 — post-close 트리거를 상태기반으로 변경
**Files:**
- Modify: `ai_trade/scheduler.py:79-84`
- Modify: `ai_trade/pull_worker.py:1-58`
- Test: `ai_trade/tests/test_scheduler.py` (add 3 tests)
**Why state-based:** 16:00:00-16:00:59 윈도우는 5분 sleep + 비결정적 cycle 시작 시각과 충돌. "오늘 아직 post-close 안 돌렸고 현재 시각 ≥ 16:00 이면 trigger 후 today 표시" 로 변경.
- [ ] **Step 1: Write the failing tests**
`ai_trade/tests/test_scheduler.py` 파일 끝에 추가:
```python
from datetime import date as _date
from ai_trade.scheduler import _is_post_close_trigger
def test_post_close_trigger_fires_at_1601_if_not_yet_today():
"""16:01에 깬 cycle도 오늘 아직 안 돌렸으면 trigger (F3)."""
now = _kst(2026, 5, 18, 16, 1)
assert _is_post_close_trigger(now, last_post_close_date=None) is True
def test_post_close_trigger_skips_if_already_today():
"""이미 오늘 돌렸으면 trigger 안 함."""
now = _kst(2026, 5, 18, 16, 5)
today = _date(2026, 5, 18)
assert _is_post_close_trigger(now, last_post_close_date=today) is False
def test_post_close_trigger_skips_before_1600():
"""16:00 전에는 trigger 안 함."""
now = _kst(2026, 5, 18, 15, 59)
assert _is_post_close_trigger(now, last_post_close_date=None) is False
def test_post_close_trigger_fires_next_day_after_reset():
"""다음 영업일이 되면 last_post_close_date < today.date() 이므로 다시 trigger."""
now = _kst(2026, 5, 19, 16, 0)
yesterday = _date(2026, 5, 18)
assert _is_post_close_trigger(now, last_post_close_date=yesterday) is True
def test_post_close_trigger_skips_on_holiday():
"""휴장일에는 trigger 안 함 (2026-05-05 어린이날)."""
now = _kst(2026, 5, 5, 16, 30)
assert _is_post_close_trigger(now, last_post_close_date=None) is False
```
- [ ] **Step 2: Run tests to verify they fail**
```
python -m pytest ai_trade/tests/test_scheduler.py -v -k post_close
```
Expected: FAIL — `_is_post_close_trigger`가 신규 시그니처(`last_post_close_date` 인자) 미지원.
- [ ] **Step 3: Modify scheduler.py:79-84**
```python
def _is_post_close_trigger(now: datetime, last_post_close_date) -> bool:
"""16:00 KST 이후 오늘 아직 post-close cycle 안 돌렸으면 True (F3 상태기반).
Args:
now: 현재 KST datetime.
last_post_close_date: 마지막 post-close 실행 영업일 date 객체 (None=미실행).
"""
if not _is_market_day(now):
return False
if now.time() < time(16, 0):
return False
today = now.date()
return last_post_close_date != today
```
- [ ] **Step 4: Run scheduler tests**
```
python -m pytest ai_trade/tests/test_scheduler.py -v
```
Expected: 신규 5개 PASS. 기존 test도 PASS (다른 함수 영향 없음).
- [ ] **Step 5: Update pull_worker.py to track last_post_close_date**
`ai_trade/pull_worker.py``poll_loop` (L18-58)을 다음으로 교체:
```python
async def poll_loop(
client: StockClient, state: PollState, shutdown: asyncio.Event,
kis_client: KISClient | None = None,
chronos=None,
dedup=None,
settings=None,
) -> None:
"""FastAPI lifespan 에서 asyncio.create_task 로 시작."""
logger.info("poll_loop started")
last_post_close_date = None
while not shutdown.is_set():
now = datetime.now(KST)
if _is_market_day(now) and _is_polling_window(now):
try:
await _run_polling_cycle(client, state, kis_client=kis_client)
except Exception:
logger.exception("poll cycle failed")
# Minute momentum 갱신 (매 cycle)
try:
update_minute_momentum_for_all(state)
except Exception:
logger.exception("minute momentum update failed")
# Post-close trigger (상태기반: 16:00 이후 + 오늘 미실행)
if (
_is_post_close_trigger(now, last_post_close_date)
and chronos is not None and kis_client is not None
):
try:
await _run_post_close_cycle(kis_client, chronos, state)
last_post_close_date = now.date()
except Exception:
logger.exception("post-close cycle failed")
# Phase 4: generate signals
if dedup is not None and settings is not None:
try:
from ai_trade.signal_generator import generate_signals
generate_signals(state, dedup, settings)
except Exception:
logger.exception("generate_signals failed")
interval = _next_interval(now)
try:
await asyncio.wait_for(shutdown.wait(), timeout=interval)
break
except asyncio.TimeoutError:
continue
logger.info("poll_loop ended")
```
- [ ] **Step 6: Add pull_worker test**
`ai_trade/tests/test_pull_worker.py` 파일 끝에 추가:
```python
from unittest.mock import AsyncMock, MagicMock
from datetime import datetime as _dt
from zoneinfo import ZoneInfo as _ZI
import asyncio as _asyncio
async def test_post_close_fires_at_1601_when_not_yet_today(monkeypatch):
"""16:01에 깬 cycle도 post_close 안 돌렸으면 호출됨 (F3 회귀)."""
from ai_trade import pull_worker
_kst = _ZI("Asia/Seoul")
now_at_1601 = _dt(2026, 5, 18, 16, 1, tzinfo=_kst)
real_dt = _dt
class FrozenDateTime:
@staticmethod
def now(tz=None):
return now_at_1601
monkeypatch.setattr(pull_worker, "datetime", FrozenDateTime)
monkeypatch.setattr(
pull_worker, "_is_market_day", lambda n: True,
)
monkeypatch.setattr(
pull_worker, "_is_polling_window", lambda n: True,
)
monkeypatch.setattr(
pull_worker, "_next_interval", lambda n: 0.01,
)
monkeypatch.setattr(
pull_worker, "_run_polling_cycle", AsyncMock(),
)
monkeypatch.setattr(
pull_worker, "update_minute_momentum_for_all", lambda s: None,
)
post_close = AsyncMock()
monkeypatch.setattr(pull_worker, "_run_post_close_cycle", post_close)
state = MagicMock()
chronos = MagicMock()
kis = MagicMock()
shutdown = _asyncio.Event()
async def _stop_soon():
await _asyncio.sleep(0.05)
shutdown.set()
_asyncio.create_task(_stop_soon())
await pull_worker.poll_loop(
client=MagicMock(),
state=state,
shutdown=shutdown,
kis_client=kis,
chronos=chronos,
dedup=None,
settings=None,
)
assert post_close.await_count >= 1, "post-close가 16:01에 호출되지 않음 (F3 회귀)"
```
- [ ] **Step 7: Run pull_worker test**
```
python -m pytest ai_trade/tests/test_pull_worker.py::test_post_close_fires_at_1601_when_not_yet_today -v
```
Expected: PASS.
- [ ] **Step 8: Run full ai_trade suite**
```
python -m pytest ai_trade/tests -q
```
Expected: 모두 PASS.
- [ ] **Step 9: Commit**
```bash
git add ai_trade/scheduler.py ai_trade/pull_worker.py ai_trade/tests/test_scheduler.py ai_trade/tests/test_pull_worker.py
git commit -m "fix(ai_trade): post-close trigger를 상태기반으로 변경 (F3)"
```
---
## Task 4: F4 — Chronos confidence를 absolute spread 기반으로 통일
**Files:**
- Modify: `ai_trade/chronos_predictor.py:106, 127`
- Test: `ai_trade/tests/test_chronos_predictor.py` (add 2 tests)
**Why absolute:** Phase 4 spec amendment (web-ui commit 534ded5)가 absolute spread로 hard gate를 결정. confidence도 같은 철학으로. 새 산식: `conf = max(0, min(1, 1 - spread / SPREAD_THRESHOLD))` — spread가 0.6에 도달하면 conf=0, 0이면 conf=1.
- [ ] **Step 1: Write the failing tests**
기존 `ai_trade/tests/test_chronos_predictor.py` 끝에 추가 (파일이 없거나 비어있으면 신규):
```python
import numpy as np
import pytest
import torch
@pytest.fixture
def fake_pipeline():
"""predict_quantiles만 stub하는 가짜 pipeline."""
class FakePipeline:
def __init__(self, q10_price, q50_price, q90_price):
self._q10, self._q50, self._q90 = q10_price, q50_price, q90_price
def predict_quantiles(self, contexts, prediction_length, quantile_levels):
n = len(contexts)
tensor = torch.tensor(
[[[self._q10, self._q50, self._q90]]] * n,
dtype=torch.float32,
)
return tensor, None
return FakePipeline
def _make_predictor_with(pipeline_obj):
"""ChronosPredictor 인스턴스 (실제 모델 안 부르고 pipeline만 주입)."""
from ai_trade.chronos_predictor import ChronosPredictor
p = ChronosPredictor.__new__(ChronosPredictor)
p._pipeline = pipeline_obj
p._device = "cpu"
return p
def test_confidence_high_when_spread_near_zero(fake_pipeline):
"""median≈0, spread≈0 (q10=q90=last_close)일 때 conf≈1 (F4)."""
last_close = 100000.0
p = _make_predictor_with(fake_pipeline(last_close, last_close, last_close))
ohlcv = {"A": [{"close": last_close}] * 30}
out = p.predict_batch(ohlcv)
assert out["A"].conf > 0.95, (
f"median≈0 + spread≈0인데 conf={out['A'].conf} (F4 회귀: relative spread로 폭증)"
)
def test_confidence_drops_with_spread(fake_pipeline):
"""spread 0.3일 때 conf≈0.5 (1 - 0.3/0.6 = 0.5)."""
last_close = 100000.0
# q10=85000 → -0.15, q90=115000 → 0.15, spread=0.30
p = _make_predictor_with(fake_pipeline(85000.0, 100000.0, 115000.0))
ohlcv = {"A": [{"close": last_close}] * 30}
out = p.predict_batch(ohlcv)
# 1 - 0.30/0.60 = 0.50
assert 0.45 < out["A"].conf < 0.55, (
f"absolute spread 0.30에서 conf={out['A'].conf} (expected ≈0.5)"
)
def test_confidence_zero_at_threshold_spread(fake_pipeline):
"""spread가 threshold(0.6) 이상이면 conf=0."""
last_close = 100000.0
# q10=70000 → -0.30, q90=130000 → 0.30, spread=0.60
p = _make_predictor_with(fake_pipeline(70000.0, 100000.0, 130000.0))
ohlcv = {"A": [{"close": last_close}] * 30}
out = p.predict_batch(ohlcv)
assert out["A"].conf < 0.05, (
f"spread=threshold에서 conf={out['A'].conf} (expected ≈0)"
)
```
- [ ] **Step 2: Run tests to verify they fail**
```
python -m pytest ai_trade/tests/test_chronos_predictor.py -v -k confidence
```
Expected: `test_confidence_high_when_spread_near_zero` 가 FAIL — 현행 relative spread 산식 때문에 median≈0에서 conf가 0으로 폭락.
- [ ] **Step 3: Fix chronos_predictor.py**
`ai_trade/chronos_predictor.py` 상단에 상수 추가 (L13 근처):
```python
_SPREAD_THRESHOLD = 0.6 # F4: signal_generator hard gate와 동일 (absolute return spread)
```
L106 (modern API 경로) 변경:
```python
# shape: [num_series, prediction_length, 3]
for i, ticker in enumerate(tickers):
q10_price, q50_price, q90_price = quantiles_np[i, 0, :]
last_close = daily_ohlcv_dict[ticker][-1]["close"]
median = float((q50_price - last_close) / last_close)
q10 = float((q10_price - last_close) / last_close)
q90 = float((q90_price - last_close) / last_close)
# F4: absolute spread (q90-q10) 기반 — signal_generator hard gate와 통일.
# median≈0 zero-shot 케이스에서 conf가 0으로 폭락하던 relative 산식 제거.
spread = q90 - q10
conf = float(max(0.0, min(1.0, 1.0 - spread / _SPREAD_THRESHOLD)))
results[ticker] = ChronosPrediction(
median=median, q10=q10, q90=q90, conf=conf, as_of=now_iso,
)
return results
```
L127 (legacy API 경로) 동일하게 변경:
```python
spread = q90 - q10
conf = float(max(0.0, min(1.0, 1.0 - spread / _SPREAD_THRESHOLD)))
```
- [ ] **Step 4: Run tests to verify they pass**
```
python -m pytest ai_trade/tests/test_chronos_predictor.py -v
```
Expected: 신규 3개 모두 PASS. 기존 test도 PASS.
- [ ] **Step 5: Run full ai_trade suite**
```
python -m pytest ai_trade/tests -q
```
Expected: 모두 PASS. signal_generator 테스트(`_compute_buy_confidence``pred["conf"]` 사용) 도 영향 받을 수 있으니 주시.
- [ ] **Step 6: Commit**
```bash
git add ai_trade/chronos_predictor.py ai_trade/tests/test_chronos_predictor.py
git commit -m "fix(ai_trade): Chronos confidence를 absolute spread 기반으로 통일 (F4)"
```
---
## Task 5: 전체 회귀 확인 + push
- [ ] **Step 1: Run full ai_trade suite + count**
```
python -m pytest ai_trade/tests -v
```
Expected:
- 기존 56 tests + 신규 (config 2 + kis_client 1 + scheduler 5 + pull_worker 1 + chronos_predictor 3) = **68 tests** 정도 PASS.
- [ ] **Step 2: Quick sanity — server boot smoke test (시간 허용 시)**
```
cd ai_trade && python -c "from main import app; print('app import OK')"
```
Expected: no import errors.
- [ ] **Step 3: Push**
```bash
git push origin main
```
---
## Self-Review Checklist
이 plan을 다 작성한 뒤 다음을 확인:
1. **F1**: config.py default + 2 test (default + env override) ✅
2. **F2**: `_throttle_lock` 추가 + 1 concurrent test ✅
3. **F3**: `_is_post_close_trigger(now, last_post_close_date)` 시그니처 변경 + `poll_loop` 상태 추적 + 5 scheduler test + 1 pull_worker test ✅
4. **F4**: `_SPREAD_THRESHOLD=0.6` 상수 + 두 분기(modern + legacy) 모두 absolute spread 적용 + 3 chronos_predictor test ✅
**누락 가능 항목**:
- `test_main.py``v1_token_path` default를 직접 검증한다면 Task 1에서 같이 갱신. 위 patch는 Settings 객체 통해서만 다루므로 영향 없음(검증 완료).
- Task 3 pull_worker test의 `FrozenDateTime.now``datetime.now(KST)` 호출만 stub함. 다른 datetime 사용 부분 영향 없음 (verified L28).
- Task 4 test는 ChronosPredictor `__new__`로 우회 — 실제 HuggingFace 모델 로딩 안 함.
---
## Execution Handoff
**Plan complete and saved to `docs/superpowers/plans/2026-05-25-ai-trade-hotfix.md`.**
두 가지 실행 옵션:
**1. Subagent-Driven (recommended)** — task 별 fresh subagent dispatch + two-stage review. F2/F3 같이 미묘한 동시성/상태 변경에 유리.
**2. Inline Execution** — 현 세션에서 직접 task별 진행 + checkpoint.
박재오 결정 대기.

View File

@@ -0,0 +1,704 @@
# Render Queue Reliability — Code Review F6 Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** 4개 render worker(insta/music/video/image-render)가 BLPOP 직후 crash 시 작업 손실되는 문제 해결. BLMOVE(또는 BRPOPLPUSH)로 atomic dequeue + processing list 패턴 + startup recovery + retry/dead-letter.
**Architecture:**
1. 각 worker가 unique `worker_id` 보유: `<queue>-<hostname>-<pid>` (env로 override 가능).
2. atomic dequeue: `BLMOVE queue:<x>-render processing:<x>-render:<worker_id> RIGHT LEFT 5` — 5초 timeout. (`BRPOPLPUSH`는 Redis 6.2+ deprecated, `BLMOVE`가 후속).
3. 작업 성공: `LREM processing:<x>-render:<worker_id> 1 <payload>` — 정확 1개 제거.
4. 작업 실패: payload에 `attempts` counter 증가시켜 main queue 끝으로 LPUSH; 한계(기본 3) 초과 시 `dead_letter:<queue>` 로 이동.
5. **Startup recovery**: worker 시작 시 자신의 processing list가 비어있지 않으면 → 모두 main queue로 되돌림 (재시도). attempts 증가.
6. NAS측 producer는 무변경 (LPUSH 그대로). 단, payload schema에 `attempts: int` (optional) 필드 명시 — producer는 안 채워도 worker가 default 0으로.
**Shared module 전략:** 4개 worker가 동일 패턴이므로 `services/_shared/reliable_queue.py` 1개 만들고 각 Dockerfile에서 `COPY services/_shared /app/_shared``from _shared.reliable_queue import ReliableQueue`. compose entry/dockerfile 변경 4건. (DRY > inline 4중복.)
**Tech Stack:** Python 3.12, redis.asyncio 5.x, fakeredis (pytest dep), pytest-asyncio.
**Working directory:** `C:\Users\jaeoh\Desktop\workspace\web-ai`.
---
## File Map
| 파일 | 변경 | 책임 |
|------|------|------|
| `services/_shared/__init__.py` | Create | namespace package |
| `services/_shared/reliable_queue.py` | Create | `ReliableQueue` 클래스 — dequeue, ack, fail, recover |
| `services/_shared/tests/test_reliable_queue.py` | Create | fakeredis 단위 테스트 6개 |
| `services/_shared/requirements.txt` | Create | redis>=5.0, fakeredis (test only) |
| `services/insta-render/Dockerfile` | Modify | `COPY services/_shared /app/_shared` + PYTHONPATH |
| `services/insta-render/worker.py` | Modify L1~ | BLPOP → ReliableQueue 사용 |
| `services/insta-render/tests/test_worker.py` | Append | 1 integration test (recovery) |
| `services/music-render/Dockerfile` | Modify | shared copy |
| `services/music-render/worker.py` | Modify | ReliableQueue 사용 |
| `services/music-render/tests/test_worker.py` | Append | recovery test |
| `services/video-render/Dockerfile` | Modify | shared copy |
| `services/video-render/worker.py` | Modify | ReliableQueue 사용 |
| `services/video-render/tests/test_worker.py` | Append | recovery test |
| `services/image-render/Dockerfile` | Modify | shared copy |
| `services/image-render/worker.py` | Modify | ReliableQueue 사용 |
| `services/image-render/tests/test_worker.py` | Append | recovery test |
| `services/docker-compose.yml` (있다면) | Verify | build context가 services/ 루트 포함하는지 |
---
## Task 1: ReliableQueue 공유 모듈 작성
**Files:**
- Create: `services/_shared/__init__.py`
- Create: `services/_shared/reliable_queue.py`
- Create: `services/_shared/tests/__init__.py`
- Create: `services/_shared/tests/test_reliable_queue.py`
- Create: `services/_shared/requirements.txt`
- [ ] **Step 1: Create namespace package**
```python
# services/_shared/__init__.py
```
(빈 파일)
```python
# services/_shared/tests/__init__.py
```
(빈 파일)
- [ ] **Step 2: Write failing tests first**
```python
# services/_shared/tests/test_reliable_queue.py
"""F6 — ReliableQueue: atomic dequeue + recovery + retry."""
import json
import fakeredis.aioredis
import pytest
from _shared.reliable_queue import ReliableQueue
@pytest.fixture
async def redis():
r = fakeredis.aioredis.FakeRedis(decode_responses=False)
yield r
await r.flushall()
await r.aclose()
async def test_dequeue_atomically_moves_to_processing(redis):
"""BLMOVE: queue → processing 원자적 이동."""
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
await redis.lpush("queue:test", json.dumps({"task_id": "t1"}).encode())
payload, raw = await q.dequeue(timeout=1)
assert payload["task_id"] == "t1"
# main queue는 비어있고, processing list에 들어있어야 함
assert await redis.llen("queue:test") == 0
assert await redis.llen("processing:queue:test:w1") == 1
async def test_dequeue_returns_none_on_timeout(redis):
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
result = await q.dequeue(timeout=1)
assert result is None
async def test_ack_removes_from_processing(redis):
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
await redis.lpush("queue:test", json.dumps({"task_id": "t1"}).encode())
payload, raw = await q.dequeue(timeout=1)
await q.ack(raw)
assert await redis.llen("processing:queue:test:w1") == 0
async def test_recover_returns_orphaned_to_main_queue(redis):
"""startup recovery: 잔존 processing list 항목을 main queue로 되돌림."""
# 이전 crash 시뮬레이션: processing list에 잔존
orphan = json.dumps({"task_id": "t1", "attempts": 0}).encode()
await redis.lpush("processing:queue:test:w1", orphan)
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
recovered = await q.recover()
assert recovered == 1
assert await redis.llen("processing:queue:test:w1") == 0
# 다시 dequeue 가능
payload, raw = await q.dequeue(timeout=1)
assert payload["task_id"] == "t1"
assert payload["attempts"] == 1 # incremented on recover
async def test_fail_below_max_attempts_returns_to_main_queue(redis):
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1", max_attempts=3)
await redis.lpush("queue:test", json.dumps({"task_id": "t1", "attempts": 0}).encode())
payload, raw = await q.dequeue(timeout=1)
await q.fail(raw, payload)
assert await redis.llen("processing:queue:test:w1") == 0
assert await redis.llen("queue:test") == 1
# attempts 증가됐는지
requeued_raw = await redis.lindex("queue:test", 0)
requeued = json.loads(requeued_raw)
assert requeued["attempts"] == 1
async def test_fail_at_max_attempts_moves_to_dead_letter(redis):
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1", max_attempts=3)
await redis.lpush(
"queue:test", json.dumps({"task_id": "t1", "attempts": 2}).encode()
)
payload, raw = await q.dequeue(timeout=1)
await q.fail(raw, payload)
# attempts 2 → 3 (== max) → dead-letter
assert await redis.llen("queue:test") == 0
assert await redis.llen("processing:queue:test:w1") == 0
assert await redis.llen("dead_letter:queue:test") == 1
```
- [ ] **Step 3: Add requirements**
```text
# services/_shared/requirements.txt
redis>=5.0.0
```
별도 dev requirements (test):
```text
# services/_shared/tests/requirements-dev.txt (optional)
fakeredis>=2.20.0
pytest>=8.0.0
pytest-asyncio>=0.23.0
```
- [ ] **Step 4: Run tests to verify they fail**
```
cd services/_shared && python -m pytest tests/ -v
```
Expected: ImportError — reliable_queue.py 미존재.
- [ ] **Step 5: Write reliable_queue.py**
```python
# services/_shared/reliable_queue.py
"""F6 — Reliable Redis queue with processing list + recovery + retry.
Pattern: BLMOVE main → processing (atomic), then either ack (LREM processing) or
fail (LREM processing + re-enqueue or dead-letter).
Startup recovery: any items left in the worker's processing list from a previous
crash are pushed back to main queue (with attempts incremented).
"""
from __future__ import annotations
import json
import logging
import os
import socket
from typing import Optional
logger = logging.getLogger(__name__)
def default_worker_id(queue_key: str) -> str:
"""env > hostname-pid."""
explicit = os.getenv("WORKER_ID")
if explicit:
return explicit
return f"{queue_key}-{socket.gethostname()}-{os.getpid()}"
class ReliableQueue:
"""Wraps a redis client to provide BLMOVE-backed atomic dequeue +
processing list + retry/dead-letter.
Producer side stays unchanged: LPUSH queue:<x> <json payload>.
Worker side: dequeue() → process → ack(raw) on success or fail(raw, payload) on error.
Startup: await queue.recover() to re-enqueue orphans.
"""
def __init__(
self,
redis,
queue_key: str,
worker_id: Optional[str] = None,
max_attempts: int = 3,
):
self._redis = redis
self._queue_key = queue_key
self._worker_id = worker_id or default_worker_id(queue_key)
self._processing_key = f"processing:{queue_key}:{self._worker_id}"
self._dead_letter_key = f"dead_letter:{queue_key}"
self._max_attempts = max_attempts
@property
def processing_key(self) -> str:
return self._processing_key
async def dequeue(self, timeout: int = 5) -> Optional[tuple[dict, bytes]]:
"""Atomically move 1 item from main queue tail to processing head.
Returns (parsed_dict, raw_bytes) or None on timeout.
Caller MUST call ack(raw) on success or fail(raw, payload) on error.
"""
raw = await self._redis.blmove(
self._queue_key, self._processing_key,
timeout=timeout, src="RIGHT", dest="LEFT",
)
if raw is None:
return None
try:
payload = json.loads(raw)
except json.JSONDecodeError:
logger.error("invalid payload on dequeue, moving to dead-letter: %r", raw[:200])
await self._redis.lrem(self._processing_key, 1, raw)
await self._redis.lpush(self._dead_letter_key, raw)
return None
return payload, raw
async def ack(self, raw: bytes) -> None:
"""Successful processing — remove from processing list."""
removed = await self._redis.lrem(self._processing_key, 1, raw)
if removed == 0:
logger.warning("ack on missing payload (already removed?): %r", raw[:100])
async def fail(self, raw: bytes, payload: dict) -> None:
"""Failed processing — remove from processing list and either re-enqueue or dead-letter."""
await self._redis.lrem(self._processing_key, 1, raw)
attempts = int(payload.get("attempts", 0)) + 1
if attempts >= self._max_attempts:
payload["attempts"] = attempts
await self._redis.lpush(self._dead_letter_key, json.dumps(payload).encode())
logger.error(
"task moved to dead-letter after %d attempts: task_id=%s",
attempts, payload.get("task_id"),
)
return
payload["attempts"] = attempts
await self._redis.lpush(self._queue_key, json.dumps(payload).encode())
logger.info(
"task re-enqueued (attempt %d/%d): task_id=%s",
attempts, self._max_attempts, payload.get("task_id"),
)
async def recover(self) -> int:
"""Startup: move all orphans from this worker's processing list back to main queue.
Increments attempts counter (orphan == implicit failure).
Returns count of recovered items.
"""
count = 0
while True:
raw = await self._redis.lpop(self._processing_key)
if raw is None:
break
try:
payload = json.loads(raw)
except json.JSONDecodeError:
await self._redis.lpush(self._dead_letter_key, raw)
count += 1
continue
payload["attempts"] = int(payload.get("attempts", 0)) + 1
if payload["attempts"] >= self._max_attempts:
await self._redis.lpush(self._dead_letter_key, json.dumps(payload).encode())
else:
await self._redis.lpush(self._queue_key, json.dumps(payload).encode())
count += 1
if count:
logger.info("recovered %d orphaned items for worker %s", count, self._worker_id)
return count
```
**참고: redis-py blmove API**: `client.blmove(first_list, second_list, timeout, src=..., dest=...)`. timeout=0 은 block forever. payload는 bytes로 받음 (`decode_responses=False` 가정).
- [ ] **Step 6: Run tests to verify they pass**
```
cd services/_shared && python -m pytest tests/ -v
```
Expected: 6 PASS.
만약 ImportError (`fakeredis` 미설치) 발생 시:
```
python -m pip install fakeredis pytest-asyncio
```
또한 `pytest.ini` 또는 `conftest.py``asyncio_mode = "auto"` 필요. 신규 conftest:
```python
# services/_shared/tests/conftest.py
import pytest
pytest_plugins = ["pytest_asyncio"]
def pytest_collection_modifyitems(config, items):
for item in items:
if "asyncio" in item.fixturenames or item.get_closest_marker("asyncio") is not None:
continue
# auto-mark all async tests
if item.function.__name__.startswith("test_"):
import asyncio, inspect
if inspect.iscoroutinefunction(item.function):
item.add_marker(pytest.mark.asyncio)
```
또는 더 간단히 `services/_shared/pytest.ini`:
```ini
[pytest]
asyncio_mode = auto
```
- [ ] **Step 7: Commit**
```bash
git add services/_shared/
git commit -m "feat(services): _shared/reliable_queue 신설 — BLMOVE + processing list + retry (F6 part 1)"
```
---
## Task 2: insta-render에 ReliableQueue 적용
**Files:**
- Modify: `services/insta-render/Dockerfile`
- Modify: `services/insta-render/worker.py`
- Modify: `services/insta-render/tests/test_worker.py` (append)
- [ ] **Step 1: Update Dockerfile**
`services/insta-render/Dockerfile``_shared` 복사 추가. 기존 Dockerfile 패턴을 먼저 읽고, `COPY services/insta-render /app` 같은 라인이 있다면 그 위 또는 옆에:
```dockerfile
COPY services/_shared /app/_shared
ENV PYTHONPATH=/app:/app/_shared:${PYTHONPATH}
```
build context가 `services/` 루트여야 함. compose에서 `build: { context: ./services, dockerfile: insta-render/Dockerfile }` 인지 확인 — 아니라면 context 조정 필요.
- [ ] **Step 2: Modify worker.py — failing test first**
`services/insta-render/tests/test_worker.py` 끝에 추가:
```python
import json
from unittest.mock import AsyncMock, patch
import pytest
@pytest.mark.asyncio
async def test_worker_calls_ack_on_success():
"""성공 시 ack() 호출 (F6)."""
import worker
fake_payload = {"task_id": "t1", "job_type": "card_generation", "params": {}}
fake_raw = json.dumps(fake_payload).encode()
fake_queue = AsyncMock()
fake_queue.dequeue = AsyncMock(side_effect=[(fake_payload, fake_raw), None])
fake_queue.ack = AsyncMock()
fake_queue.fail = AsyncMock()
fake_queue.recover = AsyncMock(return_value=0)
with patch.object(worker, "ReliableQueue", return_value=fake_queue), \
patch.object(worker, "_dispatch") as disp:
# poll_once로 1 cycle만 실행 (실제 loop 끊기 위해)
await worker.poll_once(fake_queue)
disp.assert_called_once()
fake_queue.ack.assert_called_once_with(fake_raw)
fake_queue.fail.assert_not_called()
@pytest.mark.asyncio
async def test_worker_calls_fail_on_dispatch_exception():
"""dispatch 예외 시 fail() 호출 — 작업 손실 안 됨 (F6)."""
import worker
fake_payload = {"task_id": "t2", "job_type": "card_generation", "params": {}}
fake_raw = json.dumps(fake_payload).encode()
fake_queue = AsyncMock()
fake_queue.dequeue = AsyncMock(return_value=(fake_payload, fake_raw))
fake_queue.ack = AsyncMock()
fake_queue.fail = AsyncMock()
with patch.object(worker, "_dispatch", side_effect=RuntimeError("boom")):
await worker.poll_once(fake_queue)
fake_queue.fail.assert_called_once_with(fake_raw, fake_payload)
fake_queue.ack.assert_not_called()
```
- [ ] **Step 3: Run test to fail**
```
cd services/insta-render && python -m pytest tests/ -v -k "ack_on_success or fail_on_dispatch"
```
Expected: AttributeError (`worker.poll_once` 미존재, `worker.ReliableQueue` 미존재).
- [ ] **Step 4: Rewrite insta-render worker.py**
```python
"""Redis 기반 worker — F6 신뢰성 패턴 적용 (BLMOVE + processing list + retry)."""
from __future__ import annotations
import asyncio
import logging
import os
import sys
import redis.asyncio as aioredis
from _shared.reliable_queue import ReliableQueue
from nas_client import webhook_update_task
# 기존 dispatch 대상 import 유지
from card_renderer import render_card
logger = logging.getLogger(__name__)
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
QUEUE_KEY = "queue:insta-render"
PAUSED_KEY = "queue:paused"
_DISPATCH_TABLE = {
"card_generation": "render_card",
}
def _dispatch(payload: dict) -> None:
job_type = payload.get("job_type", "")
task_id = payload.get("task_id", "")
params = payload.get("params", {})
fn_name = _DISPATCH_TABLE.get(job_type)
if fn_name is None:
logger.error("unknown job_type=%s task=%s", job_type, task_id)
webhook_update_task(task_id, "failed", 0, "", error=f"unknown job_type: {job_type}")
return
try:
fn = getattr(sys.modules[__name__], fn_name)
except AttributeError:
webhook_update_task(task_id, "failed", 0, "", error=f"internal dispatch error: {fn_name}")
return
fn(task_id, params)
async def poll_once(queue: ReliableQueue) -> bool:
"""1 cycle: dequeue → dispatch → ack/fail. Returns True if a job was handled."""
result = await queue.dequeue(timeout=5)
if result is None:
return False
payload, raw = result
try:
await asyncio.to_thread(_dispatch, payload)
except Exception:
logger.exception("dispatch failed task_id=%s", payload.get("task_id"))
await queue.fail(raw, payload)
return True
await queue.ack(raw)
return True
async def worker_loop():
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
queue = ReliableQueue(redis, queue_key=QUEUE_KEY)
logger.info("insta-render worker started worker_id=%s", queue._worker_id)
# F6: startup recovery
try:
recovered = await queue.recover()
if recovered:
logger.info("recovered %d orphaned items at startup", recovered)
except Exception:
logger.exception("startup recover failed")
while True:
try:
paused = await redis.get(PAUSED_KEY)
if paused == b"1":
await asyncio.sleep(10)
continue
await poll_once(queue)
except asyncio.CancelledError:
logger.info("worker_loop cancelled")
raise
except Exception:
logger.exception("worker_loop iteration 실패, 5초 후 재시도")
await asyncio.sleep(5)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(worker_loop())
```
**NOTE**: 기존 `insta-render/worker.py`의 dispatch table·import는 실제 파일을 보고 매핑 유지. 위 예시는 minimal — job_type / function 이름은 기존 파일과 맞춰야 함. 변경 전 `Read services/insta-render/worker.py`로 정확한 dispatch table 확인할 것.
- [ ] **Step 5: Run tests**
```
cd services/insta-render && python -m pytest tests/ -v
```
Expected: 신규 2 PASS, 기존 PASS (dispatch table test 등).
- [ ] **Step 6: Commit**
```bash
git add services/insta-render/
git commit -m "fix(insta-render): F6 ReliableQueue 적용 — BLMOVE + ack/fail (F6 part 2)"
```
---
## Task 3: music-render에 동일 적용
**Files:**
- Modify: `services/music-render/Dockerfile`, `worker.py`
- Modify: `services/music-render/tests/test_worker.py` (append)
- [ ] **Step 1: Dockerfile에 `COPY services/_shared` 추가**
- [ ] **Step 2: Test 추가 (Task 2 패턴 동일, 단 dispatch target은 `run_suno_generation` 등 기존 패턴)**
```python
@pytest.mark.asyncio
async def test_music_worker_ack_on_success():
import worker
payload = {"task_id": "t1", "job_type": "suno_generation", "params": {}}
raw = json.dumps(payload).encode()
fake_queue = AsyncMock()
fake_queue.dequeue = AsyncMock(return_value=(payload, raw))
fake_queue.ack = AsyncMock()
with patch.object(worker, "_dispatch"):
await worker.poll_once(fake_queue)
fake_queue.ack.assert_called_once_with(raw)
@pytest.mark.asyncio
async def test_music_worker_fail_on_exception():
import worker
payload = {"task_id": "t2", "job_type": "suno_generation", "params": {}}
raw = json.dumps(payload).encode()
fake_queue = AsyncMock()
fake_queue.dequeue = AsyncMock(return_value=(payload, raw))
fake_queue.fail = AsyncMock()
with patch.object(worker, "_dispatch", side_effect=RuntimeError("x")):
await worker.poll_once(fake_queue)
fake_queue.fail.assert_called_once_with(raw, payload)
```
- [ ] **Step 3: Run test to fail**
- [ ] **Step 4: Rewrite music-render worker.py — `worker_loop` 구조는 insta-render와 동일, `_dispatch` + `_DISPATCH_TABLE`은 기존 12개 함수 그대로 유지**
- [ ] **Step 5: Run tests**
- [ ] **Step 6: Commit**
```bash
git add services/music-render/
git commit -m "fix(music-render): F6 ReliableQueue 적용 (F6 part 3)"
```
---
## Task 4: video-render에 동일 적용
(Task 3와 동일 패턴 — sora/veo/kling/seedance 4 provider table 유지)
- [ ] **Step 1: Dockerfile 수정**
- [ ] **Step 2: 신규 test 2개 추가 (`test_video_worker_ack_on_success`, `test_video_worker_fail_on_exception`) — job_type은 `sora_generation`**
- [ ] **Step 3: Run failing test**
- [ ] **Step 4: Rewrite worker.py — 동일 패턴**
- [ ] **Step 5: Run tests**
- [ ] **Step 6: Commit**
```bash
git add services/video-render/
git commit -m "fix(video-render): F6 ReliableQueue 적용 (F6 part 4)"
```
---
## Task 5: image-render에 동일 적용
(gpt_image / nano_banana / flux 3 provider table 유지)
- [ ] **Step 1-6: Task 3/4 동일 패턴**
```bash
git add services/image-render/
git commit -m "fix(image-render): F6 ReliableQueue 적용 (F6 part 5)"
```
---
## Task 6: 운영 검증 + push
- [ ] **Step 1: 전체 services test 실행**
```
cd services && for d in _shared insta-render music-render video-render image-render; do
echo "--- $d ---"
(cd $d && python -m pytest tests/ -q) || true
done
```
(또는 PowerShell:)
```powershell
foreach ($d in @("_shared","insta-render","music-render","video-render","image-render")) {
Write-Output "--- $d ---"
Push-Location services/$d
python -m pytest tests/ -q
Pop-Location
}
```
Expected: 4개 worker 각 신규 2개 + _shared 6개 + 기존 test 전부 PASS.
- [ ] **Step 2: Docker build 시뮬 (옵션, 시간 허용 시)**
```
cd services && docker compose build insta-render music-render video-render image-render
```
Expected: build context에 `_shared` 포함됨 검증.
- [ ] **Step 3: Push**
```bash
git push origin main
```
- [ ] **Step 4: 운영 deploy 시 주의사항 (수동)**
NAS에서 컨테이너 재배포 시:
1. `redis-cli -h 192.168.45.54 KEYS 'processing:*'` 로 기존 orphan 확인 — 있다면 worker_id 다르면 안 잡힘. 수동으로 `LMOVE` 해야 할 수도 있음.
2. `redis-cli -h 192.168.45.54 KEYS 'dead_letter:*'` 로 dead-letter 모니터 — 누적되면 alerting 필요.
3. WORKER_ID env로 unique 하게 (`WORKER_ID=insta-render-prod-1` 등) 권장 — hostname이 컨테이너 재기동 시 바뀌면 orphan 추적 안 됨.
---
## Self-Review
1. **atomic dequeue**: `BLMOVE` 단일 명령 — Redis 단일 트랜잭션 ✅
2. **ack on success**: `LREM processing 1 raw` — 정확 1개 ✅
3. **fail with retry**: attempts < max → 재큐, attempts >= max → dead-letter ✅
4. **startup recovery**: orphan 자동 재큐 (attempts 증가) ✅
5. **4 worker 적용**: insta/music/video/image 동일 패턴 ✅
6. **NAS producer 호환**: LPUSH 그대로, payload schema에 attempts 선택적 ✅
**미커버 (의도적)**:
- dead-letter monitor/alert — 운영 작업 (CHECK_POINT 백로그)
- worker_id env 미설정 시 hostname 변경 시 orphan 분실 — 운영 가이드에 명시
**가정 검증**:
- `redis-py.aioredis.blmove` 시그니처: `(first_list, second_list, timeout, src='LEFT', dest='RIGHT')`. redis>=5.0 권장.
- fakeredis: `fakeredis.aioredis.FakeRedis` (>=2.20.0) 가 BLMOVE 지원함 — 미지원 시 plan 적용 전 검증.
---
## Execution Handoff
**Plan complete and saved to `docs/superpowers/plans/2026-05-25-render-queue-reliability.md`.**
**1. Subagent-Driven (recommended)** — Task 별 fresh subagent. 4개 worker는 패턴 같으나 dispatch table은 각 worker 고유 — subagent가 정확히 일관성 유지하도록 review checkpoint.
**2. Inline Execution** — 현 세션 실행.
박재오 결정 대기. Plan 1·2 마친 후 진입 권장 (작업량 가장 큼 — 4개 worker × 약 1시간 = 4시간).

View File

@@ -0,0 +1,593 @@
# state.signals Lifecycle — Code Review F5 Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** `state.signals` 가 무한 dict 누적되는 문제를 해결. expires_at + cycle_id 부착해서 Phase 5 consumer (agent-office `/signal`) 가 stale 신호를 안전하게 무시할 수 있게.
**Architecture:**
1. `Signal` dict에 `expires_at: ISO str`, `cycle_id: int` 필드 추가.
2. `PollState.signal_cycle_id: int` (process 단위 auto-increment).
3. `generate_signals(state, dedup, settings)` 진입마다 `cycle_id += 1`.
4. emit하는 모든 signal에 `expires_at = as_of + SIGNAL_TTL_SECONDS`, `cycle_id = state.signal_cycle_id` 부착.
5. `state.purge_expired_signals(now)` helper — 매 cycle 끝에 호출하여 만료된 항목 제거.
6. `state.get_active_signals(now) → list[dict]` — Phase 5 consumer가 호출할 read API. 만료된 것 제외.
**Tech Stack:** Python 3.12, asyncio, pytest. 기존 cycle 흐름과 호환되도록 generate_signals 인터페이스는 그대로.
**Why expires_at + cycle_id (not pop-on-read):** consumer가 polling 실패해도 신호 손실 없음. cycle_id로 "이번 cycle에 새로 emit된 신호" 식별 가능 → Phase 5에서 incremental fetch 가능.
**Working directory:** `C:\Users\jaeoh\Desktop\workspace\web-ai`.
**Test runner:** `python -m pytest ai_trade/tests -q` (또는 `py -3.12 -m`). 환경 부재 시 plan 진행 중단.
---
## File Map
| 파일 | 변경 | 책임 |
|------|------|------|
| `ai_trade/config.py` | Add 1 field | `signal_ttl_seconds: int` (default 300) |
| `ai_trade/state.py` | Modify | `signal_cycle_id: int`, helper 2개 (`get_active_signals`, `purge_expired_signals`) |
| `ai_trade/signal_generator.py` | Modify L22-50, 133, 99-111, 174-186 | cycle_id 증가 + expires_at/cycle_id 부착 |
| `ai_trade/pull_worker.py` | Modify L46-51 근처 | cycle 끝에 purge 호출 |
| `ai_trade/tests/test_state_signals_lifecycle.py` | Create | 5 test (expires, cycle_id, purge, active list) |
| `ai_trade/tests/test_signal_generator.py` | Modify | 기존 emit test에 expires_at/cycle_id 필드 검증 추가 |
---
## Task 1: PollState에 cycle_id + lifecycle helper 추가
**Files:**
- Modify: `ai_trade/state.py`
- Test: `ai_trade/tests/test_state_signals_lifecycle.py` (Create)
- [ ] **Step 1: Write the failing test**
```python
# ai_trade/tests/test_state_signals_lifecycle.py
"""F5 — state.signals lifecycle (expires_at + cycle_id)."""
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
import pytest
from ai_trade.state import PollState
KST = ZoneInfo("Asia/Seoul")
def test_initial_signal_cycle_id_is_zero():
state = PollState()
assert state.signal_cycle_id == 0
def test_get_active_signals_excludes_expired():
state = PollState()
now = datetime(2026, 5, 25, 10, 0, tzinfo=KST)
future = (now + timedelta(seconds=300)).isoformat()
past = (now - timedelta(seconds=60)).isoformat()
state.signals = {
"A": {"ticker": "A", "expires_at": future, "cycle_id": 1, "action": "buy"},
"B": {"ticker": "B", "expires_at": past, "cycle_id": 1, "action": "buy"},
}
active = state.get_active_signals(now)
tickers = [s["ticker"] for s in active]
assert "A" in tickers
assert "B" not in tickers
def test_get_active_signals_treats_missing_expires_as_expired():
"""expires_at 없는 legacy 신호는 expired로 간주."""
state = PollState()
now = datetime(2026, 5, 25, 10, 0, tzinfo=KST)
state.signals = {"C": {"ticker": "C", "action": "buy"}}
assert state.get_active_signals(now) == []
def test_purge_expired_signals_removes_expired():
state = PollState()
now = datetime(2026, 5, 25, 10, 0, tzinfo=KST)
future = (now + timedelta(seconds=300)).isoformat()
past = (now - timedelta(seconds=60)).isoformat()
state.signals = {
"A": {"ticker": "A", "expires_at": future, "cycle_id": 1},
"B": {"ticker": "B", "expires_at": past, "cycle_id": 1},
}
state.purge_expired_signals(now)
assert "A" in state.signals
assert "B" not in state.signals
```
- [ ] **Step 2: Run test to verify it fails**
```
python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v
```
Expected: `AttributeError: signal_cycle_id` 또는 `get_active_signals` 미구현.
- [ ] **Step 3: Modify state.py**
```python
"""PollState — process-wide singleton."""
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class PollState:
portfolio: dict | None = None
news_sentiment: dict | None = None
screener_preview: dict | None = None
minute_bars: dict[str, deque] = field(default_factory=dict)
asking_price: dict[str, dict] = field(default_factory=dict)
# Phase 3b additions
daily_ohlcv: dict[str, list[dict]] = field(default_factory=dict)
chronos_predictions: dict[str, dict] = field(default_factory=dict)
minute_momentum: dict[str, str] = field(default_factory=dict)
signals: dict[str, dict] = field(default_factory=dict)
# F5 lifecycle
signal_cycle_id: int = 0
last_updated: dict[str, str] = field(default_factory=dict)
fetch_errors: dict[str, int] = field(default_factory=dict)
def get_active_signals(self, now: datetime) -> list[dict]:
"""expires_at > now 인 신호만 반환. expires_at 없으면 expired 취급."""
active: list[dict] = []
for sig in self.signals.values():
expires_at = sig.get("expires_at")
if not expires_at:
continue
try:
exp_dt = datetime.fromisoformat(expires_at)
except ValueError:
continue
if exp_dt > now:
active.append(sig)
return active
def purge_expired_signals(self, now: datetime) -> int:
"""만료된 signal 제거. 제거된 개수 반환."""
to_drop = []
for ticker, sig in self.signals.items():
expires_at = sig.get("expires_at")
if not expires_at:
to_drop.append(ticker)
continue
try:
exp_dt = datetime.fromisoformat(expires_at)
except ValueError:
to_drop.append(ticker)
continue
if exp_dt <= now:
to_drop.append(ticker)
for t in to_drop:
del self.signals[t]
return len(to_drop)
state = PollState()
```
- [ ] **Step 4: Run test to verify it passes**
```
python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v
```
Expected: 4 PASS.
- [ ] **Step 5: Verify full suite still passes**
```
python -m pytest ai_trade/tests -q
```
Expected: 기존 test 전부 PASS (state.signals dict 인터페이스 그대로).
- [ ] **Step 6: Commit**
```bash
git add ai_trade/state.py ai_trade/tests/test_state_signals_lifecycle.py
git commit -m "feat(ai_trade): state.signals에 expires_at + cycle_id lifecycle 추가 (F5 part 1)"
```
---
## Task 2: config에 SIGNAL_TTL_SECONDS 추가
**Files:**
- Modify: `ai_trade/config.py`
- Test: `ai_trade/tests/test_state_signals_lifecycle.py` (append)
- [ ] **Step 1: Write failing test**
`test_state_signals_lifecycle.py` 끝에 추가:
```python
def test_signal_ttl_seconds_default(monkeypatch):
monkeypatch.delenv("SIGNAL_TTL_SECONDS", raising=False)
from ai_trade.config import Settings
s = Settings()
assert s.signal_ttl_seconds == 300
def test_signal_ttl_seconds_env_override(monkeypatch):
monkeypatch.setenv("SIGNAL_TTL_SECONDS", "60")
from ai_trade.config import Settings
s = Settings()
assert s.signal_ttl_seconds == 60
```
- [ ] **Step 2: Run test to fail**
```
python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v -k signal_ttl
```
Expected: AttributeError.
- [ ] **Step 3: Add field to config.py**
`Settings` 클래스 안에 추가 (다른 *_threshold 옆):
```python
signal_ttl_seconds: int = field(
default_factory=lambda: int(os.getenv("SIGNAL_TTL_SECONDS", "300"))
)
```
- [ ] **Step 4: Run test**
```
python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v -k signal_ttl
```
Expected: 2 PASS.
- [ ] **Step 5: Commit**
```bash
git add ai_trade/config.py ai_trade/tests/test_state_signals_lifecycle.py
git commit -m "feat(ai_trade): SIGNAL_TTL_SECONDS env 추가 (F5 part 2)"
```
---
## Task 3: signal_generator에 cycle_id + expires_at 부착
**Files:**
- Modify: `ai_trade/signal_generator.py`
- Test: `ai_trade/tests/test_signal_generator.py` (append)
- [ ] **Step 1: Write failing tests**
기존 `test_signal_generator.py` 끝에 추가:
```python
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo as _ZI
_KST_TEST = _ZI("Asia/Seoul")
def test_emit_attaches_cycle_id_and_expires_at(
state_with_buy_setup, dedup_clean, settings_default,
):
"""매 emit 시 cycle_id + expires_at 부착 (F5)."""
from ai_trade.signal_generator import generate_signals
before = datetime.now(_KST_TEST)
generate_signals(state_with_buy_setup, dedup_clean, settings_default)
after = datetime.now(_KST_TEST)
sig = state_with_buy_setup.signals["005930"]
assert sig["cycle_id"] == 1
assert "expires_at" in sig
exp_dt = datetime.fromisoformat(sig["expires_at"])
# as_of + 300s (default) — tolerance 5s
assert before + timedelta(seconds=295) < exp_dt < after + timedelta(seconds=305)
def test_cycle_id_increments_each_call(
state_with_buy_setup, dedup_clean, settings_default,
):
"""generate_signals 호출마다 cycle_id += 1."""
from ai_trade.signal_generator import generate_signals
generate_signals(state_with_buy_setup, dedup_clean, settings_default)
assert state_with_buy_setup.signal_cycle_id == 1
# 2번째 호출 — dedup이 막아도 cycle_id는 증가해야 함
generate_signals(state_with_buy_setup, dedup_clean, settings_default)
assert state_with_buy_setup.signal_cycle_id == 2
```
**NOTE:** 기존 test_signal_generator.py에 `state_with_buy_setup` / `dedup_clean` / `settings_default` 같은 fixture가 있을 것. 만약 이름이 다르면 실제 fixture 이름에 맞춰 조정. 검증: `grep -n "@pytest.fixture" ai_trade/tests/test_signal_generator.py`.
- [ ] **Step 2: Run tests to verify they fail**
```
python -m pytest ai_trade/tests/test_signal_generator.py -v -k "cycle_id or expires"
```
Expected: KeyError 또는 AttributeError.
- [ ] **Step 3: Modify signal_generator.py**
`generate_signals` 함수 (L22-25)를 변경:
```python
def generate_signals(state, dedup, settings) -> None:
"""Phase 4 entry — state-mutating. F5: cycle_id += 1 + expires_at 부착."""
state.signal_cycle_id += 1
_evaluate_sell_signals(state, dedup, settings)
_evaluate_buy_signals(state, dedup, settings)
```
`_build_buy_signal` (L99-111)에 두 필드 추가:
```python
def _build_buy_signal(state, ticker: str, name: str, rank: int | None, confidence: float) -> dict:
ap = state.asking_price[ticker]
as_of_dt = datetime.now(KST)
expires_at = (as_of_dt + timedelta(seconds=getattr(_current_settings(), "signal_ttl_seconds", 300))).isoformat()
return {
"ticker": ticker,
"name": name,
"action": "buy",
"confidence_webai": confidence,
"current_price": ap["current_price"],
"avg_price": None,
"pnl_pct": None,
"context": _build_context(state, ticker, rank),
"as_of": as_of_dt.isoformat(),
"cycle_id": state.signal_cycle_id,
"expires_at": expires_at,
}
```
같이 `_build_sell_signal` (L174-186):
```python
def _build_sell_signal(state, holding: dict, confidence: float, reason: str, settings=None) -> dict:
ticker = holding["ticker"]
as_of_dt = datetime.now(KST)
ttl = getattr(settings, "signal_ttl_seconds", 300) if settings else 300
expires_at = (as_of_dt + timedelta(seconds=ttl)).isoformat()
return {
"ticker": ticker,
"name": holding.get("name", ticker),
"action": "sell",
"confidence_webai": confidence,
"current_price": holding.get("current_price"),
"avg_price": holding.get("avg_price"),
"pnl_pct": holding.get("pnl_pct"),
"context": _build_context(state, ticker, rank=None, sell_reason=reason),
"as_of": as_of_dt.isoformat(),
"cycle_id": state.signal_cycle_id,
"expires_at": expires_at,
}
```
`_build_buy_signal`이 settings를 안 받고 있으니, 호출부도 갱신해야 함. 현실적으로 두 함수에 `settings` 인자를 추가하는 것이 깔끔. 변경:
```python
def _evaluate_buy_signals(state, dedup, settings) -> None:
candidates = _buy_candidates(state)
for ticker, name, rank in candidates:
existing = state.signals.get(ticker)
if existing is not None and existing.get("action") == "sell":
logger.debug("buy %s skipped: same-cycle sell precedence", ticker)
continue
if not _check_buy_hard_gate(state, ticker, settings):
logger.debug("buy %s skipped: hard gate failed", ticker)
continue
confidence = _compute_buy_confidence(state, ticker, rank)
if confidence <= settings.confidence_threshold:
logger.debug("buy %s skipped: confidence %.3f <= %.3f",
ticker, confidence, settings.confidence_threshold)
continue
if dedup.is_recent(ticker, "buy", within_hours=24):
logger.debug("buy %s skipped: dedup 24h", ticker)
continue
state.signals[ticker] = _build_buy_signal(state, ticker, name, rank, confidence, settings)
dedup.record(ticker, "buy", confidence=confidence)
logger.info("signal emit %s buy conf=%.3f rank=%s cycle=%d",
ticker, confidence, rank, state.signal_cycle_id)
def _build_buy_signal(state, ticker, name, rank, confidence, settings) -> dict:
ap = state.asking_price[ticker]
as_of_dt = datetime.now(KST)
ttl = settings.signal_ttl_seconds
expires_at = (as_of_dt + timedelta(seconds=ttl)).isoformat()
return {
"ticker": ticker,
"name": name,
"action": "buy",
"confidence_webai": confidence,
"current_price": ap["current_price"],
"avg_price": None,
"pnl_pct": None,
"context": _build_context(state, ticker, rank),
"as_of": as_of_dt.isoformat(),
"cycle_id": state.signal_cycle_id,
"expires_at": expires_at,
}
```
매도 측도 마찬가지로 `settings`를 통과시킴. `_try_stop_loss` 등은 이미 `settings`를 받으므로 `_build_sell_signal(..., settings=settings)` 로 호출.
import 추가 (signal_generator.py 상단):
```python
from datetime import datetime, timedelta
```
(기존 import에 `timedelta` 만 추가)
`_current_settings()` 같은 헬퍼는 만들지 않음 — settings를 명시적으로 전달.
- [ ] **Step 4: Run tests**
```
python -m pytest ai_trade/tests/test_signal_generator.py -v
```
Expected: 신규 2개 PASS, 기존 PASS.
- [ ] **Step 5: Commit**
```bash
git add ai_trade/signal_generator.py ai_trade/tests/test_signal_generator.py
git commit -m "feat(ai_trade): emit signal에 cycle_id + expires_at 부착 (F5 part 3)"
```
---
## Task 4: pull_worker가 cycle 끝에 purge 호출
**Files:**
- Modify: `ai_trade/pull_worker.py`
- Test: `ai_trade/tests/test_pull_worker.py` (append)
- [ ] **Step 1: Write failing test**
`test_pull_worker.py` 끝에 추가:
```python
async def test_poll_loop_purges_expired_signals(monkeypatch):
"""매 cycle 끝에 expired signal이 제거됨 (F5)."""
from ai_trade import pull_worker
from ai_trade.state import PollState
from datetime import datetime as _dt
from zoneinfo import ZoneInfo as _ZI
from unittest.mock import AsyncMock, MagicMock
import asyncio as _asyncio
_kst = _ZI("Asia/Seoul")
now = _dt(2026, 5, 18, 10, 0, tzinfo=_kst)
class FrozenDT:
@staticmethod
def now(tz=None): return now
state = PollState()
state.signals = {
"OLD": {"ticker": "OLD", "expires_at": _dt(2026, 5, 18, 9, 0, tzinfo=_kst).isoformat(), "cycle_id": 1},
"FRESH": {"ticker": "FRESH", "expires_at": _dt(2026, 5, 18, 10, 30, tzinfo=_kst).isoformat(), "cycle_id": 1},
}
monkeypatch.setattr(pull_worker, "datetime", FrozenDT)
monkeypatch.setattr(pull_worker, "_is_market_day", lambda n: True)
monkeypatch.setattr(pull_worker, "_is_polling_window", lambda n: True)
monkeypatch.setattr(pull_worker, "_next_interval", lambda n: 0.01)
monkeypatch.setattr(pull_worker, "_run_polling_cycle", AsyncMock())
monkeypatch.setattr(pull_worker, "update_minute_momentum_for_all", lambda s: None)
monkeypatch.setattr(pull_worker, "_is_post_close_trigger", lambda *a, **k: False)
shutdown = _asyncio.Event()
async def stop_soon():
await _asyncio.sleep(0.05)
shutdown.set()
_asyncio.create_task(stop_soon())
await pull_worker.poll_loop(
client=MagicMock(), state=state, shutdown=shutdown,
kis_client=MagicMock(), chronos=MagicMock(),
dedup=None, settings=None,
)
assert "OLD" not in state.signals
assert "FRESH" in state.signals
```
- [ ] **Step 2: Run test to fail**
```
python -m pytest ai_trade/tests/test_pull_worker.py::test_poll_loop_purges_expired_signals -v
```
Expected: FAIL — OLD가 남아있음.
- [ ] **Step 3: Add purge call in poll_loop**
`ai_trade/pull_worker.py` `poll_loop` 안, signals 생성 이후 (또는 cycle 끝 직전) 한 줄 추가:
```python
# Phase 4: generate signals
if dedup is not None and settings is not None:
try:
from ai_trade.signal_generator import generate_signals
generate_signals(state, dedup, settings)
except Exception:
logger.exception("generate_signals failed")
# F5: 만료된 signal purge (consumer 미사용 케이스 보호)
try:
state.purge_expired_signals(datetime.now(KST))
except Exception:
logger.exception("purge_expired_signals failed")
```
- [ ] **Step 4: Run test**
```
python -m pytest ai_trade/tests/test_pull_worker.py::test_poll_loop_purges_expired_signals -v
```
Expected: PASS.
- [ ] **Step 5: Run full suite**
```
python -m pytest ai_trade/tests -q
```
Expected: 모두 PASS.
- [ ] **Step 6: Commit**
```bash
git add ai_trade/pull_worker.py ai_trade/tests/test_pull_worker.py
git commit -m "feat(ai_trade): poll_loop가 매 cycle 끝에 expired signal purge (F5 part 4)"
```
---
## Task 5: 전체 회귀 + push
- [ ] **Step 1: Final pytest**
```
python -m pytest ai_trade/tests -v
```
Expected: 모두 PASS (총 신규 약 9개 + 기존 56개).
- [ ] **Step 2: Push**
```bash
git push origin main
```
---
## Self-Review
1. **expires_at + cycle_id 부착**: `_build_buy_signal`, `_build_sell_signal` 양쪽 ✅
2. **cycle_id 증가**: `generate_signals` 진입에서 단 1회 ✅
3. **purge**: poll_loop cycle 마지막에 1회 호출 ✅
4. **get_active_signals**: Phase 5 consumer가 호출할 read API 존재 ✅
5. **legacy 신호 호환**: `expires_at` 없는 신호는 expired 취급 → 안전 ✅
**미커버 항목 (의도적)**:
- Phase 5 consumer가 처리 후 explicit drain하는 API는 이 plan에서 안 다룸 (consumer가 read-only로도 충분 — expires_at + dedup으로 idempotent).
- agent-office `/signal` HTTP endpoint는 Phase 5 plan 영역.
---
## Execution Handoff
**Plan complete and saved to `docs/superpowers/plans/2026-05-25-state-signals-lifecycle.md`.**
**1. Subagent-Driven (recommended)** — Task 별 fresh subagent.
**2. Inline Execution** — 현 세션 실행.
박재오 결정 대기. Plan 1 (hotfix) 마친 뒤 진입 권장.

View File

@@ -5,6 +5,9 @@
<link rel="icon" type="image/svg+xml" href="/main_logo.png" /> <link rel="icon" type="image/svg+xml" href="/main_logo.png" />
<meta name="viewport" content="width=device-width, initial-scale=1.0, viewport-fit=cover" /> <meta name="viewport" content="width=device-width, initial-scale=1.0, viewport-fit=cover" />
<title>가후습 개인기록</title> <title>가후습 개인기록</title>
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=Noto+Serif+KR:wght@500;700&display=swap" rel="stylesheet">
</head> </head>
<body> <body>
<div id="root"></div> <div id="root"></div>

430
src/pages/saju/Saju.css Normal file
View File

@@ -0,0 +1,430 @@
/* saju-page scope — 다른 페이지에 영향 없음 */
.saju-page {
/* 베이스 */
--saju-cream: #FAF6EE;
--saju-paper: #F2EAD8;
--saju-ink: #2E2D45;
--saju-ink-deep: #1F1D38;
/* 액센트 */
--saju-gold: #D4A574;
--saju-gold-deep: #B5874E;
--saju-apricot: #C58F76;
--saju-rose: #D9A2A6;
--saju-jade: #4B7065;
--saju-violet: #6A5285;
/* 카테고리 (3 ActionCard) */
--saju-today-bg: #4B7065;
--saju-gunghab-bg: #A8736E;
--saju-saju-bg: #4F4A78;
/* 점수 카테고리 (4 ScoreCard) */
--saju-wealth: #D4A574;
--saju-romance: #D9A2A6;
--saju-social: #4B7065;
--saju-career: #6A5285;
min-height: 100vh;
background: var(--saju-cream);
color: var(--saju-ink);
font-family: 'Pretendard', sans-serif;
padding: 0;
margin: 0;
}
.saju-page * { box-sizing: border-box; }
.saju-page .saju-h1,
.saju-page .saju-h2,
.saju-page .saju-h3 {
font-family: 'Noto Serif KR', 'Pretendard', serif;
font-weight: 700;
letter-spacing: -0.02em;
color: var(--saju-ink);
margin: 0;
}
.saju-page .saju-h1 { font-size: clamp(2.5rem, 4vw, 3.5rem); line-height: 1.2; }
.saju-page .saju-h2 { font-size: clamp(1.8rem, 3vw, 2.5rem); line-height: 1.3; }
.saju-page .saju-h3 { font-size: clamp(1.2rem, 2vw, 1.5rem); }
/* 호령 마스코트 */
.horyung-mascot { display: block; object-fit: contain; }
.horyung-mascot--sm { width: 80px; height: auto; }
.horyung-mascot--md { width: 180px; height: auto; }
.horyung-mascot--lg { width: 320px; height: auto; }
/* 상단 네비게이션 */
.saju-nav {
display: flex;
align-items: center;
justify-content: space-between;
padding: 1rem 2rem;
background: var(--saju-ink);
color: var(--saju-cream);
}
.saju-nav__logo {
font-family: 'Noto Serif KR', serif;
font-size: 1.25rem;
font-weight: 700;
color: var(--saju-cream);
text-decoration: none;
}
.saju-nav__links {
display: flex;
gap: 1.5rem;
list-style: none;
padding: 0;
margin: 0;
}
.saju-nav__links a {
color: var(--saju-cream);
text-decoration: none;
font-size: 0.95rem;
opacity: 0.85;
}
.saju-nav__links a:hover { opacity: 1; }
.saju-nav__cta {
background: var(--saju-gold);
color: var(--saju-ink);
border: none;
padding: 0.5rem 1.25rem;
border-radius: 999px;
font-weight: 600;
cursor: pointer;
font-family: 'Pretendard', sans-serif;
text-decoration: none;
}
/* Hero */
.saju-hero {
display: grid;
grid-template-columns: 1fr 1.4fr;
gap: 3rem;
padding: 3rem 2rem;
max-width: 1400px;
margin: 0 auto;
}
.saju-hero__left {
display: flex;
flex-direction: column;
align-items: center;
gap: 1.5rem;
}
.saju-quote-box {
background: var(--saju-paper);
padding: 1rem 1.25rem;
border-radius: 12px;
border: 1px solid var(--saju-gold-deep);
color: var(--saju-ink);
font-size: 0.9rem;
line-height: 1.5;
max-width: 280px;
}
.saju-hero__right {
display: flex;
flex-direction: column;
gap: 1.5rem;
justify-content: center;
}
.saju-sub {
color: var(--saju-ink);
opacity: 0.7;
margin: 0;
line-height: 1.6;
}
/* ActionCard */
.saju-action-cards {
display: grid;
grid-template-columns: repeat(3, 1fr);
gap: 1rem;
margin-top: 1rem;
}
.saju-action-card {
background: var(--saju-saju-bg);
color: var(--saju-cream);
padding: 1.5rem 1rem;
border-radius: 16px;
text-decoration: none;
display: flex;
flex-direction: column;
align-items: center;
gap: 0.5rem;
transition: transform 0.2s;
font-family: 'Pretendard', sans-serif;
}
.saju-action-card:hover { transform: translateY(-4px); }
.saju-action-card--today { background: var(--saju-today-bg); }
.saju-action-card--gunghab { background: var(--saju-gunghab-bg); }
.saju-action-card--saju { background: var(--saju-saju-bg); }
.saju-action-card[aria-disabled="true"] { opacity: 0.6; cursor: not-allowed; }
.saju-action-card__icon { font-size: 2rem; }
.saju-action-card__title { font-size: 1.1rem; font-weight: 700; }
.saju-action-card__desc { font-size: 0.85rem; opacity: 0.85; text-align: center; }
/* Bottom */
.saju-bottom {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 3rem;
padding: 3rem 2rem;
max-width: 1400px;
margin: 0 auto;
background: var(--saju-ink);
color: var(--saju-cream);
border-radius: 24px 24px 0 0;
}
.saju-form { display: flex; flex-direction: column; gap: 1rem; }
.saju-form input,
.saju-form select {
padding: 0.75rem;
border-radius: 8px;
border: 1px solid var(--saju-gold-deep);
background: var(--saju-ink-deep);
color: var(--saju-cream);
font-family: inherit;
font-size: 1rem;
}
.saju-form button {
background: var(--saju-gold);
color: var(--saju-ink);
border: none;
padding: 0.875rem;
border-radius: 999px;
font-weight: 700;
cursor: pointer;
font-family: inherit;
font-size: 1rem;
}
.saju-form button:disabled { opacity: 0.6; cursor: not-allowed; }
.saju-form__error {
background: rgba(217, 162, 166, 0.2);
color: var(--saju-rose);
padding: 0.75rem;
border-radius: 8px;
font-size: 0.9rem;
}
/* Fortune ring */
.saju-fortune-ring {
display: flex;
align-items: center;
justify-content: center;
position: relative;
}
.saju-fortune-ring svg { width: 200px; height: 200px; }
.saju-fortune-ring__score {
position: absolute;
font-family: 'Noto Serif KR', serif;
font-size: 2.5rem;
font-weight: 700;
color: var(--saju-ink);
}
.saju-fortune-ring__total { font-size: 0.9rem; color: var(--saju-ink); opacity: 0.6; }
/* ScoreCard */
.saju-score-card {
background: var(--saju-cream);
border-radius: 16px;
padding: 1.25rem;
display: flex;
flex-direction: column;
gap: 0.5rem;
border: 1px solid var(--saju-paper);
}
.saju-score-card__head { display: flex; align-items: center; gap: 0.5rem; }
.saju-score-card__icon { font-size: 1.5rem; }
.saju-score-card__title { font-weight: 700; font-size: 0.95rem; }
.saju-score-card__value {
font-family: 'Noto Serif KR', serif;
font-size: 2rem;
font-weight: 700;
color: var(--saju-ink);
}
.saju-score-card__bar {
height: 6px;
background: var(--saju-paper);
border-radius: 3px;
overflow: hidden;
}
.saju-score-card__bar > div { height: 100%; background: var(--saju-gold); transition: width 0.5s; }
/* Lucky box */
.saju-lucky-box {
background: var(--saju-paper);
border-radius: 16px;
padding: 1.5rem;
display: grid;
grid-template-columns: repeat(3, 1fr);
gap: 1rem;
}
.saju-lucky-box__item { text-align: center; }
.saju-lucky-box__label { font-size: 0.8rem; color: var(--saju-ink); opacity: 0.7; margin-bottom: 0.25rem; }
.saju-lucky-box__value {
font-family: 'Noto Serif KR', serif;
font-size: 1.5rem;
font-weight: 700;
color: var(--saju-ink);
}
/* SajuPillars */
.saju-pillars {
display: grid;
grid-template-columns: repeat(4, 1fr);
gap: 0.75rem;
}
.saju-pillar {
background: var(--saju-paper);
border-radius: 12px;
padding: 1rem;
text-align: center;
}
.saju-pillar__label { font-size: 0.8rem; color: var(--saju-ink); opacity: 0.6; margin-bottom: 0.5rem; }
.saju-pillar__stem,
.saju-pillar__branch {
font-family: 'Noto Serif KR', serif;
font-size: 1.75rem;
font-weight: 700;
display: block;
}
.saju-pillar__stem-kr,
.saju-pillar__branch-kr { font-size: 0.85rem; opacity: 0.7; }
.saju-pillar__ten-god,
.saju-pillar__fortune { font-size: 0.75rem; margin-top: 0.25rem; opacity: 0.7; }
/* Element bars */
.saju-element-bars {
display: flex;
flex-direction: column;
gap: 0.5rem;
padding: 1.5rem;
background: var(--saju-cream);
border-radius: 16px;
}
.saju-element-bar {
display: grid;
grid-template-columns: 60px 1fr 50px;
align-items: center;
gap: 0.75rem;
}
.saju-element-bar__label { font-size: 0.9rem; font-weight: 700; }
.saju-element-bar__track {
height: 12px;
background: var(--saju-paper);
border-radius: 6px;
overflow: hidden;
}
.saju-element-bar__fill {
height: 100%;
border-radius: 6px;
transition: width 0.5s;
}
.saju-element-bar__fill--木 { background: #4B7065; }
.saju-element-bar__fill--火 { background: #C56F5C; }
.saju-element-bar__fill--土 { background: #D4A574; }
.saju-element-bar__fill--金 { background: #B8B5A8; }
.saju-element-bar__fill--水 { background: #4A5878; }
.saju-element-bar__value { text-align: right; font-size: 0.85rem; opacity: 0.7; }
/* Monthly flow */
.saju-monthly-flow {
display: grid;
grid-template-columns: repeat(12, 1fr);
gap: 0.25rem;
padding: 1rem;
background: var(--saju-cream);
border-radius: 16px;
}
.saju-monthly-flow__cell {
display: flex;
flex-direction: column;
align-items: center;
padding: 0.5rem 0.25rem;
border-radius: 8px;
background: var(--saju-paper);
}
.saju-monthly-flow__month { font-size: 0.7rem; opacity: 0.7; }
.saju-monthly-flow__score {
font-family: 'Noto Serif KR', serif;
font-weight: 700;
font-size: 1rem;
}
.saju-monthly-flow__label { font-size: 0.7rem; opacity: 0.8; margin-top: 0.25rem; }
/* Horyung quote */
.saju-horyung-quote {
background: var(--saju-ink);
color: var(--saju-cream);
padding: 1.5rem;
border-radius: 16px;
display: flex;
gap: 1rem;
align-items: flex-start;
}
.saju-horyung-quote__text { font-size: 0.95rem; line-height: 1.6; }
/* Interpret accordion */
.saju-interpret-accordion { display: flex; flex-direction: column; gap: 0.5rem; }
.saju-interpret-item {
background: var(--saju-cream);
border-radius: 12px;
border: 1px solid var(--saju-paper);
overflow: hidden;
}
.saju-interpret-item__header {
padding: 1rem;
background: var(--saju-paper);
cursor: pointer;
display: flex;
justify-content: space-between;
align-items: center;
font-weight: 700;
user-select: none;
}
.saju-interpret-item__body { padding: 1rem; font-size: 0.95rem; line-height: 1.6; }
.saju-interpret-item__evidence {
background: var(--saju-paper);
padding: 0.75rem;
border-radius: 8px;
margin-top: 0.75rem;
font-size: 0.85rem;
opacity: 0.85;
}
/* Stub */
.saju-stub {
max-width: 480px;
margin: 5rem auto;
text-align: center;
padding: 2rem;
background: var(--saju-paper);
border-radius: 24px;
}
.saju-stub a {
display: inline-block;
margin-top: 1.5rem;
background: var(--saju-gold);
color: var(--saju-ink);
padding: 0.75rem 1.5rem;
border-radius: 999px;
text-decoration: none;
font-weight: 700;
}
/* 반응형 */
@media (max-width: 1280px) {
.saju-hero { grid-template-columns: 1fr; text-align: center; }
.saju-hero__left { order: 2; }
.saju-hero__right { order: 1; }
.saju-bottom { grid-template-columns: 1fr; }
}
@media (max-width: 768px) {
.saju-nav { padding: 0.75rem 1rem; flex-wrap: wrap; gap: 0.5rem; }
.saju-nav__links { display: none; }
.saju-action-cards { grid-template-columns: 1fr; }
.saju-pillars { grid-template-columns: repeat(2, 1fr); }
.saju-monthly-flow { grid-template-columns: repeat(4, 1fr); }
.horyung-mascot--lg { width: 220px; }
}