fix(insta-render): F6 ReliableQueue 적용 — BLMOVE + ack/fail (F6 part 2)
- worker.py: BLPOP → ReliableQueue.dequeue / ack / fail / startup recovery - _process_one: 예외 시 webhook(failed) 후 raise — poll_once가 fail(raw, payload) 로 retry/dead-letter 처리 - poll_once 함수 추가 (테스트 단위) - Dockerfile: build context=services/ 로 올리고 _shared 포함, PYTHONPATH=/app - docker-compose.yml: insta-render build context 갱신 기존 webhook 호출 동작은 그대로 (멱등) — retry 시 매번 NAS에 failed 통보되어도 마지막 상태만 보임. dead-letter는 운영 모니터링으로 별도 처리. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -3,7 +3,8 @@ name: web-ai-services
|
||||
services:
|
||||
insta-render:
|
||||
build:
|
||||
context: ./insta-render
|
||||
context: .
|
||||
dockerfile: insta-render/Dockerfile
|
||||
container_name: insta-render
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
|
||||
@@ -12,11 +12,14 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
libcairo2 libasound2 libatspi2.0-0 \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
COPY requirements.txt .
|
||||
COPY insta-render/requirements.txt /app/
|
||||
RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt
|
||||
RUN playwright install chromium
|
||||
|
||||
COPY . .
|
||||
# F6: 공통 ReliableQueue 모듈 (services/_shared)
|
||||
COPY _shared /app/_shared
|
||||
COPY insta-render/. /app/
|
||||
ENV PYTHONPATH=/app
|
||||
|
||||
EXPOSE 8000
|
||||
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]
|
||||
|
||||
5
services/insta-render/conftest.py
Normal file
5
services/insta-render/conftest.py
Normal file
@@ -0,0 +1,5 @@
|
||||
"""Make services/ root importable so `from _shared.reliable_queue import ...` works during tests."""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
@@ -112,11 +112,88 @@ async def test_process_one_render_failure_reports_failed(monkeypatch, fake_slate
|
||||
worker.NAS_BASE_URL = "http://nas.test"
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
await worker._process_one(client, {
|
||||
"task_id": "t-3",
|
||||
"params": {"slate_id": 99},
|
||||
})
|
||||
# F6: _process_one은 webhook(failed) 호출 후 raise — poll_once가 fail(raw)로 retry/dead-letter.
|
||||
with pytest.raises(RuntimeError, match="Chromium"):
|
||||
await worker._process_one(client, {
|
||||
"task_id": "t-3",
|
||||
"params": {"slate_id": 99},
|
||||
})
|
||||
|
||||
last = calls[-1]
|
||||
assert last["status"] == "failed"
|
||||
assert "Chromium" in last["error"]
|
||||
|
||||
|
||||
# ----- F6: ReliableQueue (ack on success, fail on exception) -----
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_poll_once_acks_on_success(monkeypatch):
|
||||
"""F6 — 성공 시 queue.ack(raw) 호출 + fail 안 부름."""
|
||||
fake_payload = {
|
||||
"task_id": "t-ok",
|
||||
"params": {"slate_id": 7, "theme": "default"},
|
||||
}
|
||||
fake_raw = json.dumps(fake_payload).encode()
|
||||
|
||||
fake_queue = AsyncMock()
|
||||
fake_queue.dequeue = AsyncMock(return_value=(fake_payload, fake_raw))
|
||||
fake_queue.ack = AsyncMock()
|
||||
fake_queue.fail = AsyncMock()
|
||||
|
||||
process_mock = AsyncMock()
|
||||
monkeypatch.setattr(worker, "_process_one", process_mock)
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
handled = await worker.poll_once(fake_queue, client)
|
||||
|
||||
assert handled is True
|
||||
process_mock.assert_awaited_once()
|
||||
fake_queue.ack.assert_awaited_once_with(fake_raw)
|
||||
fake_queue.fail.assert_not_awaited()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_poll_once_calls_fail_on_exception(monkeypatch):
|
||||
"""F6 — _process_one 예외 시 queue.fail(raw, payload) 호출."""
|
||||
fake_payload = {
|
||||
"task_id": "t-err",
|
||||
"params": {"slate_id": 9, "theme": "default"},
|
||||
}
|
||||
fake_raw = json.dumps(fake_payload).encode()
|
||||
|
||||
fake_queue = AsyncMock()
|
||||
fake_queue.dequeue = AsyncMock(return_value=(fake_payload, fake_raw))
|
||||
fake_queue.ack = AsyncMock()
|
||||
fake_queue.fail = AsyncMock()
|
||||
|
||||
async def boom(client, payload):
|
||||
raise RuntimeError("simulated dispatch failure")
|
||||
|
||||
monkeypatch.setattr(worker, "_process_one", boom)
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
handled = await worker.poll_once(fake_queue, client)
|
||||
|
||||
assert handled is True
|
||||
fake_queue.fail.assert_awaited_once_with(fake_raw, fake_payload)
|
||||
fake_queue.ack.assert_not_awaited()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_poll_once_returns_false_on_timeout(monkeypatch):
|
||||
"""F6 — dequeue가 None 반환(타임아웃)이면 False 리턴, ack/fail 안 부름."""
|
||||
fake_queue = AsyncMock()
|
||||
fake_queue.dequeue = AsyncMock(return_value=None)
|
||||
fake_queue.ack = AsyncMock()
|
||||
fake_queue.fail = AsyncMock()
|
||||
|
||||
process_mock = AsyncMock()
|
||||
monkeypatch.setattr(worker, "_process_one", process_mock)
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
handled = await worker.poll_once(fake_queue, client)
|
||||
|
||||
assert handled is False
|
||||
process_mock.assert_not_awaited()
|
||||
fake_queue.ack.assert_not_awaited()
|
||||
fake_queue.fail.assert_not_awaited()
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
"""Redis BLPOP worker — queue:insta-render → render_slate → NAS webhook.
|
||||
"""Redis ReliableQueue worker — F6 신뢰성 패턴 (BLMOVE + ack/fail + recovery).
|
||||
|
||||
queue:paused가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from typing import Any
|
||||
@@ -14,6 +13,7 @@ import httpx
|
||||
import redis.asyncio as aioredis
|
||||
|
||||
from card_renderer import render_slate
|
||||
from _shared.reliable_queue import ReliableQueue
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -57,7 +57,10 @@ async def _fetch_slate(client: httpx.AsyncClient, slate_id: int) -> dict:
|
||||
|
||||
|
||||
async def _process_one(client: httpx.AsyncClient, payload: dict) -> None:
|
||||
"""단일 작업 처리: fetch slate → render → webhook."""
|
||||
"""단일 작업 처리: fetch slate → render → webhook. 예외 발생 시 webhook(failed) 호출 후 raise.
|
||||
|
||||
F6: webhook 통신 외 예외는 poll_once가 fail(raw, payload)로 retry/dead-letter 처리.
|
||||
"""
|
||||
task_id = payload["task_id"]
|
||||
params = payload.get("params", {})
|
||||
slate_id = params.get("slate_id")
|
||||
@@ -69,7 +72,6 @@ async def _process_one(client: httpx.AsyncClient, payload: dict) -> None:
|
||||
slate = await _fetch_slate(client, slate_id)
|
||||
await _post_update(client, task_id, "processing", 50)
|
||||
paths = await render_slate(slate, slate_id, template=template)
|
||||
# 결과 URL은 첫 페이지의 nginx 경로
|
||||
first_url = f"{INSTA_MEDIA_URL_PREFIX}/{slate_id}/01.png"
|
||||
await _post_update(
|
||||
client, task_id, "succeeded", 100, result_path=first_url
|
||||
@@ -78,29 +80,46 @@ async def _process_one(client: httpx.AsyncClient, payload: dict) -> None:
|
||||
except Exception as e:
|
||||
logger.exception("render task=%s 실패", task_id)
|
||||
await _post_update(client, task_id, "failed", 0, error=str(e))
|
||||
raise
|
||||
|
||||
|
||||
async def poll_once(queue: ReliableQueue, client: httpx.AsyncClient) -> bool:
|
||||
"""1 cycle: dequeue → _process_one → ack/fail. Returns True if a job handled."""
|
||||
result = await queue.dequeue(timeout=5)
|
||||
if result is None:
|
||||
return False
|
||||
payload, raw = result
|
||||
try:
|
||||
await _process_one(client, payload)
|
||||
except Exception:
|
||||
await queue.fail(raw, payload)
|
||||
return True
|
||||
await queue.ack(raw)
|
||||
return True
|
||||
|
||||
|
||||
async def worker_loop():
|
||||
"""무한 루프 — paused 체크 → BLPOP → process_one."""
|
||||
"""무한 루프 — paused 체크 → ReliableQueue.dequeue → process_one → ack/fail."""
|
||||
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
|
||||
queue = ReliableQueue(redis, queue_key=QUEUE_KEY)
|
||||
async with httpx.AsyncClient() as client:
|
||||
logger.info("insta-render worker started (queue=%s)", QUEUE_KEY)
|
||||
logger.info("insta-render worker started worker_id=%s queue=%s",
|
||||
queue.worker_id, QUEUE_KEY)
|
||||
# F6: startup recovery — 이전 crash 시 잔존 orphan 재큐
|
||||
try:
|
||||
recovered = await queue.recover()
|
||||
if recovered:
|
||||
logger.info("recovered %d orphaned items at startup", recovered)
|
||||
except Exception:
|
||||
logger.exception("startup recover failed")
|
||||
|
||||
while True:
|
||||
try:
|
||||
paused = await redis.get(PAUSED_KEY)
|
||||
if paused == b"1":
|
||||
await asyncio.sleep(10)
|
||||
continue
|
||||
item = await redis.blpop(QUEUE_KEY, timeout=1)
|
||||
if item is None:
|
||||
continue
|
||||
_, raw = item
|
||||
try:
|
||||
payload = json.loads(raw)
|
||||
except json.JSONDecodeError:
|
||||
logger.error("invalid queue payload: %r", raw[:200])
|
||||
continue
|
||||
await _process_one(client, payload)
|
||||
await poll_once(queue, client)
|
||||
except asyncio.CancelledError:
|
||||
logger.info("worker_loop cancelled")
|
||||
raise
|
||||
|
||||
Reference in New Issue
Block a user