From 4d837fdd31fba72d4f9f5d00c32400adae91e455 Mon Sep 17 00:00:00 2001 From: gahusb Date: Tue, 19 May 2026 08:41:15 +0900 Subject: [PATCH] =?UTF-8?q?feat(video-render):=20worker.py=20=E2=80=94=20R?= =?UTF-8?q?edis=20BLPOP=20+=204=20job=5Ftype=20dispatch=20(SP-7)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit queue:video-render BLPOP, queue:paused 체크 후 dispatch. string-based _DISPATCH_TABLE + getattr (테스트 patch 호환, Plan-B-Music 패턴). AttributeError 가드 포함. asyncio.to_thread로 sync provider wrap. 4 job_type: sora/veo/kling/seedance _generation. Plan-B-Video Phase 2. Co-Authored-By: Claude Opus 4.7 (1M context) --- services/video-render/tests/test_worker.py | 43 ++++++++++++ services/video-render/worker.py | 80 ++++++++++++++++++++++ 2 files changed, 123 insertions(+) create mode 100644 services/video-render/tests/test_worker.py create mode 100644 services/video-render/worker.py diff --git a/services/video-render/tests/test_worker.py b/services/video-render/tests/test_worker.py new file mode 100644 index 0000000..2a5e762 --- /dev/null +++ b/services/video-render/tests/test_worker.py @@ -0,0 +1,43 @@ +"""worker.py — job_type 디스패처 (4 provider).""" +import pytest +from unittest.mock import patch + +import worker + + +def test_dispatch_sora_calls_run_sora_generation(): + payload = {"task_id": "t1", "job_type": "sora_generation", "params": {"prompt": "x"}} + with patch("worker.run_sora_generation") as m: + worker._dispatch(payload) + m.assert_called_once_with("t1", {"prompt": "x"}) + + +def test_dispatch_veo_calls_run_veo_generation(): + payload = {"task_id": "t2", "job_type": "veo_generation", "params": {"prompt": "x"}} + with patch("worker.run_veo_generation") as m: + worker._dispatch(payload) + m.assert_called_once_with("t2", {"prompt": "x"}) + + +def test_dispatch_kling_calls_run_kling_generation(): + payload = {"task_id": "t3", "job_type": "kling_generation", "params": {"prompt": "x"}} + with patch("worker.run_kling_generation") as m: + worker._dispatch(payload) + m.assert_called_once_with("t3", {"prompt": "x"}) + + +def test_dispatch_seedance_calls_run_seedance_generation(): + payload = {"task_id": "t4", "job_type": "seedance_generation", "params": {"prompt": "x"}} + with patch("worker.run_seedance_generation") as m: + worker._dispatch(payload) + m.assert_called_once_with("t4", {"prompt": "x"}) + + +def test_dispatch_unknown_job_type_logs_error(): + payload = {"task_id": "t5", "job_type": "weird_type", "params": {}} + with patch("worker.webhook_update_task") as m: + worker._dispatch(payload) + m.assert_called_once() + args = m.call_args[0] + assert args[0] == "t5" + assert args[1] == "failed" diff --git a/services/video-render/worker.py b/services/video-render/worker.py new file mode 100644 index 0000000..ec32995 --- /dev/null +++ b/services/video-render/worker.py @@ -0,0 +1,80 @@ +"""Redis BLPOP worker — queue:video-render → job_type 디스패치 → NAS webhook. + +queue:paused 가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set). +Plan-B-Music worker.py 패턴 — string-based dispatch + getattr (테스트 patch 호환). +""" +from __future__ import annotations + +import asyncio +import json +import logging +import os +import sys + +import redis.asyncio as aioredis + +from nas_client import webhook_update_task +from providers.sora import run_sora_generation +from providers.veo import run_veo_generation +from providers.kling import run_kling_generation +from providers.seedance import run_seedance_generation + +logger = logging.getLogger(__name__) + +REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379") +QUEUE_KEY = "queue:video-render" +PAUSED_KEY = "queue:paused" + +# string names so `unittest.mock.patch` on `worker.` is correctly intercepted +_DISPATCH_TABLE = { + "sora_generation": "run_sora_generation", + "veo_generation": "run_veo_generation", + "kling_generation": "run_kling_generation", + "seedance_generation": "run_seedance_generation", +} + + +def _dispatch(payload: dict) -> None: + """payload[job_type] → provider 함수 호출 (sync, worker_loop에서 asyncio.to_thread로 wrap).""" + job_type = payload.get("job_type", "") + task_id = payload.get("task_id", "") + params = payload.get("params", {}) + fn_name = _DISPATCH_TABLE.get(job_type) + if fn_name is None: + logger.error("unknown job_type=%s task=%s", job_type, task_id) + webhook_update_task(task_id, "failed", 0, "", error=f"unknown job_type: {job_type}") + return + try: + fn = getattr(sys.modules[__name__], fn_name) + except AttributeError: + logger.error("dispatch table typo for job_type=%s name=%s task=%s", job_type, fn_name, task_id) + webhook_update_task(task_id, "failed", 0, "", error=f"internal dispatch error: {fn_name}") + return + fn(task_id, params) + + +async def worker_loop(): + redis = aioredis.from_url(REDIS_URL, decode_responses=False) + logger.info("video-render worker started (queue=%s)", QUEUE_KEY) + while True: + try: + paused = await redis.get(PAUSED_KEY) + if paused == b"1": + await asyncio.sleep(10) + continue + item = await redis.blpop(QUEUE_KEY, timeout=1) + if item is None: + continue + _, raw = item + try: + payload = json.loads(raw) + except json.JSONDecodeError: + logger.error("invalid queue payload: %r", raw[:200]) + continue + await asyncio.to_thread(_dispatch, payload) + except asyncio.CancelledError: + logger.info("worker_loop cancelled") + raise + except Exception: + logger.exception("worker_loop iteration 실패, 5초 후 재시도") + await asyncio.sleep(5)