diff --git a/services/docker-compose.yml b/services/docker-compose.yml index 03f6fcf..165982f 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -3,7 +3,8 @@ name: web-ai-services services: insta-render: build: - context: ./insta-render + context: . + dockerfile: insta-render/Dockerfile container_name: insta-render restart: unless-stopped ports: diff --git a/services/insta-render/Dockerfile b/services/insta-render/Dockerfile index e474347..60b80e6 100644 --- a/services/insta-render/Dockerfile +++ b/services/insta-render/Dockerfile @@ -12,11 +12,14 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ libcairo2 libasound2 libatspi2.0-0 \ && rm -rf /var/lib/apt/lists/* -COPY requirements.txt . +COPY insta-render/requirements.txt /app/ RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt RUN playwright install chromium -COPY . . +# F6: 공통 ReliableQueue 모듈 (services/_shared) +COPY _shared /app/_shared +COPY insta-render/. /app/ +ENV PYTHONPATH=/app EXPOSE 8000 CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"] diff --git a/services/insta-render/conftest.py b/services/insta-render/conftest.py new file mode 100644 index 0000000..b2e8533 --- /dev/null +++ b/services/insta-render/conftest.py @@ -0,0 +1,5 @@ +"""Make services/ root importable so `from _shared.reliable_queue import ...` works during tests.""" +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) diff --git a/services/insta-render/tests/test_worker.py b/services/insta-render/tests/test_worker.py index 1fa4a26..4c4f89a 100644 --- a/services/insta-render/tests/test_worker.py +++ b/services/insta-render/tests/test_worker.py @@ -112,11 +112,88 @@ async def test_process_one_render_failure_reports_failed(monkeypatch, fake_slate worker.NAS_BASE_URL = "http://nas.test" async with httpx.AsyncClient() as client: - await worker._process_one(client, { - "task_id": "t-3", - "params": {"slate_id": 99}, - }) + # F6: _process_one은 webhook(failed) 호출 후 raise — poll_once가 fail(raw)로 retry/dead-letter. + with pytest.raises(RuntimeError, match="Chromium"): + await worker._process_one(client, { + "task_id": "t-3", + "params": {"slate_id": 99}, + }) last = calls[-1] assert last["status"] == "failed" assert "Chromium" in last["error"] + + +# ----- F6: ReliableQueue (ack on success, fail on exception) ----- + +@pytest.mark.asyncio +async def test_poll_once_acks_on_success(monkeypatch): + """F6 — 성공 시 queue.ack(raw) 호출 + fail 안 부름.""" + fake_payload = { + "task_id": "t-ok", + "params": {"slate_id": 7, "theme": "default"}, + } + fake_raw = json.dumps(fake_payload).encode() + + fake_queue = AsyncMock() + fake_queue.dequeue = AsyncMock(return_value=(fake_payload, fake_raw)) + fake_queue.ack = AsyncMock() + fake_queue.fail = AsyncMock() + + process_mock = AsyncMock() + monkeypatch.setattr(worker, "_process_one", process_mock) + + async with httpx.AsyncClient() as client: + handled = await worker.poll_once(fake_queue, client) + + assert handled is True + process_mock.assert_awaited_once() + fake_queue.ack.assert_awaited_once_with(fake_raw) + fake_queue.fail.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_poll_once_calls_fail_on_exception(monkeypatch): + """F6 — _process_one 예외 시 queue.fail(raw, payload) 호출.""" + fake_payload = { + "task_id": "t-err", + "params": {"slate_id": 9, "theme": "default"}, + } + fake_raw = json.dumps(fake_payload).encode() + + fake_queue = AsyncMock() + fake_queue.dequeue = AsyncMock(return_value=(fake_payload, fake_raw)) + fake_queue.ack = AsyncMock() + fake_queue.fail = AsyncMock() + + async def boom(client, payload): + raise RuntimeError("simulated dispatch failure") + + monkeypatch.setattr(worker, "_process_one", boom) + + async with httpx.AsyncClient() as client: + handled = await worker.poll_once(fake_queue, client) + + assert handled is True + fake_queue.fail.assert_awaited_once_with(fake_raw, fake_payload) + fake_queue.ack.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_poll_once_returns_false_on_timeout(monkeypatch): + """F6 — dequeue가 None 반환(타임아웃)이면 False 리턴, ack/fail 안 부름.""" + fake_queue = AsyncMock() + fake_queue.dequeue = AsyncMock(return_value=None) + fake_queue.ack = AsyncMock() + fake_queue.fail = AsyncMock() + + process_mock = AsyncMock() + monkeypatch.setattr(worker, "_process_one", process_mock) + + async with httpx.AsyncClient() as client: + handled = await worker.poll_once(fake_queue, client) + + assert handled is False + process_mock.assert_not_awaited() + fake_queue.ack.assert_not_awaited() + fake_queue.fail.assert_not_awaited() diff --git a/services/insta-render/worker.py b/services/insta-render/worker.py index 08dfe8e..b151a66 100644 --- a/services/insta-render/worker.py +++ b/services/insta-render/worker.py @@ -1,11 +1,10 @@ -"""Redis BLPOP worker — queue:insta-render → render_slate → NAS webhook. +"""Redis ReliableQueue worker — F6 신뢰성 패턴 (BLMOVE + ack/fail + recovery). queue:paused가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set). """ from __future__ import annotations import asyncio -import json import logging import os from typing import Any @@ -14,6 +13,7 @@ import httpx import redis.asyncio as aioredis from card_renderer import render_slate +from _shared.reliable_queue import ReliableQueue logger = logging.getLogger(__name__) @@ -57,7 +57,10 @@ async def _fetch_slate(client: httpx.AsyncClient, slate_id: int) -> dict: async def _process_one(client: httpx.AsyncClient, payload: dict) -> None: - """단일 작업 처리: fetch slate → render → webhook.""" + """단일 작업 처리: fetch slate → render → webhook. 예외 발생 시 webhook(failed) 호출 후 raise. + + F6: webhook 통신 외 예외는 poll_once가 fail(raw, payload)로 retry/dead-letter 처리. + """ task_id = payload["task_id"] params = payload.get("params", {}) slate_id = params.get("slate_id") @@ -69,7 +72,6 @@ async def _process_one(client: httpx.AsyncClient, payload: dict) -> None: slate = await _fetch_slate(client, slate_id) await _post_update(client, task_id, "processing", 50) paths = await render_slate(slate, slate_id, template=template) - # 결과 URL은 첫 페이지의 nginx 경로 first_url = f"{INSTA_MEDIA_URL_PREFIX}/{slate_id}/01.png" await _post_update( client, task_id, "succeeded", 100, result_path=first_url @@ -78,29 +80,46 @@ async def _process_one(client: httpx.AsyncClient, payload: dict) -> None: except Exception as e: logger.exception("render task=%s 실패", task_id) await _post_update(client, task_id, "failed", 0, error=str(e)) + raise + + +async def poll_once(queue: ReliableQueue, client: httpx.AsyncClient) -> bool: + """1 cycle: dequeue → _process_one → ack/fail. Returns True if a job handled.""" + result = await queue.dequeue(timeout=5) + if result is None: + return False + payload, raw = result + try: + await _process_one(client, payload) + except Exception: + await queue.fail(raw, payload) + return True + await queue.ack(raw) + return True async def worker_loop(): - """무한 루프 — paused 체크 → BLPOP → process_one.""" + """무한 루프 — paused 체크 → ReliableQueue.dequeue → process_one → ack/fail.""" redis = aioredis.from_url(REDIS_URL, decode_responses=False) + queue = ReliableQueue(redis, queue_key=QUEUE_KEY) async with httpx.AsyncClient() as client: - logger.info("insta-render worker started (queue=%s)", QUEUE_KEY) + logger.info("insta-render worker started worker_id=%s queue=%s", + queue.worker_id, QUEUE_KEY) + # F6: startup recovery — 이전 crash 시 잔존 orphan 재큐 + try: + recovered = await queue.recover() + if recovered: + logger.info("recovered %d orphaned items at startup", recovered) + except Exception: + logger.exception("startup recover failed") + while True: try: paused = await redis.get(PAUSED_KEY) if paused == b"1": await asyncio.sleep(10) continue - item = await redis.blpop(QUEUE_KEY, timeout=1) - if item is None: - continue - _, raw = item - try: - payload = json.loads(raw) - except json.JSONDecodeError: - logger.error("invalid queue payload: %r", raw[:200]) - continue - await _process_one(client, payload) + await poll_once(queue, client) except asyncio.CancelledError: logger.info("worker_loop cancelled") raise