From e0269bae3989ec59b3c04da8a7f60d1785433214 Mon Sep 17 00:00:00 2001 From: gahusb Date: Tue, 19 May 2026 02:06:45 +0900 Subject: [PATCH] feat(services/insta-render): Redis BLPOP worker + NAS webhook (SP-3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit queue:insta-render에서 BLPOP → NAS API에서 slate 조회 → render → internal webhook으로 NAS DB 업데이트. queue:paused 체크 (task-watcher 연동). Plan-B-Insta Phase 2 진행 중. Co-Authored-By: Claude Opus 4.7 (1M context) --- services/insta-render/worker.py | 109 ++++++++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100644 services/insta-render/worker.py diff --git a/services/insta-render/worker.py b/services/insta-render/worker.py new file mode 100644 index 0000000..08dfe8e --- /dev/null +++ b/services/insta-render/worker.py @@ -0,0 +1,109 @@ +"""Redis BLPOP worker — queue:insta-render → render_slate → NAS webhook. + +queue:paused가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set). +""" +from __future__ import annotations + +import asyncio +import json +import logging +import os +from typing import Any + +import httpx +import redis.asyncio as aioredis + +from card_renderer import render_slate + +logger = logging.getLogger(__name__) + + +REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379") +NAS_BASE_URL = os.getenv("NAS_BASE_URL", "http://192.168.45.54:18700") +INTERNAL_API_KEY = os.getenv("INTERNAL_API_KEY", "") +INSTA_MEDIA_URL_PREFIX = os.getenv("INSTA_MEDIA_URL_PREFIX", "/media/insta") + +QUEUE_KEY = "queue:insta-render" +PAUSED_KEY = "queue:paused" + + +async def _post_update(client: httpx.AsyncClient, task_id: str, status: str, progress: int, + result_path: str | None = None, error: str | None = None) -> None: + """NAS internal webhook 호출.""" + url = f"{NAS_BASE_URL}/api/internal/insta/update" + payload: dict[str, Any] = {"task_id": task_id, "status": status, "progress": progress} + if result_path: + payload["result_path"] = result_path + if error: + payload["error"] = error + try: + r = await client.post( + url, + headers={"X-Internal-Key": INTERNAL_API_KEY}, + json=payload, + timeout=10.0, + ) + if r.status_code != 200: + logger.error("webhook %s returned %d: %s", task_id, r.status_code, r.text[:200]) + except Exception: + logger.exception("webhook %s 호출 실패", task_id) + + +async def _fetch_slate(client: httpx.AsyncClient, slate_id: int) -> dict: + """NAS /api/insta/slates/{id} GET. (인증 불필요 — 기존 endpoint)""" + r = await client.get(f"{NAS_BASE_URL}/api/insta/slates/{slate_id}", timeout=10.0) + r.raise_for_status() + return r.json() + + +async def _process_one(client: httpx.AsyncClient, payload: dict) -> None: + """단일 작업 처리: fetch slate → render → webhook.""" + task_id = payload["task_id"] + params = payload.get("params", {}) + slate_id = params.get("slate_id") + theme = params.get("theme", "default") + template = f"{theme}/card.html.j2" + + try: + await _post_update(client, task_id, "processing", 20) + slate = await _fetch_slate(client, slate_id) + await _post_update(client, task_id, "processing", 50) + paths = await render_slate(slate, slate_id, template=template) + # 결과 URL은 첫 페이지의 nginx 경로 + first_url = f"{INSTA_MEDIA_URL_PREFIX}/{slate_id}/01.png" + await _post_update( + client, task_id, "succeeded", 100, result_path=first_url + ) + logger.info("rendered task=%s slate=%s count=%d", task_id, slate_id, len(paths)) + except Exception as e: + logger.exception("render task=%s 실패", task_id) + await _post_update(client, task_id, "failed", 0, error=str(e)) + + +async def worker_loop(): + """무한 루프 — paused 체크 → BLPOP → process_one.""" + redis = aioredis.from_url(REDIS_URL, decode_responses=False) + async with httpx.AsyncClient() as client: + logger.info("insta-render worker started (queue=%s)", QUEUE_KEY) + while True: + try: + paused = await redis.get(PAUSED_KEY) + if paused == b"1": + await asyncio.sleep(10) + continue + item = await redis.blpop(QUEUE_KEY, timeout=1) + if item is None: + continue + _, raw = item + try: + payload = json.loads(raw) + except json.JSONDecodeError: + logger.error("invalid queue payload: %r", raw[:200]) + continue + await _process_one(client, payload) + except asyncio.CancelledError: + logger.info("worker_loop cancelled") + raise + except Exception: + logger.exception("worker_loop iteration 실패, 5초 후 재시도") + await asyncio.sleep(5)