- VIDEO_DATA_DIR 기본값을 /app/data/videos로 수정 (기존 /app/data에 videos 서브디렉토리를 중복 붙이던 버그 수정) - delete_project, export_project의 경로에서 중복된 "videos" 서브디렉토리 제거 - create_project 내부의 get_track_by_id lazy import를 파일 상단 import 블록으로 이동
814 lines
30 KiB
Python
814 lines
30 KiB
Python
import json
|
|
import os
|
|
import shutil
|
|
import uuid
|
|
from typing import List, Optional
|
|
from fastapi import FastAPI, HTTPException, BackgroundTasks
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel
|
|
|
|
from .db import (
|
|
init_db,
|
|
create_task, get_task,
|
|
get_all_tracks, add_track, delete_track, get_track_file_path, get_track_by_task_id, get_track_by_id,
|
|
update_track_duration, update_track_file_info, update_track_hash,
|
|
get_all_lyrics, add_lyrics, update_lyrics, delete_lyrics,
|
|
create_video_project, get_video_project, get_all_video_projects,
|
|
update_video_project_status, delete_video_project,
|
|
create_revenue_record, get_all_revenue_records,
|
|
update_revenue_record, delete_revenue_record, get_revenue_dashboard,
|
|
)
|
|
from .local_provider import run_local_generation
|
|
from .suno_provider import (
|
|
run_suno_generation, run_suno_extend, run_vocal_removal,
|
|
run_cover_image, run_wav_convert, run_stem_split,
|
|
run_upload_cover, run_upload_extend, run_add_vocals, run_add_instrumental, run_video_generate,
|
|
generate_lyrics, get_credits, get_timestamped_lyrics, generate_style_boost,
|
|
SUNO_API_KEY, SUNO_MODELS,
|
|
)
|
|
|
|
app = FastAPI()
|
|
|
|
_cors_origins = os.getenv("CORS_ALLOW_ORIGINS", "http://localhost:3007,http://localhost:8080").split(",")
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=[o.strip() for o in _cors_origins],
|
|
allow_credentials=False,
|
|
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
|
|
allow_headers=["Content-Type"],
|
|
)
|
|
|
|
MUSIC_DATA_DIR = "/app/data"
|
|
VIDEO_DATA_DIR = os.getenv("VIDEO_DATA_DIR", "/app/data/videos")
|
|
|
|
|
|
def _get_mp3_duration(file_path: str) -> Optional[int]:
|
|
"""MP3 파일에서 실제 재생 시간(초) 추출."""
|
|
try:
|
|
from mutagen.mp3 import MP3
|
|
audio = MP3(file_path)
|
|
return int(audio.info.length)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _backfill_durations():
|
|
"""duration_sec이 없는 기존 트랙에 MP3 메타데이터에서 길이 채우기."""
|
|
for t in get_all_tracks():
|
|
if t["duration_sec"] is None and t.get("file_path"):
|
|
dur = _get_mp3_duration(t["file_path"])
|
|
if dur:
|
|
update_track_duration(t["id"], dur)
|
|
|
|
|
|
@app.on_event("startup")
|
|
def on_startup():
|
|
init_db()
|
|
os.makedirs(MUSIC_DATA_DIR, exist_ok=True)
|
|
_backfill_durations()
|
|
|
|
|
|
@app.get("/health")
|
|
def health():
|
|
return {"ok": True}
|
|
|
|
|
|
@app.get("/api/music/providers")
|
|
def get_providers():
|
|
"""사용 가능한 음악 생성 프로바이더 목록 반환."""
|
|
providers = []
|
|
if os.getenv("MUSIC_AI_SERVER_URL"):
|
|
providers.append({
|
|
"id": "local",
|
|
"name": "MusicGen",
|
|
"description": "로컬 AI 서버 (인스트루멘탈 전용)",
|
|
"features": ["instrumental"],
|
|
})
|
|
if SUNO_API_KEY:
|
|
providers.append({
|
|
"id": "suno",
|
|
"name": "Suno",
|
|
"description": "Suno AI (보컬·가사·인스트루멘탈)",
|
|
"features": ["vocals", "lyrics", "instrumental"],
|
|
})
|
|
return {"providers": providers}
|
|
|
|
|
|
# ── 음악 생성 API ─────────────────────────────────────────────────────────────
|
|
|
|
class GenerateRequest(BaseModel):
|
|
provider: str = "suno" # "suno" | "local"
|
|
model: str = "V4" # Suno 모델 (V4, V4_5, V5 등)
|
|
title: str = ""
|
|
genre: str = ""
|
|
moods: List[str] = []
|
|
instruments: List[str] = []
|
|
duration_sec: Optional[int] = None
|
|
bpm: Optional[int] = None
|
|
key: str = ""
|
|
scale: str = ""
|
|
prompt: str = ""
|
|
# Suno 전용
|
|
lyrics: str = "" # 커스텀 가사 ([Verse], [Chorus] 등)
|
|
instrumental: bool = False # True면 보컬 없이 인스트루멘탈만
|
|
# Phase 1 신규
|
|
vocal_gender: Optional[str] = None # "m" | "f"
|
|
negative_tags: Optional[str] = None # 제외 스타일
|
|
style_weight: Optional[float] = None # 0.0~1.0
|
|
audio_weight: Optional[float] = None # 0.0~1.0
|
|
|
|
|
|
@app.post("/api/music/generate")
|
|
def generate_music(req: GenerateRequest, background_tasks: BackgroundTasks):
|
|
"""
|
|
음악 생성 작업 시작. task_id 즉시 반환 후 백그라운드에서 AI 서버 호출.
|
|
provider: "suno" (Suno API) 또는 "local" (MusicGen)
|
|
"""
|
|
provider = req.provider
|
|
if provider == "suno" and not SUNO_API_KEY:
|
|
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
|
|
if provider == "local" and not os.getenv("MUSIC_AI_SERVER_URL"):
|
|
raise HTTPException(status_code=400, detail="로컬 AI 서버 URL이 설정되지 않았습니다")
|
|
if provider not in ("suno", "local"):
|
|
raise HTTPException(status_code=400, detail=f"지원하지 않는 provider: {provider}")
|
|
|
|
task_id = str(uuid.uuid4())
|
|
params = req.model_dump()
|
|
create_task(task_id, params, provider=provider)
|
|
|
|
if provider == "suno":
|
|
background_tasks.add_task(run_suno_generation, task_id, params)
|
|
else:
|
|
background_tasks.add_task(run_local_generation, task_id, params)
|
|
|
|
return {"task_id": task_id, "provider": provider}
|
|
|
|
|
|
@app.get("/api/music/status/{task_id}")
|
|
def get_status(task_id: str):
|
|
"""
|
|
생성 작업 상태 조회. 프론트는 succeeded 또는 failed가 될 때까지 폴링.
|
|
status: queued | processing | succeeded | failed
|
|
succeeded 시 track 메타데이터 포함.
|
|
"""
|
|
task = get_task(task_id)
|
|
if not task:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
|
|
resp = {
|
|
"status": task["status"],
|
|
"progress": task["progress"],
|
|
"message": task["message"],
|
|
"audio_url": task["audio_url"],
|
|
"error": task["error"],
|
|
"provider": task["provider"],
|
|
}
|
|
|
|
if task["status"] == "succeeded":
|
|
track = get_track_by_task_id(task_id)
|
|
resp["track"] = track
|
|
|
|
return resp
|
|
|
|
|
|
# ── 가사 생성 API (Suno 전용) ────────────────────────────────────────────────
|
|
|
|
class LyricsRequest(BaseModel):
|
|
prompt: str
|
|
|
|
|
|
@app.post("/api/music/lyrics")
|
|
def gen_lyrics(req: LyricsRequest):
|
|
"""Suno AI로 가사를 생성합니다. 곡 생성 전 가사 미리보기용."""
|
|
if not SUNO_API_KEY:
|
|
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
|
|
result = generate_lyrics(req.prompt)
|
|
if not result:
|
|
raise HTTPException(status_code=502, detail="가사 생성에 실패했습니다")
|
|
return result
|
|
|
|
|
|
# ── 라이브러리 API ────────────────────────────────────────────────────────────
|
|
|
|
class TrackCreate(BaseModel):
|
|
title: str = ""
|
|
genre: str = ""
|
|
moods: List[str] = []
|
|
instruments: List[str] = []
|
|
duration_sec: Optional[int] = None
|
|
bpm: Optional[int] = None
|
|
key: str = ""
|
|
scale: str = ""
|
|
prompt: str = ""
|
|
audio_url: str = ""
|
|
file_path: str = ""
|
|
task_id: Optional[str] = None
|
|
tags: List[str] = []
|
|
provider: str = "local"
|
|
lyrics: str = ""
|
|
image_url: str = ""
|
|
suno_id: str = ""
|
|
|
|
|
|
@app.get("/api/music/library")
|
|
def list_library():
|
|
"""저장된 트랙 목록 전체 조회 (생성일 내림차순). 파일시스템과 자동 동기화."""
|
|
_sync_library_with_disk()
|
|
return {"tracks": get_all_tracks()}
|
|
|
|
|
|
def _calc_file_hash(file_path: str) -> str:
|
|
"""MD5 해시 계산 (파일 동일성 체크용)."""
|
|
import hashlib
|
|
h = hashlib.md5()
|
|
try:
|
|
with open(file_path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(8192), b""):
|
|
h.update(chunk)
|
|
return h.hexdigest()
|
|
except OSError:
|
|
return ""
|
|
|
|
|
|
def _sync_library_with_disk():
|
|
"""파일시스템의 .mp3 파일과 DB를 동기화 (해시 기반 rename 감지).
|
|
|
|
1단계: 파일명 매칭 (빠른 경로)
|
|
2단계: 미매칭 파일/레코드를 해시로 비교 → rename 감지 → 메타데이터 보존 업데이트
|
|
3단계: 나머지 → 삭제/추가
|
|
"""
|
|
tracks = get_all_tracks()
|
|
media_base = os.getenv("MUSIC_MEDIA_BASE", "/media/music")
|
|
|
|
# 디스크의 .mp3 파일 목록
|
|
disk_files = set()
|
|
try:
|
|
for f in os.listdir(MUSIC_DATA_DIR):
|
|
if f.lower().endswith(".mp3"):
|
|
disk_files.add(f)
|
|
except OSError:
|
|
return
|
|
|
|
# ── 1단계: 파일명 매칭 ──────────────────────────────────────
|
|
db_by_filename = {} # filename → track
|
|
for t in tracks:
|
|
if t.get("audio_url"):
|
|
fname = t["audio_url"].split("/")[-1]
|
|
db_by_filename[fname] = t
|
|
|
|
matched_disk = set()
|
|
matched_db_ids = set()
|
|
|
|
for f in disk_files:
|
|
if f in db_by_filename:
|
|
matched_disk.add(f)
|
|
track = db_by_filename[f]
|
|
matched_db_ids.add(track["id"])
|
|
# 기존 트랙에 file_hash 없으면 채우기
|
|
if not track.get("file_hash"):
|
|
file_hash = _calc_file_hash(os.path.join(MUSIC_DATA_DIR, f))
|
|
if file_hash:
|
|
update_track_hash(track["id"], file_hash)
|
|
|
|
unmatched_disk = disk_files - matched_disk
|
|
unmatched_db = [t for t in tracks if t["id"] not in matched_db_ids]
|
|
|
|
# ── 2단계: 해시 기반 rename 감지 ────────────────────────────
|
|
if unmatched_disk and unmatched_db:
|
|
# DB 미매칭 레코드의 해시 맵
|
|
db_hash_map = {} # hash → track
|
|
for t in unmatched_db:
|
|
h = t.get("file_hash", "")
|
|
if h:
|
|
db_hash_map[h] = t
|
|
|
|
resolved_disk = set()
|
|
resolved_db_ids = set()
|
|
|
|
for f in unmatched_disk:
|
|
file_path = os.path.join(MUSIC_DATA_DIR, f)
|
|
file_hash = _calc_file_hash(file_path)
|
|
if not file_hash:
|
|
continue
|
|
|
|
if file_hash in db_hash_map:
|
|
# rename 감지 — 기존 레코드 업데이트 (태그·메타데이터 보존)
|
|
track = db_hash_map[file_hash]
|
|
new_title = os.path.splitext(f)[0].replace("-", " ").replace("_", " ")
|
|
update_track_file_info(
|
|
track["id"],
|
|
title=new_title,
|
|
audio_url=f"{media_base}/{f}",
|
|
file_path=file_path,
|
|
)
|
|
resolved_disk.add(f)
|
|
resolved_db_ids.add(track["id"])
|
|
|
|
unmatched_disk -= resolved_disk
|
|
unmatched_db = [t for t in unmatched_db if t["id"] not in resolved_db_ids]
|
|
|
|
# ── 3단계: 나머지 처리 ──────────────────────────────────────
|
|
# DB에만 남은 레코드 → 파일 삭제됨 → DB 삭제
|
|
for t in unmatched_db:
|
|
delete_track(t["id"])
|
|
|
|
# 디스크에만 남은 파일 → 신규 → DB 추가 (해시 포함)
|
|
for f in unmatched_disk:
|
|
file_path = os.path.join(MUSIC_DATA_DIR, f)
|
|
title = os.path.splitext(f)[0].replace("-", " ").replace("_", " ")
|
|
file_hash = _calc_file_hash(file_path)
|
|
add_track({
|
|
"title": title,
|
|
"audio_url": f"{media_base}/{f}",
|
|
"file_path": file_path,
|
|
"provider": "suno",
|
|
"duration_sec": _get_mp3_duration(file_path),
|
|
"file_hash": file_hash,
|
|
})
|
|
|
|
|
|
@app.post("/api/music/library", status_code=201)
|
|
def save_to_library(req: TrackCreate):
|
|
"""트랙 수동 추가 (외부 파일 등록 또는 프론트 직접 저장용)"""
|
|
track = add_track(req.model_dump())
|
|
return track
|
|
|
|
|
|
@app.delete("/api/music/library/{track_id}")
|
|
def remove_from_library(track_id: int):
|
|
"""라이브러리에서 트랙 삭제. 로컬 파일도 함께 삭제."""
|
|
file_path = get_track_file_path(track_id)
|
|
ok = delete_track(track_id)
|
|
if not ok:
|
|
raise HTTPException(status_code=404, detail="Track not found")
|
|
|
|
if file_path and os.path.isfile(file_path):
|
|
try:
|
|
os.remove(file_path)
|
|
except OSError:
|
|
pass
|
|
|
|
return {"ok": True}
|
|
|
|
|
|
# ── 모델 목록 API ────────────────────────────────────────────────────────────
|
|
|
|
@app.get("/api/music/models")
|
|
def get_models():
|
|
"""사용 가능한 Suno AI 모델 목록."""
|
|
return {"models": SUNO_MODELS}
|
|
|
|
|
|
# ── 크레딧 조회 API ──────────────────────────────────────────────────────────
|
|
|
|
@app.get("/api/music/credits")
|
|
def check_credits():
|
|
"""Suno 잔여 크레딧 조회."""
|
|
if not SUNO_API_KEY:
|
|
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
|
|
result = get_credits()
|
|
if result is None:
|
|
raise HTTPException(status_code=502, detail="크레딧 조회 실패")
|
|
return result
|
|
|
|
|
|
# ── 곡 연장 API ──────────────────────────────────────────────────────────────
|
|
|
|
class ExtendRequest(BaseModel):
|
|
suno_id: str # 원본 Suno 곡 ID
|
|
continue_at: int = 0 # 연장 시작 지점 (초)
|
|
prompt: str = "" # 추가 가사/프롬프트
|
|
style: str = "" # 스타일 오버라이드
|
|
title: str = ""
|
|
model: str = "V4"
|
|
|
|
|
|
@app.post("/api/music/extend")
|
|
def extend_music(req: ExtendRequest, background_tasks: BackgroundTasks):
|
|
"""기존 곡을 특정 지점부터 연장 (Suno Extend API)."""
|
|
if not SUNO_API_KEY:
|
|
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
|
|
|
|
task_id = str(uuid.uuid4())
|
|
params = req.model_dump()
|
|
create_task(task_id, params, provider="suno")
|
|
background_tasks.add_task(run_suno_extend, task_id, params)
|
|
return {"task_id": task_id, "provider": "suno"}
|
|
|
|
|
|
# ── 보컬 분리 API ────────────────────────────────────────────────────────────
|
|
|
|
class VocalRemovalRequest(BaseModel):
|
|
suno_id: str # Suno 곡 ID
|
|
title: str = "" # 원본 트랙 제목
|
|
|
|
|
|
@app.post("/api/music/vocal-removal")
|
|
def vocal_removal(req: VocalRemovalRequest, background_tasks: BackgroundTasks):
|
|
"""트랙에서 보컬과 인스트루멘탈을 분리 (Suno Vocal Removal API)."""
|
|
if not SUNO_API_KEY:
|
|
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
|
|
|
|
task_id = str(uuid.uuid4())
|
|
params = req.model_dump()
|
|
create_task(task_id, params, provider="suno")
|
|
background_tasks.add_task(run_vocal_removal, task_id, params)
|
|
return {"task_id": task_id, "provider": "suno"}
|
|
|
|
|
|
# ── 커버 이미지 생성 API ────────────────────────────────────────────────────
|
|
|
|
class CoverImageRequest(BaseModel):
|
|
suno_task_id: str # Suno 생성 task ID
|
|
track_id: Optional[int] = None # 라이브러리 트랙 ID (결과 저장용)
|
|
|
|
|
|
@app.post("/api/music/cover-image")
|
|
def cover_image(req: CoverImageRequest, background_tasks: BackgroundTasks):
|
|
"""Suno 곡의 커버 이미지 2장 생성."""
|
|
if not SUNO_API_KEY:
|
|
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
|
|
|
|
task_id = str(uuid.uuid4())
|
|
params = req.model_dump()
|
|
create_task(task_id, params, provider="suno")
|
|
background_tasks.add_task(run_cover_image, task_id, params)
|
|
return {"task_id": task_id, "provider": "suno"}
|
|
|
|
|
|
# ── WAV 변환 API ────────────────────────────────────────────────────────────
|
|
|
|
class WavRequest(BaseModel):
|
|
suno_task_id: str
|
|
suno_id: str
|
|
track_id: Optional[int] = None
|
|
|
|
|
|
@app.post("/api/music/wav")
|
|
def wav_convert(req: WavRequest, background_tasks: BackgroundTasks):
|
|
"""곡을 WAV 포맷으로 변환."""
|
|
if not SUNO_API_KEY:
|
|
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
|
|
task_id = str(uuid.uuid4())
|
|
params = req.model_dump()
|
|
create_task(task_id, params, provider="suno")
|
|
background_tasks.add_task(run_wav_convert, task_id, params)
|
|
return {"task_id": task_id, "provider": "suno"}
|
|
|
|
|
|
# ── 12스템 분리 API ─────────────────────────────────────────────────────────
|
|
|
|
class StemSplitRequest(BaseModel):
|
|
suno_task_id: str
|
|
suno_id: str
|
|
track_id: Optional[int] = None
|
|
|
|
|
|
@app.post("/api/music/stem-split")
|
|
def stem_split(req: StemSplitRequest, background_tasks: BackgroundTasks):
|
|
"""곡을 12개 스템으로 분리 (50 크레딧). 보컬, 드럼, 베이스, 기타 등."""
|
|
if not SUNO_API_KEY:
|
|
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
|
|
task_id = str(uuid.uuid4())
|
|
params = req.model_dump()
|
|
create_task(task_id, params, provider="suno")
|
|
background_tasks.add_task(run_stem_split, task_id, params)
|
|
return {"task_id": task_id, "provider": "suno"}
|
|
|
|
|
|
# ── 타임스탬프 가사 API ─────────────────────────────────────────────────────
|
|
|
|
@app.get("/api/music/timestamped-lyrics")
|
|
def timestamped_lyrics(task_id: str, suno_id: str):
|
|
"""타임스탬프 가사 조회 (가라오케 스타일 싱크용)."""
|
|
if not SUNO_API_KEY:
|
|
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
|
|
result = get_timestamped_lyrics(task_id, suno_id)
|
|
if not result:
|
|
raise HTTPException(status_code=502, detail="타임스탬프 가사 조회 실패")
|
|
return result
|
|
|
|
|
|
# ── 스타일 부스트 API ───────────────────────────────────────────────────────
|
|
|
|
class StyleBoostRequest(BaseModel):
|
|
content: str
|
|
|
|
|
|
@app.post("/api/music/style-boost")
|
|
def style_boost(req: StyleBoostRequest):
|
|
"""AI로 최적 스타일 프롬프트 생성."""
|
|
if not SUNO_API_KEY:
|
|
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
|
|
result = generate_style_boost(req.content)
|
|
if not result:
|
|
raise HTTPException(status_code=502, detail="스타일 부스트 생성 실패")
|
|
return result
|
|
|
|
|
|
# ── Phase 3: 업로드 + 커버 ──────────────────────────────────────────────────
|
|
|
|
class UploadCoverRequest(BaseModel):
|
|
upload_url: str
|
|
model: str = "V4"
|
|
custom_mode: bool = True
|
|
instrumental: bool = False
|
|
prompt: str = ""
|
|
style: str = ""
|
|
title: str = ""
|
|
vocal_gender: Optional[str] = None
|
|
negative_tags: Optional[str] = None
|
|
style_weight: Optional[float] = None
|
|
audio_weight: Optional[float] = None
|
|
|
|
|
|
@app.post("/api/music/upload-cover")
|
|
def upload_cover(req: UploadCoverRequest, background_tasks: BackgroundTasks):
|
|
"""외부 오디오를 Suno 스타일로 리메이크."""
|
|
if not SUNO_API_KEY:
|
|
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
|
|
task_id = str(uuid.uuid4())
|
|
params = req.model_dump()
|
|
create_task(task_id, params, provider="suno")
|
|
background_tasks.add_task(run_upload_cover, task_id, params)
|
|
return {"task_id": task_id, "provider": "suno"}
|
|
|
|
|
|
# ── Phase 3: 업로드 + 확장 ──────────────────────────────────────────────────
|
|
|
|
class UploadExtendRequest(BaseModel):
|
|
upload_url: str
|
|
model: str = "V4"
|
|
default_param_flag: bool = True
|
|
continue_at: Optional[float] = None
|
|
prompt: str = ""
|
|
style: str = ""
|
|
title: str = ""
|
|
instrumental: bool = False
|
|
vocal_gender: Optional[str] = None
|
|
negative_tags: Optional[str] = None
|
|
|
|
|
|
@app.post("/api/music/upload-extend")
|
|
def upload_extend(req: UploadExtendRequest, background_tasks: BackgroundTasks):
|
|
"""외부 오디오를 이어서 확장."""
|
|
if not SUNO_API_KEY:
|
|
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
|
|
task_id = str(uuid.uuid4())
|
|
params = req.model_dump()
|
|
create_task(task_id, params, provider="suno")
|
|
background_tasks.add_task(run_upload_extend, task_id, params)
|
|
return {"task_id": task_id, "provider": "suno"}
|
|
|
|
|
|
# ── Phase 3: 보컬 추가 ──────────────────────────────────────────────────────
|
|
|
|
class AddVocalsRequest(BaseModel):
|
|
upload_url: str
|
|
prompt: str
|
|
title: str
|
|
style: str
|
|
negative_tags: str = ""
|
|
vocal_gender: Optional[str] = None
|
|
model: str = "V4_5PLUS"
|
|
style_weight: Optional[float] = None
|
|
audio_weight: Optional[float] = None
|
|
|
|
|
|
@app.post("/api/music/add-vocals")
|
|
def add_vocals(req: AddVocalsRequest, background_tasks: BackgroundTasks):
|
|
"""인스트루멘탈에 AI 보컬 추가."""
|
|
if not SUNO_API_KEY:
|
|
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
|
|
task_id = str(uuid.uuid4())
|
|
params = req.model_dump()
|
|
create_task(task_id, params, provider="suno")
|
|
background_tasks.add_task(run_add_vocals, task_id, params)
|
|
return {"task_id": task_id, "provider": "suno"}
|
|
|
|
|
|
# ── Phase 3: 인스트루멘탈 추가 ──────────────────────────────────────────────
|
|
|
|
class AddInstrumentalRequest(BaseModel):
|
|
upload_url: str
|
|
title: str
|
|
tags: str
|
|
negative_tags: str = ""
|
|
vocal_gender: Optional[str] = None
|
|
model: str = "V4_5PLUS"
|
|
style_weight: Optional[float] = None
|
|
audio_weight: Optional[float] = None
|
|
|
|
|
|
@app.post("/api/music/add-instrumental")
|
|
def add_instrumental(req: AddInstrumentalRequest, background_tasks: BackgroundTasks):
|
|
"""보컬에 AI 반주 추가."""
|
|
if not SUNO_API_KEY:
|
|
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
|
|
task_id = str(uuid.uuid4())
|
|
params = req.model_dump()
|
|
create_task(task_id, params, provider="suno")
|
|
background_tasks.add_task(run_add_instrumental, task_id, params)
|
|
return {"task_id": task_id, "provider": "suno"}
|
|
|
|
|
|
# ── Phase 3: 뮤직비디오 생성 ────────────────────────────────────────────────
|
|
|
|
class VideoRequest(BaseModel):
|
|
suno_task_id: str
|
|
suno_id: str
|
|
author: str = ""
|
|
domain_name: str = ""
|
|
track_id: Optional[int] = None
|
|
|
|
|
|
@app.post("/api/music/video")
|
|
def video_generate(req: VideoRequest, background_tasks: BackgroundTasks):
|
|
"""뮤직비디오(MP4) 생성."""
|
|
if not SUNO_API_KEY:
|
|
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
|
|
task_id = str(uuid.uuid4())
|
|
params = req.model_dump()
|
|
create_task(task_id, params, provider="suno")
|
|
background_tasks.add_task(run_video_generate, task_id, params)
|
|
return {"task_id": task_id, "provider": "suno"}
|
|
|
|
|
|
# ── 저장된 가사 CRUD API ────────────────────────────────────────────────────
|
|
|
|
class LyricsSave(BaseModel):
|
|
title: str = ""
|
|
text: str = ""
|
|
prompt: str = ""
|
|
|
|
|
|
class LyricsUpdate(BaseModel):
|
|
title: Optional[str] = None
|
|
text: Optional[str] = None
|
|
prompt: Optional[str] = None
|
|
|
|
|
|
@app.get("/api/music/lyrics/library")
|
|
def list_saved_lyrics():
|
|
"""저장된 가사 목록 전체 조회 (생성일 내림차순)."""
|
|
return {"lyrics": get_all_lyrics()}
|
|
|
|
|
|
@app.post("/api/music/lyrics/library", status_code=201)
|
|
def save_lyrics(req: LyricsSave):
|
|
"""가사 저장."""
|
|
return add_lyrics(req.model_dump())
|
|
|
|
|
|
@app.put("/api/music/lyrics/library/{lyrics_id}")
|
|
def edit_lyrics(lyrics_id: int, req: LyricsUpdate):
|
|
"""가사 수정."""
|
|
data = {k: v for k, v in req.model_dump().items() if v is not None}
|
|
result = update_lyrics(lyrics_id, data)
|
|
if not result:
|
|
raise HTTPException(status_code=404, detail="Lyrics not found")
|
|
return result
|
|
|
|
|
|
@app.delete("/api/music/lyrics/library/{lyrics_id}")
|
|
def remove_lyrics(lyrics_id: int):
|
|
"""가사 삭제."""
|
|
if not delete_lyrics(lyrics_id):
|
|
raise HTTPException(status_code=404, detail="Lyrics not found")
|
|
return {"ok": True}
|
|
|
|
|
|
# ── 영상 프로젝트 모델 ────────────────────────────────────────────────────────
|
|
|
|
class VideoProjectCreate(BaseModel):
|
|
track_id: int
|
|
format: str = "visualizer"
|
|
target_countries: List[str] = []
|
|
render_params: dict = {}
|
|
|
|
|
|
class RevenueCreate(BaseModel):
|
|
video_project_id: Optional[int] = None
|
|
yt_video_id: str = ""
|
|
record_month: str
|
|
views: int = 0
|
|
watch_hours: float = 0.0
|
|
revenue_usd: float = 0.0
|
|
country: str = ""
|
|
source: str = "manual"
|
|
|
|
|
|
class RevenueUpdate(BaseModel):
|
|
yt_video_id: Optional[str] = None
|
|
record_month: Optional[str] = None
|
|
views: Optional[int] = None
|
|
watch_hours: Optional[float] = None
|
|
revenue_usd: Optional[float] = None
|
|
country: Optional[str] = None
|
|
source: Optional[str] = None
|
|
|
|
|
|
# ── 영상 프로젝트 API ─────────────────────────────────────────────────────────
|
|
|
|
@app.post("/api/music/video-project", status_code=201)
|
|
def create_project(req: VideoProjectCreate, background_tasks: BackgroundTasks):
|
|
if not get_track_by_id(req.track_id):
|
|
raise HTTPException(status_code=404, detail="Track not found")
|
|
if req.format not in ("visualizer", "slideshow"):
|
|
raise HTTPException(status_code=400, detail="format은 'visualizer' 또는 'slideshow'")
|
|
proj = create_video_project(req.model_dump())
|
|
return proj
|
|
|
|
|
|
@app.get("/api/music/video-projects")
|
|
def list_projects():
|
|
return {"projects": get_all_video_projects()}
|
|
|
|
|
|
@app.get("/api/music/video-project/{project_id}")
|
|
def get_project(project_id: int):
|
|
proj = get_video_project(project_id)
|
|
if not proj:
|
|
raise HTTPException(status_code=404, detail="Project not found")
|
|
return proj
|
|
|
|
|
|
@app.post("/api/music/video-project/{project_id}/render")
|
|
def render_project(project_id: int, background_tasks: BackgroundTasks):
|
|
proj = get_video_project(project_id)
|
|
if not proj:
|
|
raise HTTPException(status_code=404, detail="Project not found")
|
|
if proj["status"] == "rendering":
|
|
raise HTTPException(status_code=409, detail="이미 렌더링 중입니다")
|
|
from .video_producer import produce_video
|
|
background_tasks.add_task(produce_video, project_id)
|
|
return {"ok": True, "project_id": project_id, "status": "rendering"}
|
|
|
|
|
|
@app.get("/api/music/video-project/{project_id}/export")
|
|
def export_project(project_id: int):
|
|
proj = get_video_project(project_id)
|
|
if not proj:
|
|
raise HTTPException(status_code=404, detail="Project not found")
|
|
if proj["status"] != "done":
|
|
raise HTTPException(status_code=400, detail=f"렌더링 미완료 (status: {proj['status']})")
|
|
meta_path = os.path.join(VIDEO_DATA_DIR, str(project_id), "metadata.json")
|
|
metadata = {}
|
|
if os.path.exists(meta_path):
|
|
with open(meta_path, encoding="utf-8") as f:
|
|
metadata = json.load(f)
|
|
thumb_url = proj["output_url"].replace("output.mp4", "thumbnail.jpg") if proj["output_url"] else ""
|
|
return {
|
|
"project_id": project_id,
|
|
"output_url": proj["output_url"],
|
|
"thumbnail_url": thumb_url,
|
|
"yt_title": proj["yt_title"],
|
|
"yt_description": proj["yt_description"],
|
|
"yt_tags": proj["yt_tags"],
|
|
"metadata": metadata,
|
|
}
|
|
|
|
|
|
@app.delete("/api/music/video-project/{project_id}")
|
|
def delete_project(project_id: int):
|
|
if not get_video_project(project_id):
|
|
raise HTTPException(status_code=404, detail="Project not found")
|
|
out_dir = os.path.join(VIDEO_DATA_DIR, str(project_id))
|
|
if os.path.isdir(out_dir):
|
|
shutil.rmtree(out_dir, ignore_errors=True)
|
|
delete_video_project(project_id)
|
|
return {"ok": True}
|
|
|
|
|
|
# ── 수익화 추적 API ───────────────────────────────────────────────────────────
|
|
|
|
@app.get("/api/music/revenue/dashboard")
|
|
def revenue_dashboard():
|
|
return get_revenue_dashboard()
|
|
|
|
|
|
@app.get("/api/music/revenue")
|
|
def list_revenue(yt_video_id: Optional[str] = None, year_month: Optional[str] = None):
|
|
return {"records": get_all_revenue_records(yt_video_id, year_month)}
|
|
|
|
|
|
@app.post("/api/music/revenue", status_code=201)
|
|
def add_revenue(req: RevenueCreate):
|
|
return create_revenue_record(req.model_dump())
|
|
|
|
|
|
@app.put("/api/music/revenue/{record_id}")
|
|
def edit_revenue(record_id: int, req: RevenueUpdate):
|
|
data = {k: v for k, v in req.model_dump().items() if v is not None}
|
|
result = update_revenue_record(record_id, data)
|
|
if not result:
|
|
raise HTTPException(status_code=404, detail="Record not found")
|
|
return result
|
|
|
|
|
|
@app.delete("/api/music/revenue/{record_id}")
|
|
def remove_revenue(record_id: int):
|
|
if not delete_revenue_record(record_id):
|
|
raise HTTPException(status_code=404, detail="Record not found")
|
|
return {"ok": True}
|