"""NAS webhook 어댑터 — Windows worker가 NAS DB 직접 접근 못하므로 HTTP로 위임. 기존 NAS suno_provider/local_provider의 `update_task`, `add_track` 호출을 이 모듈의 webhook_update_task/webhook_add_track으로 치환. webhook 실패는 raise하지 않고 logger.error로 기록 (provider 로직 흐름 유지). """ from __future__ import annotations import logging import os from typing import Any, Dict, Optional import httpx logger = logging.getLogger(__name__) _TIMEOUT = 10.0 def _post(payload: Dict[str, Any]) -> None: nas_base_url = os.getenv("NAS_BASE_URL", "http://192.168.45.54:18600") internal_api_key = os.getenv("INTERNAL_API_KEY", "") url = f"{nas_base_url}/api/internal/music/update" try: r = httpx.post( url, headers={"X-Internal-Key": internal_api_key}, json=payload, timeout=_TIMEOUT, ) if r.status_code != 200: logger.error("webhook %s returned %d: %s", payload.get("task_id"), r.status_code, r.text[:200]) except Exception: logger.exception("webhook %s 호출 실패", payload.get("task_id")) def webhook_update_task( task_id: str, status: str, progress: int, message: str = "", audio_url: Optional[str] = None, error: Optional[str] = None, ) -> None: """기존 update_task(task_id, status, progress, message, audio_url=None, error=None) 대체.""" payload: Dict[str, Any] = { "task_id": task_id, "status": status, "progress": progress, "message": message, } if audio_url is not None: payload["audio_url"] = audio_url if error is not None: payload["error"] = error _post(payload) def webhook_add_track( task_id: str, status: str, progress: int, message: str = "", audio_url: Optional[str] = None, track: Optional[Dict[str, Any]] = None, ) -> None: """update + add_track을 한 webhook 호출로 결합 (NAS internal_router가 둘 다 처리).""" payload: Dict[str, Any] = { "task_id": task_id, "status": status, "progress": progress, "message": message, } if audio_url is not None: payload["audio_url"] = audio_url if track is not None: payload["track"] = track _post(payload)