feat(trade-monitor): monitor 오케스트레이션 (run_cycle/loop/state_fn)
This commit is contained in:
113
services/trade-monitor/monitor.py
Normal file
113
services/trade-monitor/monitor.py
Normal file
@@ -0,0 +1,113 @@
|
|||||||
|
"""오케스트레이션 — monitor-set 조회 → 조건 평가 → report + heartbeat state."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from datetime import datetime
|
||||||
|
from zoneinfo import ZoneInfo
|
||||||
|
|
||||||
|
from conditions import evaluate_buy, evaluate_sell
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
KST = ZoneInfo("Asia/Seoul")
|
||||||
|
|
||||||
|
|
||||||
|
class MonitorState:
|
||||||
|
"""monitor_loop가 갱신, heartbeat state_fn이 읽는 공유 상태."""
|
||||||
|
def __init__(self):
|
||||||
|
self.session_state = "idle" # market_open | market_closed | idle
|
||||||
|
self.last_alert_at: str | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def filter_krx(targets: list[dict]) -> list[dict]:
|
||||||
|
"""6자리 숫자 티커(KRX)만. 알파벳 티커 skip."""
|
||||||
|
out = []
|
||||||
|
for t in targets:
|
||||||
|
tk = str(t.get("ticker", ""))
|
||||||
|
if tk.isdigit() and len(tk) == 6:
|
||||||
|
out.append(t)
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
async def _build_ctx(kis, target: dict, settings) -> dict:
|
||||||
|
ticker = target["ticker"]
|
||||||
|
quote = await kis.get_quote(ticker)
|
||||||
|
daily = await kis.get_daily_ohlcv(ticker, 250)
|
||||||
|
return {
|
||||||
|
"ticker": ticker, "name": target.get("name", ""),
|
||||||
|
"price": quote["price"], "day_open": quote["day_open"],
|
||||||
|
"today_volume": quote["today_volume"],
|
||||||
|
"closes": [b["close"] for b in daily],
|
||||||
|
"highs": [b["high"] for b in daily],
|
||||||
|
"lows": [b["low"] for b in daily],
|
||||||
|
"volumes": [b["volume"] for b in daily],
|
||||||
|
"avg_price": target.get("avg_price"),
|
||||||
|
"qty": target.get("qty"),
|
||||||
|
"holding_high": target.get("holding_high"),
|
||||||
|
"climax_vol_mult": settings.climax_vol_mult,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async def run_cycle(nas, kis, state, stats, settings) -> None:
|
||||||
|
try:
|
||||||
|
ms = await nas.get_monitor_set()
|
||||||
|
except Exception:
|
||||||
|
logger.exception("monitor-set 조회 실패")
|
||||||
|
state.session_state = "idle"
|
||||||
|
stats.jobs_failed += 1
|
||||||
|
return
|
||||||
|
|
||||||
|
session = ms.get("session", "closed")
|
||||||
|
if session == "closed":
|
||||||
|
state.session_state = "market_closed"
|
||||||
|
return
|
||||||
|
|
||||||
|
buy_targets = filter_krx(ms.get("buy_targets", []))
|
||||||
|
sell_targets = filter_krx(ms.get("sell_targets", []))
|
||||||
|
buy_params = ms.get("buy_params", {})
|
||||||
|
exit_params = ms.get("exit_params", {})
|
||||||
|
|
||||||
|
firing: list[dict] = []
|
||||||
|
for t in buy_targets:
|
||||||
|
try:
|
||||||
|
firing += evaluate_buy(await _build_ctx(kis, t, settings), buy_params)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("buy 평가 실패 %s", t.get("ticker"))
|
||||||
|
for t in sell_targets:
|
||||||
|
try:
|
||||||
|
firing += evaluate_sell(await _build_ctx(kis, t, settings), exit_params)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("sell 평가 실패 %s", t.get("ticker"))
|
||||||
|
|
||||||
|
as_of = datetime.now(KST).isoformat(timespec="seconds")
|
||||||
|
if firing:
|
||||||
|
state.last_alert_at = as_of
|
||||||
|
logger.info("firing %d개: %s", len(firing),
|
||||||
|
[f"{f['ticker']}:{f['condition']}" for f in firing])
|
||||||
|
try:
|
||||||
|
await nas.post_report(as_of, firing) # 빈 배열도 전송(edge clear)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("report 전송 실패")
|
||||||
|
|
||||||
|
state.session_state = "market_open"
|
||||||
|
stats.jobs_done += 1
|
||||||
|
stats.last_job_at = as_of
|
||||||
|
|
||||||
|
|
||||||
|
async def monitor_loop(nas, kis, state, stats, settings) -> None:
|
||||||
|
logger.info("trade-monitor loop 시작 interval=%ds", settings.loop_interval)
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
await run_cycle(nas, kis, state, stats, settings)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
logger.info("monitor_loop cancelled")
|
||||||
|
raise
|
||||||
|
except Exception:
|
||||||
|
logger.exception("monitor_loop iteration 실패")
|
||||||
|
await asyncio.sleep(settings.loop_interval)
|
||||||
|
|
||||||
|
|
||||||
|
def make_state_fn(state):
|
||||||
|
async def state_fn(redis, stats):
|
||||||
|
return state.session_state, {"last_alert_at": state.last_alert_at}
|
||||||
|
return state_fn
|
||||||
95
services/trade-monitor/tests/test_monitor.py
Normal file
95
services/trade-monitor/tests/test_monitor.py
Normal file
@@ -0,0 +1,95 @@
|
|||||||
|
"""monitor.run_cycle — 게이트/필터/조립/격리."""
|
||||||
|
from monitor import MonitorState, filter_krx, run_cycle
|
||||||
|
from config import load_settings
|
||||||
|
from _shared.heartbeat import WorkerStats
|
||||||
|
|
||||||
|
|
||||||
|
def test_filter_krx_keeps_only_numeric6():
|
||||||
|
targets = [{"ticker": "005930"}, {"ticker": "AAPL"}, {"ticker": "00660"},
|
||||||
|
{"ticker": "000660"}, {"ticker": "0059301"}]
|
||||||
|
kept = {t["ticker"] for t in filter_krx(targets)}
|
||||||
|
assert kept == {"005930", "000660"}
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeNAS:
|
||||||
|
def __init__(self, ms):
|
||||||
|
self._ms = ms
|
||||||
|
self.reported = None
|
||||||
|
|
||||||
|
async def get_monitor_set(self):
|
||||||
|
return self._ms
|
||||||
|
|
||||||
|
async def post_report(self, as_of, firing):
|
||||||
|
self.reported = {"as_of": as_of, "firing": firing}
|
||||||
|
return {"new_alerts": len(firing), "cleared": 0}
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeKIS:
|
||||||
|
def __init__(self, price=100, fail_on=None):
|
||||||
|
self._price = price
|
||||||
|
self._fail_on = fail_on or set()
|
||||||
|
|
||||||
|
async def get_quote(self, ticker):
|
||||||
|
if ticker in self._fail_on:
|
||||||
|
raise RuntimeError("KIS down")
|
||||||
|
return {"price": self._price, "day_open": 99, "today_volume": 1000,
|
||||||
|
"as_of": "x"}
|
||||||
|
|
||||||
|
async def get_daily_ohlcv(self, ticker, days=250):
|
||||||
|
# 정배열 + 저가 근접 → ma20_pullback 발화 유도
|
||||||
|
return [{"open": 90, "high": 90, "low": 90, "close": 90, "volume": 1}] * 200 \
|
||||||
|
+ [{"open": 100, "high": 100, "low": 100, "close": 100, "volume": 1}] * 20
|
||||||
|
|
||||||
|
|
||||||
|
async def test_closed_session_skips_kis():
|
||||||
|
nas = _FakeNAS({"session": "closed"})
|
||||||
|
state, stats = MonitorState(), WorkerStats()
|
||||||
|
await run_cycle(nas, _FakeKIS(), state, stats, load_settings())
|
||||||
|
assert state.session_state == "market_closed"
|
||||||
|
assert nas.reported is None # report도 안 함
|
||||||
|
|
||||||
|
|
||||||
|
async def test_non_krx_skipped_and_report_sent():
|
||||||
|
nas = _FakeNAS({"session": "regular",
|
||||||
|
"buy_targets": [{"ticker": "AAPL", "name": "Apple"}],
|
||||||
|
"sell_targets": [], "buy_params": {}, "exit_params": {}})
|
||||||
|
state, stats = MonitorState(), WorkerStats()
|
||||||
|
await run_cycle(nas, _FakeKIS(), state, stats, load_settings())
|
||||||
|
assert state.session_state == "market_open"
|
||||||
|
assert nas.reported is not None
|
||||||
|
assert nas.reported["firing"] == [] # 알파벳 티커 skip → 빈 발화
|
||||||
|
|
||||||
|
|
||||||
|
async def test_firing_assembled_and_last_alert_set():
|
||||||
|
nas = _FakeNAS({"session": "regular",
|
||||||
|
"buy_targets": [{"ticker": "005930", "name": "삼성전자"}],
|
||||||
|
"sell_targets": [], "buy_params": {"pullback_pct": 0.02},
|
||||||
|
"exit_params": {}})
|
||||||
|
state, stats = MonitorState(), WorkerStats()
|
||||||
|
await run_cycle(nas, _FakeKIS(price=101), state, stats, load_settings())
|
||||||
|
conds = {f["condition"] for f in nas.reported["firing"]}
|
||||||
|
assert "buy_ma20_pullback" in conds
|
||||||
|
assert state.last_alert_at is not None
|
||||||
|
|
||||||
|
|
||||||
|
async def test_per_ticker_failure_isolated():
|
||||||
|
nas = _FakeNAS({"session": "regular",
|
||||||
|
"buy_targets": [{"ticker": "005930"}, {"ticker": "000660"}],
|
||||||
|
"sell_targets": [], "buy_params": {}, "exit_params": {}})
|
||||||
|
state, stats = MonitorState(), WorkerStats()
|
||||||
|
# 005930은 실패, 000660은 성공 → 루프가 죽지 않고 report 전송
|
||||||
|
await run_cycle(nas, _FakeKIS(fail_on={"005930"}), state, stats, load_settings())
|
||||||
|
assert nas.reported is not None
|
||||||
|
assert state.session_state == "market_open"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_monitor_set_failure_sets_idle():
|
||||||
|
class _BadNAS(_FakeNAS):
|
||||||
|
async def get_monitor_set(self):
|
||||||
|
raise RuntimeError("NAS down")
|
||||||
|
|
||||||
|
nas = _BadNAS({})
|
||||||
|
state, stats = MonitorState(), WorkerStats()
|
||||||
|
await run_cycle(nas, _FakeKIS(), state, stats, load_settings())
|
||||||
|
assert state.session_state == "idle"
|
||||||
|
assert nas.reported is None
|
||||||
Reference in New Issue
Block a user