- watcher_loop 에서 mode 판정 직후 worker:task-watcher:heartbeat SET EX 45
- payload: build_payload(state=mode, extra={"mode": mode})
- LOOP_INTERVAL 30s < TTL 45s → 만료 전 주기적 갱신
- conftest.py 추가: services/ 를 sys.path에 주입해 _shared import 가능
- tests/test_watcher.py: payload kind/state/mode 필드 검증 (1 passed)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
17 lines
487 B
Python
17 lines
487 B
Python
"""task-watcher heartbeat payload — state=mode + mode 필드 검증."""
|
|
import json
|
|
|
|
from _shared.heartbeat import build_payload, WorkerStats
|
|
|
|
|
|
def test_watcher_heartbeat_payload_carries_mode():
|
|
payload = json.loads(
|
|
build_payload(
|
|
"task-watcher", "watcher", "trading",
|
|
WorkerStats(), extra={"mode": "trading"},
|
|
)
|
|
)
|
|
assert payload["kind"] == "watcher"
|
|
assert payload["state"] == "trading"
|
|
assert payload["mode"] == "trading"
|