"""오케스트레이션 — monitor-set 조회 → 조건 평가 → report + heartbeat state.""" from __future__ import annotations import asyncio import logging from datetime import datetime from zoneinfo import ZoneInfo from conditions import evaluate_buy, evaluate_sell logger = logging.getLogger(__name__) KST = ZoneInfo("Asia/Seoul") class MonitorState: """monitor_loop가 갱신, heartbeat state_fn이 읽는 공유 상태.""" def __init__(self): self.session_state = "idle" # market_open | market_closed | idle self.last_alert_at: str | None = None def filter_krx(targets: list[dict]) -> list[dict]: """6자리 숫자 티커(KRX)만. 알파벳 티커 skip.""" out = [] for t in targets: tk = str(t.get("ticker", "")) if tk.isdigit() and len(tk) == 6: out.append(t) return out async def _build_ctx(kis, target: dict, settings) -> dict: ticker = target["ticker"] quote = await kis.get_quote(ticker) daily = await kis.get_daily_ohlcv(ticker, 250) return { "ticker": ticker, "name": target.get("name", ""), "price": quote["price"], "day_open": quote["day_open"], "day_high": quote["day_high"], "today_volume": quote["today_volume"], "closes": [b["close"] for b in daily], "highs": [b["high"] for b in daily], "lows": [b["low"] for b in daily], "volumes": [b["volume"] for b in daily], "avg_price": target.get("avg_price"), "qty": target.get("qty"), "holding_high": target.get("holding_high"), "climax_vol_mult": settings.climax_vol_mult, } async def run_cycle(nas, kis, state, stats, settings) -> None: try: ms = await nas.get_monitor_set() except Exception: logger.exception("monitor-set 조회 실패") state.session_state = "idle" stats.jobs_failed += 1 return session = ms.get("session", "closed") if session == "closed": state.session_state = "market_closed" return buy_targets = filter_krx(ms.get("buy_targets", [])) sell_targets = filter_krx(ms.get("sell_targets", [])) buy_params = ms.get("buy_params", {}) exit_params = ms.get("exit_params", {}) firing: list[dict] = [] for t in buy_targets: try: firing += evaluate_buy(await _build_ctx(kis, t, settings), buy_params) except Exception: logger.exception("buy 평가 실패 %s", t.get("ticker")) for t in sell_targets: try: firing += evaluate_sell(await _build_ctx(kis, t, settings), exit_params) except Exception: logger.exception("sell 평가 실패 %s", t.get("ticker")) as_of = datetime.now(KST).isoformat(timespec="seconds") if firing: state.last_alert_at = as_of logger.info("firing %d개: %s", len(firing), [f"{f['ticker']}:{f['condition']}" for f in firing]) try: await nas.post_report(as_of, firing) # 빈 배열도 전송(edge clear) except Exception: logger.exception("report 전송 실패") state.session_state = "market_open" stats.jobs_done += 1 stats.last_job_at = as_of async def monitor_loop(nas, kis, state, stats, settings) -> None: logger.info("trade-monitor loop 시작 interval=%ds", settings.loop_interval) while True: try: await run_cycle(nas, kis, state, stats, settings) except asyncio.CancelledError: logger.info("monitor_loop cancelled") raise except Exception: logger.exception("monitor_loop iteration 실패") await asyncio.sleep(settings.loop_interval) def make_state_fn(state): async def state_fn(redis, stats): return state.session_state, {"last_alert_at": state.last_alert_at} return state_fn