Files
ai-trade/services/music-render/worker.py
gahusb 44bc065796 fix(music-render): handle AttributeError on dispatch typo (T8 follow-up)
Code review found: getattr(sys.modules[__name__], fn_name) raises
AttributeError if a dispatch table string entry is a typo. Now caught
and reported via webhook_update_task as 'internal dispatch error'.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 05:03:48 +09:00

96 lines
3.4 KiB
Python

"""Redis BLPOP worker — queue:music-render → job_type 디스패치 → NAS webhook.
queue:paused 가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set).
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
from typing import Any
import redis.asyncio as aioredis
from nas_client import webhook_update_task
from providers.suno import (
run_suno_generation, run_suno_extend, run_vocal_removal,
run_cover_image, run_wav_convert, run_stem_split,
run_upload_cover, run_upload_extend, run_add_vocals,
run_add_instrumental, run_video_generate,
)
from providers.local import run_local_generation
logger = logging.getLogger(__name__)
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
QUEUE_KEY = "queue:music-render"
PAUSED_KEY = "queue:paused"
# Maps job_type → module-level function name (string).
# _dispatch resolves the name via globals() at call time so unittest.mock.patch
# on "worker.<name>" is correctly intercepted.
_DISPATCH_TABLE: dict[str, str] = {
"suno_generation": "run_suno_generation",
"local_generation": "run_local_generation",
"suno_extend": "run_suno_extend",
"vocal_removal": "run_vocal_removal",
"cover_image": "run_cover_image",
"wav_convert": "run_wav_convert",
"stem_split": "run_stem_split",
"upload_cover": "run_upload_cover",
"upload_extend": "run_upload_extend",
"add_vocals": "run_add_vocals",
"add_instrumental": "run_add_instrumental",
"video_generate": "run_video_generate",
}
def _dispatch(payload: dict) -> None:
"""payload[job_type] → provider 함수 호출 (sync, asyncio.to_thread로 래핑)."""
import sys
_self = sys.modules[__name__]
job_type = payload.get("job_type", "")
task_id = payload.get("task_id", "")
params = payload.get("params", {})
fn_name = _DISPATCH_TABLE.get(job_type)
if fn_name is None:
logger.error("unknown job_type=%s task=%s", job_type, task_id)
webhook_update_task(task_id, "failed", 0, "", error=f"unknown job_type: {job_type}")
return
try:
fn = getattr(_self, fn_name)
except AttributeError:
logger.error("dispatch table typo for job_type=%s name=%s task=%s", job_type, fn_name, task_id)
webhook_update_task(task_id, "failed", 0, "", error=f"internal dispatch error: {fn_name}")
return
fn(task_id, params)
async def worker_loop():
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
logger.info("music-render worker started (queue=%s)", QUEUE_KEY)
while True:
try:
paused = await redis.get(PAUSED_KEY)
if paused == b"1":
await asyncio.sleep(10)
continue
item = await redis.blpop(QUEUE_KEY, timeout=1)
if item is None:
continue
_, raw = item
try:
payload = json.loads(raw)
except json.JSONDecodeError:
logger.error("invalid queue payload: %r", raw[:200])
continue
# sync provider 함수 — thread로 실행해서 이벤트 루프 블로킹 방지
await asyncio.to_thread(_dispatch, payload)
except asyncio.CancelledError:
logger.info("worker_loop cancelled")
raise
except Exception:
logger.exception("worker_loop iteration 실패, 5초 후 재시도")
await asyncio.sleep(5)