Files
web-page/docs/superpowers/plans/2026-05-25-state-signals-lifecycle.md

20 KiB

state.signals Lifecycle — Code Review F5 Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: state.signals 가 무한 dict 누적되는 문제를 해결. expires_at + cycle_id 부착해서 Phase 5 consumer (agent-office /signal) 가 stale 신호를 안전하게 무시할 수 있게.

Architecture:

  1. Signal dict에 expires_at: ISO str, cycle_id: int 필드 추가.
  2. PollState.signal_cycle_id: int (process 단위 auto-increment).
  3. generate_signals(state, dedup, settings) 진입마다 cycle_id += 1.
  4. emit하는 모든 signal에 expires_at = as_of + SIGNAL_TTL_SECONDS, cycle_id = state.signal_cycle_id 부착.
  5. state.purge_expired_signals(now) helper — 매 cycle 끝에 호출하여 만료된 항목 제거.
  6. state.get_active_signals(now) → list[dict] — Phase 5 consumer가 호출할 read API. 만료된 것 제외.

Tech Stack: Python 3.12, asyncio, pytest. 기존 cycle 흐름과 호환되도록 generate_signals 인터페이스는 그대로.

Why expires_at + cycle_id (not pop-on-read): consumer가 polling 실패해도 신호 손실 없음. cycle_id로 "이번 cycle에 새로 emit된 신호" 식별 가능 → Phase 5에서 incremental fetch 가능.

Working directory: C:\Users\jaeoh\Desktop\workspace\web-ai.

Test runner: python -m pytest ai_trade/tests -q (또는 py -3.12 -m). 환경 부재 시 plan 진행 중단.


File Map

파일 변경 책임
ai_trade/config.py Add 1 field signal_ttl_seconds: int (default 300)
ai_trade/state.py Modify signal_cycle_id: int, helper 2개 (get_active_signals, purge_expired_signals)
ai_trade/signal_generator.py Modify L22-50, 133, 99-111, 174-186 cycle_id 증가 + expires_at/cycle_id 부착
ai_trade/pull_worker.py Modify L46-51 근처 cycle 끝에 purge 호출
ai_trade/tests/test_state_signals_lifecycle.py Create 5 test (expires, cycle_id, purge, active list)
ai_trade/tests/test_signal_generator.py Modify 기존 emit test에 expires_at/cycle_id 필드 검증 추가

Task 1: PollState에 cycle_id + lifecycle helper 추가

Files:

  • Modify: ai_trade/state.py

  • Test: ai_trade/tests/test_state_signals_lifecycle.py (Create)

  • Step 1: Write the failing test

# ai_trade/tests/test_state_signals_lifecycle.py
"""F5 — state.signals lifecycle (expires_at + cycle_id)."""
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo

import pytest

from ai_trade.state import PollState

KST = ZoneInfo("Asia/Seoul")


def test_initial_signal_cycle_id_is_zero():
    state = PollState()
    assert state.signal_cycle_id == 0


def test_get_active_signals_excludes_expired():
    state = PollState()
    now = datetime(2026, 5, 25, 10, 0, tzinfo=KST)
    future = (now + timedelta(seconds=300)).isoformat()
    past = (now - timedelta(seconds=60)).isoformat()
    state.signals = {
        "A": {"ticker": "A", "expires_at": future, "cycle_id": 1, "action": "buy"},
        "B": {"ticker": "B", "expires_at": past,   "cycle_id": 1, "action": "buy"},
    }
    active = state.get_active_signals(now)
    tickers = [s["ticker"] for s in active]
    assert "A" in tickers
    assert "B" not in tickers


def test_get_active_signals_treats_missing_expires_as_expired():
    """expires_at 없는 legacy 신호는 expired로 간주."""
    state = PollState()
    now = datetime(2026, 5, 25, 10, 0, tzinfo=KST)
    state.signals = {"C": {"ticker": "C", "action": "buy"}}
    assert state.get_active_signals(now) == []


