feat(signal_v2-phase3b): post-close cycle + minute momentum update
scheduler._is_post_close_trigger: 16:00 KST ±1min detection (market day). pull_worker: - _run_post_close_cycle: daily fetch (60일) + chronos batch predict → state.chronos_predictions + state.daily_ohlcv. - update_minute_momentum_for_all: 매 cycle 마다 state.minute_momentum 갱신. - poll_loop signature 확장 (chronos optional). 45 tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -7,7 +7,7 @@ from datetime import datetime
|
|||||||
|
|
||||||
from signal_v2.kis_client import KISClient
|
from signal_v2.kis_client import KISClient
|
||||||
from signal_v2.scheduler import (
|
from signal_v2.scheduler import (
|
||||||
KST, _is_market_day, _is_polling_window, _next_interval,
|
KST, _is_market_day, _is_polling_window, _next_interval, _is_post_close_trigger,
|
||||||
)
|
)
|
||||||
from signal_v2.state import PollState
|
from signal_v2.state import PollState
|
||||||
from signal_v2.stock_client import StockClient
|
from signal_v2.stock_client import StockClient
|
||||||
@@ -18,6 +18,7 @@ logger = logging.getLogger(__name__)
|
|||||||
async def poll_loop(
|
async def poll_loop(
|
||||||
client: StockClient, state: PollState, shutdown: asyncio.Event,
|
client: StockClient, state: PollState, shutdown: asyncio.Event,
|
||||||
kis_client: KISClient | None = None,
|
kis_client: KISClient | None = None,
|
||||||
|
chronos=None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""FastAPI lifespan 에서 asyncio.create_task 로 시작."""
|
"""FastAPI lifespan 에서 asyncio.create_task 로 시작."""
|
||||||
logger.info("poll_loop started")
|
logger.info("poll_loop started")
|
||||||
@@ -28,6 +29,17 @@ async def poll_loop(
|
|||||||
await _run_polling_cycle(client, state, kis_client=kis_client)
|
await _run_polling_cycle(client, state, kis_client=kis_client)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("poll cycle failed")
|
logger.exception("poll cycle failed")
|
||||||
|
# Minute momentum 갱신 (매 cycle)
|
||||||
|
try:
|
||||||
|
update_minute_momentum_for_all(state)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("minute momentum update failed")
|
||||||
|
# Post-close trigger (16:00 KST)
|
||||||
|
if _is_post_close_trigger(now) and chronos is not None and kis_client is not None:
|
||||||
|
try:
|
||||||
|
await _run_post_close_cycle(kis_client, chronos, state)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("post-close cycle failed")
|
||||||
interval = _next_interval(now)
|
interval = _next_interval(now)
|
||||||
try:
|
try:
|
||||||
await asyncio.wait_for(shutdown.wait(), timeout=interval)
|
await asyncio.wait_for(shutdown.wait(), timeout=interval)
|
||||||
@@ -125,3 +137,48 @@ def _screener_tickers(state: PollState) -> list[str]:
|
|||||||
if state.screener_preview is None:
|
if state.screener_preview is None:
|
||||||
return []
|
return []
|
||||||
return [i["ticker"] for i in state.screener_preview.get("items", []) if "ticker" in i]
|
return [i["ticker"] for i in state.screener_preview.get("items", []) if "ticker" in i]
|
||||||
|
|
||||||
|
|
||||||
|
async def _run_post_close_cycle(kis_client, chronos, state) -> None:
|
||||||
|
"""16:00 KST 종가 후 1회: daily fetch + chronos predict."""
|
||||||
|
tickers = list(set(_portfolio_tickers(state)) | set(_screener_tickers(state)))
|
||||||
|
if not tickers:
|
||||||
|
return
|
||||||
|
|
||||||
|
daily_results = await asyncio.gather(*[
|
||||||
|
kis_client.get_daily_ohlcv(t, days=60) for t in tickers
|
||||||
|
], return_exceptions=True)
|
||||||
|
daily_dict = {}
|
||||||
|
for ticker, result in zip(tickers, daily_results):
|
||||||
|
if isinstance(result, list) and len(result) >= 30:
|
||||||
|
daily_dict[ticker] = result
|
||||||
|
state.daily_ohlcv[ticker] = result
|
||||||
|
elif isinstance(result, Exception):
|
||||||
|
state.fetch_errors[f"daily_ohlcv/{ticker}"] = (
|
||||||
|
state.fetch_errors.get(f"daily_ohlcv/{ticker}", 0) + 1
|
||||||
|
)
|
||||||
|
|
||||||
|
if daily_dict and chronos is not None:
|
||||||
|
try:
|
||||||
|
predictions = chronos.predict_batch(daily_dict)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("chronos predict_batch failed")
|
||||||
|
return
|
||||||
|
for ticker, pred in predictions.items():
|
||||||
|
state.chronos_predictions[ticker] = {
|
||||||
|
"median": pred.median,
|
||||||
|
"q10": pred.q10,
|
||||||
|
"q90": pred.q90,
|
||||||
|
"conf": pred.conf,
|
||||||
|
"as_of": pred.as_of,
|
||||||
|
}
|
||||||
|
state.last_updated[f"chronos/{ticker}"] = pred.as_of
|
||||||
|
|
||||||
|
|
||||||
|
def update_minute_momentum_for_all(state) -> None:
|
||||||
|
"""매 분봉 cycle 후 호출 — 모든 종목 모멘텀 갱신."""
|
||||||
|
from signal_v2.momentum_classifier import classify_minute_momentum
|
||||||
|
now_iso = datetime.now(KST).isoformat()
|
||||||
|
for ticker, bars in state.minute_bars.items():
|
||||||
|
state.minute_momentum[ticker] = classify_minute_momentum(bars)
|
||||||
|
state.last_updated[f"momentum/{ticker}"] = now_iso
|
||||||
|
|||||||
@@ -76,6 +76,14 @@ def _seconds_until_nxt_or_market_open(now: datetime) -> float:
|
|||||||
return 86400.0
|
return 86400.0
|
||||||
|
|
||||||
|
|
||||||
|
def _is_post_close_trigger(now: datetime) -> bool:
|
||||||
|
"""16:00 KST ±1분 (post-close cycle 트리거). 평일/영업일만."""
|
||||||
|
if not _is_market_day(now):
|
||||||
|
return False
|
||||||
|
t = now.time()
|
||||||
|
return time(16, 0) <= t < time(16, 1)
|
||||||
|
|
||||||
|
|
||||||
def _seconds_until_next_market_open(now: datetime) -> float:
|
def _seconds_until_next_market_open(now: datetime) -> float:
|
||||||
"""다음 영업일의 07:00 KST 까지 초수 (휴장일/주말용)."""
|
"""다음 영업일의 07:00 KST 까지 초수 (휴장일/주말용)."""
|
||||||
candidate = now.replace(hour=7, minute=0, second=0, microsecond=0)
|
candidate = now.replace(hour=7, minute=0, second=0, microsecond=0)
|
||||||
|
|||||||
@@ -53,3 +53,45 @@ def test_websocket_message_updates_state_asking_price():
|
|||||||
"current_price": 78500, "as_of": "2026-05-18T10:00:00+09:00"})
|
"current_price": 78500, "as_of": "2026-05-18T10:00:00+09:00"})
|
||||||
assert state.asking_price["005930"]["bid_total"] == 1000
|
assert state.asking_price["005930"]["bid_total"] == 1000
|
||||||
assert "asking_price/005930" in state.last_updated
|
assert "asking_price/005930" in state.last_updated
|
||||||
|
|
||||||
|
|
||||||
|
async def test_post_close_cycle_updates_chronos_predictions():
|
||||||
|
"""mock kis + mock chronos → state.chronos_predictions + state.daily_ohlcv 갱신."""
|
||||||
|
from unittest.mock import AsyncMock, MagicMock
|
||||||
|
from signal_v2.pull_worker import _run_post_close_cycle
|
||||||
|
from signal_v2.chronos_predictor import ChronosPrediction
|
||||||
|
from signal_v2.state import PollState
|
||||||
|
|
||||||
|
state = PollState()
|
||||||
|
state.portfolio = {"holdings": [{"ticker": "005930"}]}
|
||||||
|
state.screener_preview = {"items": [{"ticker": "000660"}]}
|
||||||
|
|
||||||
|
kis_mock = MagicMock()
|
||||||
|
daily_005930 = [{"datetime": f"2026-05-{i+1:02d}", "open": 100, "high": 105,
|
||||||
|
"low": 95, "close": 100 + i, "volume": 1000} for i in range(60)]
|
||||||
|
daily_000660 = [{"datetime": f"2026-05-{i+1:02d}", "open": 200, "high": 210,
|
||||||
|
"low": 190, "close": 200 + i, "volume": 2000} for i in range(60)]
|
||||||
|
# _run_post_close_cycle iterates tickers and calls get_daily_ohlcv per ticker.
|
||||||
|
# Order depends on set() so use side_effect mapping if possible, otherwise list.
|
||||||
|
async def fake_daily(ticker, days=60):
|
||||||
|
if ticker == "005930":
|
||||||
|
return daily_005930
|
||||||
|
if ticker == "000660":
|
||||||
|
return daily_000660
|
||||||
|
return []
|
||||||
|
kis_mock.get_daily_ohlcv = AsyncMock(side_effect=fake_daily)
|
||||||
|
|
||||||
|
chronos_mock = MagicMock()
|
||||||
|
chronos_mock.predict_batch = MagicMock(return_value={
|
||||||
|
"005930": ChronosPrediction(0.02, -0.01, 0.04, 0.85, "2026-05-18T16:00:00+09:00"),
|
||||||
|
"000660": ChronosPrediction(0.03, -0.02, 0.06, 0.75, "2026-05-18T16:00:00+09:00"),
|
||||||
|
})
|
||||||
|
|
||||||
|
await _run_post_close_cycle(kis_mock, chronos_mock, state)
|
||||||
|
|
||||||
|
assert "005930" in state.chronos_predictions
|
||||||
|
assert "000660" in state.chronos_predictions
|
||||||
|
assert state.chronos_predictions["005930"]["median"] == 0.02
|
||||||
|
assert state.chronos_predictions["005930"]["conf"] == 0.85
|
||||||
|
assert "005930" in state.daily_ohlcv
|
||||||
|
assert "chronos/005930" in state.last_updated
|
||||||
|
|||||||
Reference in New Issue
Block a user