Files

1218 lines
44 KiB
Python

import json
import os
import shutil
import uuid
from typing import Any, Dict, List, Optional
from fastapi import FastAPI, HTTPException, BackgroundTasks, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from .db import (
init_db,
create_task, get_task,
get_all_tracks, add_track, delete_track, get_track_file_path, get_track_by_task_id, get_track_by_id,
update_track_duration, update_track_file_info, update_track_hash,
get_all_lyrics, add_lyrics, update_lyrics, delete_lyrics,
create_video_project, get_video_project, get_all_video_projects,
update_video_project_status, delete_video_project,
create_revenue_record, get_all_revenue_records,
update_revenue_record, delete_revenue_record, get_revenue_dashboard,
get_market_trends as _get_market_trends,
get_latest_trend_report, get_trend_reports as _get_trend_reports,
create_compile_job, get_compile_jobs, get_compile_job,
update_compile_job, delete_compile_job,
)
from . import db as _db_module
from .compiler import run_compile
from .market import ingest_trends, get_suggestions
from .local_provider import run_local_generation
from .pipeline import orchestrator
from .pipeline import youtube as yt_module
from .suno_provider import (
run_suno_generation, run_suno_extend, run_vocal_removal,
run_cover_image, run_wav_convert, run_stem_split,
run_upload_cover, run_upload_extend, run_add_vocals, run_add_instrumental, run_video_generate,
generate_lyrics, get_credits, get_timestamped_lyrics, generate_style_boost,
SUNO_API_KEY, SUNO_MODELS,
)
from .batch_generator import run_batch as _run_batch
app = FastAPI()
_cors_origins = os.getenv("CORS_ALLOW_ORIGINS", "http://localhost:3007,http://localhost:8080").split(",")
app.add_middleware(
CORSMiddleware,
allow_origins=[o.strip() for o in _cors_origins],
allow_credentials=False,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["Content-Type"],
)
MUSIC_DATA_DIR = "/app/data"
VIDEO_DATA_DIR = os.getenv("VIDEO_DATA_DIR", "/app/data/videos")
def _get_mp3_duration(file_path: str) -> Optional[int]:
"""MP3 파일에서 실제 재생 시간(초) 추출."""
try:
from mutagen.mp3 import MP3
audio = MP3(file_path)
return int(audio.info.length)
except Exception:
return None
def _backfill_durations():
"""duration_sec이 없는 기존 트랙에 MP3 메타데이터에서 길이 채우기."""
for t in get_all_tracks():
if t["duration_sec"] is None and t.get("file_path"):
dur = _get_mp3_duration(t["file_path"])
if dur:
update_track_duration(t["id"], dur)
@app.on_event("startup")
def on_startup():
init_db()
os.makedirs(MUSIC_DATA_DIR, exist_ok=True)
_backfill_durations()
@app.get("/health")
def health():
return {"ok": True}
@app.get("/api/music/providers")
def get_providers():
"""사용 가능한 음악 생성 프로바이더 목록 반환."""
providers = []
if os.getenv("MUSIC_AI_SERVER_URL"):
providers.append({
"id": "local",
"name": "MusicGen",
"description": "로컬 AI 서버 (인스트루멘탈 전용)",
"features": ["instrumental"],
})
if SUNO_API_KEY:
providers.append({
"id": "suno",
"name": "Suno",
"description": "Suno AI (보컬·가사·인스트루멘탈)",
"features": ["vocals", "lyrics", "instrumental"],
})
return {"providers": providers}
# ── 음악 생성 API ─────────────────────────────────────────────────────────────
class GenerateRequest(BaseModel):
provider: str = "suno" # "suno" | "local"
model: str = "V4" # Suno 모델 (V4, V4_5, V5 등)
title: str = ""
genre: str = ""
moods: List[str] = []
instruments: List[str] = []
duration_sec: Optional[int] = None
bpm: Optional[int] = None
key: str = ""
scale: str = ""
prompt: str = ""
# Suno 전용
lyrics: str = "" # 커스텀 가사 ([Verse], [Chorus] 등)
instrumental: bool = False # True면 보컬 없이 인스트루멘탈만
# Phase 1 신규
vocal_gender: Optional[str] = None # "m" | "f"
negative_tags: Optional[str] = None # 제외 스타일
style_weight: Optional[float] = None # 0.0~1.0
audio_weight: Optional[float] = None # 0.0~1.0
@app.post("/api/music/generate")
def generate_music(req: GenerateRequest, background_tasks: BackgroundTasks):
"""
음악 생성 작업 시작. task_id 즉시 반환 후 백그라운드에서 AI 서버 호출.
provider: "suno" (Suno API) 또는 "local" (MusicGen)
"""
provider = req.provider
if provider == "suno" and not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
if provider == "local" and not os.getenv("MUSIC_AI_SERVER_URL"):
raise HTTPException(status_code=400, detail="로컬 AI 서버 URL이 설정되지 않았습니다")
if provider not in ("suno", "local"):
raise HTTPException(status_code=400, detail=f"지원하지 않는 provider: {provider}")
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider=provider)
if provider == "suno":
background_tasks.add_task(run_suno_generation, task_id, params)
else:
background_tasks.add_task(run_local_generation, task_id, params)
return {"task_id": task_id, "provider": provider}
@app.get("/api/music/status/{task_id}")
def get_status(task_id: str):
"""
생성 작업 상태 조회. 프론트는 succeeded 또는 failed가 될 때까지 폴링.
status: queued | processing | succeeded | failed
succeeded 시 track 메타데이터 포함.
"""
task = get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
resp = {
"status": task["status"],
"progress": task["progress"],
"message": task["message"],
"audio_url": task["audio_url"],
"error": task["error"],
"provider": task["provider"],
}
if task["status"] == "succeeded":
track = get_track_by_task_id(task_id)
resp["track"] = track
return resp
# ── 가사 생성 API (Suno 전용) ────────────────────────────────────────────────
class LyricsRequest(BaseModel):
prompt: str
@app.post("/api/music/lyrics")
def gen_lyrics(req: LyricsRequest):
"""Suno AI로 가사를 생성합니다. 곡 생성 전 가사 미리보기용."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
result = generate_lyrics(req.prompt)
if not result:
raise HTTPException(status_code=502, detail="가사 생성에 실패했습니다")
return result
# ── 라이브러리 API ────────────────────────────────────────────────────────────
class TrackCreate(BaseModel):
title: str = ""
genre: str = ""
moods: List[str] = []
instruments: List[str] = []
duration_sec: Optional[int] = None
bpm: Optional[int] = None
key: str = ""
scale: str = ""
prompt: str = ""
audio_url: str = ""
file_path: str = ""
task_id: Optional[str] = None
tags: List[str] = []
provider: str = "local"
lyrics: str = ""
image_url: str = ""
suno_id: str = ""
@app.get("/api/music/library")
def list_library():
"""저장된 트랙 목록 전체 조회 (생성일 내림차순). 파일시스템과 자동 동기화."""
_sync_library_with_disk()
return {"tracks": get_all_tracks()}
def _calc_file_hash(file_path: str) -> str:
"""MD5 해시 계산 (파일 동일성 체크용)."""
import hashlib
h = hashlib.md5()
try:
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
except OSError:
return ""
def _sync_library_with_disk():
"""파일시스템의 .mp3 파일과 DB를 동기화 (해시 기반 rename 감지).
1단계: 파일명 매칭 (빠른 경로)
2단계: 미매칭 파일/레코드를 해시로 비교 → rename 감지 → 메타데이터 보존 업데이트
3단계: 나머지 → 삭제/추가
"""
tracks = get_all_tracks()
media_base = os.getenv("MUSIC_MEDIA_BASE", "/media/music")
# 디스크의 .mp3 파일 목록
disk_files = set()
try:
for f in os.listdir(MUSIC_DATA_DIR):
if f.lower().endswith(".mp3"):
disk_files.add(f)
except OSError:
return
# ── 1단계: 파일명 매칭 ──────────────────────────────────────
db_by_filename = {} # filename → track
for t in tracks:
if t.get("audio_url"):
fname = t["audio_url"].split("/")[-1]
db_by_filename[fname] = t
matched_disk = set()
matched_db_ids = set()
for f in disk_files:
if f in db_by_filename:
matched_disk.add(f)
track = db_by_filename[f]
matched_db_ids.add(track["id"])
# 기존 트랙에 file_hash 없으면 채우기
if not track.get("file_hash"):
file_hash = _calc_file_hash(os.path.join(MUSIC_DATA_DIR, f))
if file_hash:
update_track_hash(track["id"], file_hash)
unmatched_disk = disk_files - matched_disk
unmatched_db = [t for t in tracks if t["id"] not in matched_db_ids]
# ── 2단계: 해시 기반 rename 감지 ────────────────────────────
if unmatched_disk and unmatched_db:
# DB 미매칭 레코드의 해시 맵
db_hash_map = {} # hash → track
for t in unmatched_db:
h = t.get("file_hash", "")
if h:
db_hash_map[h] = t
resolved_disk = set()
resolved_db_ids = set()
for f in unmatched_disk:
file_path = os.path.join(MUSIC_DATA_DIR, f)
file_hash = _calc_file_hash(file_path)
if not file_hash:
continue
if file_hash in db_hash_map:
# rename 감지 — 기존 레코드 업데이트 (태그·메타데이터 보존)
track = db_hash_map[file_hash]
new_title = os.path.splitext(f)[0].replace("-", " ").replace("_", " ")
update_track_file_info(
track["id"],
title=new_title,
audio_url=f"{media_base}/{f}",
file_path=file_path,
)
resolved_disk.add(f)
resolved_db_ids.add(track["id"])
unmatched_disk -= resolved_disk
unmatched_db = [t for t in unmatched_db if t["id"] not in resolved_db_ids]
# ── 3단계: 나머지 처리 ──────────────────────────────────────
# DB에만 남은 레코드 → 파일 삭제됨 → DB 삭제
for t in unmatched_db:
delete_track(t["id"])
# 디스크에만 남은 파일 → 신규 → DB 추가 (해시 포함)
for f in unmatched_disk:
file_path = os.path.join(MUSIC_DATA_DIR, f)
title = os.path.splitext(f)[0].replace("-", " ").replace("_", " ")
file_hash = _calc_file_hash(file_path)
add_track({
"title": title,
"audio_url": f"{media_base}/{f}",
"file_path": file_path,
"provider": "suno",
"duration_sec": _get_mp3_duration(file_path),
"file_hash": file_hash,
})
@app.post("/api/music/library", status_code=201)
def save_to_library(req: TrackCreate):
"""트랙 수동 추가 (외부 파일 등록 또는 프론트 직접 저장용)"""
track = add_track(req.model_dump())
return track
@app.delete("/api/music/library/{track_id}")
def remove_from_library(track_id: int):
"""라이브러리에서 트랙 삭제. 로컬 파일도 함께 삭제."""
file_path = get_track_file_path(track_id)
ok = delete_track(track_id)
if not ok:
raise HTTPException(status_code=404, detail="Track not found")
if file_path and os.path.isfile(file_path):
try:
os.remove(file_path)
except OSError:
pass
return {"ok": True}
# ── 모델 목록 API ────────────────────────────────────────────────────────────
@app.get("/api/music/models")
def get_models():
"""사용 가능한 Suno AI 모델 목록."""
return {"models": SUNO_MODELS}
# ── 크레딧 조회 API ──────────────────────────────────────────────────────────
@app.get("/api/music/credits")
def check_credits():
"""Suno 잔여 크레딧 조회."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
result = get_credits()
if result is None:
raise HTTPException(status_code=502, detail="크레딧 조회 실패")
return result
# ── 곡 연장 API ──────────────────────────────────────────────────────────────
class ExtendRequest(BaseModel):
suno_id: str # 원본 Suno 곡 ID
continue_at: int = 0 # 연장 시작 지점 (초)
prompt: str = "" # 추가 가사/프롬프트
style: str = "" # 스타일 오버라이드
title: str = ""
model: str = "V4"
@app.post("/api/music/extend")
def extend_music(req: ExtendRequest, background_tasks: BackgroundTasks):
"""기존 곡을 특정 지점부터 연장 (Suno Extend API)."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
background_tasks.add_task(run_suno_extend, task_id, params)
return {"task_id": task_id, "provider": "suno"}
# ── 보컬 분리 API ────────────────────────────────────────────────────────────
class VocalRemovalRequest(BaseModel):
suno_id: str # Suno 곡 ID
title: str = "" # 원본 트랙 제목
@app.post("/api/music/vocal-removal")
def vocal_removal(req: VocalRemovalRequest, background_tasks: BackgroundTasks):
"""트랙에서 보컬과 인스트루멘탈을 분리 (Suno Vocal Removal API)."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
background_tasks.add_task(run_vocal_removal, task_id, params)
return {"task_id": task_id, "provider": "suno"}
# ── 커버 이미지 생성 API ────────────────────────────────────────────────────
class CoverImageRequest(BaseModel):
suno_task_id: str # Suno 생성 task ID
track_id: Optional[int] = None # 라이브러리 트랙 ID (결과 저장용)
@app.post("/api/music/cover-image")
def cover_image(req: CoverImageRequest, background_tasks: BackgroundTasks):
"""Suno 곡의 커버 이미지 2장 생성."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
background_tasks.add_task(run_cover_image, task_id, params)
return {"task_id": task_id, "provider": "suno"}
# ── WAV 변환 API ────────────────────────────────────────────────────────────
class WavRequest(BaseModel):
suno_task_id: str
suno_id: str
track_id: Optional[int] = None
@app.post("/api/music/wav")
def wav_convert(req: WavRequest, background_tasks: BackgroundTasks):
"""곡을 WAV 포맷으로 변환."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
background_tasks.add_task(run_wav_convert, task_id, params)
return {"task_id": task_id, "provider": "suno"}
# ── 12스템 분리 API ─────────────────────────────────────────────────────────
class StemSplitRequest(BaseModel):
suno_task_id: str
suno_id: str
track_id: Optional[int] = None
@app.post("/api/music/stem-split")
def stem_split(req: StemSplitRequest, background_tasks: BackgroundTasks):
"""곡을 12개 스템으로 분리 (50 크레딧). 보컬, 드럼, 베이스, 기타 등."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
background_tasks.add_task(run_stem_split, task_id, params)
return {"task_id": task_id, "provider": "suno"}
# ── 타임스탬프 가사 API ─────────────────────────────────────────────────────
@app.get("/api/music/timestamped-lyrics")
def timestamped_lyrics(task_id: str, suno_id: str):
"""타임스탬프 가사 조회 (가라오케 스타일 싱크용)."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
result = get_timestamped_lyrics(task_id, suno_id)
if not result:
raise HTTPException(status_code=502, detail="타임스탬프 가사 조회 실패")
return result
# ── 스타일 부스트 API ───────────────────────────────────────────────────────
class StyleBoostRequest(BaseModel):
content: str
@app.post("/api/music/style-boost")
def style_boost(req: StyleBoostRequest):
"""AI로 최적 스타일 프롬프트 생성."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
result = generate_style_boost(req.content)
if not result:
raise HTTPException(status_code=502, detail="스타일 부스트 생성 실패")
return result
# ── Phase 3: 업로드 + 커버 ──────────────────────────────────────────────────
class UploadCoverRequest(BaseModel):
upload_url: str
model: str = "V4"
custom_mode: bool = True
instrumental: bool = False
prompt: str = ""
style: str = ""
title: str = ""
vocal_gender: Optional[str] = None
negative_tags: Optional[str] = None
style_weight: Optional[float] = None
audio_weight: Optional[float] = None
@app.post("/api/music/upload-cover")
def upload_cover(req: UploadCoverRequest, background_tasks: BackgroundTasks):
"""외부 오디오를 Suno 스타일로 리메이크."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
background_tasks.add_task(run_upload_cover, task_id, params)
return {"task_id": task_id, "provider": "suno"}
# ── Phase 3: 업로드 + 확장 ──────────────────────────────────────────────────
class UploadExtendRequest(BaseModel):
upload_url: str
model: str = "V4"
default_param_flag: bool = True
continue_at: Optional[float] = None
prompt: str = ""
style: str = ""
title: str = ""
instrumental: bool = False
vocal_gender: Optional[str] = None
negative_tags: Optional[str] = None
@app.post("/api/music/upload-extend")
def upload_extend(req: UploadExtendRequest, background_tasks: BackgroundTasks):
"""외부 오디오를 이어서 확장."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
background_tasks.add_task(run_upload_extend, task_id, params)
return {"task_id": task_id, "provider": "suno"}
# ── Phase 3: 보컬 추가 ──────────────────────────────────────────────────────
class AddVocalsRequest(BaseModel):
upload_url: str
prompt: str
title: str
style: str
negative_tags: str = ""
vocal_gender: Optional[str] = None
model: str = "V4_5PLUS"
style_weight: Optional[float] = None
audio_weight: Optional[float] = None
@app.post("/api/music/add-vocals")
def add_vocals(req: AddVocalsRequest, background_tasks: BackgroundTasks):
"""인스트루멘탈에 AI 보컬 추가."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
background_tasks.add_task(run_add_vocals, task_id, params)
return {"task_id": task_id, "provider": "suno"}
# ── Phase 3: 인스트루멘탈 추가 ──────────────────────────────────────────────
class AddInstrumentalRequest(BaseModel):
upload_url: str
title: str
tags: str
negative_tags: str = ""
vocal_gender: Optional[str] = None
model: str = "V4_5PLUS"
style_weight: Optional[float] = None
audio_weight: Optional[float] = None
@app.post("/api/music/add-instrumental")
def add_instrumental(req: AddInstrumentalRequest, background_tasks: BackgroundTasks):
"""보컬에 AI 반주 추가."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
background_tasks.add_task(run_add_instrumental, task_id, params)
return {"task_id": task_id, "provider": "suno"}
# ── Phase 3: 뮤직비디오 생성 ────────────────────────────────────────────────
class VideoRequest(BaseModel):
suno_task_id: str
suno_id: str
author: str = ""
domain_name: str = ""
track_id: Optional[int] = None
@app.post("/api/music/video")
def video_generate(req: VideoRequest, background_tasks: BackgroundTasks):
"""뮤직비디오(MP4) 생성."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
background_tasks.add_task(run_video_generate, task_id, params)
return {"task_id": task_id, "provider": "suno"}
# ── 저장된 가사 CRUD API ────────────────────────────────────────────────────
class LyricsSave(BaseModel):
title: str = ""
text: str = ""
prompt: str = ""
class LyricsUpdate(BaseModel):
title: Optional[str] = None
text: Optional[str] = None
prompt: Optional[str] = None
@app.get("/api/music/lyrics/library")
def list_saved_lyrics():
"""저장된 가사 목록 전체 조회 (생성일 내림차순)."""
return {"lyrics": get_all_lyrics()}
@app.post("/api/music/lyrics/library", status_code=201)
def save_lyrics(req: LyricsSave):
"""가사 저장."""
return add_lyrics(req.model_dump())
@app.put("/api/music/lyrics/library/{lyrics_id}")
def edit_lyrics(lyrics_id: int, req: LyricsUpdate):
"""가사 수정."""
data = {k: v for k, v in req.model_dump().items() if v is not None}
result = update_lyrics(lyrics_id, data)
if not result:
raise HTTPException(status_code=404, detail="Lyrics not found")
return result
@app.delete("/api/music/lyrics/library/{lyrics_id}")
def remove_lyrics(lyrics_id: int):
"""가사 삭제."""
if not delete_lyrics(lyrics_id):
raise HTTPException(status_code=404, detail="Lyrics not found")
return {"ok": True}
# ── 영상 프로젝트 모델 ────────────────────────────────────────────────────────
class VideoProjectCreate(BaseModel):
track_id: int
format: str = "visualizer"
target_countries: List[str] = []
render_params: dict = {}
class RevenueCreate(BaseModel):
video_project_id: Optional[int] = None
yt_video_id: str = ""
record_month: str
views: int = 0
watch_hours: float = 0.0
revenue_usd: float = 0.0
country: str = ""
source: str = "manual"
class RevenueUpdate(BaseModel):
yt_video_id: Optional[str] = None
record_month: Optional[str] = None
views: Optional[int] = None
watch_hours: Optional[float] = None
revenue_usd: Optional[float] = None
country: Optional[str] = None
source: Optional[str] = None
# ── 영상 프로젝트 API ─────────────────────────────────────────────────────────
@app.post("/api/music/video-project", status_code=201)
def create_project(req: VideoProjectCreate, background_tasks: BackgroundTasks):
if not get_track_by_id(req.track_id):
raise HTTPException(status_code=404, detail="Track not found")
if req.format not in ("visualizer", "slideshow"):
raise HTTPException(status_code=400, detail="format은 'visualizer' 또는 'slideshow'")
proj = create_video_project(req.model_dump())
return proj
@app.get("/api/music/video-projects")
def list_projects():
return {"projects": get_all_video_projects()}
@app.get("/api/music/video-project/{project_id}")
def get_project(project_id: int):
proj = get_video_project(project_id)
if not proj:
raise HTTPException(status_code=404, detail="Project not found")
return proj
@app.post("/api/music/video-project/{project_id}/render")
def render_project(project_id: int, background_tasks: BackgroundTasks):
proj = get_video_project(project_id)
if not proj:
raise HTTPException(status_code=404, detail="Project not found")
if proj["status"] == "rendering":
raise HTTPException(status_code=409, detail="이미 렌더링 중입니다")
from .video_producer import produce_video
background_tasks.add_task(produce_video, project_id)
return {"ok": True, "project_id": project_id, "status": "rendering"}
@app.get("/api/music/video-project/{project_id}/export")
def export_project(project_id: int):
proj = get_video_project(project_id)
if not proj:
raise HTTPException(status_code=404, detail="Project not found")
if proj["status"] != "done":
raise HTTPException(status_code=400, detail=f"렌더링 미완료 (status: {proj['status']})")
meta_path = os.path.join(VIDEO_DATA_DIR, str(project_id), "metadata.json")
metadata = {}
if os.path.exists(meta_path):
with open(meta_path, encoding="utf-8") as f:
metadata = json.load(f)
thumb_url = proj["output_url"].replace("output.mp4", "thumbnail.jpg") if proj["output_url"] else ""
return {
"project_id": project_id,
"output_url": proj["output_url"],
"thumbnail_url": thumb_url,
"yt_title": proj["yt_title"],
"yt_description": proj["yt_description"],
"yt_tags": proj["yt_tags"],
"metadata": metadata,
}
@app.delete("/api/music/video-project/{project_id}")
def delete_project(project_id: int):
if not get_video_project(project_id):
raise HTTPException(status_code=404, detail="Project not found")
out_dir = os.path.join(VIDEO_DATA_DIR, str(project_id))
if os.path.isdir(out_dir):
shutil.rmtree(out_dir, ignore_errors=True)
delete_video_project(project_id)
return {"ok": True}
# ── Compile Jobs ──────────────────────────────────────────────────────────────
class CompileRequest(BaseModel):
title: str = ""
track_ids: list[int]
crossfade_sec: float = 3.0
@app.post("/api/music/compile")
def create_compile(req: CompileRequest, background_tasks: BackgroundTasks):
if not req.track_ids:
raise HTTPException(status_code=400, detail="track_ids 필수")
if not (0.5 <= req.crossfade_sec <= 15):
raise HTTPException(status_code=400, detail="crossfade_sec: 0.5~15")
job_id = create_compile_job(req.title, req.track_ids, req.crossfade_sec)
background_tasks.add_task(run_compile, job_id)
return {"id": job_id, "status": "rendering"}
@app.get("/api/music/compiles")
def list_compiles():
return {"jobs": get_compile_jobs()}
@app.get("/api/music/compile/{job_id}")
def get_compile(job_id: int):
job = get_compile_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Not found")
return job
@app.delete("/api/music/compile/{job_id}")
def delete_compile(job_id: int):
job = get_compile_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Not found")
if job.get("output_path"):
out_dir = os.path.dirname(job["output_path"])
if os.path.exists(out_dir):
shutil.rmtree(out_dir, ignore_errors=True)
delete_compile_job(job_id)
return {"ok": True}
@app.get("/api/music/compile/{job_id}/export")
def export_compile(job_id: int):
job = get_compile_job(job_id)
if not job or job["status"] != "done":
raise HTTPException(status_code=404, detail="Not ready")
out_dir = os.path.dirname(job["output_path"])
rel = os.path.relpath(job["output_path"], os.getenv("VIDEO_DATA_DIR", "/app/data/videos"))
mp4_url = f"/media/videos/{rel}"
return {
"mp4_url": mp4_url,
"duration_sec": job["duration_sec"],
"title": job["title"],
}
# ── 배치 음악 생성 API ────────────────────────────────────────────────────────
class BatchGenerateRequest(BaseModel):
genre: str
count: int = 10
target_duration_sec: int = 180
auto_pipeline: bool = True
@app.post("/api/music/generate-batch", status_code=201)
async def generate_batch(req: BatchGenerateRequest, bg: BackgroundTasks):
if not (1 <= req.count <= 10):
raise HTTPException(status_code=400, detail="count는 1-10 사이")
if not (60 <= req.target_duration_sec <= 300):
raise HTTPException(status_code=400, detail="target_duration_sec는 60-300 사이")
if not req.genre:
raise HTTPException(status_code=400, detail="genre 필수")
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="SUNO_API_KEY 미설정")
batch_id = _db_module.create_batch_job(
genre=req.genre, count=req.count,
target_duration_sec=req.target_duration_sec,
auto_pipeline=req.auto_pipeline,
)
bg.add_task(_run_batch, batch_id)
return _db_module.get_batch_job(batch_id)
@app.get("/api/music/generate-batch/{batch_id}")
def get_batch(batch_id: int):
j = _db_module.get_batch_job(batch_id)
if not j:
raise HTTPException(status_code=404, detail="Not found")
if j["track_ids"]:
ids_csv = ",".join(str(i) for i in j["track_ids"])
import sqlite3
conn = sqlite3.connect(_db_module.DB_PATH)
conn.row_factory = sqlite3.Row
rows = conn.execute(
f"SELECT id, title, audio_url, duration_sec FROM music_library WHERE id IN ({ids_csv})"
).fetchall()
conn.close()
# 트랙을 batch.track_ids 순서대로 정렬
by_id = {r["id"]: dict(r) for r in rows}
j["tracks"] = [by_id.get(tid) for tid in j["track_ids"] if tid in by_id]
else:
j["tracks"] = []
return j
@app.get("/api/music/generate-batch")
def list_batches(status: str = "all"):
return {"batches": _db_module.list_batch_jobs(active_only=(status == "active"))}
@app.get("/api/music/genres")
def list_supported_genres():
"""배치 생성에서 사용 가능한 장르 목록 — random_pools의 키."""
from .random_pools import list_genres
return {"genres": list_genres()}
# ── 수익화 추적 API ───────────────────────────────────────────────────────────
@app.get("/api/music/revenue/dashboard")
def revenue_dashboard():
return get_revenue_dashboard()
@app.get("/api/music/revenue")
def list_revenue(yt_video_id: Optional[str] = None, year_month: Optional[str] = None):
return {"records": get_all_revenue_records(yt_video_id, year_month)}
@app.post("/api/music/revenue", status_code=201)
def add_revenue(req: RevenueCreate):
return create_revenue_record(req.model_dump())
@app.put("/api/music/revenue/{record_id}")
def edit_revenue(record_id: int, req: RevenueUpdate):
data = {k: v for k, v in req.model_dump().items() if v is not None}
result = update_revenue_record(record_id, data)
if not result:
raise HTTPException(status_code=404, detail="Record not found")
return result
@app.delete("/api/music/revenue/{record_id}")
def remove_revenue(record_id: int):
if not delete_revenue_record(record_id):
raise HTTPException(status_code=404, detail="Record not found")
return {"ok": True}
# ── 시장 조사 API ─────────────────────────────────────────────────────────────
class MarketIngestRequest(BaseModel):
trends: List[Dict[str, Any]]
report_date: str = ""
@app.post("/api/music/market/ingest")
def market_ingest(req: MarketIngestRequest):
"""agent-office → 트렌드 데이터 수신 + 리포트 생성."""
from datetime import date
report_date = req.report_date or date.today().isoformat()
report = ingest_trends(req.trends, report_date)
return {"ok": True, "trends_saved": len(req.trends), "report_date": report_date}
@app.get("/api/music/market/trends")
def list_market_trends(
country: Optional[str] = None,
genre: Optional[str] = None,
source: Optional[str] = None,
days: int = Query(7, ge=1),
):
return {"trends": _get_market_trends(country, genre, source, days)}
@app.get("/api/music/market/report/latest")
def get_market_report_latest():
report = get_latest_trend_report()
if not report:
raise HTTPException(status_code=404, detail="리포트 없음 — 아직 수집 전")
return report
@app.get("/api/music/market/report")
def list_market_reports(limit: int = 10):
return {"reports": _get_trend_reports(limit)}
@app.get("/api/music/market/suggest")
def market_suggest(limit: int = 5):
return {"suggestions": get_suggestions(limit)}
# ── Pipeline endpoints ────────────────────────────────────────────────────────
class PipelineCreate(BaseModel):
track_id: int | None = None
compile_job_id: int | None = None
visual_style: str | None = None # single | essential
background_mode: str | None = None # static | video_loop
background_keyword: str | None = None
class FeedbackRequest(BaseModel):
step: str
intent: str # approve | reject
feedback_text: Optional[str] = None
@app.post("/api/music/pipeline", status_code=201)
def create_pipeline(req: PipelineCreate):
# XOR 검증
if (req.track_id is None) == (req.compile_job_id is None):
raise HTTPException(400, "track_id 또는 compile_job_id 중 정확히 하나를 지정")
# compile_job 상태 확인
if req.compile_job_id is not None:
job = _db_module.get_compile_job(req.compile_job_id)
if not job:
raise HTTPException(404, f"compile job {req.compile_job_id} 없음")
if job.get("status") not in ("done", "succeeded"):
raise HTTPException(400, f"compile job {req.compile_job_id} not ready (status={job.get('status')})")
# 동일 입력으로 이미 active 파이프라인 있으면 409
actives = _db_module.list_pipelines(active_only=True)
for p in actives:
if (req.track_id and p.get("track_id") == req.track_id) or \
(req.compile_job_id and p.get("compile_job_id") == req.compile_job_id):
raise HTTPException(409, "이미 진행 중인 파이프라인이 있습니다")
setup = _db_module.get_youtube_setup()
vd = setup["visual_defaults"]
pid = _db_module.create_pipeline(
track_id=req.track_id,
compile_job_id=req.compile_job_id,
visual_style=req.visual_style or vd.get("default_visual_style", "essential"),
background_mode=req.background_mode or vd.get("default_background_mode", "static"),
background_keyword=req.background_keyword or vd.get("default_background_keyword") or None,
)
return _db_module.get_pipeline(pid)
@app.get("/api/music/pipeline")
def list_pipelines_endpoint(status: str = "all"):
pipelines = _db_module.list_pipelines(active_only=(status == "active"))
return {"pipelines": pipelines}
@app.get("/api/music/pipeline/lookup-by-msg/{msg_id}")
def lookup_by_msg(msg_id: int):
for p in _db_module.list_pipelines(active_only=True):
for step, mid in p["last_telegram_msg_ids"].items():
if mid == msg_id:
return {"pipeline_id": p["id"], "step": step}
raise HTTPException(404)
@app.get("/api/music/pipeline/{pid}")
def get_pipeline_endpoint(pid: int):
p = _db_module.get_pipeline(pid)
if not p:
raise HTTPException(404)
p["jobs"] = _db_module.list_pipeline_jobs(pid)
p["feedback"] = _db_module.get_feedback_history(pid)
return p
@app.post("/api/music/pipeline/{pid}/start", status_code=202)
async def start_pipeline(pid: int, bg: BackgroundTasks):
p = _db_module.get_pipeline(pid)
if not p:
raise HTTPException(404)
if p["state"] != "created":
raise HTTPException(409, f"이미 시작됨 ({p['state']})")
bg.add_task(orchestrator.run_step, pid, "cover")
return {"ok": True}
def _state_to_step(state: str) -> Optional[str]:
return {
"video_pending": "video",
"thumb_pending": "thumb",
"meta_pending": "meta",
"ai_review": "review",
"publish_pending": None, # 사용자 명시 발행 호출 필요
"publishing": "publish",
}.get(state)
@app.post("/api/music/pipeline/{pid}/feedback", status_code=202)
async def feedback(pid: int, req: FeedbackRequest, bg: BackgroundTasks):
p = _db_module.get_pipeline(pid)
if not p:
raise HTTPException(404)
if p["state"] == "awaiting_manual":
raise HTTPException(409, "수동 개입 대기 중")
state = p["state"]
expected = f"{req.step}_pending"
if state != expected:
# 멱등 처리 — 이미 다음 단계로 넘어갔으면 무시
return {"ok": True, "skipped": True}
if req.intent == "approve":
from .pipeline.state_machine import next_state_on_approve
# Validate transition is legal
try:
next_st = next_state_on_approve(state)
except ValueError as e:
raise HTTPException(400, str(e))
next_step = _state_to_step(next_st)
if next_step:
# bg task will set state to the new *_pending when step completes
bg.add_task(orchestrator.run_step, pid, next_step)
else:
# No step to run — fall through to direct state update
# (defensive — current code paths don't hit this)
_db_module.update_pipeline_state(pid, next_st)
return {"ok": True}
elif req.intent == "reject":
count = _db_module.increment_feedback_count(pid, req.step)
if count > 5:
_db_module.update_pipeline_state(pid, "awaiting_manual")
raise HTTPException(409, "재생성 한도 초과")
if req.feedback_text:
_db_module.record_feedback(pid, req.step, req.feedback_text)
bg.add_task(orchestrator.run_step, pid, req.step, req.feedback_text or "")
return {"ok": True}
else:
raise HTTPException(400, f"unknown intent: {req.intent}")
@app.post("/api/music/pipeline/{pid}/cancel")
def cancel_pipeline(pid: int):
p = _db_module.get_pipeline(pid)
if not p:
raise HTTPException(404)
_db_module.update_pipeline_state(pid, "cancelled", cancelled_at=_db_module._now())
return {"ok": True}
@app.post("/api/music/pipeline/{pid}/publish", status_code=202)
async def publish_pipeline(pid: int, bg: BackgroundTasks):
p = _db_module.get_pipeline(pid)
if not p:
raise HTTPException(404)
if p["state"] != "publish_pending":
raise HTTPException(409, f"발행 단계 아님 ({p['state']})")
_db_module.update_pipeline_state(pid, "publishing")
bg.add_task(orchestrator.run_step, pid, "publish")
return {"ok": True}
# Telegram 메시지 매칭용 엔드포인트 (agent-office용)
class TelegramMsgPatch(BaseModel):
step: str
message_id: int
@app.patch("/api/music/pipeline/{pid}/telegram-msg")
def save_telegram_msg(pid: int, req: TelegramMsgPatch):
p = _db_module.get_pipeline(pid)
if not p:
raise HTTPException(404)
ids = p["last_telegram_msg_ids"]
ids[req.step] = req.message_id
_db_module.update_pipeline_state(
pid, p["state"], last_telegram_msg_ids=json.dumps(ids)
)
return {"ok": True}
# ── Setup endpoints ───────────────────────────────────────────────────────────
class SetupRequest(BaseModel):
metadata_template: Optional[Dict[str, Any]] = None
cover_prompts: Optional[Dict[str, Any]] = None
review_weights: Optional[Dict[str, Any]] = None
review_threshold: Optional[int] = None
visual_defaults: Optional[Dict[str, Any]] = None
publish_policy: Optional[Dict[str, Any]] = None
@app.get("/api/music/setup")
def get_setup():
return _db_module.get_youtube_setup()
@app.put("/api/music/setup")
def put_setup(req: SetupRequest):
payload = {k: v for k, v in req.dict().items() if v is not None}
_db_module.update_youtube_setup(**payload)
return _db_module.get_youtube_setup()
# ── YouTube OAuth endpoints ───────────────────────────────────────────────────
@app.get("/api/music/youtube/auth-url")
def youtube_auth_url():
return {"url": yt_module.get_auth_url()}
@app.get("/api/music/youtube/callback")
async def youtube_callback(code: str):
return await yt_module.exchange_code(code)
@app.post("/api/music/youtube/disconnect")
def youtube_disconnect():
yt_module.disconnect()
return {"ok": True}
@app.get("/api/music/youtube/status")
def youtube_status():
return yt_module.get_status() or {"connected": False}