def test_purge_expired_signals_removes_expired():
    state = PollState()
    now = datetime(2026, 5, 25, 10, 0, tzinfo=KST)
    future = (now + timedelta(seconds=300)).isoformat()
    past = (now - timedelta(seconds=60)).isoformat()
    state.signals = {
        "A": {"ticker": "A", "expires_at": future, "cycle_id": 1},
        "B": {"ticker": "B", "expires_at": past, "cycle_id": 1},
    }
    state.purge_expired_signals(now)
    assert "A" in state.signals
    assert "B" not in state.signals
  • Step 2: Run test to verify it fails
python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v

Expected: AttributeError: signal_cycle_id 또는 get_active_signals 미구현.

  • Step 3: Modify state.py
"""PollState — process-wide singleton."""
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime


@dataclass
class PollState:
    portfolio: dict | None = None
    news_sentiment: dict | None = None
    screener_preview: dict | None = None
    minute_bars: dict[str, deque] = field(default_factory=dict)
    asking_price: dict[str, dict] = field(default_factory=dict)
    # Phase 3b additions
    daily_ohlcv: dict[str, list[dict]] = field(default_factory=dict)
    chronos_predictions: dict[str, dict] = field(default_factory=dict)
    minute_momentum: dict[str, str] = field(default_factory=dict)
    signals: dict[str, dict] = field(default_factory=dict)
    # F5 lifecycle
    signal_cycle_id: int = 0
    last_updated: dict[str, str] = field(default_factory=dict)
    fetch_errors: dict[str, int] = field(default_factory=dict)

    def get_active_signals(self, now: datetime) -> list[dict]:
        """expires_at > now 인 신호만 반환. expires_at 없으면 expired 취급."""
        active: list[dict] = []
        for sig in self.signals.values():
            expires_at = sig.get("expires_at")
            if not expires_at:
                continue
            try:
                exp_dt = datetime.fromisoformat(expires_at)
            except ValueError:
                continue
            if exp_dt > now:
                active.append(sig)
        return active

    def purge_expired_signals(self, now: datetime) -> int:
        """만료된 signal 제거. 제거된 개수 반환."""
        to_drop = []
        for ticker, sig in self.signals.items():
            expires_at = sig.get("expires_at")
            if not expires_at:
                to_drop.append(ticker)
                continue
            try:
                exp_dt = datetime.fromisoformat(expires_at)
            except ValueError:
                to_drop.append(ticker)
                continue
            if exp_dt <= now:
                to_drop.append(ticker)
        for t in to_drop:
            del self.signals[t]
        return len(to_drop)


state = PollState()
  • Step 4: Run test to verify it passes
python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v

Expected: 4 PASS.

  • Step 5: Verify full suite still passes
python -m pytest ai_trade/tests -q

Expected: 기존 test 전부 PASS (state.signals dict 인터페이스 그대로).

  • Step 6: Commit
git add ai_trade/state.py ai_trade/tests/test_state_signals_lifecycle.py
git commit -m "feat(ai_trade): state.signals에 expires_at + cycle_id lifecycle 추가 (F5 part 1)"

Task 2: config에 SIGNAL_TTL_SECONDS 추가

Files:

  • Modify: ai_trade/config.py

  • Test: ai_trade/tests/test_state_signals_lifecycle.py (append)

  • Step 1: Write failing test

test_state_signals_lifecycle.py 끝에 추가:

def test_signal_ttl_seconds_default(monkeypatch):
    monkeypatch.delenv("SIGNAL_TTL_SECONDS", raising=False)
    from ai_trade.config import Settings
    s = Settings()
    assert s.signal_ttl_seconds == 300


def test_signal_ttl_seconds_env_override(monkeypatch):
    monkeypatch.setenv("SIGNAL_TTL_SECONDS", "60")
    from ai_trade.config import Settings
    s = Settings()
    assert s.signal_ttl_seconds == 60
  • Step 2: Run test to fail
python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v -k signal_ttl

Expected: AttributeError.

  • Step 3: Add field to config.py

Settings 클래스 안에 추가 (다른 *_threshold 옆):

    signal_ttl_seconds: int = field(
        default_factory=lambda: int(os.getenv("SIGNAL_TTL_SECONDS", "300"))
    )
  • Step 4: Run test
python -m pytest ai_trade/tests/test_state_signals_lifecycle.py -v -k signal_ttl

