feat(task-watcher): heartbeat 발신 (state=mode, paused 이유 노출)
- watcher_loop 에서 mode 판정 직후 worker:task-watcher:heartbeat SET EX 45
- payload: build_payload(state=mode, extra={"mode": mode})
- LOOP_INTERVAL 30s < TTL 45s → 만료 전 주기적 갱신
- conftest.py 추가: services/ 를 sys.path에 주입해 _shared import 가능
- tests/test_watcher.py: payload kind/state/mode 필드 검증 (1 passed)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
5
services/task-watcher/conftest.py
Normal file
5
services/task-watcher/conftest.py
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
"""Make services/ root importable so `from _shared.heartbeat import ...` works during tests."""
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||||
16
services/task-watcher/tests/test_watcher.py
Normal file
16
services/task-watcher/tests/test_watcher.py
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
"""task-watcher heartbeat payload — state=mode + mode 필드 검증."""
|
||||||
|
import json
|
||||||
|
|
||||||
|
from _shared.heartbeat import build_payload, WorkerStats
|
||||||
|
|
||||||
|
|
||||||
|
def test_watcher_heartbeat_payload_carries_mode():
|
||||||
|
payload = json.loads(
|
||||||
|
build_payload(
|
||||||
|
"task-watcher", "watcher", "trading",
|
||||||
|
WorkerStats(), extra={"mode": "trading"},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
assert payload["kind"] == "watcher"
|
||||||
|
assert payload["state"] == "trading"
|
||||||
|
assert payload["mode"] == "trading"
|
||||||
@@ -15,6 +15,7 @@ from zoneinfo import ZoneInfo
|
|||||||
import redis.asyncio as aioredis
|
import redis.asyncio as aioredis
|
||||||
|
|
||||||
from mode import current_mode, fetch_holidays, KST
|
from mode import current_mode, fetch_holidays, KST
|
||||||
|
from _shared.heartbeat import build_payload, WorkerStats
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -23,6 +24,10 @@ PAUSED_KEY = "queue:paused"
|
|||||||
LOOP_INTERVAL = 30 # 초
|
LOOP_INTERVAL = 30 # 초
|
||||||
HOLIDAYS_REFRESH = 3600 # 1시간
|
HOLIDAYS_REFRESH = 3600 # 1시간
|
||||||
PAUSED_TTL = 600 # 10분 (watcher 죽어도 자동 해제)
|
PAUSED_TTL = 600 # 10분 (watcher 죽어도 자동 해제)
|
||||||
|
HEARTBEAT_KEY = "worker:task-watcher:heartbeat"
|
||||||
|
HEARTBEAT_TTL = 45 # LOOP_INTERVAL 30s < TTL 45s → 만료 전 갱신
|
||||||
|
|
||||||
|
_HB_STATS = WorkerStats()
|
||||||
|
|
||||||
|
|
||||||
async def watcher_loop():
|
async def watcher_loop():
|
||||||
@@ -46,6 +51,13 @@ async def watcher_loop():
|
|||||||
else:
|
else:
|
||||||
await redis.delete(PAUSED_KEY)
|
await redis.delete(PAUSED_KEY)
|
||||||
|
|
||||||
|
# heartbeat (LOOP_INTERVAL=30s < TTL 45s → 만료 전 갱신)
|
||||||
|
await redis.set(
|
||||||
|
HEARTBEAT_KEY,
|
||||||
|
build_payload("task-watcher", "watcher", mode, _HB_STATS, extra={"mode": mode}),
|
||||||
|
ex=HEARTBEAT_TTL,
|
||||||
|
)
|
||||||
|
|
||||||
if mode != last_mode:
|
if mode != last_mode:
|
||||||
logger.info("mode 전환: %s → %s (paused=%s)", last_mode, mode, mode == "trading")
|
logger.info("mode 전환: %s → %s (paused=%s)", last_mode, mode, mode == "trading")
|
||||||
last_mode = mode
|
last_mode = mode
|
||||||
|
|||||||
Reference in New Issue
Block a user