From 574b5712c308d89222f2913c345401ac0cf15411 Mon Sep 17 00:00:00 2001 From: gahusb Date: Wed, 1 Jul 2026 00:59:28 +0900 Subject: [PATCH] =?UTF-8?q?feat(task-watcher):=20heartbeat=20=EB=B0=9C?= =?UTF-8?q?=EC=8B=A0=20(state=3Dmode,=20paused=20=EC=9D=B4=EC=9C=A0=20?= =?UTF-8?q?=EB=85=B8=EC=B6=9C)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - watcher_loop 에서 mode 판정 직후 worker:task-watcher:heartbeat SET EX 45 - payload: build_payload(state=mode, extra={"mode": mode}) - LOOP_INTERVAL 30s < TTL 45s → 만료 전 주기적 갱신 - conftest.py 추가: services/ 를 sys.path에 주입해 _shared import 가능 - tests/test_watcher.py: payload kind/state/mode 필드 검증 (1 passed) Co-Authored-By: Claude Sonnet 4.6 --- services/task-watcher/conftest.py | 5 +++++ services/task-watcher/tests/test_watcher.py | 16 ++++++++++++++++ services/task-watcher/watcher.py | 12 ++++++++++++ 3 files changed, 33 insertions(+) create mode 100644 services/task-watcher/conftest.py create mode 100644 services/task-watcher/tests/test_watcher.py diff --git a/services/task-watcher/conftest.py b/services/task-watcher/conftest.py new file mode 100644 index 0000000..1f1d5de --- /dev/null +++ b/services/task-watcher/conftest.py @@ -0,0 +1,5 @@ +"""Make services/ root importable so `from _shared.heartbeat import ...` works during tests.""" +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) diff --git a/services/task-watcher/tests/test_watcher.py b/services/task-watcher/tests/test_watcher.py new file mode 100644 index 0000000..64fed08 --- /dev/null +++ b/services/task-watcher/tests/test_watcher.py @@ -0,0 +1,16 @@ +"""task-watcher heartbeat payload — state=mode + mode 필드 검증.""" +import json + +from _shared.heartbeat import build_payload, WorkerStats + + +def test_watcher_heartbeat_payload_carries_mode(): + payload = json.loads( + build_payload( + "task-watcher", "watcher", "trading", + WorkerStats(), extra={"mode": "trading"}, + ) + ) + assert payload["kind"] == "watcher" + assert payload["state"] == "trading" + assert payload["mode"] == "trading" diff --git a/services/task-watcher/watcher.py b/services/task-watcher/watcher.py index 15accc1..f3f9463 100644 --- a/services/task-watcher/watcher.py +++ b/services/task-watcher/watcher.py @@ -15,6 +15,7 @@ from zoneinfo import ZoneInfo import redis.asyncio as aioredis from mode import current_mode, fetch_holidays, KST +from _shared.heartbeat import build_payload, WorkerStats logger = logging.getLogger(__name__) @@ -23,6 +24,10 @@ PAUSED_KEY = "queue:paused" LOOP_INTERVAL = 30 # 초 HOLIDAYS_REFRESH = 3600 # 1시간 PAUSED_TTL = 600 # 10분 (watcher 죽어도 자동 해제) +HEARTBEAT_KEY = "worker:task-watcher:heartbeat" +HEARTBEAT_TTL = 45 # LOOP_INTERVAL 30s < TTL 45s → 만료 전 갱신 + +_HB_STATS = WorkerStats() async def watcher_loop(): @@ -46,6 +51,13 @@ async def watcher_loop(): else: await redis.delete(PAUSED_KEY) + # heartbeat (LOOP_INTERVAL=30s < TTL 45s → 만료 전 갱신) + await redis.set( + HEARTBEAT_KEY, + build_payload("task-watcher", "watcher", mode, _HB_STATS, extra={"mode": mode}), + ex=HEARTBEAT_TTL, + ) + if mode != last_mode: logger.info("mode 전환: %s → %s (paused=%s)", last_mode, mode, mode == "trading") last_mode = mode