diff --git a/ai_trade/pull_worker.py b/ai_trade/pull_worker.py index 147bcb1..39940a0 100644 --- a/ai_trade/pull_worker.py +++ b/ai_trade/pull_worker.py @@ -54,6 +54,11 @@ async def poll_loop( generate_signals(state, dedup, settings) except Exception: logger.exception("generate_signals failed") + # F5: cycle 끝에 expired signal purge (consumer 미사용 케이스 보호) + try: + state.purge_expired_signals(datetime.now(KST)) + except Exception: + logger.exception("purge_expired_signals failed") interval = _next_interval(now) try: await asyncio.wait_for(shutdown.wait(), timeout=interval) diff --git a/ai_trade/tests/test_pull_worker.py b/ai_trade/tests/test_pull_worker.py index 27d49d4..d34044e 100644 --- a/ai_trade/tests/test_pull_worker.py +++ b/ai_trade/tests/test_pull_worker.py @@ -178,3 +178,64 @@ async def test_post_close_fires_at_1601_when_not_yet_today(monkeypatch): ) assert post_close.await_count >= 1, "post-close가 16:01에 호출되지 않음 (F3 회귀)" + + +async def test_poll_loop_purges_expired_signals(monkeypatch): + """F5 — 매 cycle 끝에 expired signal이 제거됨.""" + from datetime import datetime as _dt + from zoneinfo import ZoneInfo as _ZI + import asyncio as _asyncio + + from ai_trade import pull_worker + from ai_trade.state import PollState + + _kst = _ZI("Asia/Seoul") + now = _dt(2026, 5, 18, 10, 0, tzinfo=_kst) + + class FrozenDT: + @staticmethod + def now(tz=None): + return now + + state = PollState() + state.signals = { + "OLD": { + "ticker": "OLD", + "expires_at": _dt(2026, 5, 18, 9, 0, tzinfo=_kst).isoformat(), + "cycle_id": 1, + }, + "FRESH": { + "ticker": "FRESH", + "expires_at": _dt(2026, 5, 18, 10, 30, tzinfo=_kst).isoformat(), + "cycle_id": 1, + }, + } + + monkeypatch.setattr(pull_worker, "datetime", FrozenDT) + monkeypatch.setattr(pull_worker, "_is_market_day", lambda n: True) + monkeypatch.setattr(pull_worker, "_is_polling_window", lambda n: True) + monkeypatch.setattr(pull_worker, "_next_interval", lambda n: 0.01) + monkeypatch.setattr(pull_worker, "_run_polling_cycle", AsyncMock()) + monkeypatch.setattr(pull_worker, "update_minute_momentum_for_all", lambda s: None) + monkeypatch.setattr(pull_worker, "_is_post_close_trigger", lambda *a, **k: False) + + shutdown = _asyncio.Event() + + async def stop_soon(): + await _asyncio.sleep(0.05) + shutdown.set() + + _asyncio.create_task(stop_soon()) + + await pull_worker.poll_loop( + client=MagicMock(), + state=state, + shutdown=shutdown, + kis_client=MagicMock(), + chronos=MagicMock(), + dedup=None, + settings=None, + ) + + assert "OLD" not in state.signals + assert "FRESH" in state.signals