Expected: 2 PASS.

  • Step 5: Commit
git add ai_trade/config.py ai_trade/tests/test_state_signals_lifecycle.py
git commit -m "feat(ai_trade): SIGNAL_TTL_SECONDS env 추가 (F5 part 2)"

Task 3: signal_generator에 cycle_id + expires_at 부착

Files:

  • Modify: ai_trade/signal_generator.py

  • Test: ai_trade/tests/test_signal_generator.py (append)

  • Step 1: Write failing tests

기존 test_signal_generator.py 끝에 추가:

from datetime import datetime, timedelta
from zoneinfo import ZoneInfo as _ZI

_KST_TEST = _ZI("Asia/Seoul")


def test_emit_attaches_cycle_id_and_expires_at(
    state_with_buy_setup, dedup_clean, settings_default,
):
    """매 emit 시 cycle_id + expires_at 부착 (F5)."""
    from ai_trade.signal_generator import generate_signals
    before = datetime.now(_KST_TEST)
    generate_signals(state_with_buy_setup, dedup_clean, settings_default)
    after = datetime.now(_KST_TEST)
    sig = state_with_buy_setup.signals["005930"]
    assert sig["cycle_id"] == 1
    assert "expires_at" in sig
    exp_dt = datetime.fromisoformat(sig["expires_at"])
    # as_of + 300s (default) — tolerance 5s
    assert before + timedelta(seconds=295) < exp_dt < after + timedelta(seconds=305)


def test_cycle_id_increments_each_call(
    state_with_buy_setup, dedup_clean, settings_default,
):
    """generate_signals 호출마다 cycle_id += 1."""
    from ai_trade.signal_generator import generate_signals
    generate_signals(state_with_buy_setup, dedup_clean, settings_default)
    assert state_with_buy_setup.signal_cycle_id == 1
    # 2번째 호출 — dedup이 막아도 cycle_id는 증가해야 함
    generate_signals(state_with_buy_setup, dedup_clean, settings_default)
    assert state_with_buy_setup.signal_cycle_id == 2

NOTE: 기존 test_signal_generator.py에 state_with_buy_setup / dedup_clean / settings_default 같은 fixture가 있을 것. 만약 이름이 다르면 실제 fixture 이름에 맞춰 조정. 검증: grep -n "@pytest.fixture" ai_trade/tests/test_signal_generator.py.

  • Step 2: Run tests to verify they fail
python -m pytest ai_trade/tests/test_signal_generator.py -v -k "cycle_id or expires"

Expected: KeyError 또는 AttributeError.

  • Step 3: Modify signal_generator.py

generate_signals 함수 (L22-25)를 변경:

def generate_signals(state, dedup, settings) -> None:
    """Phase 4 entry — state-mutating. F5: cycle_id += 1 + expires_at 부착."""
    state.signal_cycle_id += 1
    _evaluate_sell_signals(state, dedup, settings)
    _evaluate_buy_signals(state, dedup, settings)

_build_buy_signal (L99-111)에 두 필드 추가:

def _build_buy_signal(state, ticker: str, name: str, rank: int | None, confidence: float) -> dict:
    ap = state.asking_price[ticker]
    as_of_dt = datetime.now(KST)
    expires_at = (as_of_dt + timedelta(seconds=getattr(_current_settings(), "signal_ttl_seconds", 300))).isoformat()
    return {
        "ticker": ticker,
        "name": name,
        "action": "buy",
        "confidence_webai": confidence,
        "current_price": ap["current_price"],
        "avg_price": None,
        "pnl_pct": None,
        "context": _build_context(state, ticker, rank),
        "as_of": as_of_dt.isoformat(),
        "cycle_id": state.signal_cycle_id,
        "expires_at": expires_at,
    }

같이 _build_sell_signal (L174-186):

