Files
ai-trade/services/_shared/tests/test_reliable_queue.py
gahusb 32308bede6 feat(services): _shared/reliable_queue 신설 — BLMOVE + processing list + retry (F6 part 1)
코드 리뷰 F6: render worker(insta/music/video/image)가 BLPOP 직후 crash 시
작업 손실. 공통 ReliableQueue 클래스를 services/_shared/에 신설:

- dequeue: BLMOVE main → processing (atomic, 원자적)
- ack: LREM processing 1 (성공 시 1개 제거)
- fail: attempts++ 후 main queue로 재큐, max_attempts 도달 시 dead_letter:* 이동
- recover: startup 시 자신의 processing list orphan을 main queue로 (attempts 증가)

producer side 무변경. NAS 짝 워커(insta-lab/music-lab/video-lab/image-render NAS측)는
LPUSH 그대로. payload schema에 optional attempts 필드 추가.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 20:10:47 +09:00

85 lines
3.2 KiB
Python

"""F6 — ReliableQueue: atomic dequeue + recovery + retry."""
import json
import sys
from pathlib import Path
import fakeredis.aioredis
import pytest
# Make `_shared` importable when tests run from services/_shared
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent))
from _shared.reliable_queue import ReliableQueue
@pytest.fixture
async def redis():
r = fakeredis.aioredis.FakeRedis(decode_responses=False)
yield r
await r.flushall()
await r.aclose()
async def test_dequeue_atomically_moves_to_processing(redis):
"""BLMOVE: queue → processing 원자적 이동."""
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
await redis.lpush("queue:test", json.dumps({"task_id": "t1"}).encode())
result = await q.dequeue(timeout=1)
assert result is not None
payload, raw = result
assert payload["task_id"] == "t1"
assert await redis.llen("queue:test") == 0
assert await redis.llen("processing:queue:test:w1") == 1
async def test_dequeue_returns_none_on_timeout(redis):
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
result = await q.dequeue(timeout=1)
assert result is None
async def test_ack_removes_from_processing(redis):
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
await redis.lpush("queue:test", json.dumps({"task_id": "t1"}).encode())
_, raw = await q.dequeue(timeout=1)
await q.ack(raw)
assert await redis.llen("processing:queue:test:w1") == 0
async def test_recover_returns_orphaned_to_main_queue(redis):
"""startup recovery: 잔존 processing list 항목을 main queue로 되돌림."""
orphan = json.dumps({"task_id": "t1", "attempts": 0}).encode()
await redis.lpush("processing:queue:test:w1", orphan)
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
recovered = await q.recover()
assert recovered == 1
assert await redis.llen("processing:queue:test:w1") == 0
payload, _ = await q.dequeue(timeout=1)
assert payload["task_id"] == "t1"
assert payload["attempts"] == 1 # incremented on recover
async def test_fail_below_max_attempts_returns_to_main_queue(redis):
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1", max_attempts=3)
await redis.lpush("queue:test", json.dumps({"task_id": "t1", "attempts": 0}).encode())
payload, raw = await q.dequeue(timeout=1)
await q.fail(raw, payload)
assert await redis.llen("processing:queue:test:w1") == 0
assert await redis.llen("queue:test") == 1
requeued_raw = await redis.lindex("queue:test", 0)
requeued = json.loads(requeued_raw)
assert requeued["attempts"] == 1
async def test_fail_at_max_attempts_moves_to_dead_letter(redis):
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1", max_attempts=3)
await redis.lpush(
"queue:test", json.dumps({"task_id": "t1", "attempts": 2}).encode()
)
payload, raw = await q.dequeue(timeout=1)
await q.fail(raw, payload)
# attempts 2 → 3 (== max) → dead-letter
assert await redis.llen("queue:test") == 0
assert await redis.llen("processing:queue:test:w1") == 0
assert await redis.llen("dead_letter:queue:test") == 1