diff --git a/services/trade-monitor/monitor.py b/services/trade-monitor/monitor.py new file mode 100644 index 0000000..d34e9db --- /dev/null +++ b/services/trade-monitor/monitor.py @@ -0,0 +1,113 @@ +"""오케스트레이션 — monitor-set 조회 → 조건 평가 → report + heartbeat state.""" +from __future__ import annotations + +import asyncio +import logging +from datetime import datetime +from zoneinfo import ZoneInfo + +from conditions import evaluate_buy, evaluate_sell + +logger = logging.getLogger(__name__) +KST = ZoneInfo("Asia/Seoul") + + +class MonitorState: + """monitor_loop가 갱신, heartbeat state_fn이 읽는 공유 상태.""" + def __init__(self): + self.session_state = "idle" # market_open | market_closed | idle + self.last_alert_at: str | None = None + + +def filter_krx(targets: list[dict]) -> list[dict]: + """6자리 숫자 티커(KRX)만. 알파벳 티커 skip.""" + out = [] + for t in targets: + tk = str(t.get("ticker", "")) + if tk.isdigit() and len(tk) == 6: + out.append(t) + return out + + +async def _build_ctx(kis, target: dict, settings) -> dict: + ticker = target["ticker"] + quote = await kis.get_quote(ticker) + daily = await kis.get_daily_ohlcv(ticker, 250) + return { + "ticker": ticker, "name": target.get("name", ""), + "price": quote["price"], "day_open": quote["day_open"], + "today_volume": quote["today_volume"], + "closes": [b["close"] for b in daily], + "highs": [b["high"] for b in daily], + "lows": [b["low"] for b in daily], + "volumes": [b["volume"] for b in daily], + "avg_price": target.get("avg_price"), + "qty": target.get("qty"), + "holding_high": target.get("holding_high"), + "climax_vol_mult": settings.climax_vol_mult, + } + + +async def run_cycle(nas, kis, state, stats, settings) -> None: + try: + ms = await nas.get_monitor_set() + except Exception: + logger.exception("monitor-set 조회 실패") + state.session_state = "idle" + stats.jobs_failed += 1 + return + + session = ms.get("session", "closed") + if session == "closed": + state.session_state = "market_closed" + return + + buy_targets = filter_krx(ms.get("buy_targets", [])) + sell_targets = filter_krx(ms.get("sell_targets", [])) + buy_params = ms.get("buy_params", {}) + exit_params = ms.get("exit_params", {}) + + firing: list[dict] = [] + for t in buy_targets: + try: + firing += evaluate_buy(await _build_ctx(kis, t, settings), buy_params) + except Exception: + logger.exception("buy 평가 실패 %s", t.get("ticker")) + for t in sell_targets: + try: + firing += evaluate_sell(await _build_ctx(kis, t, settings), exit_params) + except Exception: + logger.exception("sell 평가 실패 %s", t.get("ticker")) + + as_of = datetime.now(KST).isoformat(timespec="seconds") + if firing: + state.last_alert_at = as_of + logger.info("firing %d개: %s", len(firing), + [f"{f['ticker']}:{f['condition']}" for f in firing]) + try: + await nas.post_report(as_of, firing) # 빈 배열도 전송(edge clear) + except Exception: + logger.exception("report 전송 실패") + + state.session_state = "market_open" + stats.jobs_done += 1 + stats.last_job_at = as_of + + +async def monitor_loop(nas, kis, state, stats, settings) -> None: + logger.info("trade-monitor loop 시작 interval=%ds", settings.loop_interval) + while True: + try: + await run_cycle(nas, kis, state, stats, settings) + except asyncio.CancelledError: + logger.info("monitor_loop cancelled") + raise + except Exception: + logger.exception("monitor_loop iteration 실패") + await asyncio.sleep(settings.loop_interval) + + +def make_state_fn(state): + async def state_fn(redis, stats): + return state.session_state, {"last_alert_at": state.last_alert_at} + return state_fn diff --git a/services/trade-monitor/tests/test_monitor.py b/services/trade-monitor/tests/test_monitor.py new file mode 100644 index 0000000..f4843dd --- /dev/null +++ b/services/trade-monitor/tests/test_monitor.py @@ -0,0 +1,95 @@ +"""monitor.run_cycle — 게이트/필터/조립/격리.""" +from monitor import MonitorState, filter_krx, run_cycle +from config import load_settings +from _shared.heartbeat import WorkerStats + + +def test_filter_krx_keeps_only_numeric6(): + targets = [{"ticker": "005930"}, {"ticker": "AAPL"}, {"ticker": "00660"}, + {"ticker": "000660"}, {"ticker": "0059301"}] + kept = {t["ticker"] for t in filter_krx(targets)} + assert kept == {"005930", "000660"} + + +class _FakeNAS: + def __init__(self, ms): + self._ms = ms + self.reported = None + + async def get_monitor_set(self): + return self._ms + + async def post_report(self, as_of, firing): + self.reported = {"as_of": as_of, "firing": firing} + return {"new_alerts": len(firing), "cleared": 0} + + +class _FakeKIS: + def __init__(self, price=100, fail_on=None): + self._price = price + self._fail_on = fail_on or set() + + async def get_quote(self, ticker): + if ticker in self._fail_on: + raise RuntimeError("KIS down") + return {"price": self._price, "day_open": 99, "today_volume": 1000, + "as_of": "x"} + + async def get_daily_ohlcv(self, ticker, days=250): + # 정배열 + 저가 근접 → ma20_pullback 발화 유도 + return [{"open": 90, "high": 90, "low": 90, "close": 90, "volume": 1}] * 200 \ + + [{"open": 100, "high": 100, "low": 100, "close": 100, "volume": 1}] * 20 + + +async def test_closed_session_skips_kis(): + nas = _FakeNAS({"session": "closed"}) + state, stats = MonitorState(), WorkerStats() + await run_cycle(nas, _FakeKIS(), state, stats, load_settings()) + assert state.session_state == "market_closed" + assert nas.reported is None # report도 안 함 + + +async def test_non_krx_skipped_and_report_sent(): + nas = _FakeNAS({"session": "regular", + "buy_targets": [{"ticker": "AAPL", "name": "Apple"}], + "sell_targets": [], "buy_params": {}, "exit_params": {}}) + state, stats = MonitorState(), WorkerStats() + await run_cycle(nas, _FakeKIS(), state, stats, load_settings()) + assert state.session_state == "market_open" + assert nas.reported is not None + assert nas.reported["firing"] == [] # 알파벳 티커 skip → 빈 발화 + + +async def test_firing_assembled_and_last_alert_set(): + nas = _FakeNAS({"session": "regular", + "buy_targets": [{"ticker": "005930", "name": "삼성전자"}], + "sell_targets": [], "buy_params": {"pullback_pct": 0.02}, + "exit_params": {}}) + state, stats = MonitorState(), WorkerStats() + await run_cycle(nas, _FakeKIS(price=101), state, stats, load_settings()) + conds = {f["condition"] for f in nas.reported["firing"]} + assert "buy_ma20_pullback" in conds + assert state.last_alert_at is not None + + +async def test_per_ticker_failure_isolated(): + nas = _FakeNAS({"session": "regular", + "buy_targets": [{"ticker": "005930"}, {"ticker": "000660"}], + "sell_targets": [], "buy_params": {}, "exit_params": {}}) + state, stats = MonitorState(), WorkerStats() + # 005930은 실패, 000660은 성공 → 루프가 죽지 않고 report 전송 + await run_cycle(nas, _FakeKIS(fail_on={"005930"}), state, stats, load_settings()) + assert nas.reported is not None + assert state.session_state == "market_open" + + +async def test_monitor_set_failure_sets_idle(): + class _BadNAS(_FakeNAS): + async def get_monitor_set(self): + raise RuntimeError("NAS down") + + nas = _BadNAS({}) + state, stats = MonitorState(), WorkerStats() + await run_cycle(nas, _FakeKIS(), state, stats, load_settings()) + assert state.session_state == "idle" + assert nas.reported is None