diff --git a/services/docker-compose.yml b/services/docker-compose.yml index 165982f..ce891da 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -27,7 +27,8 @@ services: music-render: build: - context: ./music-render + context: . + dockerfile: music-render/Dockerfile container_name: music-render restart: unless-stopped ports: diff --git a/services/music-render/Dockerfile b/services/music-render/Dockerfile index d643423..0d2c4b4 100644 --- a/services/music-render/Dockerfile +++ b/services/music-render/Dockerfile @@ -8,10 +8,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ ca-certificates \ && rm -rf /var/lib/apt/lists/* -COPY requirements.txt . +COPY music-render/requirements.txt /app/ RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt -COPY . . +# F6: 공통 ReliableQueue 모듈 (services/_shared) +COPY _shared /app/_shared +COPY music-render/. /app/ +ENV PYTHONPATH=/app EXPOSE 8000 CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"] diff --git a/services/music-render/conftest.py b/services/music-render/conftest.py new file mode 100644 index 0000000..b2e8533 --- /dev/null +++ b/services/music-render/conftest.py @@ -0,0 +1,5 @@ +"""Make services/ root importable so `from _shared.reliable_queue import ...` works during tests.""" +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) diff --git a/services/music-render/tests/test_worker.py b/services/music-render/tests/test_worker.py index d1fd189..8c8193b 100644 --- a/services/music-render/tests/test_worker.py +++ b/services/music-render/tests/test_worker.py @@ -107,3 +107,63 @@ def test_dispatch_add_instrumental_calls_run_add_instrumental(): with patch("worker.run_add_instrumental") as m: worker._dispatch(payload) m.assert_called_once_with("t13", {"upload_url": "u"}) + + +# ----- F6: ReliableQueue poll_once ----- + +from unittest.mock import AsyncMock + + +@pytest.mark.asyncio +async def test_poll_once_acks_on_success(monkeypatch): + """F6 — _dispatch 정상 return → queue.ack(raw).""" + payload = {"task_id": "t1", "job_type": "suno_generation", "params": {}} + raw = json.dumps(payload).encode() + fake_queue = AsyncMock() + fake_queue.dequeue = AsyncMock(return_value=(payload, raw)) + fake_queue.ack = AsyncMock() + fake_queue.fail = AsyncMock() + + monkeypatch.setattr(worker, "_dispatch", MagicMock()) + + handled = await worker.poll_once(fake_queue) + assert handled is True + fake_queue.ack.assert_awaited_once_with(raw) + fake_queue.fail.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_poll_once_calls_fail_on_dispatch_exception(monkeypatch): + """F6 — _dispatch unhandled exception → queue.fail(raw, payload).""" + payload = {"task_id": "t2", "job_type": "suno_generation", "params": {}} + raw = json.dumps(payload).encode() + fake_queue = AsyncMock() + fake_queue.dequeue = AsyncMock(return_value=(payload, raw)) + fake_queue.ack = AsyncMock() + fake_queue.fail = AsyncMock() + + def _boom(p): + raise RuntimeError("dispatch crash") + + monkeypatch.setattr(worker, "_dispatch", _boom) + + handled = await worker.poll_once(fake_queue) + assert handled is True + fake_queue.fail.assert_awaited_once_with(raw, payload) + fake_queue.ack.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_poll_once_returns_false_on_timeout(monkeypatch): + fake_queue = AsyncMock() + fake_queue.dequeue = AsyncMock(return_value=None) + fake_queue.ack = AsyncMock() + fake_queue.fail = AsyncMock() + dispatch_mock = MagicMock() + monkeypatch.setattr(worker, "_dispatch", dispatch_mock) + + handled = await worker.poll_once(fake_queue) + assert handled is False + dispatch_mock.assert_not_called() + fake_queue.ack.assert_not_awaited() + fake_queue.fail.assert_not_awaited() diff --git a/services/music-render/worker.py b/services/music-render/worker.py index c75b8bc..b57807f 100644 --- a/services/music-render/worker.py +++ b/services/music-render/worker.py @@ -1,4 +1,4 @@ -"""Redis BLPOP worker — queue:music-render → job_type 디스패치 → NAS webhook. +"""Redis ReliableQueue worker — F6 신뢰성 패턴 (BLMOVE + ack/fail + recovery). queue:paused 가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set). """ @@ -20,6 +20,7 @@ from providers.suno import ( run_add_instrumental, run_video_generate, ) from providers.local import run_local_generation +from _shared.reliable_queue import ReliableQueue logger = logging.getLogger(__name__) @@ -67,26 +68,44 @@ def _dispatch(payload: dict) -> None: fn(task_id, params) +async def poll_once(queue: ReliableQueue) -> bool: + """F6 — 1 cycle: dequeue → _dispatch → ack/fail. Returns True if a job handled.""" + result = await queue.dequeue(timeout=5) + if result is None: + return False + payload, raw = result + try: + # sync provider 함수 — thread로 실행해서 이벤트 루프 블로킹 방지 + await asyncio.to_thread(_dispatch, payload) + except Exception: + logger.exception("dispatch unhandled exception task_id=%s", + payload.get("task_id")) + await queue.fail(raw, payload) + return True + await queue.ack(raw) + return True + + async def worker_loop(): redis = aioredis.from_url(REDIS_URL, decode_responses=False) - logger.info("music-render worker started (queue=%s)", QUEUE_KEY) + queue = ReliableQueue(redis, queue_key=QUEUE_KEY) + logger.info("music-render worker started worker_id=%s queue=%s", + queue.worker_id, QUEUE_KEY) + # F6: startup recovery + try: + recovered = await queue.recover() + if recovered: + logger.info("recovered %d orphaned items at startup", recovered) + except Exception: + logger.exception("startup recover failed") + while True: try: paused = await redis.get(PAUSED_KEY) if paused == b"1": await asyncio.sleep(10) continue - item = await redis.blpop(QUEUE_KEY, timeout=1) - if item is None: - continue - _, raw = item - try: - payload = json.loads(raw) - except json.JSONDecodeError: - logger.error("invalid queue payload: %r", raw[:200]) - continue - # sync provider 함수 — thread로 실행해서 이벤트 루프 블로킹 방지 - await asyncio.to_thread(_dispatch, payload) + await poll_once(queue) except asyncio.CancelledError: logger.info("worker_loop cancelled") raise