REDIS_URL의 socket_timeout(<5s)이 ReliableQueue BLMOVE 5초 블록보다 짧아 idle dequeue마다 "Timeout reading"으로 잡을 못 꺼내 슬레이트가 draft에 정지(~2026-05-22~). 큐 연결을 socket_timeout=None + socket_keepalive로 생성(make_queue_redis)해 정상화. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
143 lines
5.3 KiB
Python
143 lines
5.3 KiB
Python
"""Redis ReliableQueue worker — F6 신뢰성 패턴 (BLMOVE + ack/fail + recovery).
|
|
|
|
queue:paused가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
from typing import Any
|
|
|
|
import httpx
|
|
import redis.asyncio as aioredis
|
|
|
|
from card_renderer import render_slate
|
|
from _shared.reliable_queue import ReliableQueue
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
|
|
NAS_BASE_URL = os.getenv("NAS_BASE_URL", "http://192.168.45.54:18700")
|
|
INTERNAL_API_KEY = os.getenv("INTERNAL_API_KEY", "")
|
|
INSTA_MEDIA_URL_PREFIX = os.getenv("INSTA_MEDIA_URL_PREFIX", "/media/insta")
|
|
|
|
QUEUE_KEY = "queue:insta-render"
|
|
PAUSED_KEY = "queue:paused"
|
|
|
|
|
|
async def _post_update(client: httpx.AsyncClient, task_id: str, status: str, progress: int,
|
|
result_path: str | None = None, error: str | None = None) -> None:
|
|
"""NAS internal webhook 호출."""
|
|
url = f"{NAS_BASE_URL}/api/internal/insta/update"
|
|
payload: dict[str, Any] = {"task_id": task_id, "status": status, "progress": progress}
|
|
if result_path:
|
|
payload["result_path"] = result_path
|
|
if error:
|
|
payload["error"] = error
|
|
try:
|
|
r = await client.post(
|
|
url,
|
|
headers={"X-Internal-Key": INTERNAL_API_KEY},
|
|
json=payload,
|
|
timeout=10.0,
|
|
)
|
|
if r.status_code != 200:
|
|
logger.error("webhook %s returned %d: %s", task_id, r.status_code, r.text[:200])
|
|
except Exception:
|
|
logger.exception("webhook %s 호출 실패", task_id)
|
|
|
|
|
|
async def _fetch_slate(client: httpx.AsyncClient, slate_id: int) -> dict:
|
|
"""NAS /api/insta/slates/{id} GET. (인증 불필요 — 기존 endpoint)"""
|
|
r = await client.get(f"{NAS_BASE_URL}/api/insta/slates/{slate_id}", timeout=10.0)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
|
|
async def _process_one(client: httpx.AsyncClient, payload: dict) -> None:
|
|
"""단일 작업 처리: fetch slate → render → webhook. 예외 발생 시 webhook(failed) 호출 후 raise.
|
|
|
|
F6: webhook 통신 외 예외는 poll_once가 fail(raw, payload)로 retry/dead-letter 처리.
|
|
"""
|
|
task_id = payload["task_id"]
|
|
params = payload.get("params", {})
|
|
slate_id = params.get("slate_id")
|
|
theme = params.get("theme", "default")
|
|
template = f"{theme}/card.html.j2"
|
|
|
|
try:
|
|
await _post_update(client, task_id, "processing", 20)
|
|
slate = await _fetch_slate(client, slate_id)
|
|
await _post_update(client, task_id, "processing", 50)
|
|
paths = await render_slate(slate, slate_id, template=template)
|
|
first_url = f"{INSTA_MEDIA_URL_PREFIX}/{slate_id}/01.png"
|
|
await _post_update(
|
|
client, task_id, "succeeded", 100, result_path=first_url
|
|
)
|
|
logger.info("rendered task=%s slate=%s count=%d", task_id, slate_id, len(paths))
|
|
except Exception as e:
|
|
logger.exception("render task=%s 실패", task_id)
|
|
await _post_update(client, task_id, "failed", 0, error=str(e))
|
|
raise
|
|
|
|
|
|
async def poll_once(queue: ReliableQueue, client: httpx.AsyncClient) -> bool:
|
|
"""1 cycle: dequeue → _process_one → ack/fail. Returns True if a job handled."""
|
|
result = await queue.dequeue(timeout=5)
|
|
if result is None:
|
|
return False
|
|
payload, raw = result
|
|
try:
|
|
await _process_one(client, payload)
|
|
except Exception:
|
|
await queue.fail(raw, payload)
|
|
return True
|
|
await queue.ack(raw)
|
|
return True
|
|
|
|
|
|
def make_queue_redis():
|
|
"""블로킹 dequeue(BLMOVE 5s)용 redis 클라이언트.
|
|
|
|
BLMOVE 블록보다 짧은 socket_timeout(예: REDIS_URL ?socket_timeout=)이 걸려 있으면
|
|
idle 폴링마다 "Timeout reading"으로 dequeue가 실패해 잡을 영영 못 꺼낸다(슬레이트 draft 정지).
|
|
→ read-timeout을 두지 않는다(socket_timeout=None). 죽은 연결은 socket_keepalive +
|
|
worker_loop 재시도로 감지/복구. (explicit kwarg가 URL의 socket_timeout을 override)
|
|
"""
|
|
return aioredis.from_url(
|
|
REDIS_URL, decode_responses=False,
|
|
socket_timeout=None, socket_keepalive=True,
|
|
)
|
|
|
|
|
|
async def worker_loop():
|
|
"""무한 루프 — paused 체크 → ReliableQueue.dequeue → process_one → ack/fail."""
|
|
redis = make_queue_redis()
|
|
queue = ReliableQueue(redis, queue_key=QUEUE_KEY)
|
|
async with httpx.AsyncClient() as client:
|
|
logger.info("insta-render worker started worker_id=%s queue=%s",
|
|
queue.worker_id, QUEUE_KEY)
|
|
# F6: startup recovery — 이전 crash 시 잔존 orphan 재큐
|
|
try:
|
|
recovered = await queue.recover()
|
|
if recovered:
|
|
logger.info("recovered %d orphaned items at startup", recovered)
|
|
except Exception:
|
|
logger.exception("startup recover failed")
|
|
|
|
while True:
|
|
try:
|
|
paused = await redis.get(PAUSED_KEY)
|
|
if paused == b"1":
|
|
await asyncio.sleep(10)
|
|
continue
|
|
await poll_once(queue, client)
|
|
except asyncio.CancelledError:
|
|
logger.info("worker_loop cancelled")
|
|
raise
|
|
except Exception:
|
|
logger.exception("worker_loop iteration 실패, 5초 후 재시도")
|
|
await asyncio.sleep(5)
|