Files
ai-trade/services/insta-render/worker.py
gahusb 2ff31b2e76 feat(render-workers): 4 render 워커 heartbeat 배선 + poll_once 카운터
- services/_shared/heartbeat.py (A1) WorkerStats/utc_now_iso/heartbeat_loop 소비
- image-render / video-render / music-render / insta-render 각 worker.py:
  stats = WorkerStats() 모듈 레벨 추가, poll_once에서 dispatch 전 busy=True,
  ack 후 jobs_done+1 / fail 후 jobs_failed+1 + last_job_at + busy=False
- 각 main.py: lifespan에 aioredis(decode_responses=False) + heartbeat_loop 태스크 spawn,
  종료 시 cancel + aclose
- 각 tests/test_worker.py: test_poll_once_increments_jobs_done 추가
  (image:flux / video:sora / music:suno / insta:_process_one mock)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
2026-07-01 00:52:57 +09:00

153 lines
5.7 KiB
Python

"""Redis ReliableQueue worker — F6 신뢰성 패턴 (BLMOVE + ack/fail + recovery).
queue:paused가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set).
"""
from __future__ import annotations
import asyncio
import logging
import os
from typing import Any
import httpx
import redis.asyncio as aioredis
from card_renderer import render_slate
from _shared.reliable_queue import ReliableQueue
from _shared.heartbeat import WorkerStats, utc_now_iso
logger = logging.getLogger(__name__)
stats = WorkerStats()
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
NAS_BASE_URL = os.getenv("NAS_BASE_URL", "http://192.168.45.54:18700")
INTERNAL_API_KEY = os.getenv("INTERNAL_API_KEY", "")
INSTA_MEDIA_URL_PREFIX = os.getenv("INSTA_MEDIA_URL_PREFIX", "/media/insta")
QUEUE_KEY = "queue:insta-render"
PAUSED_KEY = "queue:paused"
async def _post_update(client: httpx.AsyncClient, task_id: str, status: str, progress: int,
result_path: str | None = None, error: str | None = None) -> None:
"""NAS internal webhook 호출."""
url = f"{NAS_BASE_URL}/api/internal/insta/update"
payload: dict[str, Any] = {"task_id": task_id, "status": status, "progress": progress}
if result_path:
payload["result_path"] = result_path
if error:
payload["error"] = error
try:
r = await client.post(
url,
headers={"X-Internal-Key": INTERNAL_API_KEY},
json=payload,
timeout=10.0,
)
if r.status_code != 200:
logger.error("webhook %s returned %d: %s", task_id, r.status_code, r.text[:200])
except Exception:
logger.exception("webhook %s 호출 실패", task_id)
async def _fetch_slate(client: httpx.AsyncClient, slate_id: int) -> dict:
"""NAS /api/insta/slates/{id} GET. (인증 불필요 — 기존 endpoint)"""
r = await client.get(f"{NAS_BASE_URL}/api/insta/slates/{slate_id}", timeout=10.0)
r.raise_for_status()
return r.json()
async def _process_one(client: httpx.AsyncClient, payload: dict) -> None:
"""단일 작업 처리: fetch slate → render → webhook. 예외 발생 시 webhook(failed) 호출 후 raise.
F6: webhook 통신 외 예외는 poll_once가 fail(raw, payload)로 retry/dead-letter 처리.
"""
task_id = payload["task_id"]
params = payload.get("params", {})
slate_id = params.get("slate_id")
theme = params.get("theme", "default")
template = f"{theme}/card.html.j2"
try:
await _post_update(client, task_id, "processing", 20)
slate = await _fetch_slate(client, slate_id)
await _post_update(client, task_id, "processing", 50)
paths = await render_slate(slate, slate_id, template=template)
first_url = f"{INSTA_MEDIA_URL_PREFIX}/{slate_id}/01.png"
await _post_update(
client, task_id, "succeeded", 100, result_path=first_url
)
logger.info("rendered task=%s slate=%s count=%d", task_id, slate_id, len(paths))
except Exception as e:
logger.exception("render task=%s 실패", task_id)
await _post_update(client, task_id, "failed", 0, error=str(e))
raise
async def poll_once(queue: ReliableQueue, client: httpx.AsyncClient) -> bool:
"""1 cycle: dequeue → _process_one → ack/fail. Returns True if a job handled."""
result = await queue.dequeue(timeout=5)
if result is None:
return False
payload, raw = result
stats.busy = True
try:
await _process_one(client, payload)
except Exception:
await queue.fail(raw, payload)
stats.jobs_failed += 1
stats.last_job_at = utc_now_iso()
stats.busy = False
return True
await queue.ack(raw)
stats.jobs_done += 1
stats.last_job_at = utc_now_iso()
stats.busy = False
return True
# 블로킹 dequeue는 BLMOVE(블록 5s)를 쓴다. redis-py 블로킹 read에서 socket_timeout이
# 블록(5s) 이하이거나 None이면 read-timeout이 블록 경계와 경합해 간헐적으로
# "Timeout reading"이 터져 잡을 못 꺼낸다(슬레이트 draft 정지). 실험상 socket_timeout이
# 블록보다 충분히 크면(10/30) 항상 안정. → 블록보다 넉넉히 큰 값을 명시한다.
QUEUE_SOCKET_TIMEOUT = 30 # > dequeue blmove 블록(5s)
def make_queue_redis():
"""블로킹 dequeue(BLMOVE)용 redis 클라이언트. socket_timeout > 블록(5s) 보장."""
return aioredis.from_url(
REDIS_URL, decode_responses=False,
socket_timeout=QUEUE_SOCKET_TIMEOUT, socket_keepalive=True,
)
async def worker_loop():
"""무한 루프 — paused 체크 → ReliableQueue.dequeue → process_one → ack/fail."""
redis = make_queue_redis()
queue = ReliableQueue(redis, queue_key=QUEUE_KEY)
async with httpx.AsyncClient() as client:
logger.info("insta-render worker started worker_id=%s queue=%s",
queue.worker_id, QUEUE_KEY)
# F6: startup recovery — 이전 crash 시 잔존 orphan 재큐
try:
recovered = await queue.recover()
if recovered:
logger.info("recovered %d orphaned items at startup", recovered)
except Exception:
logger.exception("startup recover failed")
while True:
try:
paused = await redis.get(PAUSED_KEY)
if paused == b"1":
await asyncio.sleep(10)
continue
await poll_once(queue, client)
except asyncio.CancelledError:
logger.info("worker_loop cancelled")
raise
except Exception:
logger.exception("worker_loop iteration 실패, 5초 후 재시도")
await asyncio.sleep(5)