- watcher_loop 에서 mode 판정 직후 worker:task-watcher:heartbeat SET EX 45
- payload: build_payload(state=mode, extra={"mode": mode})
- LOOP_INTERVAL 30s < TTL 45s → 만료 전 주기적 갱신
- conftest.py 추가: services/ 를 sys.path에 주입해 _shared import 가능
- tests/test_watcher.py: payload kind/state/mode 필드 검증 (1 passed)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
72 lines
2.4 KiB
Python
72 lines
2.4 KiB
Python
"""30초마다 current_mode 판정 → queue:paused 토글.
|
|
|
|
trading → SET queue:paused 1 EX 600 (10분 TTL — watcher 죽어도 자동 해제)
|
|
free → DEL queue:paused
|
|
holidays는 1시간마다 refresh (매 loop fetch 부하 회피).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import datetime as dt
|
|
import logging
|
|
import os
|
|
from zoneinfo import ZoneInfo
|
|
|
|
import redis.asyncio as aioredis
|
|
|
|
from mode import current_mode, fetch_holidays, KST
|
|
from _shared.heartbeat import build_payload, WorkerStats
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
|
|
PAUSED_KEY = "queue:paused"
|
|
LOOP_INTERVAL = 30 # 초
|
|
HOLIDAYS_REFRESH = 3600 # 1시간
|
|
PAUSED_TTL = 600 # 10분 (watcher 죽어도 자동 해제)
|
|
HEARTBEAT_KEY = "worker:task-watcher:heartbeat"
|
|
HEARTBEAT_TTL = 45 # LOOP_INTERVAL 30s < TTL 45s → 만료 전 갱신
|
|
|
|
_HB_STATS = WorkerStats()
|
|
|
|
|
|
async def watcher_loop():
|
|
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
|
|
holidays = fetch_holidays()
|
|
last_holiday_refresh = dt.datetime.now(KST)
|
|
last_mode = None
|
|
logger.info("task-watcher started (trading window 토글)")
|
|
|
|
while True:
|
|
try:
|
|
now = dt.datetime.now(KST)
|
|
# holidays 주기적 refresh
|
|
if (now - last_holiday_refresh).total_seconds() >= HOLIDAYS_REFRESH:
|
|
holidays = fetch_holidays()
|
|
last_holiday_refresh = now
|
|
|
|
mode = current_mode(now, holidays)
|
|
if mode == "trading":
|
|
await redis.set(PAUSED_KEY, b"1", ex=PAUSED_TTL)
|
|
else:
|
|
await redis.delete(PAUSED_KEY)
|
|
|
|
# heartbeat (LOOP_INTERVAL=30s < TTL 45s → 만료 전 갱신)
|
|
await redis.set(
|
|
HEARTBEAT_KEY,
|
|
build_payload("task-watcher", "watcher", mode, _HB_STATS, extra={"mode": mode}),
|
|
ex=HEARTBEAT_TTL,
|
|
)
|
|
|
|
if mode != last_mode:
|
|
logger.info("mode 전환: %s → %s (paused=%s)", last_mode, mode, mode == "trading")
|
|
last_mode = mode
|
|
|
|
await asyncio.sleep(LOOP_INTERVAL)
|
|
except asyncio.CancelledError:
|
|
logger.info("watcher_loop cancelled")
|
|
raise
|
|
except Exception:
|
|
logger.exception("watcher_loop iteration 실패, 30초 후 재시도")
|
|
await asyncio.sleep(LOOP_INTERVAL)
|