"""Redis BLPOP worker — queue:insta-render → render_slate → NAS webhook. queue:paused가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set). """ from __future__ import annotations import asyncio import json import logging import os from typing import Any import httpx import redis.asyncio as aioredis from card_renderer import render_slate logger = logging.getLogger(__name__) REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379") NAS_BASE_URL = os.getenv("NAS_BASE_URL", "http://192.168.45.54:18700") INTERNAL_API_KEY = os.getenv("INTERNAL_API_KEY", "") INSTA_MEDIA_URL_PREFIX = os.getenv("INSTA_MEDIA_URL_PREFIX", "/media/insta") QUEUE_KEY = "queue:insta-render" PAUSED_KEY = "queue:paused" async def _post_update(client: httpx.AsyncClient, task_id: str, status: str, progress: int, result_path: str | None = None, error: str | None = None) -> None: """NAS internal webhook 호출.""" url = f"{NAS_BASE_URL}/api/internal/insta/update" payload: dict[str, Any] = {"task_id": task_id, "status": status, "progress": progress} if result_path: payload["result_path"] = result_path if error: payload["error"] = error try: r = await client.post( url, headers={"X-Internal-Key": INTERNAL_API_KEY}, json=payload, timeout=10.0, ) if r.status_code != 200: logger.error("webhook %s returned %d: %s", task_id, r.status_code, r.text[:200]) except Exception: logger.exception("webhook %s 호출 실패", task_id) async def _fetch_slate(client: httpx.AsyncClient, slate_id: int) -> dict: """NAS /api/insta/slates/{id} GET. (인증 불필요 — 기존 endpoint)""" r = await client.get(f"{NAS_BASE_URL}/api/insta/slates/{slate_id}", timeout=10.0) r.raise_for_status() return r.json() async def _process_one(client: httpx.AsyncClient, payload: dict) -> None: """단일 작업 처리: fetch slate → render → webhook.""" task_id = payload["task_id"] params = payload.get("params", {}) slate_id = params.get("slate_id") theme = params.get("theme", "default") template = f"{theme}/card.html.j2" try: await _post_update(client, task_id, "processing", 20) slate = await _fetch_slate(client, slate_id) await _post_update(client, task_id, "processing", 50) paths = await render_slate(slate, slate_id, template=template) # 결과 URL은 첫 페이지의 nginx 경로 first_url = f"{INSTA_MEDIA_URL_PREFIX}/{slate_id}/01.png" await _post_update( client, task_id, "succeeded", 100, result_path=first_url ) logger.info("rendered task=%s slate=%s count=%d", task_id, slate_id, len(paths)) except Exception as e: logger.exception("render task=%s 실패", task_id) await _post_update(client, task_id, "failed", 0, error=str(e)) async def worker_loop(): """무한 루프 — paused 체크 → BLPOP → process_one.""" redis = aioredis.from_url(REDIS_URL, decode_responses=False) async with httpx.AsyncClient() as client: logger.info("insta-render worker started (queue=%s)", QUEUE_KEY) while True: try: paused = await redis.get(PAUSED_KEY) if paused == b"1": await asyncio.sleep(10) continue item = await redis.blpop(QUEUE_KEY, timeout=1) if item is None: continue _, raw = item try: payload = json.loads(raw) except json.JSONDecodeError: logger.error("invalid queue payload: %r", raw[:200]) continue await _process_one(client, payload) except asyncio.CancelledError: logger.info("worker_loop cancelled") raise except Exception: logger.exception("worker_loop iteration 실패, 5초 후 재시도") await asyncio.sleep(5)