"""Redis BLPOP worker — queue:music-render → job_type 디스패치 → NAS webhook. queue:paused 가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set). """ from __future__ import annotations import asyncio import json import logging import os from typing import Any import redis.asyncio as aioredis from nas_client import webhook_update_task from providers.suno import ( run_suno_generation, run_suno_extend, run_vocal_removal, run_cover_image, run_wav_convert, run_stem_split, run_upload_cover, run_upload_extend, run_add_vocals, run_add_instrumental, run_video_generate, ) from providers.local import run_local_generation logger = logging.getLogger(__name__) REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379") QUEUE_KEY = "queue:music-render" PAUSED_KEY = "queue:paused" # Maps job_type → module-level function name (string). # _dispatch resolves the name via globals() at call time so unittest.mock.patch # on "worker." is correctly intercepted. _DISPATCH_TABLE: dict[str, str] = { "suno_generation": "run_suno_generation", "local_generation": "run_local_generation", "suno_extend": "run_suno_extend", "vocal_removal": "run_vocal_removal", "cover_image": "run_cover_image", "wav_convert": "run_wav_convert", "stem_split": "run_stem_split", "upload_cover": "run_upload_cover", "upload_extend": "run_upload_extend", "add_vocals": "run_add_vocals", "add_instrumental": "run_add_instrumental", "video_generate": "run_video_generate", } def _dispatch(payload: dict) -> None: """payload[job_type] → provider 함수 호출 (sync, asyncio.to_thread로 래핑).""" import sys _self = sys.modules[__name__] job_type = payload.get("job_type", "") task_id = payload.get("task_id", "") params = payload.get("params", {}) fn_name = _DISPATCH_TABLE.get(job_type) if fn_name is None: logger.error("unknown job_type=%s task=%s", job_type, task_id) webhook_update_task(task_id, "failed", 0, "", error=f"unknown job_type: {job_type}") return fn = getattr(_self, fn_name) fn(task_id, params) async def worker_loop(): redis = aioredis.from_url(REDIS_URL, decode_responses=False) logger.info("music-render worker started (queue=%s)", QUEUE_KEY) while True: try: paused = await redis.get(PAUSED_KEY) if paused == b"1": await asyncio.sleep(10) continue item = await redis.blpop(QUEUE_KEY, timeout=1) if item is None: continue _, raw = item try: payload = json.loads(raw) except json.JSONDecodeError: logger.error("invalid queue payload: %r", raw[:200]) continue # sync provider 함수 — thread로 실행해서 이벤트 루프 블로킹 방지 await asyncio.to_thread(_dispatch, payload) except asyncio.CancelledError: logger.info("worker_loop cancelled") raise except Exception: logger.exception("worker_loop iteration 실패, 5초 후 재시도") await asyncio.sleep(5)