Files
web-page-backend/music-lab/app/main.py
gahusb 532b794c11 refactor(music-lab): sync helpers → Windows HTTP forward + cleanup (SP-6)
/api/music/{lyrics, credits, timestamped-lyrics, style-boost}
모두 sync_forward 모듈로 위임 → Windows :18711/api/music-render/sync/*.
SUNO_API_KEY가 NAS에 없으므로 직접 호출 불가.
run_*, generate_*, get_* import 제거 (Windows로 이전됨).
SUNO_MODELS만 잔존 (정적 데이터).

추가 cleanup (T11 reviewer 지적):
- _push_render_job의 datetime import를 모듈 상위로
- 11 endpoint의 unused BackgroundTasks 매개변수 제거

generate_batch: SUNO_API_KEY 체크를 os.getenv()로 전환 + 테스트 monkeypatch 갱신.

Plan-B-Music Phase 3 (cutover 2/4).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 05:16:15 +09:00

1200 lines
43 KiB
Python

import json
import os
import shutil
import uuid
from datetime import datetime, timezone, timedelta
from typing import Any, Dict, List, Optional
from fastapi import FastAPI, HTTPException, BackgroundTasks, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from .db import (
init_db,
create_task, get_task,
get_all_tracks, add_track, delete_track, get_track_file_path, get_track_by_task_id, get_track_by_id,
update_track_duration, update_track_file_info, update_track_hash,
get_all_lyrics, add_lyrics, update_lyrics, delete_lyrics,
create_video_project, get_video_project, get_all_video_projects,
update_video_project_status, delete_video_project,
create_revenue_record, get_all_revenue_records,
update_revenue_record, delete_revenue_record, get_revenue_dashboard,
get_market_trends as _get_market_trends,
get_latest_trend_report, get_trend_reports as _get_trend_reports,
create_compile_job, get_compile_jobs, get_compile_job,
update_compile_job, delete_compile_job,
)
from . import db as _db_module
from .compiler import run_compile
from .market import ingest_trends, get_suggestions
from .pipeline import orchestrator
from .pipeline import youtube as yt_module
from .suno_provider import SUNO_MODELS
from .batch_generator import run_batch as _run_batch
import redis.asyncio as aioredis
from .internal_router import router as internal_router
app = FastAPI()
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379")
redis_client = aioredis.from_url(REDIS_URL, decode_responses=False)
app.include_router(internal_router)
async def _push_render_job(task_id: str, job_type: str, params: dict) -> None:
"""Redis queue:music-render에 push. Windows worker가 BLPOP 후 처리."""
kst = timezone(timedelta(hours=9))
payload = {
"task_id": task_id,
"kind": "music",
"job_type": job_type,
"params": params,
"submitted_at": datetime.now(kst).isoformat(),
}
await redis_client.rpush("queue:music-render", json.dumps(payload))
_cors_origins = os.getenv("CORS_ALLOW_ORIGINS", "http://localhost:3007,http://localhost:8080").split(",")
app.add_middleware(
CORSMiddleware,
allow_origins=[o.strip() for o in _cors_origins],
allow_credentials=False,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["Content-Type"],
)
MUSIC_DATA_DIR = "/app/data"
VIDEO_DATA_DIR = os.getenv("VIDEO_DATA_DIR", "/app/data/videos")
def _get_mp3_duration(file_path: str) -> Optional[int]:
"""MP3 파일에서 실제 재생 시간(초) 추출."""
try:
from mutagen.mp3 import MP3
audio = MP3(file_path)
return int(audio.info.length)
except Exception:
return None
def _backfill_durations():
"""duration_sec이 없는 기존 트랙에 MP3 메타데이터에서 길이 채우기."""
for t in get_all_tracks():
if t["duration_sec"] is None and t.get("file_path"):
dur = _get_mp3_duration(t["file_path"])
if dur:
update_track_duration(t["id"], dur)
@app.on_event("startup")
def on_startup():
init_db()
os.makedirs(MUSIC_DATA_DIR, exist_ok=True)
_backfill_durations()
@app.get("/health")
def health():
return {"ok": True}
@app.get("/api/music/providers")
def get_providers():
"""사용 가능한 음악 생성 프로바이더 목록 반환."""
providers = []
if os.getenv("MUSIC_AI_SERVER_URL"):
providers.append({
"id": "local",
"name": "MusicGen",
"description": "로컬 AI 서버 (인스트루멘탈 전용)",
"features": ["instrumental"],
})
# SUNO는 Windows music-render에서 처리 — 항상 가용 (Suno 키 누락 시 worker가 failed 보고)
providers.append({
"id": "suno",
"name": "Suno",
"description": "Suno AI (보컬·가사·인스트루멘탈)",
"features": ["vocals", "lyrics", "instrumental"],
})
return {"providers": providers}
# ── 음악 생성 API ─────────────────────────────────────────────────────────────
class GenerateRequest(BaseModel):
provider: str = "suno" # "suno" | "local"
model: str = "V4" # Suno 모델 (V4, V4_5, V5 등)
title: str = ""
genre: str = ""
moods: List[str] = []
instruments: List[str] = []
duration_sec: Optional[int] = None
bpm: Optional[int] = None
key: str = ""
scale: str = ""
prompt: str = ""
# Suno 전용
lyrics: str = "" # 커스텀 가사 ([Verse], [Chorus] 등)
instrumental: bool = False # True면 보컬 없이 인스트루멘탈만
# Phase 1 신규
vocal_gender: Optional[str] = None # "m" | "f"
negative_tags: Optional[str] = None # 제외 스타일
style_weight: Optional[float] = None # 0.0~1.0
audio_weight: Optional[float] = None # 0.0~1.0
@app.post("/api/music/generate")
async def generate_music(req: GenerateRequest):
"""음악 생성 작업 — Redis 큐로 Windows music-render에 위임."""
provider = req.provider
# SUNO_API_KEY 검증은 Windows로 위임 (NAS에서 키 보유 X).
# 실패 시 worker가 webhook으로 failed 보고.
if provider not in ("suno", "local"):
raise HTTPException(status_code=400, detail=f"지원하지 않는 provider: {provider}")
if provider == "local" and not os.getenv("MUSIC_AI_SERVER_URL"):
# 이 env는 NAS에는 더 이상 없지만 사용자 친화 검증으로 유지 — 실제 호출은 Windows
pass
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider=provider)
job_type = "suno_generation" if provider == "suno" else "local_generation"
await _push_render_job(task_id, job_type, params)
return {"task_id": task_id, "provider": provider}
@app.get("/api/music/status/{task_id}")
def get_status(task_id: str):
"""
생성 작업 상태 조회. 프론트는 succeeded 또는 failed가 될 때까지 폴링.
status: queued | processing | succeeded | failed
succeeded 시 track 메타데이터 포함.
"""
task = get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
resp = {
"status": task["status"],
"progress": task["progress"],
"message": task["message"],
"audio_url": task["audio_url"],
"error": task["error"],
"provider": task["provider"],
}
if task["status"] == "succeeded":
track = get_track_by_task_id(task_id)
resp["track"] = track
return resp
# ── 가사 생성 API (Suno 전용) ────────────────────────────────────────────────
class LyricsRequest(BaseModel):
prompt: str
@app.post("/api/music/lyrics")
def gen_lyrics(req: LyricsRequest):
"""Suno AI 가사 생성 — Windows music-render로 forward."""
from .sync_forward import forward_lyrics
result = forward_lyrics(req.prompt)
if not result:
raise HTTPException(status_code=502, detail="가사 생성 실패 (Windows worker 응답 없음)")
return result
# ── 라이브러리 API ────────────────────────────────────────────────────────────
class TrackCreate(BaseModel):
title: str = ""
genre: str = ""
moods: List[str] = []
instruments: List[str] = []
duration_sec: Optional[int] = None
bpm: Optional[int] = None
key: str = ""
scale: str = ""
prompt: str = ""
audio_url: str = ""
file_path: str = ""
task_id: Optional[str] = None
tags: List[str] = []
provider: str = "local"
lyrics: str = ""
image_url: str = ""
suno_id: str = ""
@app.get("/api/music/library")
def list_library():
"""저장된 트랙 목록 전체 조회 (생성일 내림차순). 파일시스템과 자동 동기화."""
_sync_library_with_disk()
return {"tracks": get_all_tracks()}
def _calc_file_hash(file_path: str) -> str:
"""MD5 해시 계산 (파일 동일성 체크용)."""
import hashlib
h = hashlib.md5()
try:
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
except OSError:
return ""
def _sync_library_with_disk():
"""파일시스템의 .mp3 파일과 DB를 동기화 (해시 기반 rename 감지).
1단계: 파일명 매칭 (빠른 경로)
2단계: 미매칭 파일/레코드를 해시로 비교 → rename 감지 → 메타데이터 보존 업데이트
3단계: 나머지 → 삭제/추가
"""
tracks = get_all_tracks()
media_base = os.getenv("MUSIC_MEDIA_BASE", "/media/music")
# 디스크의 .mp3 파일 목록
disk_files = set()
try:
for f in os.listdir(MUSIC_DATA_DIR):
if f.lower().endswith(".mp3"):
disk_files.add(f)
except OSError:
return
# ── 1단계: 파일명 매칭 ──────────────────────────────────────
db_by_filename = {} # filename → track
for t in tracks:
if t.get("audio_url"):
fname = t["audio_url"].split("/")[-1]
db_by_filename[fname] = t
matched_disk = set()
matched_db_ids = set()
for f in disk_files:
if f in db_by_filename:
matched_disk.add(f)
track = db_by_filename[f]
matched_db_ids.add(track["id"])
# 기존 트랙에 file_hash 없으면 채우기
if not track.get("file_hash"):
file_hash = _calc_file_hash(os.path.join(MUSIC_DATA_DIR, f))
if file_hash:
update_track_hash(track["id"], file_hash)
unmatched_disk = disk_files - matched_disk
unmatched_db = [t for t in tracks if t["id"] not in matched_db_ids]
# ── 2단계: 해시 기반 rename 감지 ────────────────────────────
if unmatched_disk and unmatched_db:
# DB 미매칭 레코드의 해시 맵
db_hash_map = {} # hash → track
for t in unmatched_db:
h = t.get("file_hash", "")
if h:
db_hash_map[h] = t
resolved_disk = set()
resolved_db_ids = set()
for f in unmatched_disk:
file_path = os.path.join(MUSIC_DATA_DIR, f)
file_hash = _calc_file_hash(file_path)
if not file_hash:
continue
if file_hash in db_hash_map:
# rename 감지 — 기존 레코드 업데이트 (태그·메타데이터 보존)
track = db_hash_map[file_hash]
new_title = os.path.splitext(f)[0].replace("-", " ").replace("_", " ")
update_track_file_info(
track["id"],
title=new_title,
audio_url=f"{media_base}/{f}",
file_path=file_path,
)
resolved_disk.add(f)
resolved_db_ids.add(track["id"])
unmatched_disk -= resolved_disk
unmatched_db = [t for t in unmatched_db if t["id"] not in resolved_db_ids]
# ── 3단계: 나머지 처리 ──────────────────────────────────────
# DB에만 남은 레코드 → 파일 삭제됨 → DB 삭제
for t in unmatched_db:
delete_track(t["id"])
# 디스크에만 남은 파일 → 신규 → DB 추가 (해시 포함)
for f in unmatched_disk:
file_path = os.path.join(MUSIC_DATA_DIR, f)
title = os.path.splitext(f)[0].replace("-", " ").replace("_", " ")
file_hash = _calc_file_hash(file_path)
add_track({
"title": title,
"audio_url": f"{media_base}/{f}",
"file_path": file_path,
"provider": "suno",
"duration_sec": _get_mp3_duration(file_path),
"file_hash": file_hash,
})
@app.post("/api/music/library", status_code=201)
def save_to_library(req: TrackCreate):
"""트랙 수동 추가 (외부 파일 등록 또는 프론트 직접 저장용)"""
track = add_track(req.model_dump())
return track
@app.delete("/api/music/library/{track_id}")
def remove_from_library(track_id: int):
"""라이브러리에서 트랙 삭제. 로컬 파일도 함께 삭제."""
file_path = get_track_file_path(track_id)
ok = delete_track(track_id)
if not ok:
raise HTTPException(status_code=404, detail="Track not found")
if file_path and os.path.isfile(file_path):
try:
os.remove(file_path)
except OSError:
pass
return {"ok": True}
# ── 모델 목록 API ────────────────────────────────────────────────────────────
@app.get("/api/music/models")
def get_models():
"""사용 가능한 Suno AI 모델 목록."""
return {"models": SUNO_MODELS}
# ── 크레딧 조회 API ──────────────────────────────────────────────────────────
@app.get("/api/music/credits")
def check_credits():
"""Suno 잔여 크레딧 조회 — Windows music-render로 forward."""
from .sync_forward import forward_credits
result = forward_credits()
if result is None:
raise HTTPException(status_code=502, detail="크레딧 조회 실패")
return result
# ── 곡 연장 API ──────────────────────────────────────────────────────────────
class ExtendRequest(BaseModel):
suno_id: str # 원본 Suno 곡 ID
continue_at: int = 0 # 연장 시작 지점 (초)
prompt: str = "" # 추가 가사/프롬프트
style: str = "" # 스타일 오버라이드
title: str = ""
model: str = "V4"
@app.post("/api/music/extend")
async def extend_music(req: ExtendRequest):
"""기존 곡을 특정 지점부터 연장 (Suno Extend API)."""
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
await _push_render_job(task_id, "suno_extend", params)
return {"task_id": task_id, "provider": "suno"}
# ── 보컬 분리 API ────────────────────────────────────────────────────────────
class VocalRemovalRequest(BaseModel):
suno_id: str # Suno 곡 ID
title: str = "" # 원본 트랙 제목
@app.post("/api/music/vocal-removal")
async def vocal_removal(req: VocalRemovalRequest):
"""트랙에서 보컬과 인스트루멘탈을 분리 (Suno Vocal Removal API)."""
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
await _push_render_job(task_id, "vocal_removal", params)
return {"task_id": task_id, "provider": "suno"}
# ── 커버 이미지 생성 API ────────────────────────────────────────────────────
class CoverImageRequest(BaseModel):
suno_task_id: str # Suno 생성 task ID
track_id: Optional[int] = None # 라이브러리 트랙 ID (결과 저장용)
@app.post("/api/music/cover-image")
async def cover_image(req: CoverImageRequest):
"""Suno 곡의 커버 이미지 2장 생성."""
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
await _push_render_job(task_id, "cover_image", params)
return {"task_id": task_id, "provider": "suno"}
# ── WAV 변환 API ────────────────────────────────────────────────────────────
class WavRequest(BaseModel):
suno_task_id: str
suno_id: str
track_id: Optional[int] = None
@app.post("/api/music/wav")
async def wav_convert(req: WavRequest):
"""곡을 WAV 포맷으로 변환."""
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
await _push_render_job(task_id, "wav_convert", params)
return {"task_id": task_id, "provider": "suno"}
# ── 12스템 분리 API ─────────────────────────────────────────────────────────
class StemSplitRequest(BaseModel):
suno_task_id: str
suno_id: str
track_id: Optional[int] = None
@app.post("/api/music/stem-split")
async def stem_split(req: StemSplitRequest):
"""곡을 12개 스템으로 분리 (50 크레딧). 보컬, 드럼, 베이스, 기타 등."""
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
await _push_render_job(task_id, "stem_split", params)
return {"task_id": task_id, "provider": "suno"}
# ── 타임스탬프 가사 API ─────────────────────────────────────────────────────
@app.get("/api/music/timestamped-lyrics")
def timestamped_lyrics(task_id: str, suno_id: str):
"""타임스탬프 가사 — Windows music-render로 forward."""
from .sync_forward import forward_timestamped_lyrics
result = forward_timestamped_lyrics(task_id, suno_id)
if not result:
raise HTTPException(status_code=502, detail="타임스탬프 가사 조회 실패")
return result
# ── 스타일 부스트 API ───────────────────────────────────────────────────────
class StyleBoostRequest(BaseModel):
content: str
@app.post("/api/music/style-boost")
def style_boost(req: StyleBoostRequest):
"""스타일 부스트 — Windows music-render로 forward."""
from .sync_forward import forward_style_boost
result = forward_style_boost(req.content)
if not result:
raise HTTPException(status_code=502, detail="스타일 부스트 생성 실패")
return result
# ── Phase 3: 업로드 + 커버 ──────────────────────────────────────────────────
class UploadCoverRequest(BaseModel):
upload_url: str
model: str = "V4"
custom_mode: bool = True
instrumental: bool = False
prompt: str = ""
style: str = ""
title: str = ""
vocal_gender: Optional[str] = None
negative_tags: Optional[str] = None
style_weight: Optional[float] = None
audio_weight: Optional[float] = None
@app.post("/api/music/upload-cover")
async def upload_cover(req: UploadCoverRequest):
"""외부 오디오를 Suno 스타일로 리메이크."""
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
await _push_render_job(task_id, "upload_cover", params)
return {"task_id": task_id, "provider": "suno"}
# ── Phase 3: 업로드 + 확장 ──────────────────────────────────────────────────
class UploadExtendRequest(BaseModel):
upload_url: str
model: str = "V4"
default_param_flag: bool = True
continue_at: Optional[float] = None
prompt: str = ""
style: str = ""
title: str = ""
instrumental: bool = False
vocal_gender: Optional[str] = None
negative_tags: Optional[str] = None
@app.post("/api/music/upload-extend")
async def upload_extend(req: UploadExtendRequest):
"""외부 오디오를 이어서 확장."""
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
await _push_render_job(task_id, "upload_extend", params)
return {"task_id": task_id, "provider": "suno"}
# ── Phase 3: 보컬 추가 ──────────────────────────────────────────────────────
class AddVocalsRequest(BaseModel):
upload_url: str
prompt: str
title: str
style: str
negative_tags: str = ""
vocal_gender: Optional[str] = None
model: str = "V4_5PLUS"
style_weight: Optional[float] = None
audio_weight: Optional[float] = None
@app.post("/api/music/add-vocals")
async def add_vocals(req: AddVocalsRequest):
"""인스트루멘탈에 AI 보컬 추가."""
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
await _push_render_job(task_id, "add_vocals", params)
return {"task_id": task_id, "provider": "suno"}
# ── Phase 3: 인스트루멘탈 추가 ──────────────────────────────────────────────
class AddInstrumentalRequest(BaseModel):
upload_url: str
title: str
tags: str
negative_tags: str = ""
vocal_gender: Optional[str] = None
model: str = "V4_5PLUS"
style_weight: Optional[float] = None
audio_weight: Optional[float] = None
@app.post("/api/music/add-instrumental")
async def add_instrumental(req: AddInstrumentalRequest):
"""보컬에 AI 반주 추가."""
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
await _push_render_job(task_id, "add_instrumental", params)
return {"task_id": task_id, "provider": "suno"}
# ── Phase 3: 뮤직비디오 생성 ────────────────────────────────────────────────
class VideoRequest(BaseModel):
suno_task_id: str
suno_id: str
author: str = ""
domain_name: str = ""
track_id: Optional[int] = None
@app.post("/api/music/video")
async def video_generate(req: VideoRequest):
"""뮤직비디오(MP4) 생성."""
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
await _push_render_job(task_id, "video_generate", params)
return {"task_id": task_id, "provider": "suno"}
# ── 저장된 가사 CRUD API ────────────────────────────────────────────────────
class LyricsSave(BaseModel):
title: str = ""
text: str = ""
prompt: str = ""
class LyricsUpdate(BaseModel):
title: Optional[str] = None
text: Optional[str] = None
prompt: Optional[str] = None
@app.get("/api/music/lyrics/library")
def list_saved_lyrics():
"""저장된 가사 목록 전체 조회 (생성일 내림차순)."""
return {"lyrics": get_all_lyrics()}
@app.post("/api/music/lyrics/library", status_code=201)
def save_lyrics(req: LyricsSave):
"""가사 저장."""
return add_lyrics(req.model_dump())
@app.put("/api/music/lyrics/library/{lyrics_id}")
def edit_lyrics(lyrics_id: int, req: LyricsUpdate):
"""가사 수정."""
data = {k: v for k, v in req.model_dump().items() if v is not None}
result = update_lyrics(lyrics_id, data)
if not result:
raise HTTPException(status_code=404, detail="Lyrics not found")
return result
@app.delete("/api/music/lyrics/library/{lyrics_id}")
def remove_lyrics(lyrics_id: int):
"""가사 삭제."""
if not delete_lyrics(lyrics_id):
raise HTTPException(status_code=404, detail="Lyrics not found")
return {"ok": True}
# ── 영상 프로젝트 모델 ────────────────────────────────────────────────────────
class VideoProjectCreate(BaseModel):
track_id: int
format: str = "visualizer"
target_countries: List[str] = []
render_params: dict = {}
class RevenueCreate(BaseModel):
video_project_id: Optional[int] = None
yt_video_id: str = ""
record_month: str
views: int = 0
watch_hours: float = 0.0
revenue_usd: float = 0.0
country: str = ""
source: str = "manual"
class RevenueUpdate(BaseModel):
yt_video_id: Optional[str] = None
record_month: Optional[str] = None
views: Optional[int] = None
watch_hours: Optional[float] = None
revenue_usd: Optional[float] = None
country: Optional[str] = None
source: Optional[str] = None
# ── 영상 프로젝트 API ─────────────────────────────────────────────────────────
@app.post("/api/music/video-project", status_code=201)
def create_project(req: VideoProjectCreate, background_tasks: BackgroundTasks):
if not get_track_by_id(req.track_id):
raise HTTPException(status_code=404, detail="Track not found")
if req.format not in ("visualizer", "slideshow"):
raise HTTPException(status_code=400, detail="format은 'visualizer' 또는 'slideshow'")
proj = create_video_project(req.model_dump())
return proj
@app.get("/api/music/video-projects")
def list_projects():
return {"projects": get_all_video_projects()}
@app.get("/api/music/video-project/{project_id}")
def get_project(project_id: int):
proj = get_video_project(project_id)
if not proj:
raise HTTPException(status_code=404, detail="Project not found")
return proj
@app.post("/api/music/video-project/{project_id}/render")
def render_project(project_id: int, background_tasks: BackgroundTasks):
proj = get_video_project(project_id)
if not proj:
raise HTTPException(status_code=404, detail="Project not found")
if proj["status"] == "rendering":
raise HTTPException(status_code=409, detail="이미 렌더링 중입니다")
from .video_producer import produce_video
background_tasks.add_task(produce_video, project_id)
return {"ok": True, "project_id": project_id, "status": "rendering"}
@app.get("/api/music/video-project/{project_id}/export")
def export_project(project_id: int):
proj = get_video_project(project_id)
if not proj:
raise HTTPException(status_code=404, detail="Project not found")
if proj["status"] != "done":
raise HTTPException(status_code=400, detail=f"렌더링 미완료 (status: {proj['status']})")
meta_path = os.path.join(VIDEO_DATA_DIR, str(project_id), "metadata.json")
metadata = {}
if os.path.exists(meta_path):
with open(meta_path, encoding="utf-8") as f:
metadata = json.load(f)
thumb_url = proj["output_url"].replace("output.mp4", "thumbnail.jpg") if proj["output_url"] else ""
return {
"project_id": project_id,
"output_url": proj["output_url"],
"thumbnail_url": thumb_url,
"yt_title": proj["yt_title"],
"yt_description": proj["yt_description"],
"yt_tags": proj["yt_tags"],
"metadata": metadata,
}
@app.delete("/api/music/video-project/{project_id}")
def delete_project(project_id: int):
if not get_video_project(project_id):
raise HTTPException(status_code=404, detail="Project not found")
out_dir = os.path.join(VIDEO_DATA_DIR, str(project_id))
if os.path.isdir(out_dir):
shutil.rmtree(out_dir, ignore_errors=True)
delete_video_project(project_id)
return {"ok": True}
# ── Compile Jobs ──────────────────────────────────────────────────────────────
class CompileRequest(BaseModel):
title: str = ""
track_ids: list[int]
crossfade_sec: float = 3.0
@app.post("/api/music/compile")
def create_compile(req: CompileRequest, background_tasks: BackgroundTasks):
if not req.track_ids:
raise HTTPException(status_code=400, detail="track_ids 필수")
if not (0.5 <= req.crossfade_sec <= 15):
raise HTTPException(status_code=400, detail="crossfade_sec: 0.5~15")
job_id = create_compile_job(req.title, req.track_ids, req.crossfade_sec)
background_tasks.add_task(run_compile, job_id)
return {"id": job_id, "status": "rendering"}
@app.get("/api/music/compiles")
def list_compiles():
return {"jobs": get_compile_jobs()}
@app.get("/api/music/compile/{job_id}")
def get_compile(job_id: int):
job = get_compile_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Not found")
return job
@app.delete("/api/music/compile/{job_id}")
def delete_compile(job_id: int):
job = get_compile_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Not found")
if job.get("output_path"):
out_dir = os.path.dirname(job["output_path"])
if os.path.exists(out_dir):
shutil.rmtree(out_dir, ignore_errors=True)
delete_compile_job(job_id)
return {"ok": True}
@app.get("/api/music/compile/{job_id}/export")
def export_compile(job_id: int):
job = get_compile_job(job_id)
if not job or job["status"] != "done":
raise HTTPException(status_code=404, detail="Not ready")
out_dir = os.path.dirname(job["output_path"])
rel = os.path.relpath(job["output_path"], os.getenv("VIDEO_DATA_DIR", "/app/data/videos"))
mp4_url = f"/media/videos/{rel}"
return {
"mp4_url": mp4_url,
"duration_sec": job["duration_sec"],
"title": job["title"],
}
# ── 배치 음악 생성 API ────────────────────────────────────────────────────────
class BatchGenerateRequest(BaseModel):
genre: str
count: int = 10
target_duration_sec: int = 180
auto_pipeline: bool = True
@app.post("/api/music/generate-batch", status_code=201)
async def generate_batch(req: BatchGenerateRequest, bg: BackgroundTasks):
if not (1 <= req.count <= 10):
raise HTTPException(status_code=400, detail="count는 1-10 사이")
if not (60 <= req.target_duration_sec <= 300):
raise HTTPException(status_code=400, detail="target_duration_sec는 60-300 사이")
if not req.genre:
raise HTTPException(status_code=400, detail="genre 필수")
if not os.getenv("SUNO_API_KEY"):
raise HTTPException(status_code=400, detail="SUNO_API_KEY 미설정")
batch_id = _db_module.create_batch_job(
genre=req.genre, count=req.count,
target_duration_sec=req.target_duration_sec,
auto_pipeline=req.auto_pipeline,
)
bg.add_task(_run_batch, batch_id)
return _db_module.get_batch_job(batch_id)
@app.get("/api/music/generate-batch/{batch_id}")
def get_batch(batch_id: int):
j = _db_module.get_batch_job(batch_id)
if not j:
raise HTTPException(status_code=404, detail="Not found")
if j["track_ids"]:
ids_csv = ",".join(str(i) for i in j["track_ids"])
import sqlite3
conn = sqlite3.connect(_db_module.DB_PATH)
conn.row_factory = sqlite3.Row
rows = conn.execute(
f"SELECT id, title, audio_url, duration_sec FROM music_library WHERE id IN ({ids_csv})"
).fetchall()
conn.close()
# 트랙을 batch.track_ids 순서대로 정렬
by_id = {r["id"]: dict(r) for r in rows}
j["tracks"] = [by_id.get(tid) for tid in j["track_ids"] if tid in by_id]
else:
j["tracks"] = []
return j
@app.get("/api/music/generate-batch")
def list_batches(status: str = "all"):
return {"batches": _db_module.list_batch_jobs(active_only=(status == "active"))}
@app.get("/api/music/genres")
def list_supported_genres():
"""배치 생성에서 사용 가능한 장르 목록 — random_pools의 키."""
from .random_pools import list_genres
return {"genres": list_genres()}
# ── 수익화 추적 API ───────────────────────────────────────────────────────────
@app.get("/api/music/revenue/dashboard")
def revenue_dashboard():
return get_revenue_dashboard()
@app.get("/api/music/revenue")
def list_revenue(yt_video_id: Optional[str] = None, year_month: Optional[str] = None):
return {"records": get_all_revenue_records(yt_video_id, year_month)}
@app.post("/api/music/revenue", status_code=201)
def add_revenue(req: RevenueCreate):
return create_revenue_record(req.model_dump())
@app.put("/api/music/revenue/{record_id}")
def edit_revenue(record_id: int, req: RevenueUpdate):
data = {k: v for k, v in req.model_dump().items() if v is not None}
result = update_revenue_record(record_id, data)
if not result:
raise HTTPException(status_code=404, detail="Record not found")
return result
@app.delete("/api/music/revenue/{record_id}")
def remove_revenue(record_id: int):
if not delete_revenue_record(record_id):
raise HTTPException(status_code=404, detail="Record not found")
return {"ok": True}
# ── 시장 조사 API ─────────────────────────────────────────────────────────────
class MarketIngestRequest(BaseModel):
trends: List[Dict[str, Any]]
report_date: str = ""
@app.post("/api/music/market/ingest")
def market_ingest(req: MarketIngestRequest):
"""agent-office → 트렌드 데이터 수신 + 리포트 생성."""
from datetime import date
report_date = req.report_date or date.today().isoformat()
report = ingest_trends(req.trends, report_date)
return {"ok": True, "trends_saved": len(req.trends), "report_date": report_date}
@app.get("/api/music/market/trends")
def list_market_trends(
country: Optional[str] = None,
genre: Optional[str] = None,
source: Optional[str] = None,
days: int = Query(7, ge=1),
):
return {"trends": _get_market_trends(country, genre, source, days)}
@app.get("/api/music/market/report/latest")
def get_market_report_latest():
report = get_latest_trend_report()
if not report:
raise HTTPException(status_code=404, detail="리포트 없음 — 아직 수집 전")
return report
@app.get("/api/music/market/report")
def list_market_reports(limit: int = 10):
return {"reports": _get_trend_reports(limit)}
@app.get("/api/music/market/suggest")
def market_suggest(limit: int = 5):
return {"suggestions": get_suggestions(limit)}
# ── Pipeline endpoints ────────────────────────────────────────────────────────
class PipelineCreate(BaseModel):
track_id: int | None = None
compile_job_id: int | None = None
visual_style: str | None = None # single | essential
background_mode: str | None = None # static | video_loop
background_keyword: str | None = None
class FeedbackRequest(BaseModel):
step: str
intent: str # approve | reject
feedback_text: Optional[str] = None
@app.post("/api/music/pipeline", status_code=201)
def create_pipeline(req: PipelineCreate):
# XOR 검증
if (req.track_id is None) == (req.compile_job_id is None):
raise HTTPException(400, "track_id 또는 compile_job_id 중 정확히 하나를 지정")
# compile_job 상태 확인
if req.compile_job_id is not None:
job = _db_module.get_compile_job(req.compile_job_id)
if not job:
raise HTTPException(404, f"compile job {req.compile_job_id} 없음")
if job.get("status") not in ("done", "succeeded"):
raise HTTPException(400, f"compile job {req.compile_job_id} not ready (status={job.get('status')})")
# 동일 입력으로 이미 active 파이프라인 있으면 409
actives = _db_module.list_pipelines(active_only=True)
for p in actives:
if (req.track_id and p.get("track_id") == req.track_id) or \
(req.compile_job_id and p.get("compile_job_id") == req.compile_job_id):
raise HTTPException(409, "이미 진행 중인 파이프라인이 있습니다")
setup = _db_module.get_youtube_setup()
vd = setup["visual_defaults"]
pid = _db_module.create_pipeline(
track_id=req.track_id,
compile_job_id=req.compile_job_id,
visual_style=req.visual_style or vd.get("default_visual_style", "essential"),
background_mode=req.background_mode or vd.get("default_background_mode", "static"),
background_keyword=req.background_keyword or vd.get("default_background_keyword") or None,
)
return _db_module.get_pipeline(pid)
@app.get("/api/music/pipeline")
def list_pipelines_endpoint(status: str = "all"):
pipelines = _db_module.list_pipelines(active_only=(status == "active"))
return {"pipelines": pipelines}
@app.get("/api/music/pipeline/lookup-by-msg/{msg_id}")
def lookup_by_msg(msg_id: int):
for p in _db_module.list_pipelines(active_only=True):
for step, mid in p["last_telegram_msg_ids"].items():
if mid == msg_id:
return {"pipeline_id": p["id"], "step": step}
raise HTTPException(404)
@app.get("/api/music/pipeline/{pid}")
def get_pipeline_endpoint(pid: int):
p = _db_module.get_pipeline(pid)
if not p:
raise HTTPException(404)
p["jobs"] = _db_module.list_pipeline_jobs(pid)
p["feedback"] = _db_module.get_feedback_history(pid)
return p
@app.post("/api/music/pipeline/{pid}/start", status_code=202)
async def start_pipeline(pid: int, bg: BackgroundTasks):
p = _db_module.get_pipeline(pid)
if not p:
raise HTTPException(404)
if p["state"] != "created":
raise HTTPException(409, f"이미 시작됨 ({p['state']})")
bg.add_task(orchestrator.run_step, pid, "cover")
return {"ok": True}
def _state_to_step(state: str) -> Optional[str]:
return {
"video_pending": "video",
"thumb_pending": "thumb",
"meta_pending": "meta",
"ai_review": "review",
"publish_pending": None, # 사용자 명시 발행 호출 필요
"publishing": "publish",
}.get(state)
@app.post("/api/music/pipeline/{pid}/feedback", status_code=202)
async def feedback(pid: int, req: FeedbackRequest, bg: BackgroundTasks):
p = _db_module.get_pipeline(pid)
if not p:
raise HTTPException(404)
if p["state"] == "awaiting_manual":
raise HTTPException(409, "수동 개입 대기 중")
state = p["state"]
expected = f"{req.step}_pending"
if state != expected:
# 멱등 처리 — 이미 다음 단계로 넘어갔으면 무시
return {"ok": True, "skipped": True}
if req.intent == "approve":
from .pipeline.state_machine import next_state_on_approve
# Validate transition is legal
try:
next_st = next_state_on_approve(state)
except ValueError as e:
raise HTTPException(400, str(e))
next_step = _state_to_step(next_st)
if next_step:
# bg task will set state to the new *_pending when step completes
bg.add_task(orchestrator.run_step, pid, next_step)
else:
# No step to run — fall through to direct state update
# (defensive — current code paths don't hit this)
_db_module.update_pipeline_state(pid, next_st)
return {"ok": True}
elif req.intent == "reject":
count = _db_module.increment_feedback_count(pid, req.step)
if count > 5:
_db_module.update_pipeline_state(pid, "awaiting_manual")
raise HTTPException(409, "재생성 한도 초과")
if req.feedback_text:
_db_module.record_feedback(pid, req.step, req.feedback_text)
bg.add_task(orchestrator.run_step, pid, req.step, req.feedback_text or "")
return {"ok": True}
else:
raise HTTPException(400, f"unknown intent: {req.intent}")
@app.post("/api/music/pipeline/{pid}/cancel")
def cancel_pipeline(pid: int):
p = _db_module.get_pipeline(pid)
if not p:
raise HTTPException(404)
_db_module.update_pipeline_state(pid, "cancelled", cancelled_at=_db_module._now())
return {"ok": True}
@app.post("/api/music/pipeline/{pid}/publish", status_code=202)
async def publish_pipeline(pid: int, bg: BackgroundTasks):
p = _db_module.get_pipeline(pid)
if not p:
raise HTTPException(404)
if p["state"] != "publish_pending":
raise HTTPException(409, f"발행 단계 아님 ({p['state']})")
_db_module.update_pipeline_state(pid, "publishing")
bg.add_task(orchestrator.run_step, pid, "publish")
return {"ok": True}
# Telegram 메시지 매칭용 엔드포인트 (agent-office용)
class TelegramMsgPatch(BaseModel):
step: str
message_id: int
@app.patch("/api/music/pipeline/{pid}/telegram-msg")
def save_telegram_msg(pid: int, req: TelegramMsgPatch):
p = _db_module.get_pipeline(pid)
if not p:
raise HTTPException(404)
ids = p["last_telegram_msg_ids"]
ids[req.step] = req.message_id
_db_module.update_pipeline_state(
pid, p["state"], last_telegram_msg_ids=json.dumps(ids)
)
return {"ok": True}
# ── Setup endpoints ───────────────────────────────────────────────────────────
class SetupRequest(BaseModel):
metadata_template: Optional[Dict[str, Any]] = None
cover_prompts: Optional[Dict[str, Any]] = None
review_weights: Optional[Dict[str, Any]] = None
review_threshold: Optional[int] = None
visual_defaults: Optional[Dict[str, Any]] = None
publish_policy: Optional[Dict[str, Any]] = None
@app.get("/api/music/setup")
def get_setup():
return _db_module.get_youtube_setup()
@app.put("/api/music/setup")
def put_setup(req: SetupRequest):
payload = {k: v for k, v in req.dict().items() if v is not None}
_db_module.update_youtube_setup(**payload)
return _db_module.get_youtube_setup()
# ── YouTube OAuth endpoints ───────────────────────────────────────────────────
@app.get("/api/music/youtube/auth-url")
def youtube_auth_url():
return {"url": yt_module.get_auth_url()}
@app.get("/api/music/youtube/callback")
async def youtube_callback(code: str):
return await yt_module.exchange_code(code)
@app.post("/api/music/youtube/disconnect")
def youtube_disconnect():
yt_module.disconnect()
return {"ok": True}
@app.get("/api/music/youtube/status")
def youtube_status():
return yt_module.get_status() or {"connected": False}