diff --git a/services/_shared/__init__.py b/services/_shared/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/_shared/pytest.ini b/services/_shared/pytest.ini new file mode 100644 index 0000000..2f4c80e --- /dev/null +++ b/services/_shared/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +asyncio_mode = auto diff --git a/services/_shared/reliable_queue.py b/services/_shared/reliable_queue.py new file mode 100644 index 0000000..3f600ba --- /dev/null +++ b/services/_shared/reliable_queue.py @@ -0,0 +1,135 @@ +"""F6 — Reliable Redis queue with processing list + recovery + retry. + +Pattern: +- BLMOVE main → processing (atomic dequeue) +- ack: LREM processing (1 occurrence) +- fail: LREM processing + (re-enqueue with attempts++ OR move to dead-letter) +- recover: startup-time orphan recovery (worker's processing list → main queue) + +Producer side stays unchanged: LPUSH queue: . +Worker side: dequeue() → process → ack(raw) on success or fail(raw, payload) on error. +Startup: await queue.recover() to re-enqueue orphans. +""" +from __future__ import annotations + +import json +import logging +import os +import socket +from typing import Optional + +logger = logging.getLogger(__name__) + + +def default_worker_id(queue_key: str) -> str: + """env WORKER_ID > hostname-pid.""" + explicit = os.getenv("WORKER_ID") + if explicit: + return explicit + return f"{queue_key}-{socket.gethostname()}-{os.getpid()}" + + +class ReliableQueue: + """BLMOVE-backed atomic dequeue + processing list + retry/dead-letter.""" + + def __init__( + self, + redis, + queue_key: str, + worker_id: Optional[str] = None, + max_attempts: int = 3, + ): + self._redis = redis + self._queue_key = queue_key + self._worker_id = worker_id or default_worker_id(queue_key) + self._processing_key = f"processing:{queue_key}:{self._worker_id}" + self._dead_letter_key = f"dead_letter:{queue_key}" + self._max_attempts = max_attempts + + @property + def worker_id(self) -> str: + return self._worker_id + + @property + def processing_key(self) -> str: + return self._processing_key + + async def dequeue(self, timeout: int = 5) -> Optional[tuple[dict, bytes]]: + """Atomically move 1 item from main queue tail to processing head. + + Returns (parsed_dict, raw_bytes) or None on timeout/parse-error. + Caller MUST call ack(raw) on success or fail(raw, payload) on error. + """ + raw = await self._redis.blmove( + self._queue_key, self._processing_key, + timeout, "RIGHT", "LEFT", + ) + if raw is None: + return None + try: + payload = json.loads(raw) + except json.JSONDecodeError: + logger.error( + "invalid payload on dequeue, moving to dead-letter: %r", raw[:200] + ) + await self._redis.lrem(self._processing_key, 1, raw) + await self._redis.lpush(self._dead_letter_key, raw) + return None + return payload, raw + + async def ack(self, raw: bytes) -> None: + """Successful processing — remove from processing list.""" + removed = await self._redis.lrem(self._processing_key, 1, raw) + if removed == 0: + logger.warning("ack on missing payload (already removed?): %r", raw[:100]) + + async def fail(self, raw: bytes, payload: dict) -> None: + """Failed processing — remove from processing list and re-enqueue or dead-letter.""" + await self._redis.lrem(self._processing_key, 1, raw) + attempts = int(payload.get("attempts", 0)) + 1 + if attempts >= self._max_attempts: + payload["attempts"] = attempts + await self._redis.lpush(self._dead_letter_key, json.dumps(payload).encode()) + logger.error( + "task moved to dead-letter after %d attempts: task_id=%s", + attempts, payload.get("task_id"), + ) + return + payload["attempts"] = attempts + await self._redis.lpush(self._queue_key, json.dumps(payload).encode()) + logger.info( + "task re-enqueued (attempt %d/%d): task_id=%s", + attempts, self._max_attempts, payload.get("task_id"), + ) + + async def recover(self) -> int: + """Startup: move all orphans from this worker's processing list back to main queue. + + Increments attempts counter (orphan == implicit failure). Returns count. + """ + count = 0 + while True: + raw = await self._redis.lpop(self._processing_key) + if raw is None: + break + try: + payload = json.loads(raw) + except json.JSONDecodeError: + await self._redis.lpush(self._dead_letter_key, raw) + count += 1 + continue + payload["attempts"] = int(payload.get("attempts", 0)) + 1 + if payload["attempts"] >= self._max_attempts: + await self._redis.lpush( + self._dead_letter_key, json.dumps(payload).encode() + ) + else: + await self._redis.lpush( + self._queue_key, json.dumps(payload).encode() + ) + count += 1 + if count: + logger.info( + "recovered %d orphaned items for worker %s", count, self._worker_id + ) + return count diff --git a/services/_shared/requirements.txt b/services/_shared/requirements.txt new file mode 100644 index 0000000..294a12f --- /dev/null +++ b/services/_shared/requirements.txt @@ -0,0 +1 @@ +redis>=5.0.0 diff --git a/services/_shared/tests/__init__.py b/services/_shared/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/_shared/tests/test_reliable_queue.py b/services/_shared/tests/test_reliable_queue.py new file mode 100644 index 0000000..28a4434 --- /dev/null +++ b/services/_shared/tests/test_reliable_queue.py @@ -0,0 +1,84 @@ +"""F6 — ReliableQueue: atomic dequeue + recovery + retry.""" +import json +import sys +from pathlib import Path + +import fakeredis.aioredis +import pytest + +# Make `_shared` importable when tests run from services/_shared +sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent)) + +from _shared.reliable_queue import ReliableQueue + + +@pytest.fixture +async def redis(): + r = fakeredis.aioredis.FakeRedis(decode_responses=False) + yield r + await r.flushall() + await r.aclose() + + +async def test_dequeue_atomically_moves_to_processing(redis): + """BLMOVE: queue → processing 원자적 이동.""" + q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1") + await redis.lpush("queue:test", json.dumps({"task_id": "t1"}).encode()) + result = await q.dequeue(timeout=1) + assert result is not None + payload, raw = result + assert payload["task_id"] == "t1" + assert await redis.llen("queue:test") == 0 + assert await redis.llen("processing:queue:test:w1") == 1 + + +async def test_dequeue_returns_none_on_timeout(redis): + q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1") + result = await q.dequeue(timeout=1) + assert result is None + + +async def test_ack_removes_from_processing(redis): + q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1") + await redis.lpush("queue:test", json.dumps({"task_id": "t1"}).encode()) + _, raw = await q.dequeue(timeout=1) + await q.ack(raw) + assert await redis.llen("processing:queue:test:w1") == 0 + + +async def test_recover_returns_orphaned_to_main_queue(redis): + """startup recovery: 잔존 processing list 항목을 main queue로 되돌림.""" + orphan = json.dumps({"task_id": "t1", "attempts": 0}).encode() + await redis.lpush("processing:queue:test:w1", orphan) + q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1") + recovered = await q.recover() + assert recovered == 1 + assert await redis.llen("processing:queue:test:w1") == 0 + payload, _ = await q.dequeue(timeout=1) + assert payload["task_id"] == "t1" + assert payload["attempts"] == 1 # incremented on recover + + +async def test_fail_below_max_attempts_returns_to_main_queue(redis): + q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1", max_attempts=3) + await redis.lpush("queue:test", json.dumps({"task_id": "t1", "attempts": 0}).encode()) + payload, raw = await q.dequeue(timeout=1) + await q.fail(raw, payload) + assert await redis.llen("processing:queue:test:w1") == 0 + assert await redis.llen("queue:test") == 1 + requeued_raw = await redis.lindex("queue:test", 0) + requeued = json.loads(requeued_raw) + assert requeued["attempts"] == 1 + + +async def test_fail_at_max_attempts_moves_to_dead_letter(redis): + q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1", max_attempts=3) + await redis.lpush( + "queue:test", json.dumps({"task_id": "t1", "attempts": 2}).encode() + ) + payload, raw = await q.dequeue(timeout=1) + await q.fail(raw, payload) + # attempts 2 → 3 (== max) → dead-letter + assert await redis.llen("queue:test") == 0 + assert await redis.llen("processing:queue:test:w1") == 0 + assert await redis.llen("dead_letter:queue:test") == 1