Files
web-page-backend/music-lab/app/main.py

861 lines
32 KiB
Python

import json
import os
import shutil
import uuid
from typing import List, Optional
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from .db import (
init_db,
create_task, get_task,
get_all_tracks, add_track, delete_track, get_track_file_path, get_track_by_task_id, get_track_by_id,
update_track_duration, update_track_file_info, update_track_hash,
get_all_lyrics, add_lyrics, update_lyrics, delete_lyrics,
create_video_project, get_video_project, get_all_video_projects,
update_video_project_status, delete_video_project,
create_revenue_record, get_all_revenue_records,
update_revenue_record, delete_revenue_record, get_revenue_dashboard,
get_market_trends as _get_market_trends,
get_latest_trend_report, get_trend_reports as _get_trend_reports,
)
from .market import ingest_trends, get_suggestions
from .local_provider import run_local_generation
from .suno_provider import (
run_suno_generation, run_suno_extend, run_vocal_removal,
run_cover_image, run_wav_convert, run_stem_split,
run_upload_cover, run_upload_extend, run_add_vocals, run_add_instrumental, run_video_generate,
generate_lyrics, get_credits, get_timestamped_lyrics, generate_style_boost,
SUNO_API_KEY, SUNO_MODELS,
)
app = FastAPI()
_cors_origins = os.getenv("CORS_ALLOW_ORIGINS", "http://localhost:3007,http://localhost:8080").split(",")
app.add_middleware(
CORSMiddleware,
allow_origins=[o.strip() for o in _cors_origins],
allow_credentials=False,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["Content-Type"],
)
MUSIC_DATA_DIR = "/app/data"
VIDEO_DATA_DIR = os.getenv("VIDEO_DATA_DIR", "/app/data/videos")
def _get_mp3_duration(file_path: str) -> Optional[int]:
"""MP3 파일에서 실제 재생 시간(초) 추출."""
try:
from mutagen.mp3 import MP3
audio = MP3(file_path)
return int(audio.info.length)
except Exception:
return None
def _backfill_durations():
"""duration_sec이 없는 기존 트랙에 MP3 메타데이터에서 길이 채우기."""
for t in get_all_tracks():
if t["duration_sec"] is None and t.get("file_path"):
dur = _get_mp3_duration(t["file_path"])
if dur:
update_track_duration(t["id"], dur)
@app.on_event("startup")
def on_startup():
init_db()
os.makedirs(MUSIC_DATA_DIR, exist_ok=True)
_backfill_durations()
@app.get("/health")
def health():
return {"ok": True}
@app.get("/api/music/providers")
def get_providers():
"""사용 가능한 음악 생성 프로바이더 목록 반환."""
providers = []
if os.getenv("MUSIC_AI_SERVER_URL"):
providers.append({
"id": "local",
"name": "MusicGen",
"description": "로컬 AI 서버 (인스트루멘탈 전용)",
"features": ["instrumental"],
})
if SUNO_API_KEY:
providers.append({
"id": "suno",
"name": "Suno",
"description": "Suno AI (보컬·가사·인스트루멘탈)",
"features": ["vocals", "lyrics", "instrumental"],
})
return {"providers": providers}
# ── 음악 생성 API ─────────────────────────────────────────────────────────────
class GenerateRequest(BaseModel):
provider: str = "suno" # "suno" | "local"
model: str = "V4" # Suno 모델 (V4, V4_5, V5 등)
title: str = ""
genre: str = ""
moods: List[str] = []
instruments: List[str] = []
duration_sec: Optional[int] = None
bpm: Optional[int] = None
key: str = ""
scale: str = ""
prompt: str = ""
# Suno 전용
lyrics: str = "" # 커스텀 가사 ([Verse], [Chorus] 등)
instrumental: bool = False # True면 보컬 없이 인스트루멘탈만
# Phase 1 신규
vocal_gender: Optional[str] = None # "m" | "f"
negative_tags: Optional[str] = None # 제외 스타일
style_weight: Optional[float] = None # 0.0~1.0
audio_weight: Optional[float] = None # 0.0~1.0
@app.post("/api/music/generate")
def generate_music(req: GenerateRequest, background_tasks: BackgroundTasks):
"""
음악 생성 작업 시작. task_id 즉시 반환 후 백그라운드에서 AI 서버 호출.
provider: "suno" (Suno API) 또는 "local" (MusicGen)
"""
provider = req.provider
if provider == "suno" and not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
if provider == "local" and not os.getenv("MUSIC_AI_SERVER_URL"):
raise HTTPException(status_code=400, detail="로컬 AI 서버 URL이 설정되지 않았습니다")
if provider not in ("suno", "local"):
raise HTTPException(status_code=400, detail=f"지원하지 않는 provider: {provider}")
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider=provider)
if provider == "suno":
background_tasks.add_task(run_suno_generation, task_id, params)
else:
background_tasks.add_task(run_local_generation, task_id, params)
return {"task_id": task_id, "provider": provider}
@app.get("/api/music/status/{task_id}")
def get_status(task_id: str):
"""
생성 작업 상태 조회. 프론트는 succeeded 또는 failed가 될 때까지 폴링.
status: queued | processing | succeeded | failed
succeeded 시 track 메타데이터 포함.
"""
task = get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
resp = {
"status": task["status"],
"progress": task["progress"],
"message": task["message"],
"audio_url": task["audio_url"],
"error": task["error"],
"provider": task["provider"],
}
if task["status"] == "succeeded":
track = get_track_by_task_id(task_id)
resp["track"] = track
return resp
# ── 가사 생성 API (Suno 전용) ────────────────────────────────────────────────
class LyricsRequest(BaseModel):
prompt: str
@app.post("/api/music/lyrics")
def gen_lyrics(req: LyricsRequest):
"""Suno AI로 가사를 생성합니다. 곡 생성 전 가사 미리보기용."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
result = generate_lyrics(req.prompt)
if not result:
raise HTTPException(status_code=502, detail="가사 생성에 실패했습니다")
return result
# ── 라이브러리 API ────────────────────────────────────────────────────────────
class TrackCreate(BaseModel):
title: str = ""
genre: str = ""
moods: List[str] = []
instruments: List[str] = []
duration_sec: Optional[int] = None
bpm: Optional[int] = None
key: str = ""
scale: str = ""
prompt: str = ""
audio_url: str = ""
file_path: str = ""
task_id: Optional[str] = None
tags: List[str] = []
provider: str = "local"
lyrics: str = ""
image_url: str = ""
suno_id: str = ""
@app.get("/api/music/library")
def list_library():
"""저장된 트랙 목록 전체 조회 (생성일 내림차순). 파일시스템과 자동 동기화."""
_sync_library_with_disk()
return {"tracks": get_all_tracks()}
def _calc_file_hash(file_path: str) -> str:
"""MD5 해시 계산 (파일 동일성 체크용)."""
import hashlib
h = hashlib.md5()
try:
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
except OSError:
return ""
def _sync_library_with_disk():
"""파일시스템의 .mp3 파일과 DB를 동기화 (해시 기반 rename 감지).
1단계: 파일명 매칭 (빠른 경로)
2단계: 미매칭 파일/레코드를 해시로 비교 → rename 감지 → 메타데이터 보존 업데이트
3단계: 나머지 → 삭제/추가
"""
tracks = get_all_tracks()
media_base = os.getenv("MUSIC_MEDIA_BASE", "/media/music")
# 디스크의 .mp3 파일 목록
disk_files = set()
try:
for f in os.listdir(MUSIC_DATA_DIR):
if f.lower().endswith(".mp3"):
disk_files.add(f)
except OSError:
return
# ── 1단계: 파일명 매칭 ──────────────────────────────────────
db_by_filename = {} # filename → track
for t in tracks:
if t.get("audio_url"):
fname = t["audio_url"].split("/")[-1]
db_by_filename[fname] = t
matched_disk = set()
matched_db_ids = set()
for f in disk_files:
if f in db_by_filename:
matched_disk.add(f)
track = db_by_filename[f]
matched_db_ids.add(track["id"])
# 기존 트랙에 file_hash 없으면 채우기
if not track.get("file_hash"):
file_hash = _calc_file_hash(os.path.join(MUSIC_DATA_DIR, f))
if file_hash:
update_track_hash(track["id"], file_hash)
unmatched_disk = disk_files - matched_disk
unmatched_db = [t for t in tracks if t["id"] not in matched_db_ids]
# ── 2단계: 해시 기반 rename 감지 ────────────────────────────
if unmatched_disk and unmatched_db:
# DB 미매칭 레코드의 해시 맵
db_hash_map = {} # hash → track
for t in unmatched_db:
h = t.get("file_hash", "")
if h:
db_hash_map[h] = t
resolved_disk = set()
resolved_db_ids = set()
for f in unmatched_disk:
file_path = os.path.join(MUSIC_DATA_DIR, f)
file_hash = _calc_file_hash(file_path)
if not file_hash:
continue
if file_hash in db_hash_map:
# rename 감지 — 기존 레코드 업데이트 (태그·메타데이터 보존)
track = db_hash_map[file_hash]
new_title = os.path.splitext(f)[0].replace("-", " ").replace("_", " ")
update_track_file_info(
track["id"],
title=new_title,
audio_url=f"{media_base}/{f}",
file_path=file_path,
)
resolved_disk.add(f)
resolved_db_ids.add(track["id"])
unmatched_disk -= resolved_disk
unmatched_db = [t for t in unmatched_db if t["id"] not in resolved_db_ids]
# ── 3단계: 나머지 처리 ──────────────────────────────────────
# DB에만 남은 레코드 → 파일 삭제됨 → DB 삭제
for t in unmatched_db:
delete_track(t["id"])
# 디스크에만 남은 파일 → 신규 → DB 추가 (해시 포함)
for f in unmatched_disk:
file_path = os.path.join(MUSIC_DATA_DIR, f)
title = os.path.splitext(f)[0].replace("-", " ").replace("_", " ")
file_hash = _calc_file_hash(file_path)
add_track({
"title": title,
"audio_url": f"{media_base}/{f}",
"file_path": file_path,
"provider": "suno",
"duration_sec": _get_mp3_duration(file_path),
"file_hash": file_hash,
})
@app.post("/api/music/library", status_code=201)
def save_to_library(req: TrackCreate):
"""트랙 수동 추가 (외부 파일 등록 또는 프론트 직접 저장용)"""
track = add_track(req.model_dump())
return track
@app.delete("/api/music/library/{track_id}")
def remove_from_library(track_id: int):
"""라이브러리에서 트랙 삭제. 로컬 파일도 함께 삭제."""
file_path = get_track_file_path(track_id)
ok = delete_track(track_id)
if not ok:
raise HTTPException(status_code=404, detail="Track not found")
if file_path and os.path.isfile(file_path):
try:
os.remove(file_path)
except OSError:
pass
return {"ok": True}
# ── 모델 목록 API ────────────────────────────────────────────────────────────
@app.get("/api/music/models")
def get_models():
"""사용 가능한 Suno AI 모델 목록."""
return {"models": SUNO_MODELS}
# ── 크레딧 조회 API ──────────────────────────────────────────────────────────
@app.get("/api/music/credits")
def check_credits():
"""Suno 잔여 크레딧 조회."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
result = get_credits()
if result is None:
raise HTTPException(status_code=502, detail="크레딧 조회 실패")
return result
# ── 곡 연장 API ──────────────────────────────────────────────────────────────
class ExtendRequest(BaseModel):
suno_id: str # 원본 Suno 곡 ID
continue_at: int = 0 # 연장 시작 지점 (초)
prompt: str = "" # 추가 가사/프롬프트
style: str = "" # 스타일 오버라이드
title: str = ""
model: str = "V4"
@app.post("/api/music/extend")
def extend_music(req: ExtendRequest, background_tasks: BackgroundTasks):
"""기존 곡을 특정 지점부터 연장 (Suno Extend API)."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
background_tasks.add_task(run_suno_extend, task_id, params)
return {"task_id": task_id, "provider": "suno"}
# ── 보컬 분리 API ────────────────────────────────────────────────────────────
class VocalRemovalRequest(BaseModel):
suno_id: str # Suno 곡 ID
title: str = "" # 원본 트랙 제목
@app.post("/api/music/vocal-removal")
def vocal_removal(req: VocalRemovalRequest, background_tasks: BackgroundTasks):
"""트랙에서 보컬과 인스트루멘탈을 분리 (Suno Vocal Removal API)."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
background_tasks.add_task(run_vocal_removal, task_id, params)
return {"task_id": task_id, "provider": "suno"}
# ── 커버 이미지 생성 API ────────────────────────────────────────────────────
class CoverImageRequest(BaseModel):
suno_task_id: str # Suno 생성 task ID
track_id: Optional[int] = None # 라이브러리 트랙 ID (결과 저장용)
@app.post("/api/music/cover-image")
def cover_image(req: CoverImageRequest, background_tasks: BackgroundTasks):
"""Suno 곡의 커버 이미지 2장 생성."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
background_tasks.add_task(run_cover_image, task_id, params)
return {"task_id": task_id, "provider": "suno"}
# ── WAV 변환 API ────────────────────────────────────────────────────────────
class WavRequest(BaseModel):
suno_task_id: str
suno_id: str
track_id: Optional[int] = None
@app.post("/api/music/wav")
def wav_convert(req: WavRequest, background_tasks: BackgroundTasks):
"""곡을 WAV 포맷으로 변환."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
background_tasks.add_task(run_wav_convert, task_id, params)
return {"task_id": task_id, "provider": "suno"}
# ── 12스템 분리 API ─────────────────────────────────────────────────────────
class StemSplitRequest(BaseModel):
suno_task_id: str
suno_id: str
track_id: Optional[int] = None
@app.post("/api/music/stem-split")
def stem_split(req: StemSplitRequest, background_tasks: BackgroundTasks):
"""곡을 12개 스템으로 분리 (50 크레딧). 보컬, 드럼, 베이스, 기타 등."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
background_tasks.add_task(run_stem_split, task_id, params)
return {"task_id": task_id, "provider": "suno"}
# ── 타임스탬프 가사 API ─────────────────────────────────────────────────────
@app.get("/api/music/timestamped-lyrics")
def timestamped_lyrics(task_id: str, suno_id: str):
"""타임스탬프 가사 조회 (가라오케 스타일 싱크용)."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
result = get_timestamped_lyrics(task_id, suno_id)
if not result:
raise HTTPException(status_code=502, detail="타임스탬프 가사 조회 실패")
return result
# ── 스타일 부스트 API ───────────────────────────────────────────────────────
class StyleBoostRequest(BaseModel):
content: str
@app.post("/api/music/style-boost")
def style_boost(req: StyleBoostRequest):
"""AI로 최적 스타일 프롬프트 생성."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
result = generate_style_boost(req.content)
if not result:
raise HTTPException(status_code=502, detail="스타일 부스트 생성 실패")
return result
# ── Phase 3: 업로드 + 커버 ──────────────────────────────────────────────────
class UploadCoverRequest(BaseModel):
upload_url: str
model: str = "V4"
custom_mode: bool = True
instrumental: bool = False
prompt: str = ""
style: str = ""
title: str = ""
vocal_gender: Optional[str] = None
negative_tags: Optional[str] = None
style_weight: Optional[float] = None
audio_weight: Optional[float] = None
@app.post("/api/music/upload-cover")
def upload_cover(req: UploadCoverRequest, background_tasks: BackgroundTasks):
"""외부 오디오를 Suno 스타일로 리메이크."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
background_tasks.add_task(run_upload_cover, task_id, params)
return {"task_id": task_id, "provider": "suno"}
# ── Phase 3: 업로드 + 확장 ──────────────────────────────────────────────────
class UploadExtendRequest(BaseModel):
upload_url: str
model: str = "V4"
default_param_flag: bool = True
continue_at: Optional[float] = None
prompt: str = ""
style: str = ""
title: str = ""
instrumental: bool = False
vocal_gender: Optional[str] = None
negative_tags: Optional[str] = None
@app.post("/api/music/upload-extend")
def upload_extend(req: UploadExtendRequest, background_tasks: BackgroundTasks):
"""외부 오디오를 이어서 확장."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
background_tasks.add_task(run_upload_extend, task_id, params)
return {"task_id": task_id, "provider": "suno"}
# ── Phase 3: 보컬 추가 ──────────────────────────────────────────────────────
class AddVocalsRequest(BaseModel):
upload_url: str
prompt: str
title: str
style: str
negative_tags: str = ""
vocal_gender: Optional[str] = None
model: str = "V4_5PLUS"
style_weight: Optional[float] = None
audio_weight: Optional[float] = None
@app.post("/api/music/add-vocals")
def add_vocals(req: AddVocalsRequest, background_tasks: BackgroundTasks):
"""인스트루멘탈에 AI 보컬 추가."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
background_tasks.add_task(run_add_vocals, task_id, params)
return {"task_id": task_id, "provider": "suno"}
# ── Phase 3: 인스트루멘탈 추가 ──────────────────────────────────────────────
class AddInstrumentalRequest(BaseModel):
upload_url: str
title: str
tags: str
negative_tags: str = ""
vocal_gender: Optional[str] = None
model: str = "V4_5PLUS"
style_weight: Optional[float] = None
audio_weight: Optional[float] = None
@app.post("/api/music/add-instrumental")
def add_instrumental(req: AddInstrumentalRequest, background_tasks: BackgroundTasks):
"""보컬에 AI 반주 추가."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
background_tasks.add_task(run_add_instrumental, task_id, params)
return {"task_id": task_id, "provider": "suno"}
# ── Phase 3: 뮤직비디오 생성 ────────────────────────────────────────────────
class VideoRequest(BaseModel):
suno_task_id: str
suno_id: str
author: str = ""
domain_name: str = ""
track_id: Optional[int] = None
@app.post("/api/music/video")
def video_generate(req: VideoRequest, background_tasks: BackgroundTasks):
"""뮤직비디오(MP4) 생성."""
if not SUNO_API_KEY:
raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다")
task_id = str(uuid.uuid4())
params = req.model_dump()
create_task(task_id, params, provider="suno")
background_tasks.add_task(run_video_generate, task_id, params)
return {"task_id": task_id, "provider": "suno"}
# ── 저장된 가사 CRUD API ────────────────────────────────────────────────────
class LyricsSave(BaseModel):
title: str = ""
text: str = ""
prompt: str = ""
class LyricsUpdate(BaseModel):
title: Optional[str] = None
text: Optional[str] = None
prompt: Optional[str] = None
@app.get("/api/music/lyrics/library")
def list_saved_lyrics():
"""저장된 가사 목록 전체 조회 (생성일 내림차순)."""
return {"lyrics": get_all_lyrics()}
@app.post("/api/music/lyrics/library", status_code=201)
def save_lyrics(req: LyricsSave):
"""가사 저장."""
return add_lyrics(req.model_dump())
@app.put("/api/music/lyrics/library/{lyrics_id}")
def edit_lyrics(lyrics_id: int, req: LyricsUpdate):
"""가사 수정."""
data = {k: v for k, v in req.model_dump().items() if v is not None}
result = update_lyrics(lyrics_id, data)
if not result:
raise HTTPException(status_code=404, detail="Lyrics not found")
return result
@app.delete("/api/music/lyrics/library/{lyrics_id}")
def remove_lyrics(lyrics_id: int):
"""가사 삭제."""
if not delete_lyrics(lyrics_id):
raise HTTPException(status_code=404, detail="Lyrics not found")
return {"ok": True}
# ── 영상 프로젝트 모델 ────────────────────────────────────────────────────────
class VideoProjectCreate(BaseModel):
track_id: int
format: str = "visualizer"
target_countries: List[str] = []
render_params: dict = {}
class RevenueCreate(BaseModel):
video_project_id: Optional[int] = None
yt_video_id: str = ""
record_month: str
views: int = 0
watch_hours: float = 0.0
revenue_usd: float = 0.0
country: str = ""
source: str = "manual"
class RevenueUpdate(BaseModel):
yt_video_id: Optional[str] = None
record_month: Optional[str] = None
views: Optional[int] = None
watch_hours: Optional[float] = None
revenue_usd: Optional[float] = None
country: Optional[str] = None
source: Optional[str] = None
# ── 영상 프로젝트 API ─────────────────────────────────────────────────────────
@app.post("/api/music/video-project", status_code=201)
def create_project(req: VideoProjectCreate, background_tasks: BackgroundTasks):
if not get_track_by_id(req.track_id):
raise HTTPException(status_code=404, detail="Track not found")
if req.format not in ("visualizer", "slideshow"):
raise HTTPException(status_code=400, detail="format은 'visualizer' 또는 'slideshow'")
proj = create_video_project(req.model_dump())
return proj
@app.get("/api/music/video-projects")
def list_projects():
return {"projects": get_all_video_projects()}
@app.get("/api/music/video-project/{project_id}")
def get_project(project_id: int):
proj = get_video_project(project_id)
if not proj:
raise HTTPException(status_code=404, detail="Project not found")
return proj
@app.post("/api/music/video-project/{project_id}/render")
def render_project(project_id: int, background_tasks: BackgroundTasks):
proj = get_video_project(project_id)
if not proj:
raise HTTPException(status_code=404, detail="Project not found")
if proj["status"] == "rendering":
raise HTTPException(status_code=409, detail="이미 렌더링 중입니다")
from .video_producer import produce_video
background_tasks.add_task(produce_video, project_id)
return {"ok": True, "project_id": project_id, "status": "rendering"}
@app.get("/api/music/video-project/{project_id}/export")
def export_project(project_id: int):
proj = get_video_project(project_id)
if not proj:
raise HTTPException(status_code=404, detail="Project not found")
if proj["status"] != "done":
raise HTTPException(status_code=400, detail=f"렌더링 미완료 (status: {proj['status']})")
meta_path = os.path.join(VIDEO_DATA_DIR, str(project_id), "metadata.json")
metadata = {}
if os.path.exists(meta_path):
with open(meta_path, encoding="utf-8") as f:
metadata = json.load(f)
thumb_url = proj["output_url"].replace("output.mp4", "thumbnail.jpg") if proj["output_url"] else ""
return {
"project_id": project_id,
"output_url": proj["output_url"],
"thumbnail_url": thumb_url,
"yt_title": proj["yt_title"],
"yt_description": proj["yt_description"],
"yt_tags": proj["yt_tags"],
"metadata": metadata,
}
@app.delete("/api/music/video-project/{project_id}")
def delete_project(project_id: int):
if not get_video_project(project_id):
raise HTTPException(status_code=404, detail="Project not found")
out_dir = os.path.join(VIDEO_DATA_DIR, str(project_id))
if os.path.isdir(out_dir):
shutil.rmtree(out_dir, ignore_errors=True)
delete_video_project(project_id)
return {"ok": True}
# ── 수익화 추적 API ───────────────────────────────────────────────────────────
@app.get("/api/music/revenue/dashboard")
def revenue_dashboard():
return get_revenue_dashboard()
@app.get("/api/music/revenue")
def list_revenue(yt_video_id: Optional[str] = None, year_month: Optional[str] = None):
return {"records": get_all_revenue_records(yt_video_id, year_month)}
@app.post("/api/music/revenue", status_code=201)
def add_revenue(req: RevenueCreate):
return create_revenue_record(req.model_dump())
@app.put("/api/music/revenue/{record_id}")
def edit_revenue(record_id: int, req: RevenueUpdate):
data = {k: v for k, v in req.model_dump().items() if v is not None}
result = update_revenue_record(record_id, data)
if not result:
raise HTTPException(status_code=404, detail="Record not found")
return result
@app.delete("/api/music/revenue/{record_id}")
def remove_revenue(record_id: int):
if not delete_revenue_record(record_id):
raise HTTPException(status_code=404, detail="Record not found")
return {"ok": True}
# ── 시장 조사 API ─────────────────────────────────────────────────────────────
class MarketIngestRequest(BaseModel):
trends: list
report_date: str = ""
@app.post("/api/music/market/ingest")
def market_ingest(req: MarketIngestRequest):
"""agent-office → 트렌드 데이터 수신 + 리포트 생성."""
from datetime import date
report_date = req.report_date or date.today().isoformat()
report = ingest_trends(req.trends, report_date)
return {"ok": True, "trends_saved": len(req.trends), "report_date": report_date}
@app.get("/api/music/market/trends")
def list_market_trends(
country: Optional[str] = None,
genre: Optional[str] = None,
source: Optional[str] = None,
days: int = 7,
):
return {"trends": _get_market_trends(country, genre, source, days)}
@app.get("/api/music/market/report/latest")
def get_market_report_latest():
report = get_latest_trend_report()
if not report:
raise HTTPException(status_code=404, detail="리포트 없음 — 아직 수집 전")
return report
@app.get("/api/music/market/report")
def list_market_reports(limit: int = 10):
return {"reports": _get_trend_reports(limit)}
@app.get("/api/music/market/suggest")
def market_suggest(limit: int = 5):
return {"suggestions": get_suggestions(limit)}