def _build_sell_signal(state, holding: dict, confidence: float, reason: str, settings=None) -> dict:
    ticker = holding["ticker"]
    as_of_dt = datetime.now(KST)
    ttl = getattr(settings, "signal_ttl_seconds", 300) if settings else 300
    expires_at = (as_of_dt + timedelta(seconds=ttl)).isoformat()
    return {
        "ticker": ticker,
        "name": holding.get("name", ticker),
        "action": "sell",
        "confidence_webai": confidence,
        "current_price": holding.get("current_price"),
        "avg_price": holding.get("avg_price"),
        "pnl_pct": holding.get("pnl_pct"),
        "context": _build_context(state, ticker, rank=None, sell_reason=reason),
        "as_of": as_of_dt.isoformat(),
        "cycle_id": state.signal_cycle_id,
        "expires_at": expires_at,
    }

_build_buy_signal이 settings를 안 받고 있으니, 호출부도 갱신해야 함. 현실적으로 두 함수에 settings 인자를 추가하는 것이 깔끔. 변경:

def _evaluate_buy_signals(state, dedup, settings) -> None:
    candidates = _buy_candidates(state)
    for ticker, name, rank in candidates:
        existing = state.signals.get(ticker)
        if existing is not None and existing.get("action") == "sell":
            logger.debug("buy %s skipped: same-cycle sell precedence", ticker)
            continue
        if not _check_buy_hard_gate(state, ticker, settings):
            logger.debug("buy %s skipped: hard gate failed", ticker)
            continue
        confidence = _compute_buy_confidence(state, ticker, rank)
        if confidence <= settings.confidence_threshold:
            logger.debug("buy %s skipped: confidence %.3f <= %.3f",
                         ticker, confidence, settings.confidence_threshold)
            continue
        if dedup.is_recent(ticker, "buy", within_hours=24):
            logger.debug("buy %s skipped: dedup 24h", ticker)
            continue
        state.signals[ticker] = _build_buy_signal(state, ticker, name, rank, confidence, settings)
        dedup.record(ticker, "buy", confidence=confidence)
        logger.info("signal emit %s buy conf=%.3f rank=%s cycle=%d",
                    ticker, confidence, rank, state.signal_cycle_id)


def _build_buy_signal(state, ticker, name, rank, confidence, settings) -> dict:
    ap = state.asking_price[ticker]
    as_of_dt = datetime.now(KST)
    ttl = settings.signal_ttl_seconds
    expires_at = (as_of_dt + timedelta(seconds=ttl)).isoformat()
    return {
        "ticker": ticker,
        "name": name,
        "action": "buy",
        "confidence_webai": confidence,
        "current_price": ap["current_price"],
        "avg_price": None,
        "pnl_pct": None,
        "context": _build_context(state, ticker, rank),
        "as_of": as_of_dt.isoformat(),
        "cycle_id": state.signal_cycle_id,
        "expires_at": expires_at,
    }

매도 측도 마찬가지로 settings를 통과시킴. _try_stop_loss 등은 이미 settings를 받으므로 _build_sell_signal(..., settings=settings) 로 호출.

import 추가 (signal_generator.py 상단):

from datetime import datetime, timedelta

(기존 import에 timedelta 만 추가)

_current_settings() 같은 헬퍼는 만들지 않음 — settings를 명시적으로 전달.

  • Step 4: Run tests
python -m pytest ai_trade/tests/test_signal_generator.py -v

Expected: 신규 2개 PASS, 기존 PASS.

  • Step 5: Commit
git add ai_trade/signal_generator.py ai_trade/tests/test_signal_generator.py
git commit -m "feat(ai_trade): emit signal에 cycle_id + expires_at 부착 (F5 part 3)"

Task 4: pull_worker가 cycle 끝에 purge 호출

Files:

  • Modify: ai_trade/pull_worker.py

  • Test: ai_trade/tests/test_pull_worker.py (append)

  • Step 1: Write failing test

test_pull_worker.py 끝에 추가:

