fix(music-render): F6 ReliableQueue 적용 (F6 part 3)
- worker.py: poll_once 신설, BLPOP → ReliableQueue.dequeue/ack/fail + startup recovery
- 12 job_type dispatch table 보존 (기존 13 tests 그대로 PASS)
- Dockerfile: build context=services/, _shared 포함, PYTHONPATH=/app
- docker-compose.yml: music-render build context 갱신
dispatch 자체 unhandled exception 발생 시 fail(raw, payload)로 retry/dead-letter.
provider 함수가 webhook("failed")를 잡고 있는 정상 케이스는 ack (멱등 webhook).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -27,7 +27,8 @@ services:
|
||||
|
||||
music-render:
|
||||
build:
|
||||
context: ./music-render
|
||||
context: .
|
||||
dockerfile: music-render/Dockerfile
|
||||
container_name: music-render
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
|
||||
@@ -8,10 +8,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
ca-certificates \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
COPY requirements.txt .
|
||||
COPY music-render/requirements.txt /app/
|
||||
RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt
|
||||
|
||||
COPY . .
|
||||
# F6: 공통 ReliableQueue 모듈 (services/_shared)
|
||||
COPY _shared /app/_shared
|
||||
COPY music-render/. /app/
|
||||
ENV PYTHONPATH=/app
|
||||
|
||||
EXPOSE 8000
|
||||
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]
|
||||
|
||||
5
services/music-render/conftest.py
Normal file
5
services/music-render/conftest.py
Normal file
@@ -0,0 +1,5 @@
|
||||
"""Make services/ root importable so `from _shared.reliable_queue import ...` works during tests."""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
@@ -107,3 +107,63 @@ def test_dispatch_add_instrumental_calls_run_add_instrumental():
|
||||
with patch("worker.run_add_instrumental") as m:
|
||||
worker._dispatch(payload)
|
||||
m.assert_called_once_with("t13", {"upload_url": "u"})
|
||||
|
||||
|
||||
# ----- F6: ReliableQueue poll_once -----
|
||||
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_poll_once_acks_on_success(monkeypatch):
|
||||
"""F6 — _dispatch 정상 return → queue.ack(raw)."""
|
||||
payload = {"task_id": "t1", "job_type": "suno_generation", "params": {}}
|
||||
raw = json.dumps(payload).encode()
|
||||
fake_queue = AsyncMock()
|
||||
fake_queue.dequeue = AsyncMock(return_value=(payload, raw))
|
||||
fake_queue.ack = AsyncMock()
|
||||
fake_queue.fail = AsyncMock()
|
||||
|
||||
monkeypatch.setattr(worker, "_dispatch", MagicMock())
|
||||
|
||||
handled = await worker.poll_once(fake_queue)
|
||||
assert handled is True
|
||||
fake_queue.ack.assert_awaited_once_with(raw)
|
||||
fake_queue.fail.assert_not_awaited()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_poll_once_calls_fail_on_dispatch_exception(monkeypatch):
|
||||
"""F6 — _dispatch unhandled exception → queue.fail(raw, payload)."""
|
||||
payload = {"task_id": "t2", "job_type": "suno_generation", "params": {}}
|
||||
raw = json.dumps(payload).encode()
|
||||
fake_queue = AsyncMock()
|
||||
fake_queue.dequeue = AsyncMock(return_value=(payload, raw))
|
||||
fake_queue.ack = AsyncMock()
|
||||
fake_queue.fail = AsyncMock()
|
||||
|
||||
def _boom(p):
|
||||
raise RuntimeError("dispatch crash")
|
||||
|
||||
monkeypatch.setattr(worker, "_dispatch", _boom)
|
||||
|
||||
handled = await worker.poll_once(fake_queue)
|
||||
assert handled is True
|
||||
fake_queue.fail.assert_awaited_once_with(raw, payload)
|
||||
fake_queue.ack.assert_not_awaited()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_poll_once_returns_false_on_timeout(monkeypatch):
|
||||
fake_queue = AsyncMock()
|
||||
fake_queue.dequeue = AsyncMock(return_value=None)
|
||||
fake_queue.ack = AsyncMock()
|
||||
fake_queue.fail = AsyncMock()
|
||||
dispatch_mock = MagicMock()
|
||||
monkeypatch.setattr(worker, "_dispatch", dispatch_mock)
|
||||
|
||||
handled = await worker.poll_once(fake_queue)
|
||||
assert handled is False
|
||||
dispatch_mock.assert_not_called()
|
||||
fake_queue.ack.assert_not_awaited()
|
||||
fake_queue.fail.assert_not_awaited()
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
"""Redis BLPOP worker — queue:music-render → job_type 디스패치 → NAS webhook.
|
||||
"""Redis ReliableQueue worker — F6 신뢰성 패턴 (BLMOVE + ack/fail + recovery).
|
||||
|
||||
queue:paused 가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set).
|
||||
"""
|
||||
@@ -20,6 +20,7 @@ from providers.suno import (
|
||||
run_add_instrumental, run_video_generate,
|
||||
)
|
||||
from providers.local import run_local_generation
|
||||
from _shared.reliable_queue import ReliableQueue
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -67,26 +68,44 @@ def _dispatch(payload: dict) -> None:
|
||||
fn(task_id, params)
|
||||
|
||||
|
||||
async def poll_once(queue: ReliableQueue) -> bool:
|
||||
"""F6 — 1 cycle: dequeue → _dispatch → ack/fail. Returns True if a job handled."""
|
||||
result = await queue.dequeue(timeout=5)
|
||||
if result is None:
|
||||
return False
|
||||
payload, raw = result
|
||||
try:
|
||||
# sync provider 함수 — thread로 실행해서 이벤트 루프 블로킹 방지
|
||||
await asyncio.to_thread(_dispatch, payload)
|
||||
except Exception:
|
||||
logger.exception("dispatch unhandled exception task_id=%s",
|
||||
payload.get("task_id"))
|
||||
await queue.fail(raw, payload)
|
||||
return True
|
||||
await queue.ack(raw)
|
||||
return True
|
||||
|
||||
|
||||
async def worker_loop():
|
||||
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
|
||||
logger.info("music-render worker started (queue=%s)", QUEUE_KEY)
|
||||
queue = ReliableQueue(redis, queue_key=QUEUE_KEY)
|
||||
logger.info("music-render worker started worker_id=%s queue=%s",
|
||||
queue.worker_id, QUEUE_KEY)
|
||||
# F6: startup recovery
|
||||
try:
|
||||
recovered = await queue.recover()
|
||||
if recovered:
|
||||
logger.info("recovered %d orphaned items at startup", recovered)
|
||||
except Exception:
|
||||
logger.exception("startup recover failed")
|
||||
|
||||
while True:
|
||||
try:
|
||||
paused = await redis.get(PAUSED_KEY)
|
||||
if paused == b"1":
|
||||
await asyncio.sleep(10)
|
||||
continue
|
||||
item = await redis.blpop(QUEUE_KEY, timeout=1)
|
||||
if item is None:
|
||||
continue
|
||||
_, raw = item
|
||||
try:
|
||||
payload = json.loads(raw)
|
||||
except json.JSONDecodeError:
|
||||
logger.error("invalid queue payload: %r", raw[:200])
|
||||
continue
|
||||
# sync provider 함수 — thread로 실행해서 이벤트 루프 블로킹 방지
|
||||
await asyncio.to_thread(_dispatch, payload)
|
||||
await poll_once(queue)
|
||||
except asyncio.CancelledError:
|
||||
logger.info("worker_loop cancelled")
|
||||
raise
|
||||
|
||||
Reference in New Issue
Block a user