20 KiB
state.signals Lifecycle — Code Review F5 Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: state.signals 가 무한 dict 누적되는 문제를 해결. expires_at + cycle_id 부착해서 Phase 5 consumer (agent-office /signal) 가 stale 신호를 안전하게 무시할 수 있게.
Architecture:
Signaldict에expires_at: ISO str,cycle_id: int필드 추가.PollState.signal_cycle_id: int(process 단위 auto-increment).generate_signals(state, dedup, settings)진입마다cycle_id += 1.- emit하는 모든 signal에
expires_at = as_of + SIGNAL_TTL_SECONDS,cycle_id = state.signal_cycle_id부착. state.purge_expired_signals(now)helper — 매 cycle 끝에 호출하여 만료된 항목 제거.state.get_active_signals(now) → list[dict]— Phase 5 consumer가 호출할 read API. 만료된 것 제외.
Tech Stack: Python 3.12, asyncio, pytest. 기존 cycle 흐름과 호환되도록 generate_signals 인터페이스는 그대로.
Why expires_at + cycle_id (not pop-on-read): consumer가 polling 실패해도 신호 손실 없음. cycle_id로 "이번 cycle에 새로 emit된 신호" 식별 가능 → Phase 5에서 incremental fetch 가능.
Working directory: C:\Users\jaeoh\Desktop\workspace\web-ai.
Test runner: python -m pytest ai_trade/tests -q (또는 py -3.12 -m). 환경 부재 시 plan 진행 중단.
File Map
| 파일 | 변경 | 책임 |
|---|---|---|
ai_trade/config.py |
Add 1 field | signal_ttl_seconds: int (default 300) |
ai_trade/state.py |
Modify | signal_cycle_id: int, helper 2개 (get_active_signals, purge_expired_signals) |
ai_trade/signal_generator.py |
Modify L22-50, 133, 99-111, 174-186 | cycle_id 증가 + expires_at/cycle_id 부착 |
ai_trade/pull_worker.py |
Modify L46-51 근처 | cycle 끝에 purge 호출 |
ai_trade/tests/test_state_signals_lifecycle.py |
Create | 5 test (expires, cycle_id, purge, active list) |
ai_trade/tests/test_signal_generator.py |
Modify | 기존 emit test에 expires_at/cycle_id 필드 검증 추가 |
Task 1: PollState에 cycle_id + lifecycle helper 추가
Files:
-
Modify:
ai_trade/state.py -
Test:
ai_trade/tests/test_state_signals_lifecycle.py(Create) -
Step 1: Write the failing test
# ai_trade/tests/test_state_signals_lifecycle.py
"""F5 — state.signals lifecycle (expires_at + cycle_id)."""
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
import pytest
from ai_trade.state import PollState
KST = ZoneInfo("Asia/Seoul")
def test_initial_signal_cycle_id_is_zero():
state = PollState()
assert state.signal_cycle_id == 0
def test_get_active_signals_excludes_expired():
state = PollState()
now = datetime(2026, 5, 25, 10, 0, tzinfo=KST)
future = (now + timedelta(seconds=300)).isoformat()
past = (now - timedelta(seconds=60)).isoformat()
state.signals = {
"A": {"ticker": "A", "expires_at": future, "cycle_id": 1, "action": "buy"},
"B": {"ticker": "B", "expires_at": past, "cycle_id": 1, "action": "buy"},
}
active = state.get_active_signals(now)
tickers = [s["ticker"] for s in active]
assert "A" in tickers
assert "B" not in tickers
def test_get_active_signals_treats_missing_expires_as_expired():
"""expires_at 없는 legacy 신호는 expired로 간주."""
state = PollState()
now = datetime(2026, 5, 25, 10, 0, tzinfo=KST)
state.signals = {"C": {"ticker": "C", "action": "buy"}}
assert state.get_active_signals(now) == []
def test_purge_expired_signals_removes_expired():
state = PollState()
now = datetime(2026, 5, 25, 10, 0, tzinfo=KST)
future = (now + timedelta(seconds=300)).isoformat()
past = (now - timedelta(seconds=60)).isoformat()
state.signals = {
"A": {"ticker": "A", "expires_at": future, "cycle_id": 1},
"B": {"ticker": "B", "expires_at": past, "cycle_id": 1},
}
state.purge_expired_signals(now)
assert "A" in state.signals
assert "B" not in state.signals
- Step 2: Run test to verify it fails
python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v
Expected: AttributeError: signal_cycle_id 또는 get_active_signals 미구현.
- Step 3: Modify state.py
"""PollState — process-wide singleton."""
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class PollState:
portfolio: dict | None = None
news_sentiment: dict | None = None
screener_preview: dict | None = None
minute_bars: dict[str, deque] = field(default_factory=dict)
asking_price: dict[str, dict] = field(default_factory=dict)
# Phase 3b additions
daily_ohlcv: dict[str, list[dict]] = field(default_factory=dict)
chronos_predictions: dict[str, dict] = field(default_factory=dict)
minute_momentum: dict[str, str] = field(default_factory=dict)
signals: dict[str, dict] = field(default_factory=dict)
# F5 lifecycle
signal_cycle_id: int = 0
last_updated: dict[str, str] = field(default_factory=dict)
fetch_errors: dict[str, int] = field(default_factory=dict)
def get_active_signals(self, now: datetime) -> list[dict]:
"""expires_at > now 인 신호만 반환. expires_at 없으면 expired 취급."""
active: list[dict] = []
for sig in self.signals.values():
expires_at = sig.get("expires_at")
if not expires_at:
continue
try:
exp_dt = datetime.fromisoformat(expires_at)
except ValueError:
continue
if exp_dt > now:
active.append(sig)
return active
def purge_expired_signals(self, now: datetime) -> int:
"""만료된 signal 제거. 제거된 개수 반환."""
to_drop = []
for ticker, sig in self.signals.items():
expires_at = sig.get("expires_at")
if not expires_at:
to_drop.append(ticker)
continue
try:
exp_dt = datetime.fromisoformat(expires_at)
except ValueError:
to_drop.append(ticker)
continue
if exp_dt <= now:
to_drop.append(ticker)
for t in to_drop:
del self.signals[t]
return len(to_drop)
state = PollState()
- Step 4: Run test to verify it passes
python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v
Expected: 4 PASS.
- Step 5: Verify full suite still passes
python -m pytest ai_trade/tests -q
Expected: 기존 test 전부 PASS (state.signals dict 인터페이스 그대로).
- Step 6: Commit
git add ai_trade/state.py ai_trade/tests/test_state_signals_lifecycle.py
git commit -m "feat(ai_trade): state.signals에 expires_at + cycle_id lifecycle 추가 (F5 part 1)"
Task 2: config에 SIGNAL_TTL_SECONDS 추가
Files:
-
Modify:
ai_trade/config.py -
Test:
ai_trade/tests/test_state_signals_lifecycle.py(append) -
Step 1: Write failing test
test_state_signals_lifecycle.py 끝에 추가:
def test_signal_ttl_seconds_default(monkeypatch):
monkeypatch.delenv("SIGNAL_TTL_SECONDS", raising=False)
from ai_trade.config import Settings
s = Settings()
assert s.signal_ttl_seconds == 300
def test_signal_ttl_seconds_env_override(monkeypatch):
monkeypatch.setenv("SIGNAL_TTL_SECONDS", "60")
from ai_trade.config import Settings
s = Settings()
assert s.signal_ttl_seconds == 60
- Step 2: Run test to fail
python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v -k signal_ttl
Expected: AttributeError.
- Step 3: Add field to config.py
Settings 클래스 안에 추가 (다른 *_threshold 옆):
signal_ttl_seconds: int = field(
default_factory=lambda: int(os.getenv("SIGNAL_TTL_SECONDS", "300"))
)
- Step 4: Run test
python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v -k signal_ttl
Expected: 2 PASS.
- Step 5: Commit
git add ai_trade/config.py ai_trade/tests/test_state_signals_lifecycle.py
git commit -m "feat(ai_trade): SIGNAL_TTL_SECONDS env 추가 (F5 part 2)"
Task 3: signal_generator에 cycle_id + expires_at 부착
Files:
-
Modify:
ai_trade/signal_generator.py -
Test:
ai_trade/tests/test_signal_generator.py(append) -
Step 1: Write failing tests
기존 test_signal_generator.py 끝에 추가:
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo as _ZI
_KST_TEST = _ZI("Asia/Seoul")
def test_emit_attaches_cycle_id_and_expires_at(
state_with_buy_setup, dedup_clean, settings_default,
):
"""매 emit 시 cycle_id + expires_at 부착 (F5)."""
from ai_trade.signal_generator import generate_signals
before = datetime.now(_KST_TEST)
generate_signals(state_with_buy_setup, dedup_clean, settings_default)
after = datetime.now(_KST_TEST)
sig = state_with_buy_setup.signals["005930"]
assert sig["cycle_id"] == 1
assert "expires_at" in sig
exp_dt = datetime.fromisoformat(sig["expires_at"])
# as_of + 300s (default) — tolerance 5s
assert before + timedelta(seconds=295) < exp_dt < after + timedelta(seconds=305)
def test_cycle_id_increments_each_call(
state_with_buy_setup, dedup_clean, settings_default,
):
"""generate_signals 호출마다 cycle_id += 1."""
from ai_trade.signal_generator import generate_signals
generate_signals(state_with_buy_setup, dedup_clean, settings_default)
assert state_with_buy_setup.signal_cycle_id == 1
# 2번째 호출 — dedup이 막아도 cycle_id는 증가해야 함
generate_signals(state_with_buy_setup, dedup_clean, settings_default)
assert state_with_buy_setup.signal_cycle_id == 2
NOTE: 기존 test_signal_generator.py에 state_with_buy_setup / dedup_clean / settings_default 같은 fixture가 있을 것. 만약 이름이 다르면 실제 fixture 이름에 맞춰 조정. 검증: grep -n "@pytest.fixture" ai_trade/tests/test_signal_generator.py.
- Step 2: Run tests to verify they fail
python -m pytest ai_trade/tests/test_signal_generator.py -v -k "cycle_id or expires"
Expected: KeyError 또는 AttributeError.
- Step 3: Modify signal_generator.py
generate_signals 함수 (L22-25)를 변경:
def generate_signals(state, dedup, settings) -> None:
"""Phase 4 entry — state-mutating. F5: cycle_id += 1 + expires_at 부착."""
state.signal_cycle_id += 1
_evaluate_sell_signals(state, dedup, settings)
_evaluate_buy_signals(state, dedup, settings)
_build_buy_signal (L99-111)에 두 필드 추가:
def _build_buy_signal(state, ticker: str, name: str, rank: int | None, confidence: float) -> dict:
ap = state.asking_price[ticker]
as_of_dt = datetime.now(KST)
expires_at = (as_of_dt + timedelta(seconds=getattr(_current_settings(), "signal_ttl_seconds", 300))).isoformat()
return {
"ticker": ticker,
"name": name,
"action": "buy",
"confidence_webai": confidence,
"current_price": ap["current_price"],
"avg_price": None,
"pnl_pct": None,
"context": _build_context(state, ticker, rank),
"as_of": as_of_dt.isoformat(),
"cycle_id": state.signal_cycle_id,
"expires_at": expires_at,
}
같이 _build_sell_signal (L174-186):
def _build_sell_signal(state, holding: dict, confidence: float, reason: str, settings=None) -> dict:
ticker = holding["ticker"]
as_of_dt = datetime.now(KST)
ttl = getattr(settings, "signal_ttl_seconds", 300) if settings else 300
expires_at = (as_of_dt + timedelta(seconds=ttl)).isoformat()
return {
"ticker": ticker,
"name": holding.get("name", ticker),
"action": "sell",
"confidence_webai": confidence,
"current_price": holding.get("current_price"),
"avg_price": holding.get("avg_price"),
"pnl_pct": holding.get("pnl_pct"),
"context": _build_context(state, ticker, rank=None, sell_reason=reason),
"as_of": as_of_dt.isoformat(),
"cycle_id": state.signal_cycle_id,
"expires_at": expires_at,
}
_build_buy_signal이 settings를 안 받고 있으니, 호출부도 갱신해야 함. 현실적으로 두 함수에 settings 인자를 추가하는 것이 깔끔. 변경:
def _evaluate_buy_signals(state, dedup, settings) -> None:
candidates = _buy_candidates(state)
for ticker, name, rank in candidates:
existing = state.signals.get(ticker)
if existing is not None and existing.get("action") == "sell":
logger.debug("buy %s skipped: same-cycle sell precedence", ticker)
continue
if not _check_buy_hard_gate(state, ticker, settings):
logger.debug("buy %s skipped: hard gate failed", ticker)
continue
confidence = _compute_buy_confidence(state, ticker, rank)
if confidence <= settings.confidence_threshold:
logger.debug("buy %s skipped: confidence %.3f <= %.3f",
ticker, confidence, settings.confidence_threshold)
continue
if dedup.is_recent(ticker, "buy", within_hours=24):
logger.debug("buy %s skipped: dedup 24h", ticker)
continue
state.signals[ticker] = _build_buy_signal(state, ticker, name, rank, confidence, settings)
dedup.record(ticker, "buy", confidence=confidence)
logger.info("signal emit %s buy conf=%.3f rank=%s cycle=%d",
ticker, confidence, rank, state.signal_cycle_id)
def _build_buy_signal(state, ticker, name, rank, confidence, settings) -> dict:
ap = state.asking_price[ticker]
as_of_dt = datetime.now(KST)
ttl = settings.signal_ttl_seconds
expires_at = (as_of_dt + timedelta(seconds=ttl)).isoformat()
return {
"ticker": ticker,
"name": name,
"action": "buy",
"confidence_webai": confidence,
"current_price": ap["current_price"],
"avg_price": None,
"pnl_pct": None,
"context": _build_context(state, ticker, rank),
"as_of": as_of_dt.isoformat(),
"cycle_id": state.signal_cycle_id,
"expires_at": expires_at,
}
매도 측도 마찬가지로 settings를 통과시킴. _try_stop_loss 등은 이미 settings를 받으므로 _build_sell_signal(..., settings=settings) 로 호출.
import 추가 (signal_generator.py 상단):
from datetime import datetime, timedelta
(기존 import에 timedelta 만 추가)
_current_settings() 같은 헬퍼는 만들지 않음 — settings를 명시적으로 전달.
- Step 4: Run tests
python -m pytest ai_trade/tests/test_signal_generator.py -v
Expected: 신규 2개 PASS, 기존 PASS.
- Step 5: Commit
git add ai_trade/signal_generator.py ai_trade/tests/test_signal_generator.py
git commit -m "feat(ai_trade): emit signal에 cycle_id + expires_at 부착 (F5 part 3)"
Task 4: pull_worker가 cycle 끝에 purge 호출
Files:
-
Modify:
ai_trade/pull_worker.py -
Test:
ai_trade/tests/test_pull_worker.py(append) -
Step 1: Write failing test
test_pull_worker.py 끝에 추가:
async def test_poll_loop_purges_expired_signals(monkeypatch):
"""매 cycle 끝에 expired signal이 제거됨 (F5)."""
from ai_trade import pull_worker
from ai_trade.state import PollState
from datetime import datetime as _dt
from zoneinfo import ZoneInfo as _ZI
from unittest.mock import AsyncMock, MagicMock
import asyncio as _asyncio
_kst = _ZI("Asia/Seoul")
now = _dt(2026, 5, 18, 10, 0, tzinfo=_kst)
class FrozenDT:
@staticmethod
def now(tz=None): return now
state = PollState()
state.signals = {
"OLD": {"ticker": "OLD", "expires_at": _dt(2026, 5, 18, 9, 0, tzinfo=_kst).isoformat(), "cycle_id": 1},
"FRESH": {"ticker": "FRESH", "expires_at": _dt(2026, 5, 18, 10, 30, tzinfo=_kst).isoformat(), "cycle_id": 1},
}
monkeypatch.setattr(pull_worker, "datetime", FrozenDT)
monkeypatch.setattr(pull_worker, "_is_market_day", lambda n: True)
monkeypatch.setattr(pull_worker, "_is_polling_window", lambda n: True)
monkeypatch.setattr(pull_worker, "_next_interval", lambda n: 0.01)
monkeypatch.setattr(pull_worker, "_run_polling_cycle", AsyncMock())
monkeypatch.setattr(pull_worker, "update_minute_momentum_for_all", lambda s: None)
monkeypatch.setattr(pull_worker, "_is_post_close_trigger", lambda *a, **k: False)
shutdown = _asyncio.Event()
async def stop_soon():
await _asyncio.sleep(0.05)
shutdown.set()
_asyncio.create_task(stop_soon())
await pull_worker.poll_loop(
client=MagicMock(), state=state, shutdown=shutdown,
kis_client=MagicMock(), chronos=MagicMock(),
dedup=None, settings=None,
)
assert "OLD" not in state.signals
assert "FRESH" in state.signals
- Step 2: Run test to fail
python -m pytest ai_trade/tests/test_pull_worker.py::test_poll_loop_purges_expired_signals -v
Expected: FAIL — OLD가 남아있음.
- Step 3: Add purge call in poll_loop
ai_trade/pull_worker.py poll_loop 안, signals 생성 이후 (또는 cycle 끝 직전) 한 줄 추가:
# Phase 4: generate signals
if dedup is not None and settings is not None:
try:
from ai_trade.signal_generator import generate_signals
generate_signals(state, dedup, settings)
except Exception:
logger.exception("generate_signals failed")
# F5: 만료된 signal purge (consumer 미사용 케이스 보호)
try:
state.purge_expired_signals(datetime.now(KST))
except Exception:
logger.exception("purge_expired_signals failed")
- Step 4: Run test
python -m pytest ai_trade/tests/test_pull_worker.py::test_poll_loop_purges_expired_signals -v
Expected: PASS.
- Step 5: Run full suite
python -m pytest ai_trade/tests -q
Expected: 모두 PASS.
- Step 6: Commit
git add ai_trade/pull_worker.py ai_trade/tests/test_pull_worker.py
git commit -m "feat(ai_trade): poll_loop가 매 cycle 끝에 expired signal purge (F5 part 4)"
Task 5: 전체 회귀 + push
- Step 1: Final pytest
python -m pytest ai_trade/tests -v
Expected: 모두 PASS (총 신규 약 9개 + 기존 56개).
- Step 2: Push
git push origin main
Self-Review
- expires_at + cycle_id 부착:
_build_buy_signal,_build_sell_signal양쪽 ✅ - cycle_id 증가:
generate_signals진입에서 단 1회 ✅ - purge: poll_loop cycle 마지막에 1회 호출 ✅
- get_active_signals: Phase 5 consumer가 호출할 read API 존재 ✅
- legacy 신호 호환:
expires_at없는 신호는 expired 취급 → 안전 ✅
미커버 항목 (의도적):
- Phase 5 consumer가 처리 후 explicit drain하는 API는 이 plan에서 안 다룸 (consumer가 read-only로도 충분 — expires_at + dedup으로 idempotent).
- agent-office
/signalHTTP endpoint는 Phase 5 plan 영역.
Execution Handoff
Plan complete and saved to docs/superpowers/plans/2026-05-25-state-signals-lifecycle.md.
1. Subagent-Driven (recommended) — Task 별 fresh subagent. 2. Inline Execution — 현 세션 실행.
박재오 결정 대기. Plan 1 (hotfix) 마친 뒤 진입 권장.