Files
ai-trade/services/task-watcher/watcher.py
gahusb 574b5712c3 feat(task-watcher): heartbeat 발신 (state=mode, paused 이유 노출)
- watcher_loop 에서 mode 판정 직후 worker:task-watcher:heartbeat SET EX 45
- payload: build_payload(state=mode, extra={"mode": mode})
- LOOP_INTERVAL 30s < TTL 45s → 만료 전 주기적 갱신
- conftest.py 추가: services/ 를 sys.path에 주입해 _shared import 가능
- tests/test_watcher.py: payload kind/state/mode 필드 검증 (1 passed)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-07-01 00:59:28 +09:00

72 lines
2.4 KiB
Python

"""30초마다 current_mode 판정 → queue:paused 토글.
trading → SET queue:paused 1 EX 600 (10분 TTL — watcher 죽어도 자동 해제)
free → DEL queue:paused
holidays는 1시간마다 refresh (매 loop fetch 부하 회피).
"""
from __future__ import annotations
import asyncio
import datetime as dt
import logging
import os
from zoneinfo import ZoneInfo
import redis.asyncio as aioredis
from mode import current_mode, fetch_holidays, KST
from _shared.heartbeat import build_payload, WorkerStats
logger = logging.getLogger(__name__)
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
PAUSED_KEY = "queue:paused"
LOOP_INTERVAL = 30 # 초
HOLIDAYS_REFRESH = 3600 # 1시간
PAUSED_TTL = 600 # 10분 (watcher 죽어도 자동 해제)
HEARTBEAT_KEY = "worker:task-watcher:heartbeat"
HEARTBEAT_TTL = 45 # LOOP_INTERVAL 30s < TTL 45s → 만료 전 갱신
_HB_STATS = WorkerStats()
async def watcher_loop():
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
holidays = fetch_holidays()
last_holiday_refresh = dt.datetime.now(KST)
last_mode = None
logger.info("task-watcher started (trading window 토글)")
while True:
try:
now = dt.datetime.now(KST)
# holidays 주기적 refresh
if (now - last_holiday_refresh).total_seconds() >= HOLIDAYS_REFRESH:
holidays = fetch_holidays()
last_holiday_refresh = now
mode = current_mode(now, holidays)
if mode == "trading":
await redis.set(PAUSED_KEY, b"1", ex=PAUSED_TTL)
else:
await redis.delete(PAUSED_KEY)
# heartbeat (LOOP_INTERVAL=30s < TTL 45s → 만료 전 갱신)
await redis.set(
HEARTBEAT_KEY,
build_payload("task-watcher", "watcher", mode, _HB_STATS, extra={"mode": mode}),
ex=HEARTBEAT_TTL,
)
if mode != last_mode:
logger.info("mode 전환: %s%s (paused=%s)", last_mode, mode, mode == "trading")
last_mode = mode
await asyncio.sleep(LOOP_INTERVAL)
except asyncio.CancelledError:
logger.info("watcher_loop cancelled")
raise
except Exception:
logger.exception("watcher_loop iteration 실패, 30초 후 재시도")
await asyncio.sleep(LOOP_INTERVAL)