Files
web-page/docs/superpowers/plans/2026-05-25-state-signals-lifecycle.md

594 lines
20 KiB
Markdown

# state.signals Lifecycle — Code Review F5 Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** `state.signals` 가 무한 dict 누적되는 문제를 해결. expires_at + cycle_id 부착해서 Phase 5 consumer (agent-office `/signal`) 가 stale 신호를 안전하게 무시할 수 있게.
**Architecture:**
1. `Signal` dict에 `expires_at: ISO str`, `cycle_id: int` 필드 추가.
2. `PollState.signal_cycle_id: int` (process 단위 auto-increment).
3. `generate_signals(state, dedup, settings)` 진입마다 `cycle_id += 1`.
4. emit하는 모든 signal에 `expires_at = as_of + SIGNAL_TTL_SECONDS`, `cycle_id = state.signal_cycle_id` 부착.
5. `state.purge_expired_signals(now)` helper — 매 cycle 끝에 호출하여 만료된 항목 제거.
6. `state.get_active_signals(now) → list[dict]` — Phase 5 consumer가 호출할 read API. 만료된 것 제외.
**Tech Stack:** Python 3.12, asyncio, pytest. 기존 cycle 흐름과 호환되도록 generate_signals 인터페이스는 그대로.
**Why expires_at + cycle_id (not pop-on-read):** consumer가 polling 실패해도 신호 손실 없음. cycle_id로 "이번 cycle에 새로 emit된 신호" 식별 가능 → Phase 5에서 incremental fetch 가능.
**Working directory:** `C:\Users\jaeoh\Desktop\workspace\web-ai`.
**Test runner:** `python -m pytest ai_trade/tests -q` (또는 `py -3.12 -m`). 환경 부재 시 plan 진행 중단.
---
## File Map
| 파일 | 변경 | 책임 |
|------|------|------|
| `ai_trade/config.py` | Add 1 field | `signal_ttl_seconds: int` (default 300) |
| `ai_trade/state.py` | Modify | `signal_cycle_id: int`, helper 2개 (`get_active_signals`, `purge_expired_signals`) |
| `ai_trade/signal_generator.py` | Modify L22-50, 133, 99-111, 174-186 | cycle_id 증가 + expires_at/cycle_id 부착 |
| `ai_trade/pull_worker.py` | Modify L46-51 근처 | cycle 끝에 purge 호출 |
| `ai_trade/tests/test_state_signals_lifecycle.py` | Create | 5 test (expires, cycle_id, purge, active list) |
| `ai_trade/tests/test_signal_generator.py` | Modify | 기존 emit test에 expires_at/cycle_id 필드 검증 추가 |
---
## Task 1: PollState에 cycle_id + lifecycle helper 추가
**Files:**
- Modify: `ai_trade/state.py`
- Test: `ai_trade/tests/test_state_signals_lifecycle.py` (Create)
- [ ] **Step 1: Write the failing test**
```python
# ai_trade/tests/test_state_signals_lifecycle.py
"""F5 — state.signals lifecycle (expires_at + cycle_id)."""
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
import pytest
from ai_trade.state import PollState
KST = ZoneInfo("Asia/Seoul")
def test_initial_signal_cycle_id_is_zero():
state = PollState()
assert state.signal_cycle_id == 0
def test_get_active_signals_excludes_expired():
state = PollState()
now = datetime(2026, 5, 25, 10, 0, tzinfo=KST)
future = (now + timedelta(seconds=300)).isoformat()
past = (now - timedelta(seconds=60)).isoformat()
state.signals = {
"A": {"ticker": "A", "expires_at": future, "cycle_id": 1, "action": "buy"},
"B": {"ticker": "B", "expires_at": past, "cycle_id": 1, "action": "buy"},
}
active = state.get_active_signals(now)
tickers = [s["ticker"] for s in active]
assert "A" in tickers
assert "B" not in tickers
def test_get_active_signals_treats_missing_expires_as_expired():
"""expires_at 없는 legacy 신호는 expired로 간주."""
state = PollState()
now = datetime(2026, 5, 25, 10, 0, tzinfo=KST)
state.signals = {"C": {"ticker": "C", "action": "buy"}}
assert state.get_active_signals(now) == []
def test_purge_expired_signals_removes_expired():
state = PollState()
now = datetime(2026, 5, 25, 10, 0, tzinfo=KST)
future = (now + timedelta(seconds=300)).isoformat()
past = (now - timedelta(seconds=60)).isoformat()
state.signals = {
"A": {"ticker": "A", "expires_at": future, "cycle_id": 1},
"B": {"ticker": "B", "expires_at": past, "cycle_id": 1},
}
state.purge_expired_signals(now)
assert "A" in state.signals
assert "B" not in state.signals
```
- [ ] **Step 2: Run test to verify it fails**
```
python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v
```
Expected: `AttributeError: signal_cycle_id` 또는 `get_active_signals` 미구현.
- [ ] **Step 3: Modify state.py**
```python
"""PollState — process-wide singleton."""
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class PollState:
portfolio: dict | None = None
news_sentiment: dict | None = None
screener_preview: dict | None = None
minute_bars: dict[str, deque] = field(default_factory=dict)
asking_price: dict[str, dict] = field(default_factory=dict)
# Phase 3b additions
daily_ohlcv: dict[str, list[dict]] = field(default_factory=dict)
chronos_predictions: dict[str, dict] = field(default_factory=dict)
minute_momentum: dict[str, str] = field(default_factory=dict)
signals: dict[str, dict] = field(default_factory=dict)
# F5 lifecycle
signal_cycle_id: int = 0
last_updated: dict[str, str] = field(default_factory=dict)
fetch_errors: dict[str, int] = field(default_factory=dict)
def get_active_signals(self, now: datetime) -> list[dict]:
"""expires_at > now 인 신호만 반환. expires_at 없으면 expired 취급."""
active: list[dict] = []
for sig in self.signals.values():
expires_at = sig.get("expires_at")
if not expires_at:
continue
try:
exp_dt = datetime.fromisoformat(expires_at)
except ValueError:
continue
if exp_dt > now:
active.append(sig)
return active
def purge_expired_signals(self, now: datetime) -> int:
"""만료된 signal 제거. 제거된 개수 반환."""
to_drop = []
for ticker, sig in self.signals.items():
expires_at = sig.get("expires_at")
if not expires_at:
to_drop.append(ticker)
continue
try:
exp_dt = datetime.fromisoformat(expires_at)
except ValueError:
to_drop.append(ticker)
continue
if exp_dt <= now:
to_drop.append(ticker)
for t in to_drop:
del self.signals[t]
return len(to_drop)
state = PollState()
```
- [ ] **Step 4: Run test to verify it passes**
```
python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v
```
Expected: 4 PASS.
- [ ] **Step 5: Verify full suite still passes**
```
python -m pytest ai_trade/tests -q
```
Expected: 기존 test 전부 PASS (state.signals dict 인터페이스 그대로).
- [ ] **Step 6: Commit**
```bash
git add ai_trade/state.py ai_trade/tests/test_state_signals_lifecycle.py
git commit -m "feat(ai_trade): state.signals에 expires_at + cycle_id lifecycle 추가 (F5 part 1)"
```
---
## Task 2: config에 SIGNAL_TTL_SECONDS 추가
**Files:**
- Modify: `ai_trade/config.py`
- Test: `ai_trade/tests/test_state_signals_lifecycle.py` (append)
- [ ] **Step 1: Write failing test**
`test_state_signals_lifecycle.py` 끝에 추가:
```python
def test_signal_ttl_seconds_default(monkeypatch):
monkeypatch.delenv("SIGNAL_TTL_SECONDS", raising=False)
from ai_trade.config import Settings
s = Settings()
assert s.signal_ttl_seconds == 300
def test_signal_ttl_seconds_env_override(monkeypatch):
monkeypatch.setenv("SIGNAL_TTL_SECONDS", "60")
from ai_trade.config import Settings
s = Settings()
assert s.signal_ttl_seconds == 60
```
- [ ] **Step 2: Run test to fail**
```
python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v -k signal_ttl
```
Expected: AttributeError.
- [ ] **Step 3: Add field to config.py**
`Settings` 클래스 안에 추가 (다른 *_threshold 옆):
```python
signal_ttl_seconds: int = field(
default_factory=lambda: int(os.getenv("SIGNAL_TTL_SECONDS", "300"))
)
```
- [ ] **Step 4: Run test**
```
python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v -k signal_ttl
```
Expected: 2 PASS.
- [ ] **Step 5: Commit**
```bash
git add ai_trade/config.py ai_trade/tests/test_state_signals_lifecycle.py
git commit -m "feat(ai_trade): SIGNAL_TTL_SECONDS env 추가 (F5 part 2)"
```
---
## Task 3: signal_generator에 cycle_id + expires_at 부착
**Files:**
- Modify: `ai_trade/signal_generator.py`
- Test: `ai_trade/tests/test_signal_generator.py` (append)
- [ ] **Step 1: Write failing tests**
기존 `test_signal_generator.py` 끝에 추가:
```python
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo as _ZI
_KST_TEST = _ZI("Asia/Seoul")
def test_emit_attaches_cycle_id_and_expires_at(
state_with_buy_setup, dedup_clean, settings_default,
):
"""매 emit 시 cycle_id + expires_at 부착 (F5)."""
from ai_trade.signal_generator import generate_signals
before = datetime.now(_KST_TEST)
generate_signals(state_with_buy_setup, dedup_clean, settings_default)
after = datetime.now(_KST_TEST)
sig = state_with_buy_setup.signals["005930"]
assert sig["cycle_id"] == 1
assert "expires_at" in sig
exp_dt = datetime.fromisoformat(sig["expires_at"])
# as_of + 300s (default) — tolerance 5s
assert before + timedelta(seconds=295) < exp_dt < after + timedelta(seconds=305)
def test_cycle_id_increments_each_call(
state_with_buy_setup, dedup_clean, settings_default,
):
"""generate_signals 호출마다 cycle_id += 1."""
from ai_trade.signal_generator import generate_signals
generate_signals(state_with_buy_setup, dedup_clean, settings_default)
assert state_with_buy_setup.signal_cycle_id == 1
# 2번째 호출 — dedup이 막아도 cycle_id는 증가해야 함
generate_signals(state_with_buy_setup, dedup_clean, settings_default)
assert state_with_buy_setup.signal_cycle_id == 2
```
**NOTE:** 기존 test_signal_generator.py에 `state_with_buy_setup` / `dedup_clean` / `settings_default` 같은 fixture가 있을 것. 만약 이름이 다르면 실제 fixture 이름에 맞춰 조정. 검증: `grep -n "@pytest.fixture" ai_trade/tests/test_signal_generator.py`.
- [ ] **Step 2: Run tests to verify they fail**
```
python -m pytest ai_trade/tests/test_signal_generator.py -v -k "cycle_id or expires"
```
Expected: KeyError 또는 AttributeError.
- [ ] **Step 3: Modify signal_generator.py**
`generate_signals` 함수 (L22-25)를 변경:
```python
def generate_signals(state, dedup, settings) -> None:
"""Phase 4 entry — state-mutating. F5: cycle_id += 1 + expires_at 부착."""
state.signal_cycle_id += 1
_evaluate_sell_signals(state, dedup, settings)
_evaluate_buy_signals(state, dedup, settings)
```
`_build_buy_signal` (L99-111)에 두 필드 추가:
```python
def _build_buy_signal(state, ticker: str, name: str, rank: int | None, confidence: float) -> dict:
ap = state.asking_price[ticker]
as_of_dt = datetime.now(KST)
expires_at = (as_of_dt + timedelta(seconds=getattr(_current_settings(), "signal_ttl_seconds", 300))).isoformat()
return {
"ticker": ticker,
"name": name,
"action": "buy",
"confidence_webai": confidence,
"current_price": ap["current_price"],
"avg_price": None,
"pnl_pct": None,
"context": _build_context(state, ticker, rank),
"as_of": as_of_dt.isoformat(),
"cycle_id": state.signal_cycle_id,
"expires_at": expires_at,
}
```
같이 `_build_sell_signal` (L174-186):
```python
def _build_sell_signal(state, holding: dict, confidence: float, reason: str, settings=None) -> dict:
ticker = holding["ticker"]
as_of_dt = datetime.now(KST)
ttl = getattr(settings, "signal_ttl_seconds", 300) if settings else 300
expires_at = (as_of_dt + timedelta(seconds=ttl)).isoformat()
return {
"ticker": ticker,
"name": holding.get("name", ticker),
"action": "sell",
"confidence_webai": confidence,
"current_price": holding.get("current_price"),
"avg_price": holding.get("avg_price"),
"pnl_pct": holding.get("pnl_pct"),
"context": _build_context(state, ticker, rank=None, sell_reason=reason),
"as_of": as_of_dt.isoformat(),
"cycle_id": state.signal_cycle_id,
"expires_at": expires_at,
}
```
`_build_buy_signal`이 settings를 안 받고 있으니, 호출부도 갱신해야 함. 현실적으로 두 함수에 `settings` 인자를 추가하는 것이 깔끔. 변경:
```python
def _evaluate_buy_signals(state, dedup, settings) -> None:
candidates = _buy_candidates(state)
for ticker, name, rank in candidates:
existing = state.signals.get(ticker)
if existing is not None and existing.get("action") == "sell":
logger.debug("buy %s skipped: same-cycle sell precedence", ticker)
continue
if not _check_buy_hard_gate(state, ticker, settings):
logger.debug("buy %s skipped: hard gate failed", ticker)
continue
confidence = _compute_buy_confidence(state, ticker, rank)
if confidence <= settings.confidence_threshold:
logger.debug("buy %s skipped: confidence %.3f <= %.3f",
ticker, confidence, settings.confidence_threshold)
continue
if dedup.is_recent(ticker, "buy", within_hours=24):
logger.debug("buy %s skipped: dedup 24h", ticker)
continue
state.signals[ticker] = _build_buy_signal(state, ticker, name, rank, confidence, settings)
dedup.record(ticker, "buy", confidence=confidence)
logger.info("signal emit %s buy conf=%.3f rank=%s cycle=%d",
ticker, confidence, rank, state.signal_cycle_id)
def _build_buy_signal(state, ticker, name, rank, confidence, settings) -> dict:
ap = state.asking_price[ticker]
as_of_dt = datetime.now(KST)
ttl = settings.signal_ttl_seconds
expires_at = (as_of_dt + timedelta(seconds=ttl)).isoformat()
return {
"ticker": ticker,
"name": name,
"action": "buy",
"confidence_webai": confidence,
"current_price": ap["current_price"],
"avg_price": None,
"pnl_pct": None,
"context": _build_context(state, ticker, rank),
"as_of": as_of_dt.isoformat(),
"cycle_id": state.signal_cycle_id,
"expires_at": expires_at,
}
```
매도 측도 마찬가지로 `settings`를 통과시킴. `_try_stop_loss` 등은 이미 `settings`를 받으므로 `_build_sell_signal(..., settings=settings)` 로 호출.
import 추가 (signal_generator.py 상단):
```python
from datetime import datetime, timedelta
```
(기존 import에 `timedelta` 만 추가)
`_current_settings()` 같은 헬퍼는 만들지 않음 — settings를 명시적으로 전달.
- [ ] **Step 4: Run tests**
```
python -m pytest ai_trade/tests/test_signal_generator.py -v
```
Expected: 신규 2개 PASS, 기존 PASS.
- [ ] **Step 5: Commit**
```bash
git add ai_trade/signal_generator.py ai_trade/tests/test_signal_generator.py
git commit -m "feat(ai_trade): emit signal에 cycle_id + expires_at 부착 (F5 part 3)"
```
---
## Task 4: pull_worker가 cycle 끝에 purge 호출
**Files:**
- Modify: `ai_trade/pull_worker.py`
- Test: `ai_trade/tests/test_pull_worker.py` (append)
- [ ] **Step 1: Write failing test**
`test_pull_worker.py` 끝에 추가:
```python
async def test_poll_loop_purges_expired_signals(monkeypatch):
"""매 cycle 끝에 expired signal이 제거됨 (F5)."""
from ai_trade import pull_worker
from ai_trade.state import PollState
from datetime import datetime as _dt
from zoneinfo import ZoneInfo as _ZI
from unittest.mock import AsyncMock, MagicMock
import asyncio as _asyncio
_kst = _ZI("Asia/Seoul")
now = _dt(2026, 5, 18, 10, 0, tzinfo=_kst)
class FrozenDT:
@staticmethod
def now(tz=None): return now
state = PollState()
state.signals = {
"OLD": {"ticker": "OLD", "expires_at": _dt(2026, 5, 18, 9, 0, tzinfo=_kst).isoformat(), "cycle_id": 1},
"FRESH": {"ticker": "FRESH", "expires_at": _dt(2026, 5, 18, 10, 30, tzinfo=_kst).isoformat(), "cycle_id": 1},
}
monkeypatch.setattr(pull_worker, "datetime", FrozenDT)
monkeypatch.setattr(pull_worker, "_is_market_day", lambda n: True)
monkeypatch.setattr(pull_worker, "_is_polling_window", lambda n: True)
monkeypatch.setattr(pull_worker, "_next_interval", lambda n: 0.01)
monkeypatch.setattr(pull_worker, "_run_polling_cycle", AsyncMock())
monkeypatch.setattr(pull_worker, "update_minute_momentum_for_all", lambda s: None)
monkeypatch.setattr(pull_worker, "_is_post_close_trigger", lambda *a, **k: False)
shutdown = _asyncio.Event()
async def stop_soon():
await _asyncio.sleep(0.05)
shutdown.set()
_asyncio.create_task(stop_soon())
await pull_worker.poll_loop(
client=MagicMock(), state=state, shutdown=shutdown,
kis_client=MagicMock(), chronos=MagicMock(),
dedup=None, settings=None,
)
assert "OLD" not in state.signals
assert "FRESH" in state.signals
```
- [ ] **Step 2: Run test to fail**
```
python -m pytest ai_trade/tests/test_pull_worker.py::test_poll_loop_purges_expired_signals -v
```
Expected: FAIL — OLD가 남아있음.
- [ ] **Step 3: Add purge call in poll_loop**
`ai_trade/pull_worker.py` `poll_loop` 안, signals 생성 이후 (또는 cycle 끝 직전) 한 줄 추가:
```python
# Phase 4: generate signals
if dedup is not None and settings is not None:
try:
from ai_trade.signal_generator import generate_signals
generate_signals(state, dedup, settings)
except Exception:
logger.exception("generate_signals failed")
# F5: 만료된 signal purge (consumer 미사용 케이스 보호)
try:
state.purge_expired_signals(datetime.now(KST))
except Exception:
logger.exception("purge_expired_signals failed")
```
- [ ] **Step 4: Run test**
```
python -m pytest ai_trade/tests/test_pull_worker.py::test_poll_loop_purges_expired_signals -v
```
Expected: PASS.
- [ ] **Step 5: Run full suite**
```
python -m pytest ai_trade/tests -q
```
Expected: 모두 PASS.
- [ ] **Step 6: Commit**
```bash
git add ai_trade/pull_worker.py ai_trade/tests/test_pull_worker.py
git commit -m "feat(ai_trade): poll_loop가 매 cycle 끝에 expired signal purge (F5 part 4)"
```
---
## Task 5: 전체 회귀 + push
- [ ] **Step 1: Final pytest**
```
python -m pytest ai_trade/tests -v
```
Expected: 모두 PASS (총 신규 약 9개 + 기존 56개).
- [ ] **Step 2: Push**
```bash
git push origin main
```
---
## Self-Review
1. **expires_at + cycle_id 부착**: `_build_buy_signal`, `_build_sell_signal` 양쪽 ✅
2. **cycle_id 증가**: `generate_signals` 진입에서 단 1회 ✅
3. **purge**: poll_loop cycle 마지막에 1회 호출 ✅
4. **get_active_signals**: Phase 5 consumer가 호출할 read API 존재 ✅
5. **legacy 신호 호환**: `expires_at` 없는 신호는 expired 취급 → 안전 ✅
**미커버 항목 (의도적)**:
- Phase 5 consumer가 처리 후 explicit drain하는 API는 이 plan에서 안 다룸 (consumer가 read-only로도 충분 — expires_at + dedup으로 idempotent).
- agent-office `/signal` HTTP endpoint는 Phase 5 plan 영역.
---
## Execution Handoff
**Plan complete and saved to `docs/superpowers/plans/2026-05-25-state-signals-lifecycle.md`.**
**1. Subagent-Driven (recommended)** — Task 별 fresh subagent.
**2. Inline Execution** — 현 세션 실행.
박재오 결정 대기. Plan 1 (hotfix) 마친 뒤 진입 권장.