# state.signals Lifecycle — Code Review F5 Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** `state.signals` 가 무한 dict 누적되는 문제를 해결. expires_at + cycle_id 부착해서 Phase 5 consumer (agent-office `/signal`) 가 stale 신호를 안전하게 무시할 수 있게. **Architecture:** 1. `Signal` dict에 `expires_at: ISO str`, `cycle_id: int` 필드 추가. 2. `PollState.signal_cycle_id: int` (process 단위 auto-increment). 3. `generate_signals(state, dedup, settings)` 진입마다 `cycle_id += 1`. 4. emit하는 모든 signal에 `expires_at = as_of + SIGNAL_TTL_SECONDS`, `cycle_id = state.signal_cycle_id` 부착. 5. `state.purge_expired_signals(now)` helper — 매 cycle 끝에 호출하여 만료된 항목 제거. 6. `state.get_active_signals(now) → list[dict]` — Phase 5 consumer가 호출할 read API. 만료된 것 제외. **Tech Stack:** Python 3.12, asyncio, pytest. 기존 cycle 흐름과 호환되도록 generate_signals 인터페이스는 그대로. **Why expires_at + cycle_id (not pop-on-read):** consumer가 polling 실패해도 신호 손실 없음. cycle_id로 "이번 cycle에 새로 emit된 신호" 식별 가능 → Phase 5에서 incremental fetch 가능. **Working directory:** `C:\Users\jaeoh\Desktop\workspace\web-ai`. **Test runner:** `python -m pytest ai_trade/tests -q` (또는 `py -3.12 -m`). 환경 부재 시 plan 진행 중단. --- ## File Map | 파일 | 변경 | 책임 | |------|------|------| | `ai_trade/config.py` | Add 1 field | `signal_ttl_seconds: int` (default 300) | | `ai_trade/state.py` | Modify | `signal_cycle_id: int`, helper 2개 (`get_active_signals`, `purge_expired_signals`) | | `ai_trade/signal_generator.py` | Modify L22-50, 133, 99-111, 174-186 | cycle_id 증가 + expires_at/cycle_id 부착 | | `ai_trade/pull_worker.py` | Modify L46-51 근처 | cycle 끝에 purge 호출 | | `ai_trade/tests/test_state_signals_lifecycle.py` | Create | 5 test (expires, cycle_id, purge, active list) | | `ai_trade/tests/test_signal_generator.py` | Modify | 기존 emit test에 expires_at/cycle_id 필드 검증 추가 | --- ## Task 1: PollState에 cycle_id + lifecycle helper 추가 **Files:** - Modify: `ai_trade/state.py` - Test: `ai_trade/tests/test_state_signals_lifecycle.py` (Create) - [ ] **Step 1: Write the failing test** ```python # ai_trade/tests/test_state_signals_lifecycle.py """F5 — state.signals lifecycle (expires_at + cycle_id).""" from datetime import datetime, timedelta from zoneinfo import ZoneInfo import pytest from ai_trade.state import PollState KST = ZoneInfo("Asia/Seoul") def test_initial_signal_cycle_id_is_zero(): state = PollState() assert state.signal_cycle_id == 0 def test_get_active_signals_excludes_expired(): state = PollState() now = datetime(2026, 5, 25, 10, 0, tzinfo=KST) future = (now + timedelta(seconds=300)).isoformat() past = (now - timedelta(seconds=60)).isoformat() state.signals = { "A": {"ticker": "A", "expires_at": future, "cycle_id": 1, "action": "buy"}, "B": {"ticker": "B", "expires_at": past, "cycle_id": 1, "action": "buy"}, } active = state.get_active_signals(now) tickers = [s["ticker"] for s in active] assert "A" in tickers assert "B" not in tickers def test_get_active_signals_treats_missing_expires_as_expired(): """expires_at 없는 legacy 신호는 expired로 간주.""" state = PollState() now = datetime(2026, 5, 25, 10, 0, tzinfo=KST) state.signals = {"C": {"ticker": "C", "action": "buy"}} assert state.get_active_signals(now) == [] def test_purge_expired_signals_removes_expired(): state = PollState() now = datetime(2026, 5, 25, 10, 0, tzinfo=KST) future = (now + timedelta(seconds=300)).isoformat() past = (now - timedelta(seconds=60)).isoformat() state.signals = { "A": {"ticker": "A", "expires_at": future, "cycle_id": 1}, "B": {"ticker": "B", "expires_at": past, "cycle_id": 1}, } state.purge_expired_signals(now) assert "A" in state.signals assert "B" not in state.signals ``` - [ ] **Step 2: Run test to verify it fails** ``` python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v ``` Expected: `AttributeError: signal_cycle_id` 또는 `get_active_signals` 미구현. - [ ] **Step 3: Modify state.py** ```python """PollState — process-wide singleton.""" from collections import deque from dataclasses import dataclass, field from datetime import datetime @dataclass class PollState: portfolio: dict | None = None news_sentiment: dict | None = None screener_preview: dict | None = None minute_bars: dict[str, deque] = field(default_factory=dict) asking_price: dict[str, dict] = field(default_factory=dict) # Phase 3b additions daily_ohlcv: dict[str, list[dict]] = field(default_factory=dict) chronos_predictions: dict[str, dict] = field(default_factory=dict) minute_momentum: dict[str, str] = field(default_factory=dict) signals: dict[str, dict] = field(default_factory=dict) # F5 lifecycle signal_cycle_id: int = 0 last_updated: dict[str, str] = field(default_factory=dict) fetch_errors: dict[str, int] = field(default_factory=dict) def get_active_signals(self, now: datetime) -> list[dict]: """expires_at > now 인 신호만 반환. expires_at 없으면 expired 취급.""" active: list[dict] = [] for sig in self.signals.values(): expires_at = sig.get("expires_at") if not expires_at: continue try: exp_dt = datetime.fromisoformat(expires_at) except ValueError: continue if exp_dt > now: active.append(sig) return active def purge_expired_signals(self, now: datetime) -> int: """만료된 signal 제거. 제거된 개수 반환.""" to_drop = [] for ticker, sig in self.signals.items(): expires_at = sig.get("expires_at") if not expires_at: to_drop.append(ticker) continue try: exp_dt = datetime.fromisoformat(expires_at) except ValueError: to_drop.append(ticker) continue if exp_dt <= now: to_drop.append(ticker) for t in to_drop: del self.signals[t] return len(to_drop) state = PollState() ``` - [ ] **Step 4: Run test to verify it passes** ``` python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v ``` Expected: 4 PASS. - [ ] **Step 5: Verify full suite still passes** ``` python -m pytest ai_trade/tests -q ``` Expected: 기존 test 전부 PASS (state.signals dict 인터페이스 그대로). - [ ] **Step 6: Commit** ```bash git add ai_trade/state.py ai_trade/tests/test_state_signals_lifecycle.py git commit -m "feat(ai_trade): state.signals에 expires_at + cycle_id lifecycle 추가 (F5 part 1)" ``` --- ## Task 2: config에 SIGNAL_TTL_SECONDS 추가 **Files:** - Modify: `ai_trade/config.py` - Test: `ai_trade/tests/test_state_signals_lifecycle.py` (append) - [ ] **Step 1: Write failing test** `test_state_signals_lifecycle.py` 끝에 추가: ```python def test_signal_ttl_seconds_default(monkeypatch): monkeypatch.delenv("SIGNAL_TTL_SECONDS", raising=False) from ai_trade.config import Settings s = Settings() assert s.signal_ttl_seconds == 300 def test_signal_ttl_seconds_env_override(monkeypatch): monkeypatch.setenv("SIGNAL_TTL_SECONDS", "60") from ai_trade.config import Settings s = Settings() assert s.signal_ttl_seconds == 60 ``` - [ ] **Step 2: Run test to fail** ``` python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v -k signal_ttl ``` Expected: AttributeError. - [ ] **Step 3: Add field to config.py** `Settings` 클래스 안에 추가 (다른 *_threshold 옆): ```python signal_ttl_seconds: int = field( default_factory=lambda: int(os.getenv("SIGNAL_TTL_SECONDS", "300")) ) ``` - [ ] **Step 4: Run test** ``` python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v -k signal_ttl ``` Expected: 2 PASS. - [ ] **Step 5: Commit** ```bash git add ai_trade/config.py ai_trade/tests/test_state_signals_lifecycle.py git commit -m "feat(ai_trade): SIGNAL_TTL_SECONDS env 추가 (F5 part 2)" ``` --- ## Task 3: signal_generator에 cycle_id + expires_at 부착 **Files:** - Modify: `ai_trade/signal_generator.py` - Test: `ai_trade/tests/test_signal_generator.py` (append) - [ ] **Step 1: Write failing tests** 기존 `test_signal_generator.py` 끝에 추가: ```python from datetime import datetime, timedelta from zoneinfo import ZoneInfo as _ZI _KST_TEST = _ZI("Asia/Seoul") def test_emit_attaches_cycle_id_and_expires_at( state_with_buy_setup, dedup_clean, settings_default, ): """매 emit 시 cycle_id + expires_at 부착 (F5).""" from ai_trade.signal_generator import generate_signals before = datetime.now(_KST_TEST) generate_signals(state_with_buy_setup, dedup_clean, settings_default) after = datetime.now(_KST_TEST) sig = state_with_buy_setup.signals["005930"] assert sig["cycle_id"] == 1 assert "expires_at" in sig exp_dt = datetime.fromisoformat(sig["expires_at"]) # as_of + 300s (default) — tolerance 5s assert before + timedelta(seconds=295) < exp_dt < after + timedelta(seconds=305) def test_cycle_id_increments_each_call( state_with_buy_setup, dedup_clean, settings_default, ): """generate_signals 호출마다 cycle_id += 1.""" from ai_trade.signal_generator import generate_signals generate_signals(state_with_buy_setup, dedup_clean, settings_default) assert state_with_buy_setup.signal_cycle_id == 1 # 2번째 호출 — dedup이 막아도 cycle_id는 증가해야 함 generate_signals(state_with_buy_setup, dedup_clean, settings_default) assert state_with_buy_setup.signal_cycle_id == 2 ``` **NOTE:** 기존 test_signal_generator.py에 `state_with_buy_setup` / `dedup_clean` / `settings_default` 같은 fixture가 있을 것. 만약 이름이 다르면 실제 fixture 이름에 맞춰 조정. 검증: `grep -n "@pytest.fixture" ai_trade/tests/test_signal_generator.py`. - [ ] **Step 2: Run tests to verify they fail** ``` python -m pytest ai_trade/tests/test_signal_generator.py -v -k "cycle_id or expires" ``` Expected: KeyError 또는 AttributeError. - [ ] **Step 3: Modify signal_generator.py** `generate_signals` 함수 (L22-25)를 변경: ```python def generate_signals(state, dedup, settings) -> None: """Phase 4 entry — state-mutating. F5: cycle_id += 1 + expires_at 부착.""" state.signal_cycle_id += 1 _evaluate_sell_signals(state, dedup, settings) _evaluate_buy_signals(state, dedup, settings) ``` `_build_buy_signal` (L99-111)에 두 필드 추가: ```python def _build_buy_signal(state, ticker: str, name: str, rank: int | None, confidence: float) -> dict: ap = state.asking_price[ticker] as_of_dt = datetime.now(KST) expires_at = (as_of_dt + timedelta(seconds=getattr(_current_settings(), "signal_ttl_seconds", 300))).isoformat() return { "ticker": ticker, "name": name, "action": "buy", "confidence_webai": confidence, "current_price": ap["current_price"], "avg_price": None, "pnl_pct": None, "context": _build_context(state, ticker, rank), "as_of": as_of_dt.isoformat(), "cycle_id": state.signal_cycle_id, "expires_at": expires_at, } ``` 같이 `_build_sell_signal` (L174-186): ```python def _build_sell_signal(state, holding: dict, confidence: float, reason: str, settings=None) -> dict: ticker = holding["ticker"] as_of_dt = datetime.now(KST) ttl = getattr(settings, "signal_ttl_seconds", 300) if settings else 300 expires_at = (as_of_dt + timedelta(seconds=ttl)).isoformat() return { "ticker": ticker, "name": holding.get("name", ticker), "action": "sell", "confidence_webai": confidence, "current_price": holding.get("current_price"), "avg_price": holding.get("avg_price"), "pnl_pct": holding.get("pnl_pct"), "context": _build_context(state, ticker, rank=None, sell_reason=reason), "as_of": as_of_dt.isoformat(), "cycle_id": state.signal_cycle_id, "expires_at": expires_at, } ``` `_build_buy_signal`이 settings를 안 받고 있으니, 호출부도 갱신해야 함. 현실적으로 두 함수에 `settings` 인자를 추가하는 것이 깔끔. 변경: ```python def _evaluate_buy_signals(state, dedup, settings) -> None: candidates = _buy_candidates(state) for ticker, name, rank in candidates: existing = state.signals.get(ticker) if existing is not None and existing.get("action") == "sell": logger.debug("buy %s skipped: same-cycle sell precedence", ticker) continue if not _check_buy_hard_gate(state, ticker, settings): logger.debug("buy %s skipped: hard gate failed", ticker) continue confidence = _compute_buy_confidence(state, ticker, rank) if confidence <= settings.confidence_threshold: logger.debug("buy %s skipped: confidence %.3f <= %.3f", ticker, confidence, settings.confidence_threshold) continue if dedup.is_recent(ticker, "buy", within_hours=24): logger.debug("buy %s skipped: dedup 24h", ticker) continue state.signals[ticker] = _build_buy_signal(state, ticker, name, rank, confidence, settings) dedup.record(ticker, "buy", confidence=confidence) logger.info("signal emit %s buy conf=%.3f rank=%s cycle=%d", ticker, confidence, rank, state.signal_cycle_id) def _build_buy_signal(state, ticker, name, rank, confidence, settings) -> dict: ap = state.asking_price[ticker] as_of_dt = datetime.now(KST) ttl = settings.signal_ttl_seconds expires_at = (as_of_dt + timedelta(seconds=ttl)).isoformat() return { "ticker": ticker, "name": name, "action": "buy", "confidence_webai": confidence, "current_price": ap["current_price"], "avg_price": None, "pnl_pct": None, "context": _build_context(state, ticker, rank), "as_of": as_of_dt.isoformat(), "cycle_id": state.signal_cycle_id, "expires_at": expires_at, } ``` 매도 측도 마찬가지로 `settings`를 통과시킴. `_try_stop_loss` 등은 이미 `settings`를 받으므로 `_build_sell_signal(..., settings=settings)` 로 호출. import 추가 (signal_generator.py 상단): ```python from datetime import datetime, timedelta ``` (기존 import에 `timedelta` 만 추가) `_current_settings()` 같은 헬퍼는 만들지 않음 — settings를 명시적으로 전달. - [ ] **Step 4: Run tests** ``` python -m pytest ai_trade/tests/test_signal_generator.py -v ``` Expected: 신규 2개 PASS, 기존 PASS. - [ ] **Step 5: Commit** ```bash git add ai_trade/signal_generator.py ai_trade/tests/test_signal_generator.py git commit -m "feat(ai_trade): emit signal에 cycle_id + expires_at 부착 (F5 part 3)" ``` --- ## Task 4: pull_worker가 cycle 끝에 purge 호출 **Files:** - Modify: `ai_trade/pull_worker.py` - Test: `ai_trade/tests/test_pull_worker.py` (append) - [ ] **Step 1: Write failing test** `test_pull_worker.py` 끝에 추가: ```python async def test_poll_loop_purges_expired_signals(monkeypatch): """매 cycle 끝에 expired signal이 제거됨 (F5).""" from ai_trade import pull_worker from ai_trade.state import PollState from datetime import datetime as _dt from zoneinfo import ZoneInfo as _ZI from unittest.mock import AsyncMock, MagicMock import asyncio as _asyncio _kst = _ZI("Asia/Seoul") now = _dt(2026, 5, 18, 10, 0, tzinfo=_kst) class FrozenDT: @staticmethod def now(tz=None): return now state = PollState() state.signals = { "OLD": {"ticker": "OLD", "expires_at": _dt(2026, 5, 18, 9, 0, tzinfo=_kst).isoformat(), "cycle_id": 1}, "FRESH": {"ticker": "FRESH", "expires_at": _dt(2026, 5, 18, 10, 30, tzinfo=_kst).isoformat(), "cycle_id": 1}, } monkeypatch.setattr(pull_worker, "datetime", FrozenDT) monkeypatch.setattr(pull_worker, "_is_market_day", lambda n: True) monkeypatch.setattr(pull_worker, "_is_polling_window", lambda n: True) monkeypatch.setattr(pull_worker, "_next_interval", lambda n: 0.01) monkeypatch.setattr(pull_worker, "_run_polling_cycle", AsyncMock()) monkeypatch.setattr(pull_worker, "update_minute_momentum_for_all", lambda s: None) monkeypatch.setattr(pull_worker, "_is_post_close_trigger", lambda *a, **k: False) shutdown = _asyncio.Event() async def stop_soon(): await _asyncio.sleep(0.05) shutdown.set() _asyncio.create_task(stop_soon()) await pull_worker.poll_loop( client=MagicMock(), state=state, shutdown=shutdown, kis_client=MagicMock(), chronos=MagicMock(), dedup=None, settings=None, ) assert "OLD" not in state.signals assert "FRESH" in state.signals ``` - [ ] **Step 2: Run test to fail** ``` python -m pytest ai_trade/tests/test_pull_worker.py::test_poll_loop_purges_expired_signals -v ``` Expected: FAIL — OLD가 남아있음. - [ ] **Step 3: Add purge call in poll_loop** `ai_trade/pull_worker.py` `poll_loop` 안, signals 생성 이후 (또는 cycle 끝 직전) 한 줄 추가: ```python # Phase 4: generate signals if dedup is not None and settings is not None: try: from ai_trade.signal_generator import generate_signals generate_signals(state, dedup, settings) except Exception: logger.exception("generate_signals failed") # F5: 만료된 signal purge (consumer 미사용 케이스 보호) try: state.purge_expired_signals(datetime.now(KST)) except Exception: logger.exception("purge_expired_signals failed") ``` - [ ] **Step 4: Run test** ``` python -m pytest ai_trade/tests/test_pull_worker.py::test_poll_loop_purges_expired_signals -v ``` Expected: PASS. - [ ] **Step 5: Run full suite** ``` python -m pytest ai_trade/tests -q ``` Expected: 모두 PASS. - [ ] **Step 6: Commit** ```bash git add ai_trade/pull_worker.py ai_trade/tests/test_pull_worker.py git commit -m "feat(ai_trade): poll_loop가 매 cycle 끝에 expired signal purge (F5 part 4)" ``` --- ## Task 5: 전체 회귀 + push - [ ] **Step 1: Final pytest** ``` python -m pytest ai_trade/tests -v ``` Expected: 모두 PASS (총 신규 약 9개 + 기존 56개). - [ ] **Step 2: Push** ```bash git push origin main ``` --- ## Self-Review 1. **expires_at + cycle_id 부착**: `_build_buy_signal`, `_build_sell_signal` 양쪽 ✅ 2. **cycle_id 증가**: `generate_signals` 진입에서 단 1회 ✅ 3. **purge**: poll_loop cycle 마지막에 1회 호출 ✅ 4. **get_active_signals**: Phase 5 consumer가 호출할 read API 존재 ✅ 5. **legacy 신호 호환**: `expires_at` 없는 신호는 expired 취급 → 안전 ✅ **미커버 항목 (의도적)**: - Phase 5 consumer가 처리 후 explicit drain하는 API는 이 plan에서 안 다룸 (consumer가 read-only로도 충분 — expires_at + dedup으로 idempotent). - agent-office `/signal` HTTP endpoint는 Phase 5 plan 영역. --- ## Execution Handoff **Plan complete and saved to `docs/superpowers/plans/2026-05-25-state-signals-lifecycle.md`.** **1. Subagent-Driven (recommended)** — Task 별 fresh subagent. **2. Inline Execution** — 현 세션 실행. 박재오 결정 대기. Plan 1 (hotfix) 마친 뒤 진입 권장.