import json import os import shutil import uuid from typing import Any, Dict, List, Optional from fastapi import FastAPI, HTTPException, BackgroundTasks, Query from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from .db import ( init_db, create_task, get_task, get_all_tracks, add_track, delete_track, get_track_file_path, get_track_by_task_id, get_track_by_id, update_track_duration, update_track_file_info, update_track_hash, get_all_lyrics, add_lyrics, update_lyrics, delete_lyrics, create_video_project, get_video_project, get_all_video_projects, update_video_project_status, delete_video_project, create_revenue_record, get_all_revenue_records, update_revenue_record, delete_revenue_record, get_revenue_dashboard, get_market_trends as _get_market_trends, get_latest_trend_report, get_trend_reports as _get_trend_reports, create_compile_job, get_compile_jobs, get_compile_job, update_compile_job, delete_compile_job, ) from . import db as _db_module from .compiler import run_compile from .market import ingest_trends, get_suggestions from .local_provider import run_local_generation from .pipeline import orchestrator from .pipeline import youtube as yt_module from .suno_provider import ( run_suno_generation, run_suno_extend, run_vocal_removal, run_cover_image, run_wav_convert, run_stem_split, run_upload_cover, run_upload_extend, run_add_vocals, run_add_instrumental, run_video_generate, generate_lyrics, get_credits, get_timestamped_lyrics, generate_style_boost, SUNO_API_KEY, SUNO_MODELS, ) app = FastAPI() _cors_origins = os.getenv("CORS_ALLOW_ORIGINS", "http://localhost:3007,http://localhost:8080").split(",") app.add_middleware( CORSMiddleware, allow_origins=[o.strip() for o in _cors_origins], allow_credentials=False, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], allow_headers=["Content-Type"], ) MUSIC_DATA_DIR = "/app/data" VIDEO_DATA_DIR = os.getenv("VIDEO_DATA_DIR", "/app/data/videos") def _get_mp3_duration(file_path: str) -> Optional[int]: """MP3 파일에서 실제 재생 시간(초) 추출.""" try: from mutagen.mp3 import MP3 audio = MP3(file_path) return int(audio.info.length) except Exception: return None def _backfill_durations(): """duration_sec이 없는 기존 트랙에 MP3 메타데이터에서 길이 채우기.""" for t in get_all_tracks(): if t["duration_sec"] is None and t.get("file_path"): dur = _get_mp3_duration(t["file_path"]) if dur: update_track_duration(t["id"], dur) @app.on_event("startup") def on_startup(): init_db() os.makedirs(MUSIC_DATA_DIR, exist_ok=True) _backfill_durations() @app.get("/health") def health(): return {"ok": True} @app.get("/api/music/providers") def get_providers(): """사용 가능한 음악 생성 프로바이더 목록 반환.""" providers = [] if os.getenv("MUSIC_AI_SERVER_URL"): providers.append({ "id": "local", "name": "MusicGen", "description": "로컬 AI 서버 (인스트루멘탈 전용)", "features": ["instrumental"], }) if SUNO_API_KEY: providers.append({ "id": "suno", "name": "Suno", "description": "Suno AI (보컬·가사·인스트루멘탈)", "features": ["vocals", "lyrics", "instrumental"], }) return {"providers": providers} # ── 음악 생성 API ───────────────────────────────────────────────────────────── class GenerateRequest(BaseModel): provider: str = "suno" # "suno" | "local" model: str = "V4" # Suno 모델 (V4, V4_5, V5 등) title: str = "" genre: str = "" moods: List[str] = [] instruments: List[str] = [] duration_sec: Optional[int] = None bpm: Optional[int] = None key: str = "" scale: str = "" prompt: str = "" # Suno 전용 lyrics: str = "" # 커스텀 가사 ([Verse], [Chorus] 등) instrumental: bool = False # True면 보컬 없이 인스트루멘탈만 # Phase 1 신규 vocal_gender: Optional[str] = None # "m" | "f" negative_tags: Optional[str] = None # 제외 스타일 style_weight: Optional[float] = None # 0.0~1.0 audio_weight: Optional[float] = None # 0.0~1.0 @app.post("/api/music/generate") def generate_music(req: GenerateRequest, background_tasks: BackgroundTasks): """ 음악 생성 작업 시작. task_id 즉시 반환 후 백그라운드에서 AI 서버 호출. provider: "suno" (Suno API) 또는 "local" (MusicGen) """ provider = req.provider if provider == "suno" and not SUNO_API_KEY: raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다") if provider == "local" and not os.getenv("MUSIC_AI_SERVER_URL"): raise HTTPException(status_code=400, detail="로컬 AI 서버 URL이 설정되지 않았습니다") if provider not in ("suno", "local"): raise HTTPException(status_code=400, detail=f"지원하지 않는 provider: {provider}") task_id = str(uuid.uuid4()) params = req.model_dump() create_task(task_id, params, provider=provider) if provider == "suno": background_tasks.add_task(run_suno_generation, task_id, params) else: background_tasks.add_task(run_local_generation, task_id, params) return {"task_id": task_id, "provider": provider} @app.get("/api/music/status/{task_id}") def get_status(task_id: str): """ 생성 작업 상태 조회. 프론트는 succeeded 또는 failed가 될 때까지 폴링. status: queued | processing | succeeded | failed succeeded 시 track 메타데이터 포함. """ task = get_task(task_id) if not task: raise HTTPException(status_code=404, detail="Task not found") resp = { "status": task["status"], "progress": task["progress"], "message": task["message"], "audio_url": task["audio_url"], "error": task["error"], "provider": task["provider"], } if task["status"] == "succeeded": track = get_track_by_task_id(task_id) resp["track"] = track return resp # ── 가사 생성 API (Suno 전용) ──────────────────────────────────────────────── class LyricsRequest(BaseModel): prompt: str @app.post("/api/music/lyrics") def gen_lyrics(req: LyricsRequest): """Suno AI로 가사를 생성합니다. 곡 생성 전 가사 미리보기용.""" if not SUNO_API_KEY: raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다") result = generate_lyrics(req.prompt) if not result: raise HTTPException(status_code=502, detail="가사 생성에 실패했습니다") return result # ── 라이브러리 API ──────────────────────────────────────────────────────────── class TrackCreate(BaseModel): title: str = "" genre: str = "" moods: List[str] = [] instruments: List[str] = [] duration_sec: Optional[int] = None bpm: Optional[int] = None key: str = "" scale: str = "" prompt: str = "" audio_url: str = "" file_path: str = "" task_id: Optional[str] = None tags: List[str] = [] provider: str = "local" lyrics: str = "" image_url: str = "" suno_id: str = "" @app.get("/api/music/library") def list_library(): """저장된 트랙 목록 전체 조회 (생성일 내림차순). 파일시스템과 자동 동기화.""" _sync_library_with_disk() return {"tracks": get_all_tracks()} def _calc_file_hash(file_path: str) -> str: """MD5 해시 계산 (파일 동일성 체크용).""" import hashlib h = hashlib.md5() try: with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): h.update(chunk) return h.hexdigest() except OSError: return "" def _sync_library_with_disk(): """파일시스템의 .mp3 파일과 DB를 동기화 (해시 기반 rename 감지). 1단계: 파일명 매칭 (빠른 경로) 2단계: 미매칭 파일/레코드를 해시로 비교 → rename 감지 → 메타데이터 보존 업데이트 3단계: 나머지 → 삭제/추가 """ tracks = get_all_tracks() media_base = os.getenv("MUSIC_MEDIA_BASE", "/media/music") # 디스크의 .mp3 파일 목록 disk_files = set() try: for f in os.listdir(MUSIC_DATA_DIR): if f.lower().endswith(".mp3"): disk_files.add(f) except OSError: return # ── 1단계: 파일명 매칭 ────────────────────────────────────── db_by_filename = {} # filename → track for t in tracks: if t.get("audio_url"): fname = t["audio_url"].split("/")[-1] db_by_filename[fname] = t matched_disk = set() matched_db_ids = set() for f in disk_files: if f in db_by_filename: matched_disk.add(f) track = db_by_filename[f] matched_db_ids.add(track["id"]) # 기존 트랙에 file_hash 없으면 채우기 if not track.get("file_hash"): file_hash = _calc_file_hash(os.path.join(MUSIC_DATA_DIR, f)) if file_hash: update_track_hash(track["id"], file_hash) unmatched_disk = disk_files - matched_disk unmatched_db = [t for t in tracks if t["id"] not in matched_db_ids] # ── 2단계: 해시 기반 rename 감지 ──────────────────────────── if unmatched_disk and unmatched_db: # DB 미매칭 레코드의 해시 맵 db_hash_map = {} # hash → track for t in unmatched_db: h = t.get("file_hash", "") if h: db_hash_map[h] = t resolved_disk = set() resolved_db_ids = set() for f in unmatched_disk: file_path = os.path.join(MUSIC_DATA_DIR, f) file_hash = _calc_file_hash(file_path) if not file_hash: continue if file_hash in db_hash_map: # rename 감지 — 기존 레코드 업데이트 (태그·메타데이터 보존) track = db_hash_map[file_hash] new_title = os.path.splitext(f)[0].replace("-", " ").replace("_", " ") update_track_file_info( track["id"], title=new_title, audio_url=f"{media_base}/{f}", file_path=file_path, ) resolved_disk.add(f) resolved_db_ids.add(track["id"]) unmatched_disk -= resolved_disk unmatched_db = [t for t in unmatched_db if t["id"] not in resolved_db_ids] # ── 3단계: 나머지 처리 ────────────────────────────────────── # DB에만 남은 레코드 → 파일 삭제됨 → DB 삭제 for t in unmatched_db: delete_track(t["id"]) # 디스크에만 남은 파일 → 신규 → DB 추가 (해시 포함) for f in unmatched_disk: file_path = os.path.join(MUSIC_DATA_DIR, f) title = os.path.splitext(f)[0].replace("-", " ").replace("_", " ") file_hash = _calc_file_hash(file_path) add_track({ "title": title, "audio_url": f"{media_base}/{f}", "file_path": file_path, "provider": "suno", "duration_sec": _get_mp3_duration(file_path), "file_hash": file_hash, }) @app.post("/api/music/library", status_code=201) def save_to_library(req: TrackCreate): """트랙 수동 추가 (외부 파일 등록 또는 프론트 직접 저장용)""" track = add_track(req.model_dump()) return track @app.delete("/api/music/library/{track_id}") def remove_from_library(track_id: int): """라이브러리에서 트랙 삭제. 로컬 파일도 함께 삭제.""" file_path = get_track_file_path(track_id) ok = delete_track(track_id) if not ok: raise HTTPException(status_code=404, detail="Track not found") if file_path and os.path.isfile(file_path): try: os.remove(file_path) except OSError: pass return {"ok": True} # ── 모델 목록 API ──────────────────────────────────────────────────────────── @app.get("/api/music/models") def get_models(): """사용 가능한 Suno AI 모델 목록.""" return {"models": SUNO_MODELS} # ── 크레딧 조회 API ────────────────────────────────────────────────────────── @app.get("/api/music/credits") def check_credits(): """Suno 잔여 크레딧 조회.""" if not SUNO_API_KEY: raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다") result = get_credits() if result is None: raise HTTPException(status_code=502, detail="크레딧 조회 실패") return result # ── 곡 연장 API ────────────────────────────────────────────────────────────── class ExtendRequest(BaseModel): suno_id: str # 원본 Suno 곡 ID continue_at: int = 0 # 연장 시작 지점 (초) prompt: str = "" # 추가 가사/프롬프트 style: str = "" # 스타일 오버라이드 title: str = "" model: str = "V4" @app.post("/api/music/extend") def extend_music(req: ExtendRequest, background_tasks: BackgroundTasks): """기존 곡을 특정 지점부터 연장 (Suno Extend API).""" if not SUNO_API_KEY: raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다") task_id = str(uuid.uuid4()) params = req.model_dump() create_task(task_id, params, provider="suno") background_tasks.add_task(run_suno_extend, task_id, params) return {"task_id": task_id, "provider": "suno"} # ── 보컬 분리 API ──────────────────────────────────────────────────────────── class VocalRemovalRequest(BaseModel): suno_id: str # Suno 곡 ID title: str = "" # 원본 트랙 제목 @app.post("/api/music/vocal-removal") def vocal_removal(req: VocalRemovalRequest, background_tasks: BackgroundTasks): """트랙에서 보컬과 인스트루멘탈을 분리 (Suno Vocal Removal API).""" if not SUNO_API_KEY: raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다") task_id = str(uuid.uuid4()) params = req.model_dump() create_task(task_id, params, provider="suno") background_tasks.add_task(run_vocal_removal, task_id, params) return {"task_id": task_id, "provider": "suno"} # ── 커버 이미지 생성 API ──────────────────────────────────────────────────── class CoverImageRequest(BaseModel): suno_task_id: str # Suno 생성 task ID track_id: Optional[int] = None # 라이브러리 트랙 ID (결과 저장용) @app.post("/api/music/cover-image") def cover_image(req: CoverImageRequest, background_tasks: BackgroundTasks): """Suno 곡의 커버 이미지 2장 생성.""" if not SUNO_API_KEY: raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다") task_id = str(uuid.uuid4()) params = req.model_dump() create_task(task_id, params, provider="suno") background_tasks.add_task(run_cover_image, task_id, params) return {"task_id": task_id, "provider": "suno"} # ── WAV 변환 API ──────────────────────────────────────────────────────────── class WavRequest(BaseModel): suno_task_id: str suno_id: str track_id: Optional[int] = None @app.post("/api/music/wav") def wav_convert(req: WavRequest, background_tasks: BackgroundTasks): """곡을 WAV 포맷으로 변환.""" if not SUNO_API_KEY: raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다") task_id = str(uuid.uuid4()) params = req.model_dump() create_task(task_id, params, provider="suno") background_tasks.add_task(run_wav_convert, task_id, params) return {"task_id": task_id, "provider": "suno"} # ── 12스템 분리 API ───────────────────────────────────────────────────────── class StemSplitRequest(BaseModel): suno_task_id: str suno_id: str track_id: Optional[int] = None @app.post("/api/music/stem-split") def stem_split(req: StemSplitRequest, background_tasks: BackgroundTasks): """곡을 12개 스템으로 분리 (50 크레딧). 보컬, 드럼, 베이스, 기타 등.""" if not SUNO_API_KEY: raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다") task_id = str(uuid.uuid4()) params = req.model_dump() create_task(task_id, params, provider="suno") background_tasks.add_task(run_stem_split, task_id, params) return {"task_id": task_id, "provider": "suno"} # ── 타임스탬프 가사 API ───────────────────────────────────────────────────── @app.get("/api/music/timestamped-lyrics") def timestamped_lyrics(task_id: str, suno_id: str): """타임스탬프 가사 조회 (가라오케 스타일 싱크용).""" if not SUNO_API_KEY: raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다") result = get_timestamped_lyrics(task_id, suno_id) if not result: raise HTTPException(status_code=502, detail="타임스탬프 가사 조회 실패") return result # ── 스타일 부스트 API ─────────────────────────────────────────────────────── class StyleBoostRequest(BaseModel): content: str @app.post("/api/music/style-boost") def style_boost(req: StyleBoostRequest): """AI로 최적 스타일 프롬프트 생성.""" if not SUNO_API_KEY: raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다") result = generate_style_boost(req.content) if not result: raise HTTPException(status_code=502, detail="스타일 부스트 생성 실패") return result # ── Phase 3: 업로드 + 커버 ────────────────────────────────────────────────── class UploadCoverRequest(BaseModel): upload_url: str model: str = "V4" custom_mode: bool = True instrumental: bool = False prompt: str = "" style: str = "" title: str = "" vocal_gender: Optional[str] = None negative_tags: Optional[str] = None style_weight: Optional[float] = None audio_weight: Optional[float] = None @app.post("/api/music/upload-cover") def upload_cover(req: UploadCoverRequest, background_tasks: BackgroundTasks): """외부 오디오를 Suno 스타일로 리메이크.""" if not SUNO_API_KEY: raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다") task_id = str(uuid.uuid4()) params = req.model_dump() create_task(task_id, params, provider="suno") background_tasks.add_task(run_upload_cover, task_id, params) return {"task_id": task_id, "provider": "suno"} # ── Phase 3: 업로드 + 확장 ────────────────────────────────────────────────── class UploadExtendRequest(BaseModel): upload_url: str model: str = "V4" default_param_flag: bool = True continue_at: Optional[float] = None prompt: str = "" style: str = "" title: str = "" instrumental: bool = False vocal_gender: Optional[str] = None negative_tags: Optional[str] = None @app.post("/api/music/upload-extend") def upload_extend(req: UploadExtendRequest, background_tasks: BackgroundTasks): """외부 오디오를 이어서 확장.""" if not SUNO_API_KEY: raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다") task_id = str(uuid.uuid4()) params = req.model_dump() create_task(task_id, params, provider="suno") background_tasks.add_task(run_upload_extend, task_id, params) return {"task_id": task_id, "provider": "suno"} # ── Phase 3: 보컬 추가 ────────────────────────────────────────────────────── class AddVocalsRequest(BaseModel): upload_url: str prompt: str title: str style: str negative_tags: str = "" vocal_gender: Optional[str] = None model: str = "V4_5PLUS" style_weight: Optional[float] = None audio_weight: Optional[float] = None @app.post("/api/music/add-vocals") def add_vocals(req: AddVocalsRequest, background_tasks: BackgroundTasks): """인스트루멘탈에 AI 보컬 추가.""" if not SUNO_API_KEY: raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다") task_id = str(uuid.uuid4()) params = req.model_dump() create_task(task_id, params, provider="suno") background_tasks.add_task(run_add_vocals, task_id, params) return {"task_id": task_id, "provider": "suno"} # ── Phase 3: 인스트루멘탈 추가 ────────────────────────────────────────────── class AddInstrumentalRequest(BaseModel): upload_url: str title: str tags: str negative_tags: str = "" vocal_gender: Optional[str] = None model: str = "V4_5PLUS" style_weight: Optional[float] = None audio_weight: Optional[float] = None @app.post("/api/music/add-instrumental") def add_instrumental(req: AddInstrumentalRequest, background_tasks: BackgroundTasks): """보컬에 AI 반주 추가.""" if not SUNO_API_KEY: raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다") task_id = str(uuid.uuid4()) params = req.model_dump() create_task(task_id, params, provider="suno") background_tasks.add_task(run_add_instrumental, task_id, params) return {"task_id": task_id, "provider": "suno"} # ── Phase 3: 뮤직비디오 생성 ──────────────────────────────────────────────── class VideoRequest(BaseModel): suno_task_id: str suno_id: str author: str = "" domain_name: str = "" track_id: Optional[int] = None @app.post("/api/music/video") def video_generate(req: VideoRequest, background_tasks: BackgroundTasks): """뮤직비디오(MP4) 생성.""" if not SUNO_API_KEY: raise HTTPException(status_code=400, detail="Suno API 키가 설정되지 않았습니다") task_id = str(uuid.uuid4()) params = req.model_dump() create_task(task_id, params, provider="suno") background_tasks.add_task(run_video_generate, task_id, params) return {"task_id": task_id, "provider": "suno"} # ── 저장된 가사 CRUD API ──────────────────────────────────────────────────── class LyricsSave(BaseModel): title: str = "" text: str = "" prompt: str = "" class LyricsUpdate(BaseModel): title: Optional[str] = None text: Optional[str] = None prompt: Optional[str] = None @app.get("/api/music/lyrics/library") def list_saved_lyrics(): """저장된 가사 목록 전체 조회 (생성일 내림차순).""" return {"lyrics": get_all_lyrics()} @app.post("/api/music/lyrics/library", status_code=201) def save_lyrics(req: LyricsSave): """가사 저장.""" return add_lyrics(req.model_dump()) @app.put("/api/music/lyrics/library/{lyrics_id}") def edit_lyrics(lyrics_id: int, req: LyricsUpdate): """가사 수정.""" data = {k: v for k, v in req.model_dump().items() if v is not None} result = update_lyrics(lyrics_id, data) if not result: raise HTTPException(status_code=404, detail="Lyrics not found") return result @app.delete("/api/music/lyrics/library/{lyrics_id}") def remove_lyrics(lyrics_id: int): """가사 삭제.""" if not delete_lyrics(lyrics_id): raise HTTPException(status_code=404, detail="Lyrics not found") return {"ok": True} # ── 영상 프로젝트 모델 ──────────────────────────────────────────────────────── class VideoProjectCreate(BaseModel): track_id: int format: str = "visualizer" target_countries: List[str] = [] render_params: dict = {} class RevenueCreate(BaseModel): video_project_id: Optional[int] = None yt_video_id: str = "" record_month: str views: int = 0 watch_hours: float = 0.0 revenue_usd: float = 0.0 country: str = "" source: str = "manual" class RevenueUpdate(BaseModel): yt_video_id: Optional[str] = None record_month: Optional[str] = None views: Optional[int] = None watch_hours: Optional[float] = None revenue_usd: Optional[float] = None country: Optional[str] = None source: Optional[str] = None # ── 영상 프로젝트 API ───────────────────────────────────────────────────────── @app.post("/api/music/video-project", status_code=201) def create_project(req: VideoProjectCreate, background_tasks: BackgroundTasks): if not get_track_by_id(req.track_id): raise HTTPException(status_code=404, detail="Track not found") if req.format not in ("visualizer", "slideshow"): raise HTTPException(status_code=400, detail="format은 'visualizer' 또는 'slideshow'") proj = create_video_project(req.model_dump()) return proj @app.get("/api/music/video-projects") def list_projects(): return {"projects": get_all_video_projects()} @app.get("/api/music/video-project/{project_id}") def get_project(project_id: int): proj = get_video_project(project_id) if not proj: raise HTTPException(status_code=404, detail="Project not found") return proj @app.post("/api/music/video-project/{project_id}/render") def render_project(project_id: int, background_tasks: BackgroundTasks): proj = get_video_project(project_id) if not proj: raise HTTPException(status_code=404, detail="Project not found") if proj["status"] == "rendering": raise HTTPException(status_code=409, detail="이미 렌더링 중입니다") from .video_producer import produce_video background_tasks.add_task(produce_video, project_id) return {"ok": True, "project_id": project_id, "status": "rendering"} @app.get("/api/music/video-project/{project_id}/export") def export_project(project_id: int): proj = get_video_project(project_id) if not proj: raise HTTPException(status_code=404, detail="Project not found") if proj["status"] != "done": raise HTTPException(status_code=400, detail=f"렌더링 미완료 (status: {proj['status']})") meta_path = os.path.join(VIDEO_DATA_DIR, str(project_id), "metadata.json") metadata = {} if os.path.exists(meta_path): with open(meta_path, encoding="utf-8") as f: metadata = json.load(f) thumb_url = proj["output_url"].replace("output.mp4", "thumbnail.jpg") if proj["output_url"] else "" return { "project_id": project_id, "output_url": proj["output_url"], "thumbnail_url": thumb_url, "yt_title": proj["yt_title"], "yt_description": proj["yt_description"], "yt_tags": proj["yt_tags"], "metadata": metadata, } @app.delete("/api/music/video-project/{project_id}") def delete_project(project_id: int): if not get_video_project(project_id): raise HTTPException(status_code=404, detail="Project not found") out_dir = os.path.join(VIDEO_DATA_DIR, str(project_id)) if os.path.isdir(out_dir): shutil.rmtree(out_dir, ignore_errors=True) delete_video_project(project_id) return {"ok": True} # ── Compile Jobs ────────────────────────────────────────────────────────────── class CompileRequest(BaseModel): title: str = "" track_ids: list[int] crossfade_sec: float = 3.0 @app.post("/api/music/compile") def create_compile(req: CompileRequest, background_tasks: BackgroundTasks): if not req.track_ids: raise HTTPException(status_code=400, detail="track_ids 필수") if not (0.5 <= req.crossfade_sec <= 15): raise HTTPException(status_code=400, detail="crossfade_sec: 0.5~15") job_id = create_compile_job(req.title, req.track_ids, req.crossfade_sec) background_tasks.add_task(run_compile, job_id) return {"id": job_id, "status": "rendering"} @app.get("/api/music/compiles") def list_compiles(): return {"jobs": get_compile_jobs()} @app.get("/api/music/compile/{job_id}") def get_compile(job_id: int): job = get_compile_job(job_id) if not job: raise HTTPException(status_code=404, detail="Not found") return job @app.delete("/api/music/compile/{job_id}") def delete_compile(job_id: int): job = get_compile_job(job_id) if not job: raise HTTPException(status_code=404, detail="Not found") if job.get("output_path"): out_dir = os.path.dirname(job["output_path"]) if os.path.exists(out_dir): shutil.rmtree(out_dir, ignore_errors=True) delete_compile_job(job_id) return {"ok": True} @app.get("/api/music/compile/{job_id}/export") def export_compile(job_id: int): job = get_compile_job(job_id) if not job or job["status"] != "done": raise HTTPException(status_code=404, detail="Not ready") out_dir = os.path.dirname(job["output_path"]) rel = os.path.relpath(job["output_path"], os.getenv("VIDEO_DATA_DIR", "/app/data/videos")) mp4_url = f"/media/videos/{rel}" return { "mp4_url": mp4_url, "duration_sec": job["duration_sec"], "title": job["title"], } # ── 수익화 추적 API ─────────────────────────────────────────────────────────── @app.get("/api/music/revenue/dashboard") def revenue_dashboard(): return get_revenue_dashboard() @app.get("/api/music/revenue") def list_revenue(yt_video_id: Optional[str] = None, year_month: Optional[str] = None): return {"records": get_all_revenue_records(yt_video_id, year_month)} @app.post("/api/music/revenue", status_code=201) def add_revenue(req: RevenueCreate): return create_revenue_record(req.model_dump()) @app.put("/api/music/revenue/{record_id}") def edit_revenue(record_id: int, req: RevenueUpdate): data = {k: v for k, v in req.model_dump().items() if v is not None} result = update_revenue_record(record_id, data) if not result: raise HTTPException(status_code=404, detail="Record not found") return result @app.delete("/api/music/revenue/{record_id}") def remove_revenue(record_id: int): if not delete_revenue_record(record_id): raise HTTPException(status_code=404, detail="Record not found") return {"ok": True} # ── 시장 조사 API ───────────────────────────────────────────────────────────── class MarketIngestRequest(BaseModel): trends: List[Dict[str, Any]] report_date: str = "" @app.post("/api/music/market/ingest") def market_ingest(req: MarketIngestRequest): """agent-office → 트렌드 데이터 수신 + 리포트 생성.""" from datetime import date report_date = req.report_date or date.today().isoformat() report = ingest_trends(req.trends, report_date) return {"ok": True, "trends_saved": len(req.trends), "report_date": report_date} @app.get("/api/music/market/trends") def list_market_trends( country: Optional[str] = None, genre: Optional[str] = None, source: Optional[str] = None, days: int = Query(7, ge=1), ): return {"trends": _get_market_trends(country, genre, source, days)} @app.get("/api/music/market/report/latest") def get_market_report_latest(): report = get_latest_trend_report() if not report: raise HTTPException(status_code=404, detail="리포트 없음 — 아직 수집 전") return report @app.get("/api/music/market/report") def list_market_reports(limit: int = 10): return {"reports": _get_trend_reports(limit)} @app.get("/api/music/market/suggest") def market_suggest(limit: int = 5): return {"suggestions": get_suggestions(limit)} # ── Pipeline endpoints ──────────────────────────────────────────────────────── class PipelineCreate(BaseModel): track_id: int | None = None compile_job_id: int | None = None visual_style: str | None = None # single | essential background_mode: str | None = None # static | video_loop background_keyword: str | None = None class FeedbackRequest(BaseModel): step: str intent: str # approve | reject feedback_text: Optional[str] = None @app.post("/api/music/pipeline", status_code=201) def create_pipeline(req: PipelineCreate): # XOR 검증 if (req.track_id is None) == (req.compile_job_id is None): raise HTTPException(400, "track_id 또는 compile_job_id 중 정확히 하나를 지정") # compile_job 상태 확인 if req.compile_job_id is not None: job = _db_module.get_compile_job(req.compile_job_id) if not job: raise HTTPException(404, f"compile job {req.compile_job_id} 없음") if job.get("status") not in ("done", "succeeded"): raise HTTPException(400, f"compile job {req.compile_job_id} not ready (status={job.get('status')})") # 동일 입력으로 이미 active 파이프라인 있으면 409 actives = _db_module.list_pipelines(active_only=True) for p in actives: if (req.track_id and p.get("track_id") == req.track_id) or \ (req.compile_job_id and p.get("compile_job_id") == req.compile_job_id): raise HTTPException(409, "이미 진행 중인 파이프라인이 있습니다") setup = _db_module.get_youtube_setup() vd = setup["visual_defaults"] pid = _db_module.create_pipeline( track_id=req.track_id, compile_job_id=req.compile_job_id, visual_style=req.visual_style or vd.get("default_visual_style", "essential"), background_mode=req.background_mode or vd.get("default_background_mode", "static"), background_keyword=req.background_keyword or vd.get("default_background_keyword") or None, ) return _db_module.get_pipeline(pid) @app.get("/api/music/pipeline") def list_pipelines_endpoint(status: str = "all"): pipelines = _db_module.list_pipelines(active_only=(status == "active")) return {"pipelines": pipelines} @app.get("/api/music/pipeline/lookup-by-msg/{msg_id}") def lookup_by_msg(msg_id: int): for p in _db_module.list_pipelines(active_only=True): for step, mid in p["last_telegram_msg_ids"].items(): if mid == msg_id: return {"pipeline_id": p["id"], "step": step} raise HTTPException(404) @app.get("/api/music/pipeline/{pid}") def get_pipeline_endpoint(pid: int): p = _db_module.get_pipeline(pid) if not p: raise HTTPException(404) p["jobs"] = _db_module.list_pipeline_jobs(pid) p["feedback"] = _db_module.get_feedback_history(pid) return p @app.post("/api/music/pipeline/{pid}/start", status_code=202) async def start_pipeline(pid: int, bg: BackgroundTasks): p = _db_module.get_pipeline(pid) if not p: raise HTTPException(404) if p["state"] != "created": raise HTTPException(409, f"이미 시작됨 ({p['state']})") bg.add_task(orchestrator.run_step, pid, "cover") return {"ok": True} def _state_to_step(state: str) -> Optional[str]: return { "video_pending": "video", "thumb_pending": "thumb", "meta_pending": "meta", "ai_review": "review", "publish_pending": None, # 사용자 명시 발행 호출 필요 "publishing": "publish", }.get(state) @app.post("/api/music/pipeline/{pid}/feedback", status_code=202) async def feedback(pid: int, req: FeedbackRequest, bg: BackgroundTasks): p = _db_module.get_pipeline(pid) if not p: raise HTTPException(404) if p["state"] == "awaiting_manual": raise HTTPException(409, "수동 개입 대기 중") state = p["state"] expected = f"{req.step}_pending" if state != expected: # 멱등 처리 — 이미 다음 단계로 넘어갔으면 무시 return {"ok": True, "skipped": True} if req.intent == "approve": from .pipeline.state_machine import next_state_on_approve # Validate transition is legal try: next_st = next_state_on_approve(state) except ValueError as e: raise HTTPException(400, str(e)) next_step = _state_to_step(next_st) if next_step: # bg task will set state to the new *_pending when step completes bg.add_task(orchestrator.run_step, pid, next_step) else: # No step to run — fall through to direct state update # (defensive — current code paths don't hit this) _db_module.update_pipeline_state(pid, next_st) return {"ok": True} elif req.intent == "reject": count = _db_module.increment_feedback_count(pid, req.step) if count > 5: _db_module.update_pipeline_state(pid, "awaiting_manual") raise HTTPException(409, "재생성 한도 초과") if req.feedback_text: _db_module.record_feedback(pid, req.step, req.feedback_text) bg.add_task(orchestrator.run_step, pid, req.step, req.feedback_text or "") return {"ok": True} else: raise HTTPException(400, f"unknown intent: {req.intent}") @app.post("/api/music/pipeline/{pid}/cancel") def cancel_pipeline(pid: int): p = _db_module.get_pipeline(pid) if not p: raise HTTPException(404) _db_module.update_pipeline_state(pid, "cancelled", cancelled_at=_db_module._now()) return {"ok": True} @app.post("/api/music/pipeline/{pid}/publish", status_code=202) async def publish_pipeline(pid: int, bg: BackgroundTasks): p = _db_module.get_pipeline(pid) if not p: raise HTTPException(404) if p["state"] != "publish_pending": raise HTTPException(409, f"발행 단계 아님 ({p['state']})") _db_module.update_pipeline_state(pid, "publishing") bg.add_task(orchestrator.run_step, pid, "publish") return {"ok": True} # Telegram 메시지 매칭용 엔드포인트 (agent-office용) class TelegramMsgPatch(BaseModel): step: str message_id: int @app.patch("/api/music/pipeline/{pid}/telegram-msg") def save_telegram_msg(pid: int, req: TelegramMsgPatch): p = _db_module.get_pipeline(pid) if not p: raise HTTPException(404) ids = p["last_telegram_msg_ids"] ids[req.step] = req.message_id _db_module.update_pipeline_state( pid, p["state"], last_telegram_msg_ids=json.dumps(ids) ) return {"ok": True} # ── Setup endpoints ─────────────────────────────────────────────────────────── class SetupRequest(BaseModel): metadata_template: Optional[Dict[str, Any]] = None cover_prompts: Optional[Dict[str, Any]] = None review_weights: Optional[Dict[str, Any]] = None review_threshold: Optional[int] = None visual_defaults: Optional[Dict[str, Any]] = None publish_policy: Optional[Dict[str, Any]] = None @app.get("/api/music/setup") def get_setup(): return _db_module.get_youtube_setup() @app.put("/api/music/setup") def put_setup(req: SetupRequest): payload = {k: v for k, v in req.dict().items() if v is not None} _db_module.update_youtube_setup(**payload) return _db_module.get_youtube_setup() # ── YouTube OAuth endpoints ─────────────────────────────────────────────────── @app.get("/api/music/youtube/auth-url") def youtube_auth_url(): return {"url": yt_module.get_auth_url()} @app.get("/api/music/youtube/callback") async def youtube_callback(code: str): return await yt_module.exchange_code(code) @app.post("/api/music/youtube/disconnect") def youtube_disconnect(): yt_module.disconnect() return {"ok": True} @app.get("/api/music/youtube/status") def youtube_status(): return yt_module.get_status() or {"connected": False}