From 9e5fecb3693ac47bc7455fcd6e5588881ff85923 Mon Sep 17 00:00:00 2001 From: gahusb Date: Sat, 16 May 2026 18:04:32 +0900 Subject: [PATCH] feat(signal_v2-phase3b): post-close cycle + minute momentum update MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit scheduler._is_post_close_trigger: 16:00 KST ±1min detection (market day). pull_worker: - _run_post_close_cycle: daily fetch (60일) + chronos batch predict → state.chronos_predictions + state.daily_ohlcv. - update_minute_momentum_for_all: 매 cycle 마다 state.minute_momentum 갱신. - poll_loop signature 확장 (chronos optional). 45 tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- signal_v2/pull_worker.py | 59 ++++++++++++++++++++++++++++- signal_v2/scheduler.py | 8 ++++ signal_v2/tests/test_pull_worker.py | 42 ++++++++++++++++++++ 3 files changed, 108 insertions(+), 1 deletion(-) diff --git a/signal_v2/pull_worker.py b/signal_v2/pull_worker.py index ed57636..8b09041 100644 --- a/signal_v2/pull_worker.py +++ b/signal_v2/pull_worker.py @@ -7,7 +7,7 @@ from datetime import datetime from signal_v2.kis_client import KISClient from signal_v2.scheduler import ( - KST, _is_market_day, _is_polling_window, _next_interval, + KST, _is_market_day, _is_polling_window, _next_interval, _is_post_close_trigger, ) from signal_v2.state import PollState from signal_v2.stock_client import StockClient @@ -18,6 +18,7 @@ logger = logging.getLogger(__name__) async def poll_loop( client: StockClient, state: PollState, shutdown: asyncio.Event, kis_client: KISClient | None = None, + chronos=None, ) -> None: """FastAPI lifespan 에서 asyncio.create_task 로 시작.""" logger.info("poll_loop started") @@ -28,6 +29,17 @@ async def poll_loop( await _run_polling_cycle(client, state, kis_client=kis_client) except Exception: logger.exception("poll cycle failed") + # Minute momentum 갱신 (매 cycle) + try: + update_minute_momentum_for_all(state) + except Exception: + logger.exception("minute momentum update failed") + # Post-close trigger (16:00 KST) + if _is_post_close_trigger(now) and chronos is not None and kis_client is not None: + try: + await _run_post_close_cycle(kis_client, chronos, state) + except Exception: + logger.exception("post-close cycle failed") interval = _next_interval(now) try: await asyncio.wait_for(shutdown.wait(), timeout=interval) @@ -125,3 +137,48 @@ def _screener_tickers(state: PollState) -> list[str]: if state.screener_preview is None: return [] return [i["ticker"] for i in state.screener_preview.get("items", []) if "ticker" in i] + + +async def _run_post_close_cycle(kis_client, chronos, state) -> None: + """16:00 KST 종가 후 1회: daily fetch + chronos predict.""" + tickers = list(set(_portfolio_tickers(state)) | set(_screener_tickers(state))) + if not tickers: + return + + daily_results = await asyncio.gather(*[ + kis_client.get_daily_ohlcv(t, days=60) for t in tickers + ], return_exceptions=True) + daily_dict = {} + for ticker, result in zip(tickers, daily_results): + if isinstance(result, list) and len(result) >= 30: + daily_dict[ticker] = result + state.daily_ohlcv[ticker] = result + elif isinstance(result, Exception): + state.fetch_errors[f"daily_ohlcv/{ticker}"] = ( + state.fetch_errors.get(f"daily_ohlcv/{ticker}", 0) + 1 + ) + + if daily_dict and chronos is not None: + try: + predictions = chronos.predict_batch(daily_dict) + except Exception: + logger.exception("chronos predict_batch failed") + return + for ticker, pred in predictions.items(): + state.chronos_predictions[ticker] = { + "median": pred.median, + "q10": pred.q10, + "q90": pred.q90, + "conf": pred.conf, + "as_of": pred.as_of, + } + state.last_updated[f"chronos/{ticker}"] = pred.as_of + + +def update_minute_momentum_for_all(state) -> None: + """매 분봉 cycle 후 호출 — 모든 종목 모멘텀 갱신.""" + from signal_v2.momentum_classifier import classify_minute_momentum + now_iso = datetime.now(KST).isoformat() + for ticker, bars in state.minute_bars.items(): + state.minute_momentum[ticker] = classify_minute_momentum(bars) + state.last_updated[f"momentum/{ticker}"] = now_iso diff --git a/signal_v2/scheduler.py b/signal_v2/scheduler.py index 23265c5..dcece00 100644 --- a/signal_v2/scheduler.py +++ b/signal_v2/scheduler.py @@ -76,6 +76,14 @@ def _seconds_until_nxt_or_market_open(now: datetime) -> float: return 86400.0 +def _is_post_close_trigger(now: datetime) -> bool: + """16:00 KST ±1분 (post-close cycle 트리거). 평일/영업일만.""" + if not _is_market_day(now): + return False + t = now.time() + return time(16, 0) <= t < time(16, 1) + + def _seconds_until_next_market_open(now: datetime) -> float: """다음 영업일의 07:00 KST 까지 초수 (휴장일/주말용).""" candidate = now.replace(hour=7, minute=0, second=0, microsecond=0) diff --git a/signal_v2/tests/test_pull_worker.py b/signal_v2/tests/test_pull_worker.py index 81e8265..fe76954 100644 --- a/signal_v2/tests/test_pull_worker.py +++ b/signal_v2/tests/test_pull_worker.py @@ -53,3 +53,45 @@ def test_websocket_message_updates_state_asking_price(): "current_price": 78500, "as_of": "2026-05-18T10:00:00+09:00"}) assert state.asking_price["005930"]["bid_total"] == 1000 assert "asking_price/005930" in state.last_updated + + +async def test_post_close_cycle_updates_chronos_predictions(): + """mock kis + mock chronos → state.chronos_predictions + state.daily_ohlcv 갱신.""" + from unittest.mock import AsyncMock, MagicMock + from signal_v2.pull_worker import _run_post_close_cycle + from signal_v2.chronos_predictor import ChronosPrediction + from signal_v2.state import PollState + + state = PollState() + state.portfolio = {"holdings": [{"ticker": "005930"}]} + state.screener_preview = {"items": [{"ticker": "000660"}]} + + kis_mock = MagicMock() + daily_005930 = [{"datetime": f"2026-05-{i+1:02d}", "open": 100, "high": 105, + "low": 95, "close": 100 + i, "volume": 1000} for i in range(60)] + daily_000660 = [{"datetime": f"2026-05-{i+1:02d}", "open": 200, "high": 210, + "low": 190, "close": 200 + i, "volume": 2000} for i in range(60)] + # _run_post_close_cycle iterates tickers and calls get_daily_ohlcv per ticker. + # Order depends on set() so use side_effect mapping if possible, otherwise list. + async def fake_daily(ticker, days=60): + if ticker == "005930": + return daily_005930 + if ticker == "000660": + return daily_000660 + return [] + kis_mock.get_daily_ohlcv = AsyncMock(side_effect=fake_daily) + + chronos_mock = MagicMock() + chronos_mock.predict_batch = MagicMock(return_value={ + "005930": ChronosPrediction(0.02, -0.01, 0.04, 0.85, "2026-05-18T16:00:00+09:00"), + "000660": ChronosPrediction(0.03, -0.02, 0.06, 0.75, "2026-05-18T16:00:00+09:00"), + }) + + await _run_post_close_cycle(kis_mock, chronos_mock, state) + + assert "005930" in state.chronos_predictions + assert "000660" in state.chronos_predictions + assert state.chronos_predictions["005930"]["median"] == 0.02 + assert state.chronos_predictions["005930"]["conf"] == 0.85 + assert "005930" in state.daily_ohlcv + assert "chronos/005930" in state.last_updated