diff --git a/music-lab/app/suno_provider.py b/music-lab/app/suno_provider.py index d2446c6..a3abb57 100644 --- a/music-lab/app/suno_provider.py +++ b/music-lab/app/suno_provider.py @@ -3,6 +3,7 @@ Suno API Provider — sunoapi.org 래퍼를 통한 음악 생성 https://docs.sunoapi.org/suno-api/quickstart """ +import json import os import time import logging @@ -29,6 +30,7 @@ SUNO_MODELS = [ {"id": "V4_5PLUS", "name": "V4.5+", "max_duration": "8분", "description": "강화된 음악성"}, {"id": "V4_5ALL", "name": "V4.5 All", "max_duration": "8분", "description": "더 나은 곡 구조"}, {"id": "V5", "name": "V5", "max_duration": "8분", "description": "최신, 빠른 생성 + 뛰어난 음악성"}, + {"id": "V5_5", "name": "V5.5", "max_duration": "8분", "description": "커스텀 모델, 최신 음악성"}, ] @@ -140,9 +142,13 @@ def run_suno_generation(task_id: str, params: dict) -> None: logger.info("Suno task created: %s (internal: %s)", suno_task_id, task_id) # ── 2단계: 상태 폴링 ── - completed_tracks = _poll_until_complete(task_id, suno_task_id) + response = _poll_suno_record("/generate/record-info", suno_task_id, task_id) + if not response: + return + completed_tracks = response.get("sunoData") or [] if not completed_tracks: - return # 에러는 _poll_until_complete 내부에서 처리 + update_task(task_id, "failed", 0, "", error="Suno 생성 완료했으나 트랙 데이터 없음") + return update_task(task_id, "processing", 80, "오디오 파일 다운로드 중...") @@ -231,63 +237,73 @@ def _build_suno_payload(params: dict) -> dict: parts.append(", ".join(params["moods"])) payload["prompt"] = " ".join(parts)[:500] if parts else "instrumental music" + if params.get("vocal_gender"): + payload["vocalGender"] = params["vocal_gender"] + if params.get("negative_tags"): + payload["negativeTags"] = params["negative_tags"] + if params.get("style_weight") is not None: + payload["styleWeight"] = params["style_weight"] + if params.get("audio_weight") is not None: + payload["audioWeight"] = params["audio_weight"] + return payload -def _poll_until_complete(task_id: str, suno_task_id: str) -> Optional[list]: - """sunoapi.org record-info를 폴링하여 SUCCESS가 될 때까지 대기.""" +def _poll_suno_record( + record_info_path: str, + suno_task_id: str, + task_id: str, + max_attempts: int = POLL_MAX_ATTEMPTS, + interval: int = POLL_INTERVAL, + progress_msg_map: dict = None, +) -> Optional[dict]: + """범용 Suno 작업 폴링. SUCCESS 시 response 객체 반환.""" error_statuses = { "CREATE_TASK_FAILED", "GENERATE_AUDIO_FAILED", "CALLBACK_EXCEPTION", "SENSITIVE_WORD_ERROR", } + default_msgs = { + "PENDING": "대기열에서 대기 중...", + "TEXT_SUCCESS": "가사 생성 완료, 음악 생성 중...", + "FIRST_SUCCESS": "첫 번째 트랙 완료, 두 번째 생성 중...", + "GENERATING": "생성 중...", + } + msgs = {**default_msgs, **(progress_msg_map or {})} - for attempt in range(POLL_MAX_ATTEMPTS): - time.sleep(POLL_INTERVAL) - + for attempt in range(max_attempts): + time.sleep(interval) try: resp = requests.get( - f"{SUNO_BASE_URL}/generate/record-info", + f"{SUNO_BASE_URL}{record_info_path}", headers=_headers(), params={"taskId": suno_task_id}, timeout=15, ) if resp.status_code != 200: continue - body = resp.json() if body.get("code") != 200: continue - data = body.get("data", {}) status = data.get("status", "") - progress = min(15 + int((attempt / POLL_MAX_ATTEMPTS) * 65), 79) + progress = min(15 + int((attempt / max_attempts) * 65), 79) - if status == "PENDING": - update_task(task_id, "processing", progress, "대기열에서 대기 중...") - elif status == "TEXT_SUCCESS": - update_task(task_id, "processing", progress, "가사 생성 완료, 음악 생성 중...") - elif status == "FIRST_SUCCESS": - update_task(task_id, "processing", max(progress, 60), "첫 번째 트랙 완료, 두 번째 생성 중...") - elif status == "SUCCESS": - # data.response.sunoData 에 트랙 배열이 들어있음 - response_obj = data.get("response", {}) - tracks = response_obj.get("sunoData") or [] - if tracks: - return tracks - update_task(task_id, "failed", 0, "", error="Suno 생성 완료했으나 트랙 데이터 없음") - return None + if status == "SUCCESS": + return data.get("response", data) elif status in error_statuses: - error_msg = data.get("errorMessage") or data.get("msg") or f"Suno 생성 실패 ({status})" + error_msg = data.get("errorMessage") or data.get("msg") or f"Suno 작업 실패 ({status})" update_task(task_id, "failed", 0, "", error=error_msg) return None else: - update_task(task_id, "processing", progress, f"처리 중... ({status})") - + msg = msgs.get(status, f"처리 중... ({status})") + if status == "FIRST_SUCCESS": + progress = max(progress, 60) + update_task(task_id, "processing", progress, msg) except Exception as e: logger.warning("Suno poll error (attempt %d): %s", attempt, e) continue - update_task(task_id, "failed", 0, "", error="Suno 생성 타임아웃 (5분 초과)") + update_task(task_id, "failed", 0, "", error="Suno 작업 타임아웃") return None @@ -351,20 +367,20 @@ def _download_and_register( # ── 크레딧 조회 ────────────────────────────────────────────────────────────── def get_credits() -> Optional[dict]: - """Suno API 잔여 크레딧 조회.""" + """Suno API 잔여 크레딧 조회. 두 엔드포인트 폴백.""" if not SUNO_API_KEY: return None - try: - resp = requests.get( - f"{SUNO_BASE_URL}/get-credits", - headers=_headers(), - timeout=15, - ) - if resp.status_code == 200: - body = resp.json() - return body.get("data", body) - except Exception as e: - logger.warning("Suno credits API error: %s", e) + for path in ["/generate/credit", "/get-credits"]: + try: + resp = requests.get(f"{SUNO_BASE_URL}{path}", headers=_headers(), timeout=15) + if resp.status_code == 200: + body = resp.json() + data = body.get("data", body) + if isinstance(data, (int, float)): + return {"credits_left": int(data)} + return data + except Exception as e: + logger.warning("Suno credits API error (%s): %s", path, e) return None @@ -418,8 +434,12 @@ def run_suno_extend(task_id: str, params: dict) -> None: update_task(task_id, "processing", 15, "곡 연장 대기열에 등록됨...") - completed_tracks = _poll_until_complete(task_id, suno_task_id) + response = _poll_suno_record("/generate/record-info", suno_task_id, task_id) + if not response: + return + completed_tracks = response.get("sunoData") or [] if not completed_tracks: + update_task(task_id, "failed", 0, "", error="Suno 연장 완료했으나 트랙 데이터 없음") return update_task(task_id, "processing", 80, "연장된 오디오 다운로드 중...") @@ -483,8 +503,12 @@ def run_vocal_removal(task_id: str, params: dict) -> None: update_task(task_id, "processing", 15, "보컬 분리 처리 중...") # 보컬 분리 결과 폴링 - completed_tracks = _poll_until_complete(task_id, suno_task_id) + response = _poll_suno_record("/vocal-removal/record-info", suno_task_id, task_id) + if not response: + return + completed_tracks = response.get("sunoData") or [] if not completed_tracks: + update_task(task_id, "failed", 0, "", error="보컬 분리 완료했으나 트랙 데이터 없음") return update_task(task_id, "processing", 80, "분리된 오디오 다운로드 중...") @@ -512,3 +536,51 @@ def run_vocal_removal(task_id: str, params: dict) -> None: except Exception as e: logger.exception("Suno vocal removal error for task %s", task_id) update_task(task_id, "failed", 0, "", error=str(e)) + + +# ── 커버 이미지 생성 ──────────────────────────────────────────────────────── + +def run_cover_image(task_id: str, params: dict) -> None: + """Suno 곡의 커버 이미지 2장을 생성.""" + try: + if not SUNO_API_KEY: + update_task(task_id, "failed", 0, "", error="SUNO_API_KEY가 설정되지 않았습니다.") + return + update_task(task_id, "processing", 5, "커버 이미지 생성 요청 중...") + suno_task_id = params.get("suno_task_id", "") + if not suno_task_id: + update_task(task_id, "failed", 0, "", error="suno_task_id가 필요합니다") + return + payload = { + "taskId": suno_task_id, + "callBackUrl": "https://example.com/noop", + } + resp = requests.post(f"{SUNO_BASE_URL}/suno/cover/generate", headers=_headers(), json=payload, timeout=30) + if resp.status_code != 200: + update_task(task_id, "failed", 0, "", error=f"커버 이미지 API 오류: {resp.text[:300]}") + return + body = resp.json() + if body.get("code") != 200: + update_task(task_id, "failed", 0, "", error=f"커버 이미지 거부: {body.get('msg', 'unknown')}") + return + cover_task_id = body.get("data", {}).get("taskId", suno_task_id) + update_task(task_id, "processing", 15, "커버 이미지 생성 중...") + response = _poll_suno_record( + "/suno/cover/record-info", cover_task_id, task_id, + max_attempts=30, interval=5, + progress_msg_map={"PENDING": "이미지 생성 대기 중...", "GENERATING": "이미지 생성 중..."}, + ) + if not response: + return + images = response.get("images") or response.get("sunoData") or [] + image_urls = [] + if isinstance(images, list): + for img in images: + if isinstance(img, str): + image_urls.append(img) + elif isinstance(img, dict): + image_urls.append(img.get("imageUrl") or img.get("image_url", "")) + update_task(task_id, "succeeded", 100, "커버 이미지 생성 완료", audio_url=json.dumps(image_urls)) + except Exception as e: + logger.exception("Cover image generation error for task %s", task_id) + update_task(task_id, "failed", 0, "", error=str(e))