From f79c5c26dfc6af2316fa8cc447370d13bb83faec Mon Sep 17 00:00:00 2001 From: gahusb Date: Mon, 25 May 2026 20:16:01 +0900 Subject: [PATCH] =?UTF-8?q?fix(video-render):=20F6=20ReliableQueue=20?= =?UTF-8?q?=EC=A0=81=EC=9A=A9=20(F6=20part=204)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - worker.py: poll_once + ReliableQueue + startup recovery - 4 provider (sora/veo/kling/seedance) dispatch table 보존 - Dockerfile: build context=services/, _shared 포함, PYTHONPATH=/app - docker-compose.yml: video-render build context 갱신 Co-Authored-By: Claude Opus 4.7 (1M context) --- services/docker-compose.yml | 3 +- services/video-render/Dockerfile | 7 ++- services/video-render/conftest.py | 5 ++ services/video-render/tests/test_worker.py | 53 ++++++++++++++++++++++ services/video-render/worker.py | 44 ++++++++++++------ 5 files changed, 96 insertions(+), 16 deletions(-) create mode 100644 services/video-render/conftest.py diff --git a/services/docker-compose.yml b/services/docker-compose.yml index ce891da..d14d305 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -54,7 +54,8 @@ services: video-render: build: - context: ./video-render + context: . + dockerfile: video-render/Dockerfile container_name: video-render restart: unless-stopped ports: diff --git a/services/video-render/Dockerfile b/services/video-render/Dockerfile index ab72368..f7bab90 100644 --- a/services/video-render/Dockerfile +++ b/services/video-render/Dockerfile @@ -7,10 +7,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ ca-certificates \ && rm -rf /var/lib/apt/lists/* -COPY requirements.txt . +COPY video-render/requirements.txt /app/ RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt -COPY . . +# F6: 공통 ReliableQueue 모듈 (services/_shared) +COPY _shared /app/_shared +COPY video-render/. /app/ +ENV PYTHONPATH=/app EXPOSE 8000 CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"] diff --git a/services/video-render/conftest.py b/services/video-render/conftest.py new file mode 100644 index 0000000..b2e8533 --- /dev/null +++ b/services/video-render/conftest.py @@ -0,0 +1,5 @@ +"""Make services/ root importable so `from _shared.reliable_queue import ...` works during tests.""" +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) diff --git a/services/video-render/tests/test_worker.py b/services/video-render/tests/test_worker.py index 2a5e762..5c69934 100644 --- a/services/video-render/tests/test_worker.py +++ b/services/video-render/tests/test_worker.py @@ -41,3 +41,56 @@ def test_dispatch_unknown_job_type_logs_error(): args = m.call_args[0] assert args[0] == "t5" assert args[1] == "failed" + + +# ----- F6: ReliableQueue poll_once ----- + +import json +from unittest.mock import AsyncMock, MagicMock + + +@pytest.mark.asyncio +async def test_poll_once_acks_on_success(monkeypatch): + payload = {"task_id": "t1", "job_type": "sora_generation", "params": {}} + raw = json.dumps(payload).encode() + fake_queue = AsyncMock() + fake_queue.dequeue = AsyncMock(return_value=(payload, raw)) + fake_queue.ack = AsyncMock() + fake_queue.fail = AsyncMock() + monkeypatch.setattr(worker, "_dispatch", MagicMock()) + handled = await worker.poll_once(fake_queue) + assert handled is True + fake_queue.ack.assert_awaited_once_with(raw) + fake_queue.fail.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_poll_once_calls_fail_on_dispatch_exception(monkeypatch): + payload = {"task_id": "t2", "job_type": "sora_generation", "params": {}} + raw = json.dumps(payload).encode() + fake_queue = AsyncMock() + fake_queue.dequeue = AsyncMock(return_value=(payload, raw)) + fake_queue.ack = AsyncMock() + fake_queue.fail = AsyncMock() + + def _boom(p): + raise RuntimeError("dispatch crash") + + monkeypatch.setattr(worker, "_dispatch", _boom) + handled = await worker.poll_once(fake_queue) + assert handled is True + fake_queue.fail.assert_awaited_once_with(raw, payload) + fake_queue.ack.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_poll_once_returns_false_on_timeout(monkeypatch): + fake_queue = AsyncMock() + fake_queue.dequeue = AsyncMock(return_value=None) + fake_queue.ack = AsyncMock() + fake_queue.fail = AsyncMock() + monkeypatch.setattr(worker, "_dispatch", MagicMock()) + handled = await worker.poll_once(fake_queue) + assert handled is False + fake_queue.ack.assert_not_awaited() + fake_queue.fail.assert_not_awaited() diff --git a/services/video-render/worker.py b/services/video-render/worker.py index ec32995..cb2683f 100644 --- a/services/video-render/worker.py +++ b/services/video-render/worker.py @@ -1,7 +1,7 @@ -"""Redis BLPOP worker — queue:video-render → job_type 디스패치 → NAS webhook. +"""Redis ReliableQueue worker — F6 신뢰성 패턴 (BLMOVE + ack/fail + recovery). queue:paused 가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set). -Plan-B-Music worker.py 패턴 — string-based dispatch + getattr (테스트 patch 호환). +string-based dispatch + getattr (테스트 patch 호환). """ from __future__ import annotations @@ -18,6 +18,7 @@ from providers.sora import run_sora_generation from providers.veo import run_veo_generation from providers.kling import run_kling_generation from providers.seedance import run_seedance_generation +from _shared.reliable_queue import ReliableQueue logger = logging.getLogger(__name__) @@ -53,25 +54,42 @@ def _dispatch(payload: dict) -> None: fn(task_id, params) +async def poll_once(queue: ReliableQueue) -> bool: + """F6 — 1 cycle: dequeue → _dispatch → ack/fail. Returns True if a job handled.""" + result = await queue.dequeue(timeout=5) + if result is None: + return False + payload, raw = result + try: + await asyncio.to_thread(_dispatch, payload) + except Exception: + logger.exception("dispatch unhandled exception task_id=%s", + payload.get("task_id")) + await queue.fail(raw, payload) + return True + await queue.ack(raw) + return True + + async def worker_loop(): redis = aioredis.from_url(REDIS_URL, decode_responses=False) - logger.info("video-render worker started (queue=%s)", QUEUE_KEY) + queue = ReliableQueue(redis, queue_key=QUEUE_KEY) + logger.info("video-render worker started worker_id=%s queue=%s", + queue.worker_id, QUEUE_KEY) + try: + recovered = await queue.recover() + if recovered: + logger.info("recovered %d orphaned items at startup", recovered) + except Exception: + logger.exception("startup recover failed") + while True: try: paused = await redis.get(PAUSED_KEY) if paused == b"1": await asyncio.sleep(10) continue - item = await redis.blpop(QUEUE_KEY, timeout=1) - if item is None: - continue - _, raw = item - try: - payload = json.loads(raw) - except json.JSONDecodeError: - logger.error("invalid queue payload: %r", raw[:200]) - continue - await asyncio.to_thread(_dispatch, payload) + await poll_once(queue) except asyncio.CancelledError: logger.info("worker_loop cancelled") raise