feat(services/insta-render): Redis BLPOP worker + NAS webhook (SP-3)
queue:insta-render에서 BLPOP → NAS API에서 slate 조회 → render → internal webhook으로 NAS DB 업데이트. queue:paused 체크 (task-watcher 연동). Plan-B-Insta Phase 2 진행 중. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
109
services/insta-render/worker.py
Normal file
109
services/insta-render/worker.py
Normal file
@@ -0,0 +1,109 @@
|
||||
"""Redis BLPOP worker — queue:insta-render → render_slate → NAS webhook.
|
||||
|
||||
queue:paused가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
import redis.asyncio as aioredis
|
||||
|
||||
from card_renderer import render_slate
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
|
||||
NAS_BASE_URL = os.getenv("NAS_BASE_URL", "http://192.168.45.54:18700")
|
||||
INTERNAL_API_KEY = os.getenv("INTERNAL_API_KEY", "")
|
||||
INSTA_MEDIA_URL_PREFIX = os.getenv("INSTA_MEDIA_URL_PREFIX", "/media/insta")
|
||||
|
||||
QUEUE_KEY = "queue:insta-render"
|
||||
PAUSED_KEY = "queue:paused"
|
||||
|
||||
|
||||
async def _post_update(client: httpx.AsyncClient, task_id: str, status: str, progress: int,
|
||||
result_path: str | None = None, error: str | None = None) -> None:
|
||||
"""NAS internal webhook 호출."""
|
||||
url = f"{NAS_BASE_URL}/api/internal/insta/update"
|
||||
payload: dict[str, Any] = {"task_id": task_id, "status": status, "progress": progress}
|
||||
if result_path:
|
||||
payload["result_path"] = result_path
|
||||
if error:
|
||||
payload["error"] = error
|
||||
try:
|
||||
r = await client.post(
|
||||
url,
|
||||
headers={"X-Internal-Key": INTERNAL_API_KEY},
|
||||
json=payload,
|
||||
timeout=10.0,
|
||||
)
|
||||
if r.status_code != 200:
|
||||
logger.error("webhook %s returned %d: %s", task_id, r.status_code, r.text[:200])
|
||||
except Exception:
|
||||
logger.exception("webhook %s 호출 실패", task_id)
|
||||
|
||||
|
||||
async def _fetch_slate(client: httpx.AsyncClient, slate_id: int) -> dict:
|
||||
"""NAS /api/insta/slates/{id} GET. (인증 불필요 — 기존 endpoint)"""
|
||||
r = await client.get(f"{NAS_BASE_URL}/api/insta/slates/{slate_id}", timeout=10.0)
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
|
||||
async def _process_one(client: httpx.AsyncClient, payload: dict) -> None:
|
||||
"""단일 작업 처리: fetch slate → render → webhook."""
|
||||
task_id = payload["task_id"]
|
||||
params = payload.get("params", {})
|
||||
slate_id = params.get("slate_id")
|
||||
theme = params.get("theme", "default")
|
||||
template = f"{theme}/card.html.j2"
|
||||
|
||||
try:
|
||||
await _post_update(client, task_id, "processing", 20)
|
||||
slate = await _fetch_slate(client, slate_id)
|
||||
await _post_update(client, task_id, "processing", 50)
|
||||
paths = await render_slate(slate, slate_id, template=template)
|
||||
# 결과 URL은 첫 페이지의 nginx 경로
|
||||
first_url = f"{INSTA_MEDIA_URL_PREFIX}/{slate_id}/01.png"
|
||||
await _post_update(
|
||||
client, task_id, "succeeded", 100, result_path=first_url
|
||||
)
|
||||
logger.info("rendered task=%s slate=%s count=%d", task_id, slate_id, len(paths))
|
||||
except Exception as e:
|
||||
logger.exception("render task=%s 실패", task_id)
|
||||
await _post_update(client, task_id, "failed", 0, error=str(e))
|
||||
|
||||
|
||||
async def worker_loop():
|
||||
"""무한 루프 — paused 체크 → BLPOP → process_one."""
|
||||
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
|
||||
async with httpx.AsyncClient() as client:
|
||||
logger.info("insta-render worker started (queue=%s)", QUEUE_KEY)
|
||||
while True:
|
||||
try:
|
||||
paused = await redis.get(PAUSED_KEY)
|
||||
if paused == b"1":
|
||||
await asyncio.sleep(10)
|
||||
continue
|
||||
item = await redis.blpop(QUEUE_KEY, timeout=1)
|
||||
if item is None:
|
||||
continue
|
||||
_, raw = item
|
||||
try:
|
||||
payload = json.loads(raw)
|
||||
except json.JSONDecodeError:
|
||||
logger.error("invalid queue payload: %r", raw[:200])
|
||||
continue
|
||||
await _process_one(client, payload)
|
||||
except asyncio.CancelledError:
|
||||
logger.info("worker_loop cancelled")
|
||||
raise
|
||||
except Exception:
|
||||
logger.exception("worker_loop iteration 실패, 5초 후 재시도")
|
||||
await asyncio.sleep(5)
|
||||
Reference in New Issue
Block a user