"""F6 — Reliable Redis queue with processing list + recovery + retry. Pattern: - BLMOVE main → processing (atomic dequeue) - ack: LREM processing (1 occurrence) - fail: LREM processing + (re-enqueue with attempts++ OR move to dead-letter) - recover: startup-time orphan recovery (worker's processing list → main queue) Producer side stays unchanged: LPUSH queue: . Worker side: dequeue() → process → ack(raw) on success or fail(raw, payload) on error. Startup: await queue.recover() to re-enqueue orphans. """ from __future__ import annotations import json import logging import os import socket from typing import Optional logger = logging.getLogger(__name__) def default_worker_id(queue_key: str) -> str: """env WORKER_ID > hostname-pid.""" explicit = os.getenv("WORKER_ID") if explicit: return explicit return f"{queue_key}-{socket.gethostname()}-{os.getpid()}" class ReliableQueue: """BLMOVE-backed atomic dequeue + processing list + retry/dead-letter.""" def __init__( self, redis, queue_key: str, worker_id: Optional[str] = None, max_attempts: int = 3, ): self._redis = redis self._queue_key = queue_key self._worker_id = worker_id or default_worker_id(queue_key) self._processing_key = f"processing:{queue_key}:{self._worker_id}" self._dead_letter_key = f"dead_letter:{queue_key}" self._max_attempts = max_attempts @property def worker_id(self) -> str: return self._worker_id @property def processing_key(self) -> str: return self._processing_key async def dequeue(self, timeout: int = 5) -> Optional[tuple[dict, bytes]]: """Atomically move 1 item from main queue tail to processing head. Returns (parsed_dict, raw_bytes) or None on timeout/parse-error. Caller MUST call ack(raw) on success or fail(raw, payload) on error. """ raw = await self._redis.blmove( self._queue_key, self._processing_key, timeout, "RIGHT", "LEFT", ) if raw is None: return None try: payload = json.loads(raw) except json.JSONDecodeError: logger.error( "invalid payload on dequeue, moving to dead-letter: %r", raw[:200] ) await self._redis.lrem(self._processing_key, 1, raw) await self._redis.lpush(self._dead_letter_key, raw) return None return payload, raw async def ack(self, raw: bytes) -> None: """Successful processing — remove from processing list.""" removed = await self._redis.lrem(self._processing_key, 1, raw) if removed == 0: logger.warning("ack on missing payload (already removed?): %r", raw[:100]) async def fail(self, raw: bytes, payload: dict) -> None: """Failed processing — remove from processing list and re-enqueue or dead-letter.""" await self._redis.lrem(self._processing_key, 1, raw) attempts = int(payload.get("attempts", 0)) + 1 if attempts >= self._max_attempts: payload["attempts"] = attempts await self._redis.lpush(self._dead_letter_key, json.dumps(payload).encode()) logger.error( "task moved to dead-letter after %d attempts: task_id=%s", attempts, payload.get("task_id"), ) return payload["attempts"] = attempts await self._redis.lpush(self._queue_key, json.dumps(payload).encode()) logger.info( "task re-enqueued (attempt %d/%d): task_id=%s", attempts, self._max_attempts, payload.get("task_id"), ) async def recover(self) -> int: """Startup: move all orphans from this worker's processing list back to main queue. Increments attempts counter (orphan == implicit failure). Returns count. """ count = 0 while True: raw = await self._redis.lpop(self._processing_key) if raw is None: break try: payload = json.loads(raw) except json.JSONDecodeError: await self._redis.lpush(self._dead_letter_key, raw) count += 1 continue payload["attempts"] = int(payload.get("attempts", 0)) + 1 if payload["attempts"] >= self._max_attempts: await self._redis.lpush( self._dead_letter_key, json.dumps(payload).encode() ) else: await self._redis.lpush( self._queue_key, json.dumps(payload).encode() ) count += 1 if count: logger.info( "recovered %d orphaned items for worker %s", count, self._worker_id ) return count