705 lines
25 KiB
Markdown
705 lines
25 KiB
Markdown
# Render Queue Reliability — Code Review F6 Implementation Plan
|
||
|
||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||
|
||
**Goal:** 4개 render worker(insta/music/video/image-render)가 BLPOP 직후 crash 시 작업 손실되는 문제 해결. BLMOVE(또는 BRPOPLPUSH)로 atomic dequeue + processing list 패턴 + startup recovery + retry/dead-letter.
|
||
|
||
**Architecture:**
|
||
1. 각 worker가 unique `worker_id` 보유: `<queue>-<hostname>-<pid>` (env로 override 가능).
|
||
2. atomic dequeue: `BLMOVE queue:<x>-render processing:<x>-render:<worker_id> RIGHT LEFT 5` — 5초 timeout. (`BRPOPLPUSH`는 Redis 6.2+ deprecated, `BLMOVE`가 후속).
|
||
3. 작업 성공: `LREM processing:<x>-render:<worker_id> 1 <payload>` — 정확 1개 제거.
|
||
4. 작업 실패: payload에 `attempts` counter 증가시켜 main queue 끝으로 LPUSH; 한계(기본 3) 초과 시 `dead_letter:<queue>` 로 이동.
|
||
5. **Startup recovery**: worker 시작 시 자신의 processing list가 비어있지 않으면 → 모두 main queue로 되돌림 (재시도). attempts 증가.
|
||
6. NAS측 producer는 무변경 (LPUSH 그대로). 단, payload schema에 `attempts: int` (optional) 필드 명시 — producer는 안 채워도 worker가 default 0으로.
|
||
|
||
**Shared module 전략:** 4개 worker가 동일 패턴이므로 `services/_shared/reliable_queue.py` 1개 만들고 각 Dockerfile에서 `COPY services/_shared /app/_shared` 후 `from _shared.reliable_queue import ReliableQueue`. compose entry/dockerfile 변경 4건. (DRY > inline 4중복.)
|
||
|
||
**Tech Stack:** Python 3.12, redis.asyncio 5.x, fakeredis (pytest dep), pytest-asyncio.
|
||
|
||
**Working directory:** `C:\Users\jaeoh\Desktop\workspace\web-ai`.
|
||
|
||
---
|
||
|
||
## File Map
|
||
|
||
| 파일 | 변경 | 책임 |
|
||
|------|------|------|
|
||
| `services/_shared/__init__.py` | Create | namespace package |
|
||
| `services/_shared/reliable_queue.py` | Create | `ReliableQueue` 클래스 — dequeue, ack, fail, recover |
|
||
| `services/_shared/tests/test_reliable_queue.py` | Create | fakeredis 단위 테스트 6개 |
|
||
| `services/_shared/requirements.txt` | Create | redis>=5.0, fakeredis (test only) |
|
||
| `services/insta-render/Dockerfile` | Modify | `COPY services/_shared /app/_shared` + PYTHONPATH |
|
||
| `services/insta-render/worker.py` | Modify L1~ | BLPOP → ReliableQueue 사용 |
|
||
| `services/insta-render/tests/test_worker.py` | Append | 1 integration test (recovery) |
|
||
| `services/music-render/Dockerfile` | Modify | shared copy |
|
||
| `services/music-render/worker.py` | Modify | ReliableQueue 사용 |
|
||
| `services/music-render/tests/test_worker.py` | Append | recovery test |
|
||
| `services/video-render/Dockerfile` | Modify | shared copy |
|
||
| `services/video-render/worker.py` | Modify | ReliableQueue 사용 |
|
||
| `services/video-render/tests/test_worker.py` | Append | recovery test |
|
||
| `services/image-render/Dockerfile` | Modify | shared copy |
|
||
| `services/image-render/worker.py` | Modify | ReliableQueue 사용 |
|
||
| `services/image-render/tests/test_worker.py` | Append | recovery test |
|
||
| `services/docker-compose.yml` (있다면) | Verify | build context가 services/ 루트 포함하는지 |
|
||
|
||
---
|
||
|
||
## Task 1: ReliableQueue 공유 모듈 작성
|
||
|
||
**Files:**
|
||
- Create: `services/_shared/__init__.py`
|
||
- Create: `services/_shared/reliable_queue.py`
|
||
- Create: `services/_shared/tests/__init__.py`
|
||
- Create: `services/_shared/tests/test_reliable_queue.py`
|
||
- Create: `services/_shared/requirements.txt`
|
||
|
||
- [ ] **Step 1: Create namespace package**
|
||
|
||
```python
|
||
# services/_shared/__init__.py
|
||
```
|
||
(빈 파일)
|
||
|
||
```python
|
||
# services/_shared/tests/__init__.py
|
||
```
|
||
(빈 파일)
|
||
|
||
- [ ] **Step 2: Write failing tests first**
|
||
|
||
```python
|
||
# services/_shared/tests/test_reliable_queue.py
|
||
"""F6 — ReliableQueue: atomic dequeue + recovery + retry."""
|
||
import json
|
||
|
||
import fakeredis.aioredis
|
||
import pytest
|
||
|
||
from _shared.reliable_queue import ReliableQueue
|
||
|
||
|
||
@pytest.fixture
|
||
async def redis():
|
||
r = fakeredis.aioredis.FakeRedis(decode_responses=False)
|
||
yield r
|
||
await r.flushall()
|
||
await r.aclose()
|
||
|
||
|
||
async def test_dequeue_atomically_moves_to_processing(redis):
|
||
"""BLMOVE: queue → processing 원자적 이동."""
|
||
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
|
||
await redis.lpush("queue:test", json.dumps({"task_id": "t1"}).encode())
|
||
payload, raw = await q.dequeue(timeout=1)
|
||
assert payload["task_id"] == "t1"
|
||
# main queue는 비어있고, processing list에 들어있어야 함
|
||
assert await redis.llen("queue:test") == 0
|
||
assert await redis.llen("processing:queue:test:w1") == 1
|
||
|
||
|
||
async def test_dequeue_returns_none_on_timeout(redis):
|
||
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
|
||
result = await q.dequeue(timeout=1)
|
||
assert result is None
|
||
|
||
|
||
async def test_ack_removes_from_processing(redis):
|
||
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
|
||
await redis.lpush("queue:test", json.dumps({"task_id": "t1"}).encode())
|
||
payload, raw = await q.dequeue(timeout=1)
|
||
await q.ack(raw)
|
||
assert await redis.llen("processing:queue:test:w1") == 0
|
||
|
||
|
||
async def test_recover_returns_orphaned_to_main_queue(redis):
|
||
"""startup recovery: 잔존 processing list 항목을 main queue로 되돌림."""
|
||
# 이전 crash 시뮬레이션: processing list에 잔존
|
||
orphan = json.dumps({"task_id": "t1", "attempts": 0}).encode()
|
||
await redis.lpush("processing:queue:test:w1", orphan)
|
||
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
|
||
recovered = await q.recover()
|
||
assert recovered == 1
|
||
assert await redis.llen("processing:queue:test:w1") == 0
|
||
# 다시 dequeue 가능
|
||
payload, raw = await q.dequeue(timeout=1)
|
||
assert payload["task_id"] == "t1"
|
||
assert payload["attempts"] == 1 # incremented on recover
|
||
|
||
|
||
async def test_fail_below_max_attempts_returns_to_main_queue(redis):
|
||
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1", max_attempts=3)
|
||
await redis.lpush("queue:test", json.dumps({"task_id": "t1", "attempts": 0}).encode())
|
||
payload, raw = await q.dequeue(timeout=1)
|
||
await q.fail(raw, payload)
|
||
assert await redis.llen("processing:queue:test:w1") == 0
|
||
assert await redis.llen("queue:test") == 1
|
||
# attempts 증가됐는지
|
||
requeued_raw = await redis.lindex("queue:test", 0)
|
||
requeued = json.loads(requeued_raw)
|
||
assert requeued["attempts"] == 1
|
||
|
||
|
||
async def test_fail_at_max_attempts_moves_to_dead_letter(redis):
|
||
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1", max_attempts=3)
|
||
await redis.lpush(
|
||
"queue:test", json.dumps({"task_id": "t1", "attempts": 2}).encode()
|
||
)
|
||
payload, raw = await q.dequeue(timeout=1)
|
||
await q.fail(raw, payload)
|
||
# attempts 2 → 3 (== max) → dead-letter
|
||
assert await redis.llen("queue:test") == 0
|
||
assert await redis.llen("processing:queue:test:w1") == 0
|
||
assert await redis.llen("dead_letter:queue:test") == 1
|
||
```
|
||
|
||
- [ ] **Step 3: Add requirements**
|
||
|
||
```text
|
||
# services/_shared/requirements.txt
|
||
redis>=5.0.0
|
||
```
|
||
|
||
별도 dev requirements (test):
|
||
|
||
```text
|
||
# services/_shared/tests/requirements-dev.txt (optional)
|
||
fakeredis>=2.20.0
|
||
pytest>=8.0.0
|
||
pytest-asyncio>=0.23.0
|
||
```
|
||
|
||
- [ ] **Step 4: Run tests to verify they fail**
|
||
|
||
```
|
||
cd services/_shared && python -m pytest tests/ -v
|
||
```
|
||
|
||
Expected: ImportError — reliable_queue.py 미존재.
|
||
|
||
- [ ] **Step 5: Write reliable_queue.py**
|
||
|
||
```python
|
||
# services/_shared/reliable_queue.py
|
||
"""F6 — Reliable Redis queue with processing list + recovery + retry.
|
||
|
||
Pattern: BLMOVE main → processing (atomic), then either ack (LREM processing) or
|
||
fail (LREM processing + re-enqueue or dead-letter).
|
||
|
||
Startup recovery: any items left in the worker's processing list from a previous
|
||
crash are pushed back to main queue (with attempts incremented).
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import os
|
||
import socket
|
||
from typing import Optional
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def default_worker_id(queue_key: str) -> str:
|
||
"""env > hostname-pid."""
|
||
explicit = os.getenv("WORKER_ID")
|
||
if explicit:
|
||
return explicit
|
||
return f"{queue_key}-{socket.gethostname()}-{os.getpid()}"
|
||
|
||
|
||
class ReliableQueue:
|
||
"""Wraps a redis client to provide BLMOVE-backed atomic dequeue +
|
||
processing list + retry/dead-letter.
|
||
|
||
Producer side stays unchanged: LPUSH queue:<x> <json payload>.
|
||
Worker side: dequeue() → process → ack(raw) on success or fail(raw, payload) on error.
|
||
Startup: await queue.recover() to re-enqueue orphans.
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
redis,
|
||
queue_key: str,
|
||
worker_id: Optional[str] = None,
|
||
max_attempts: int = 3,
|
||
):
|
||
self._redis = redis
|
||
self._queue_key = queue_key
|
||
self._worker_id = worker_id or default_worker_id(queue_key)
|
||
self._processing_key = f"processing:{queue_key}:{self._worker_id}"
|
||
self._dead_letter_key = f"dead_letter:{queue_key}"
|
||
self._max_attempts = max_attempts
|
||
|
||
@property
|
||
def processing_key(self) -> str:
|
||
return self._processing_key
|
||
|
||
async def dequeue(self, timeout: int = 5) -> Optional[tuple[dict, bytes]]:
|
||
"""Atomically move 1 item from main queue tail to processing head.
|
||
|
||
Returns (parsed_dict, raw_bytes) or None on timeout.
|
||
Caller MUST call ack(raw) on success or fail(raw, payload) on error.
|
||
"""
|
||
raw = await self._redis.blmove(
|
||
self._queue_key, self._processing_key,
|
||
timeout=timeout, src="RIGHT", dest="LEFT",
|
||
)
|
||
if raw is None:
|
||
return None
|
||
try:
|
||
payload = json.loads(raw)
|
||
except json.JSONDecodeError:
|
||
logger.error("invalid payload on dequeue, moving to dead-letter: %r", raw[:200])
|
||
await self._redis.lrem(self._processing_key, 1, raw)
|
||
await self._redis.lpush(self._dead_letter_key, raw)
|
||
return None
|
||
return payload, raw
|
||
|
||
async def ack(self, raw: bytes) -> None:
|
||
"""Successful processing — remove from processing list."""
|
||
removed = await self._redis.lrem(self._processing_key, 1, raw)
|
||
if removed == 0:
|
||
logger.warning("ack on missing payload (already removed?): %r", raw[:100])
|
||
|
||
async def fail(self, raw: bytes, payload: dict) -> None:
|
||
"""Failed processing — remove from processing list and either re-enqueue or dead-letter."""
|
||
await self._redis.lrem(self._processing_key, 1, raw)
|
||
attempts = int(payload.get("attempts", 0)) + 1
|
||
if attempts >= self._max_attempts:
|
||
payload["attempts"] = attempts
|
||
await self._redis.lpush(self._dead_letter_key, json.dumps(payload).encode())
|
||
logger.error(
|
||
"task moved to dead-letter after %d attempts: task_id=%s",
|
||
attempts, payload.get("task_id"),
|
||
)
|
||
return
|
||
payload["attempts"] = attempts
|
||
await self._redis.lpush(self._queue_key, json.dumps(payload).encode())
|
||
logger.info(
|
||
"task re-enqueued (attempt %d/%d): task_id=%s",
|
||
attempts, self._max_attempts, payload.get("task_id"),
|
||
)
|
||
|
||
async def recover(self) -> int:
|
||
"""Startup: move all orphans from this worker's processing list back to main queue.
|
||
|
||
Increments attempts counter (orphan == implicit failure).
|
||
Returns count of recovered items.
|
||
"""
|
||
count = 0
|
||
while True:
|
||
raw = await self._redis.lpop(self._processing_key)
|
||
if raw is None:
|
||
break
|
||
try:
|
||
payload = json.loads(raw)
|
||
except json.JSONDecodeError:
|
||
await self._redis.lpush(self._dead_letter_key, raw)
|
||
count += 1
|
||
continue
|
||
payload["attempts"] = int(payload.get("attempts", 0)) + 1
|
||
if payload["attempts"] >= self._max_attempts:
|
||
await self._redis.lpush(self._dead_letter_key, json.dumps(payload).encode())
|
||
else:
|
||
await self._redis.lpush(self._queue_key, json.dumps(payload).encode())
|
||
count += 1
|
||
if count:
|
||
logger.info("recovered %d orphaned items for worker %s", count, self._worker_id)
|
||
return count
|
||
```
|
||
|
||
**참고: redis-py blmove API**: `client.blmove(first_list, second_list, timeout, src=..., dest=...)`. timeout=0 은 block forever. payload는 bytes로 받음 (`decode_responses=False` 가정).
|
||
|
||
- [ ] **Step 6: Run tests to verify they pass**
|
||
|
||
```
|
||
cd services/_shared && python -m pytest tests/ -v
|
||
```
|
||
|
||
Expected: 6 PASS.
|
||
|
||
만약 ImportError (`fakeredis` 미설치) 발생 시:
|
||
|
||
```
|
||
python -m pip install fakeredis pytest-asyncio
|
||
```
|
||
|
||
또한 `pytest.ini` 또는 `conftest.py`에 `asyncio_mode = "auto"` 필요. 신규 conftest:
|
||
|
||
```python
|
||
# services/_shared/tests/conftest.py
|
||
import pytest
|
||
pytest_plugins = ["pytest_asyncio"]
|
||
|
||
|
||
def pytest_collection_modifyitems(config, items):
|
||
for item in items:
|
||
if "asyncio" in item.fixturenames or item.get_closest_marker("asyncio") is not None:
|
||
continue
|
||
# auto-mark all async tests
|
||
if item.function.__name__.startswith("test_"):
|
||
import asyncio, inspect
|
||
if inspect.iscoroutinefunction(item.function):
|
||
item.add_marker(pytest.mark.asyncio)
|
||
```
|
||
|
||
또는 더 간단히 `services/_shared/pytest.ini`:
|
||
|
||
```ini
|
||
[pytest]
|
||
asyncio_mode = auto
|
||
```
|
||
|
||
- [ ] **Step 7: Commit**
|
||
|
||
```bash
|
||
git add services/_shared/
|
||
git commit -m "feat(services): _shared/reliable_queue 신설 — BLMOVE + processing list + retry (F6 part 1)"
|
||
```
|
||
|
||
---
|
||
|
||
## Task 2: insta-render에 ReliableQueue 적용
|
||
|
||
**Files:**
|
||
- Modify: `services/insta-render/Dockerfile`
|
||
- Modify: `services/insta-render/worker.py`
|
||
- Modify: `services/insta-render/tests/test_worker.py` (append)
|
||
|
||
- [ ] **Step 1: Update Dockerfile**
|
||
|
||
`services/insta-render/Dockerfile` 에 `_shared` 복사 추가. 기존 Dockerfile 패턴을 먼저 읽고, `COPY services/insta-render /app` 같은 라인이 있다면 그 위 또는 옆에:
|
||
|
||
```dockerfile
|
||
COPY services/_shared /app/_shared
|
||
ENV PYTHONPATH=/app:/app/_shared:${PYTHONPATH}
|
||
```
|
||
|
||
build context가 `services/` 루트여야 함. compose에서 `build: { context: ./services, dockerfile: insta-render/Dockerfile }` 인지 확인 — 아니라면 context 조정 필요.
|
||
|
||
- [ ] **Step 2: Modify worker.py — failing test first**
|
||
|
||
`services/insta-render/tests/test_worker.py` 끝에 추가:
|
||
|
||
```python
|
||
import json
|
||
from unittest.mock import AsyncMock, patch
|
||
import pytest
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_worker_calls_ack_on_success():
|
||
"""성공 시 ack() 호출 (F6)."""
|
||
import worker
|
||
fake_payload = {"task_id": "t1", "job_type": "card_generation", "params": {}}
|
||
fake_raw = json.dumps(fake_payload).encode()
|
||
|
||
fake_queue = AsyncMock()
|
||
fake_queue.dequeue = AsyncMock(side_effect=[(fake_payload, fake_raw), None])
|
||
fake_queue.ack = AsyncMock()
|
||
fake_queue.fail = AsyncMock()
|
||
fake_queue.recover = AsyncMock(return_value=0)
|
||
|
||
with patch.object(worker, "ReliableQueue", return_value=fake_queue), \
|
||
patch.object(worker, "_dispatch") as disp:
|
||
# poll_once로 1 cycle만 실행 (실제 loop 끊기 위해)
|
||
await worker.poll_once(fake_queue)
|
||
disp.assert_called_once()
|
||
fake_queue.ack.assert_called_once_with(fake_raw)
|
||
fake_queue.fail.assert_not_called()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_worker_calls_fail_on_dispatch_exception():
|
||
"""dispatch 예외 시 fail() 호출 — 작업 손실 안 됨 (F6)."""
|
||
import worker
|
||
fake_payload = {"task_id": "t2", "job_type": "card_generation", "params": {}}
|
||
fake_raw = json.dumps(fake_payload).encode()
|
||
|
||
fake_queue = AsyncMock()
|
||
fake_queue.dequeue = AsyncMock(return_value=(fake_payload, fake_raw))
|
||
fake_queue.ack = AsyncMock()
|
||
fake_queue.fail = AsyncMock()
|
||
|
||
with patch.object(worker, "_dispatch", side_effect=RuntimeError("boom")):
|
||
await worker.poll_once(fake_queue)
|
||
fake_queue.fail.assert_called_once_with(fake_raw, fake_payload)
|
||
fake_queue.ack.assert_not_called()
|
||
```
|
||
|
||
- [ ] **Step 3: Run test to fail**
|
||
|
||
```
|
||
cd services/insta-render && python -m pytest tests/ -v -k "ack_on_success or fail_on_dispatch"
|
||
```
|
||
|
||
Expected: AttributeError (`worker.poll_once` 미존재, `worker.ReliableQueue` 미존재).
|
||
|
||
- [ ] **Step 4: Rewrite insta-render worker.py**
|
||
|
||
```python
|
||
"""Redis 기반 worker — F6 신뢰성 패턴 적용 (BLMOVE + processing list + retry)."""
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import logging
|
||
import os
|
||
import sys
|
||
|
||
import redis.asyncio as aioredis
|
||
|
||
from _shared.reliable_queue import ReliableQueue
|
||
from nas_client import webhook_update_task
|
||
# 기존 dispatch 대상 import 유지
|
||
from card_renderer import render_card
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
|
||
QUEUE_KEY = "queue:insta-render"
|
||
PAUSED_KEY = "queue:paused"
|
||
|
||
|
||
_DISPATCH_TABLE = {
|
||
"card_generation": "render_card",
|
||
}
|
||
|
||
|
||
def _dispatch(payload: dict) -> None:
|
||
job_type = payload.get("job_type", "")
|
||
task_id = payload.get("task_id", "")
|
||
params = payload.get("params", {})
|
||
fn_name = _DISPATCH_TABLE.get(job_type)
|
||
if fn_name is None:
|
||
logger.error("unknown job_type=%s task=%s", job_type, task_id)
|
||
webhook_update_task(task_id, "failed", 0, "", error=f"unknown job_type: {job_type}")
|
||
return
|
||
try:
|
||
fn = getattr(sys.modules[__name__], fn_name)
|
||
except AttributeError:
|
||
webhook_update_task(task_id, "failed", 0, "", error=f"internal dispatch error: {fn_name}")
|
||
return
|
||
fn(task_id, params)
|
||
|
||
|
||
async def poll_once(queue: ReliableQueue) -> bool:
|
||
"""1 cycle: dequeue → dispatch → ack/fail. Returns True if a job was handled."""
|
||
result = await queue.dequeue(timeout=5)
|
||
if result is None:
|
||
return False
|
||
payload, raw = result
|
||
try:
|
||
await asyncio.to_thread(_dispatch, payload)
|
||
except Exception:
|
||
logger.exception("dispatch failed task_id=%s", payload.get("task_id"))
|
||
await queue.fail(raw, payload)
|
||
return True
|
||
await queue.ack(raw)
|
||
return True
|
||
|
||
|
||
async def worker_loop():
|
||
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
|
||
queue = ReliableQueue(redis, queue_key=QUEUE_KEY)
|
||
logger.info("insta-render worker started worker_id=%s", queue._worker_id)
|
||
# F6: startup recovery
|
||
try:
|
||
recovered = await queue.recover()
|
||
if recovered:
|
||
logger.info("recovered %d orphaned items at startup", recovered)
|
||
except Exception:
|
||
logger.exception("startup recover failed")
|
||
while True:
|
||
try:
|
||
paused = await redis.get(PAUSED_KEY)
|
||
if paused == b"1":
|
||
await asyncio.sleep(10)
|
||
continue
|
||
await poll_once(queue)
|
||
except asyncio.CancelledError:
|
||
logger.info("worker_loop cancelled")
|
||
raise
|
||
except Exception:
|
||
logger.exception("worker_loop iteration 실패, 5초 후 재시도")
|
||
await asyncio.sleep(5)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
logging.basicConfig(level=logging.INFO)
|
||
asyncio.run(worker_loop())
|
||
```
|
||
|
||
**NOTE**: 기존 `insta-render/worker.py`의 dispatch table·import는 실제 파일을 보고 매핑 유지. 위 예시는 minimal — job_type / function 이름은 기존 파일과 맞춰야 함. 변경 전 `Read services/insta-render/worker.py`로 정확한 dispatch table 확인할 것.
|
||
|
||
- [ ] **Step 5: Run tests**
|
||
|
||
```
|
||
cd services/insta-render && python -m pytest tests/ -v
|
||
```
|
||
|
||
Expected: 신규 2 PASS, 기존 PASS (dispatch table test 등).
|
||
|
||
- [ ] **Step 6: Commit**
|
||
|
||
```bash
|
||
git add services/insta-render/
|
||
git commit -m "fix(insta-render): F6 ReliableQueue 적용 — BLMOVE + ack/fail (F6 part 2)"
|
||
```
|
||
|
||
---
|
||
|
||
## Task 3: music-render에 동일 적용
|
||
|
||
**Files:**
|
||
- Modify: `services/music-render/Dockerfile`, `worker.py`
|
||
- Modify: `services/music-render/tests/test_worker.py` (append)
|
||
|
||
- [ ] **Step 1: Dockerfile에 `COPY services/_shared` 추가**
|
||
- [ ] **Step 2: Test 추가 (Task 2 패턴 동일, 단 dispatch target은 `run_suno_generation` 등 기존 패턴)**
|
||
|
||
```python
|
||
@pytest.mark.asyncio
|
||
async def test_music_worker_ack_on_success():
|
||
import worker
|
||
payload = {"task_id": "t1", "job_type": "suno_generation", "params": {}}
|
||
raw = json.dumps(payload).encode()
|
||
fake_queue = AsyncMock()
|
||
fake_queue.dequeue = AsyncMock(return_value=(payload, raw))
|
||
fake_queue.ack = AsyncMock()
|
||
with patch.object(worker, "_dispatch"):
|
||
await worker.poll_once(fake_queue)
|
||
fake_queue.ack.assert_called_once_with(raw)
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_music_worker_fail_on_exception():
|
||
import worker
|
||
payload = {"task_id": "t2", "job_type": "suno_generation", "params": {}}
|
||
raw = json.dumps(payload).encode()
|
||
fake_queue = AsyncMock()
|
||
fake_queue.dequeue = AsyncMock(return_value=(payload, raw))
|
||
fake_queue.fail = AsyncMock()
|
||
with patch.object(worker, "_dispatch", side_effect=RuntimeError("x")):
|
||
await worker.poll_once(fake_queue)
|
||
fake_queue.fail.assert_called_once_with(raw, payload)
|
||
```
|
||
|
||
- [ ] **Step 3: Run test to fail**
|
||
- [ ] **Step 4: Rewrite music-render worker.py — `worker_loop` 구조는 insta-render와 동일, `_dispatch` + `_DISPATCH_TABLE`은 기존 12개 함수 그대로 유지**
|
||
- [ ] **Step 5: Run tests**
|
||
- [ ] **Step 6: Commit**
|
||
|
||
```bash
|
||
git add services/music-render/
|
||
git commit -m "fix(music-render): F6 ReliableQueue 적용 (F6 part 3)"
|
||
```
|
||
|
||
---
|
||
|
||
## Task 4: video-render에 동일 적용
|
||
|
||
(Task 3와 동일 패턴 — sora/veo/kling/seedance 4 provider table 유지)
|
||
|
||
- [ ] **Step 1: Dockerfile 수정**
|
||
- [ ] **Step 2: 신규 test 2개 추가 (`test_video_worker_ack_on_success`, `test_video_worker_fail_on_exception`) — job_type은 `sora_generation`**
|
||
- [ ] **Step 3: Run failing test**
|
||
- [ ] **Step 4: Rewrite worker.py — 동일 패턴**
|
||
- [ ] **Step 5: Run tests**
|
||
- [ ] **Step 6: Commit**
|
||
|
||
```bash
|
||
git add services/video-render/
|
||
git commit -m "fix(video-render): F6 ReliableQueue 적용 (F6 part 4)"
|
||
```
|
||
|
||
---
|
||
|
||
## Task 5: image-render에 동일 적용
|
||
|
||
(gpt_image / nano_banana / flux 3 provider table 유지)
|
||
|
||
- [ ] **Step 1-6: Task 3/4 동일 패턴**
|
||
|
||
```bash
|
||
git add services/image-render/
|
||
git commit -m "fix(image-render): F6 ReliableQueue 적용 (F6 part 5)"
|
||
```
|
||
|
||
---
|
||
|
||
## Task 6: 운영 검증 + push
|
||
|
||
- [ ] **Step 1: 전체 services test 실행**
|
||
|
||
```
|
||
cd services && for d in _shared insta-render music-render video-render image-render; do
|
||
echo "--- $d ---"
|
||
(cd $d && python -m pytest tests/ -q) || true
|
||
done
|
||
```
|
||
|
||
(또는 PowerShell:)
|
||
|
||
```powershell
|
||
foreach ($d in @("_shared","insta-render","music-render","video-render","image-render")) {
|
||
Write-Output "--- $d ---"
|
||
Push-Location services/$d
|
||
python -m pytest tests/ -q
|
||
Pop-Location
|
||
}
|
||
```
|
||
|
||
Expected: 4개 worker 각 신규 2개 + _shared 6개 + 기존 test 전부 PASS.
|
||
|
||
- [ ] **Step 2: Docker build 시뮬 (옵션, 시간 허용 시)**
|
||
|
||
```
|
||
cd services && docker compose build insta-render music-render video-render image-render
|
||
```
|
||
|
||
Expected: build context에 `_shared` 포함됨 검증.
|
||
|
||
- [ ] **Step 3: Push**
|
||
|
||
```bash
|
||
git push origin main
|
||
```
|
||
|
||
- [ ] **Step 4: 운영 deploy 시 주의사항 (수동)**
|
||
|
||
NAS에서 컨테이너 재배포 시:
|
||
1. `redis-cli -h 192.168.45.54 KEYS 'processing:*'` 로 기존 orphan 확인 — 있다면 worker_id 다르면 안 잡힘. 수동으로 `LMOVE` 해야 할 수도 있음.
|
||
2. `redis-cli -h 192.168.45.54 KEYS 'dead_letter:*'` 로 dead-letter 모니터 — 누적되면 alerting 필요.
|
||
3. WORKER_ID env로 unique 하게 (`WORKER_ID=insta-render-prod-1` 등) 권장 — hostname이 컨테이너 재기동 시 바뀌면 orphan 추적 안 됨.
|
||
|
||
---
|
||
|
||
## Self-Review
|
||
|
||
1. **atomic dequeue**: `BLMOVE` 단일 명령 — Redis 단일 트랜잭션 ✅
|
||
2. **ack on success**: `LREM processing 1 raw` — 정확 1개 ✅
|
||
3. **fail with retry**: attempts < max → 재큐, attempts >= max → dead-letter ✅
|
||
4. **startup recovery**: orphan 자동 재큐 (attempts 증가) ✅
|
||
5. **4 worker 적용**: insta/music/video/image 동일 패턴 ✅
|
||
6. **NAS producer 호환**: LPUSH 그대로, payload schema에 attempts 선택적 ✅
|
||
|
||
**미커버 (의도적)**:
|
||
- dead-letter monitor/alert — 운영 작업 (CHECK_POINT 백로그)
|
||
- worker_id env 미설정 시 hostname 변경 시 orphan 분실 — 운영 가이드에 명시
|
||
|
||
**가정 검증**:
|
||
- `redis-py.aioredis.blmove` 시그니처: `(first_list, second_list, timeout, src='LEFT', dest='RIGHT')`. redis>=5.0 권장.
|
||
- fakeredis: `fakeredis.aioredis.FakeRedis` (>=2.20.0) 가 BLMOVE 지원함 — 미지원 시 plan 적용 전 검증.
|
||
|
||
---
|
||
|
||
## Execution Handoff
|
||
|
||
**Plan complete and saved to `docs/superpowers/plans/2026-05-25-render-queue-reliability.md`.**
|
||
|
||
**1. Subagent-Driven (recommended)** — Task 별 fresh subagent. 4개 worker는 패턴 같으나 dispatch table은 각 worker 고유 — subagent가 정확히 일관성 유지하도록 review checkpoint.
|
||
|
||
**2. Inline Execution** — 현 세션 실행.
|
||
|
||
박재오 결정 대기. Plan 1·2 마친 후 진입 권장 (작업량 가장 큼 — 4개 worker × 약 1시간 = 4시간).
|