feat(video-render): worker.py — Redis BLPOP + 4 job_type dispatch (SP-7)
queue:video-render BLPOP, queue:paused 체크 후 dispatch. string-based _DISPATCH_TABLE + getattr (테스트 patch 호환, Plan-B-Music 패턴). AttributeError 가드 포함. asyncio.to_thread로 sync provider wrap. 4 job_type: sora/veo/kling/seedance _generation. Plan-B-Video Phase 2. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
43
services/video-render/tests/test_worker.py
Normal file
43
services/video-render/tests/test_worker.py
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
"""worker.py — job_type 디스패처 (4 provider)."""
|
||||||
|
import pytest
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
import worker
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_sora_calls_run_sora_generation():
|
||||||
|
payload = {"task_id": "t1", "job_type": "sora_generation", "params": {"prompt": "x"}}
|
||||||
|
with patch("worker.run_sora_generation") as m:
|
||||||
|
worker._dispatch(payload)
|
||||||
|
m.assert_called_once_with("t1", {"prompt": "x"})
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_veo_calls_run_veo_generation():
|
||||||
|
payload = {"task_id": "t2", "job_type": "veo_generation", "params": {"prompt": "x"}}
|
||||||
|
with patch("worker.run_veo_generation") as m:
|
||||||
|
worker._dispatch(payload)
|
||||||
|
m.assert_called_once_with("t2", {"prompt": "x"})
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_kling_calls_run_kling_generation():
|
||||||
|
payload = {"task_id": "t3", "job_type": "kling_generation", "params": {"prompt": "x"}}
|
||||||
|
with patch("worker.run_kling_generation") as m:
|
||||||
|
worker._dispatch(payload)
|
||||||
|
m.assert_called_once_with("t3", {"prompt": "x"})
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_seedance_calls_run_seedance_generation():
|
||||||
|
payload = {"task_id": "t4", "job_type": "seedance_generation", "params": {"prompt": "x"}}
|
||||||
|
with patch("worker.run_seedance_generation") as m:
|
||||||
|
worker._dispatch(payload)
|
||||||
|
m.assert_called_once_with("t4", {"prompt": "x"})
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_unknown_job_type_logs_error():
|
||||||
|
payload = {"task_id": "t5", "job_type": "weird_type", "params": {}}
|
||||||
|
with patch("worker.webhook_update_task") as m:
|
||||||
|
worker._dispatch(payload)
|
||||||
|
m.assert_called_once()
|
||||||
|
args = m.call_args[0]
|
||||||
|
assert args[0] == "t5"
|
||||||
|
assert args[1] == "failed"
|
||||||
80
services/video-render/worker.py
Normal file
80
services/video-render/worker.py
Normal file
@@ -0,0 +1,80 @@
|
|||||||
|
"""Redis BLPOP worker — queue:video-render → job_type 디스패치 → NAS webhook.
|
||||||
|
|
||||||
|
queue:paused 가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set).
|
||||||
|
Plan-B-Music worker.py 패턴 — string-based dispatch + getattr (테스트 patch 호환).
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
import redis.asyncio as aioredis
|
||||||
|
|
||||||
|
from nas_client import webhook_update_task
|
||||||
|
from providers.sora import run_sora_generation
|
||||||
|
from providers.veo import run_veo_generation
|
||||||
|
from providers.kling import run_kling_generation
|
||||||
|
from providers.seedance import run_seedance_generation
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
|
||||||
|
QUEUE_KEY = "queue:video-render"
|
||||||
|
PAUSED_KEY = "queue:paused"
|
||||||
|
|
||||||
|
# string names so `unittest.mock.patch` on `worker.<name>` is correctly intercepted
|
||||||
|
_DISPATCH_TABLE = {
|
||||||
|
"sora_generation": "run_sora_generation",
|
||||||
|
"veo_generation": "run_veo_generation",
|
||||||
|
"kling_generation": "run_kling_generation",
|
||||||
|
"seedance_generation": "run_seedance_generation",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _dispatch(payload: dict) -> None:
|
||||||
|
"""payload[job_type] → provider 함수 호출 (sync, worker_loop에서 asyncio.to_thread로 wrap)."""
|
||||||
|
job_type = payload.get("job_type", "")
|
||||||
|
task_id = payload.get("task_id", "")
|
||||||
|
params = payload.get("params", {})
|
||||||
|
fn_name = _DISPATCH_TABLE.get(job_type)
|
||||||
|
if fn_name is None:
|
||||||
|
logger.error("unknown job_type=%s task=%s", job_type, task_id)
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error=f"unknown job_type: {job_type}")
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
fn = getattr(sys.modules[__name__], fn_name)
|
||||||
|
except AttributeError:
|
||||||
|
logger.error("dispatch table typo for job_type=%s name=%s task=%s", job_type, fn_name, task_id)
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error=f"internal dispatch error: {fn_name}")
|
||||||
|
return
|
||||||
|
fn(task_id, params)
|
||||||
|
|
||||||
|
|
||||||
|
async def worker_loop():
|
||||||
|
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
|
||||||
|
logger.info("video-render worker started (queue=%s)", QUEUE_KEY)
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
paused = await redis.get(PAUSED_KEY)
|
||||||
|
if paused == b"1":
|
||||||
|
await asyncio.sleep(10)
|
||||||
|
continue
|
||||||
|
item = await redis.blpop(QUEUE_KEY, timeout=1)
|
||||||
|
if item is None:
|
||||||
|
continue
|
||||||
|
_, raw = item
|
||||||
|
try:
|
||||||
|
payload = json.loads(raw)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
logger.error("invalid queue payload: %r", raw[:200])
|
||||||
|
continue
|
||||||
|
await asyncio.to_thread(_dispatch, payload)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
logger.info("worker_loop cancelled")
|
||||||
|
raise
|
||||||
|
except Exception:
|
||||||
|
logger.exception("worker_loop iteration 실패, 5초 후 재시도")
|
||||||
|
await asyncio.sleep(5)
|
||||||
Reference in New Issue
Block a user