114 lines
3.7 KiB
Python
114 lines
3.7 KiB
Python
"""오케스트레이션 — monitor-set 조회 → 조건 평가 → report + heartbeat state."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from conditions import evaluate_buy, evaluate_sell
|
|
|
|
logger = logging.getLogger(__name__)
|
|
KST = ZoneInfo("Asia/Seoul")
|
|
|
|
|
|
class MonitorState:
|
|
"""monitor_loop가 갱신, heartbeat state_fn이 읽는 공유 상태."""
|
|
def __init__(self):
|
|
self.session_state = "idle" # market_open | market_closed | idle
|
|
self.last_alert_at: str | None = None
|
|
|
|
|
|
def filter_krx(targets: list[dict]) -> list[dict]:
|
|
"""6자리 숫자 티커(KRX)만. 알파벳 티커 skip."""
|
|
out = []
|
|
for t in targets:
|
|
tk = str(t.get("ticker", ""))
|
|
if tk.isdigit() and len(tk) == 6:
|
|
out.append(t)
|
|
return out
|
|
|
|
|
|
async def _build_ctx(kis, target: dict, settings) -> dict:
|
|
ticker = target["ticker"]
|
|
quote = await kis.get_quote(ticker)
|
|
daily = await kis.get_daily_ohlcv(ticker, 250)
|
|
return {
|
|
"ticker": ticker, "name": target.get("name", ""),
|
|
"price": quote["price"], "day_open": quote["day_open"],
|
|
"today_volume": quote["today_volume"],
|
|
"closes": [b["close"] for b in daily],
|
|
"highs": [b["high"] for b in daily],
|
|
"lows": [b["low"] for b in daily],
|
|
"volumes": [b["volume"] for b in daily],
|
|
"avg_price": target.get("avg_price"),
|
|
"qty": target.get("qty"),
|
|
"holding_high": target.get("holding_high"),
|
|
"climax_vol_mult": settings.climax_vol_mult,
|
|
}
|
|
|
|
|
|
async def run_cycle(nas, kis, state, stats, settings) -> None:
|
|
try:
|
|
ms = await nas.get_monitor_set()
|
|
except Exception:
|
|
logger.exception("monitor-set 조회 실패")
|
|
state.session_state = "idle"
|
|
stats.jobs_failed += 1
|
|
return
|
|
|
|
session = ms.get("session", "closed")
|
|
if session == "closed":
|
|
state.session_state = "market_closed"
|
|
return
|
|
|
|
buy_targets = filter_krx(ms.get("buy_targets", []))
|
|
sell_targets = filter_krx(ms.get("sell_targets", []))
|
|
buy_params = ms.get("buy_params", {})
|
|
exit_params = ms.get("exit_params", {})
|
|
|
|
firing: list[dict] = []
|
|
for t in buy_targets:
|
|
try:
|
|
firing += evaluate_buy(await _build_ctx(kis, t, settings), buy_params)
|
|
except Exception:
|
|
logger.exception("buy 평가 실패 %s", t.get("ticker"))
|
|
for t in sell_targets:
|
|
try:
|
|
firing += evaluate_sell(await _build_ctx(kis, t, settings), exit_params)
|
|
except Exception:
|
|
logger.exception("sell 평가 실패 %s", t.get("ticker"))
|
|
|
|
as_of = datetime.now(KST).isoformat(timespec="seconds")
|
|
if firing:
|
|
state.last_alert_at = as_of
|
|
logger.info("firing %d개: %s", len(firing),
|
|
[f"{f['ticker']}:{f['condition']}" for f in firing])
|
|
try:
|
|
await nas.post_report(as_of, firing) # 빈 배열도 전송(edge clear)
|
|
except Exception:
|
|
logger.exception("report 전송 실패")
|
|
|
|
state.session_state = "market_open"
|
|
stats.jobs_done += 1
|
|
stats.last_job_at = as_of
|
|
|
|
|
|
async def monitor_loop(nas, kis, state, stats, settings) -> None:
|
|
logger.info("trade-monitor loop 시작 interval=%ds", settings.loop_interval)
|
|
while True:
|
|
try:
|
|
await run_cycle(nas, kis, state, stats, settings)
|
|
except asyncio.CancelledError:
|
|
logger.info("monitor_loop cancelled")
|
|
raise
|
|
except Exception:
|
|
logger.exception("monitor_loop iteration 실패")
|
|
await asyncio.sleep(settings.loop_interval)
|
|
|
|
|
|
def make_state_fn(state):
|
|
async def state_fn(redis, stats):
|
|
return state.session_state, {"last_alert_at": state.last_alert_at}
|
|
return state_fn
|