async def test_poll_loop_purges_expired_signals(monkeypatch):
    """매 cycle 끝에 expired signal이 제거됨 (F5)."""
    from ai_trade import pull_worker
    from ai_trade.state import PollState
    from datetime import datetime as _dt
    from zoneinfo import ZoneInfo as _ZI
    from unittest.mock import AsyncMock, MagicMock
    import asyncio as _asyncio

    _kst = _ZI("Asia/Seoul")
    now = _dt(2026, 5, 18, 10, 0, tzinfo=_kst)

    class FrozenDT:
        @staticmethod
        def now(tz=None): return now

    state = PollState()
    state.signals = {
        "OLD": {"ticker": "OLD", "expires_at": _dt(2026, 5, 18, 9, 0, tzinfo=_kst).isoformat(), "cycle_id": 1},
        "FRESH": {"ticker": "FRESH", "expires_at": _dt(2026, 5, 18, 10, 30, tzinfo=_kst).isoformat(), "cycle_id": 1},
    }

    monkeypatch.setattr(pull_worker, "datetime", FrozenDT)
    monkeypatch.setattr(pull_worker, "_is_market_day", lambda n: True)
    monkeypatch.setattr(pull_worker, "_is_polling_window", lambda n: True)
    monkeypatch.setattr(pull_worker, "_next_interval", lambda n: 0.01)
    monkeypatch.setattr(pull_worker, "_run_polling_cycle", AsyncMock())
    monkeypatch.setattr(pull_worker, "update_minute_momentum_for_all", lambda s: None)
    monkeypatch.setattr(pull_worker, "_is_post_close_trigger", lambda *a, **k: False)

    shutdown = _asyncio.Event()
    async def stop_soon():
        await _asyncio.sleep(0.05)
        shutdown.set()
    _asyncio.create_task(stop_soon())

    await pull_worker.poll_loop(
        client=MagicMock(), state=state, shutdown=shutdown,
        kis_client=MagicMock(), chronos=MagicMock(),
        dedup=None, settings=None,
    )
    assert "OLD" not in state.signals
    assert "FRESH" in state.signals
  • Step 2: Run test to fail
python -m pytest ai_trade/tests/test_pull_worker.py::test_poll_loop_purges_expired_signals -v

Expected: FAIL — OLD가 남아있음.

  • Step 3: Add purge call in poll_loop

ai_trade/pull_worker.py poll_loop 안, signals 생성 이후 (또는 cycle 끝 직전) 한 줄 추가:

            # Phase 4: generate signals
            if dedup is not None and settings is not None:
                try:
                    from ai_trade.signal_generator import generate_signals
                    generate_signals(state, dedup, settings)
                except Exception:
                    logger.exception("generate_signals failed")
            # F5: 만료된 signal purge (consumer 미사용 케이스 보호)
            try:
                state.purge_expired_signals(datetime.now(KST))
            except Exception:
                logger.exception("purge_expired_signals failed")
  • Step 4: Run test
python -m pytest ai_trade/tests/test_pull_worker.py::test_poll_loop_purges_expired_signals -v

Expected: PASS.

  • Step 5: Run full suite
python -m pytest ai_trade/tests -q

Expected: 모두 PASS.

  • Step 6: Commit
git add ai_trade/pull_worker.py ai_trade/tests/test_pull_worker.py
git commit -m "feat(ai_trade): poll_loop가 매 cycle 끝에 expired signal purge (F5 part 4)"

Task 5: 전체 회귀 + push

  • Step 1: Final pytest
python -m pytest ai_trade/tests -v

Expected: 모두 PASS (총 신규 약 9개 + 기존 56개).

  • Step 2: Push
git push origin main

Self-Review

  1. expires_at + cycle_id 부착: _build_buy_signal, _build_sell_signal 양쪽
  2. cycle_id 증가: generate_signals 진입에서 단 1회
  3. purge: poll_loop cycle 마지막에 1회 호출
  4. get_active_signals: Phase 5 consumer가 호출할 read API 존재
  5. legacy 신호 호환: expires_at 없는 신호는 expired 취급 → 안전

미커버 항목 (의도적):

  • Phase 5 consumer가 처리 후 explicit drain하는 API는 이 plan에서 안 다룸 (consumer가 read-only로도 충분 — expires_at + dedup으로 idempotent).
  • agent-office /signal HTTP endpoint는 Phase 5 plan 영역.

Execution Handoff

Plan complete and saved to docs/superpowers/plans/2026-05-25-state-signals-lifecycle.md.

1. Subagent-Driven (recommended) — Task 별 fresh subagent. 2. Inline Execution — 현 세션 실행.

박재오 결정 대기. Plan 1 (hotfix) 마친 뒤 진입 권장.