import json import os import subprocess from typing import Optional import requests from .db import get_video_project, get_track_by_id, update_video_project_status VIDEO_DATA_DIR = os.getenv("VIDEO_DATA_DIR", "/app/data/videos") VIDEO_MEDIA_BASE = os.getenv("VIDEO_MEDIA_BASE", "/media/videos") PEXELS_API_KEY = os.getenv("PEXELS_API_KEY", "") ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") GENRE_COLORS = { "lo-fi": ((26, 26, 46), (22, 33, 62)), "phonk": ((26, 10, 10), (45, 0, 0)), "ambient": ((13, 33, 55), (10, 22, 40)), "pop": ((26, 10, 46), (45, 27, 78)), "default": ((17, 24, 39), (31, 41, 55)), } def _make_gradient_bg(width: int, height: int, genre: str, output_path: str) -> None: from PIL import Image top_rgb, bot_rgb = GENRE_COLORS.get(genre.lower(), GENRE_COLORS["default"]) img = Image.new("RGB", (width, height)) pixels = img.load() for y in range(height): t = y / height r = int(top_rgb[0] + (bot_rgb[0] - top_rgb[0]) * t) g = int(top_rgb[1] + (bot_rgb[1] - top_rgb[1]) * t) b = int(top_rgb[2] + (bot_rgb[2] - top_rgb[2]) * t) for x in range(width): pixels[x, y] = (r, g, b) img.save(output_path, "JPEG", quality=95) def _build_visualizer_cmd(audio_path: str, bg_path: str, output_path: str) -> list: return [ "ffmpeg", "-y", "-loop", "1", "-i", bg_path, "-i", audio_path, "-filter_complex", "[1:a]showwaves=s=1920x200:mode=cline:colors=0xFF4444@0.8[wave];" "[0:v][wave]overlay=0:880[out]", "-map", "[out]", "-map", "1:a", "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "aac", "-b:a", "192k", "-shortest", output_path, ] def _build_thumbnail_cmd(video_path: str, thumb_path: str) -> list: return [ "ffmpeg", "-y", "-i", video_path, "-ss", "00:00:05", "-vframes", "1", "-q:v", "2", thumb_path, ] def _build_slideshow_cmd( image_paths: list, audio_path: str, output_path: str, duration_per_image: float ) -> list: n = len(image_paths) inputs = [] for p in image_paths: inputs += ["-i", p] inputs += ["-i", audio_path] scale = ( "scale=1920:1080:force_original_aspect_ratio=decrease," "pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1" ) filter_parts = [f"[{i}:v]{scale}[v{i}]" for i in range(n)] xd = 1.0 if n == 1: filter_str = ";".join(filter_parts) + ";[v0]copy[out]" else: filter_str = ";".join(filter_parts) prev = "v0" for i in range(1, n): offset = max(0.0, duration_per_image * i - xd * i) nxt = "out" if i == n - 1 else f"xf{i}" filter_str += ( f";[{prev}][v{i}]xfade=transition=fade:" f"duration={xd}:offset={offset:.2f}[{nxt}]" ) prev = nxt return [ "ffmpeg", "-y", *inputs, "-filter_complex", filter_str, "-map", "[out]", "-map", f"{n}:a", "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "aac", "-b:a", "192k", "-shortest", output_path, ] def _fetch_pexels_images(keywords: list, count: int = 5) -> list: if not PEXELS_API_KEY or not keywords: return [] query = " ".join(k for k in keywords if k)[:60] try: resp = requests.get( "https://api.pexels.com/v1/search", headers={"Authorization": PEXELS_API_KEY}, params={"query": query, "per_page": count, "orientation": "landscape"}, timeout=10, ) if resp.status_code != 200: return [] return [p["src"]["large2x"] for p in resp.json().get("photos", [])] except Exception: return [] def _download_url(url: str, dest_path: str) -> bool: try: resp = requests.get(url, timeout=30, stream=True) resp.raise_for_status() with open(dest_path, "wb") as f: for chunk in resp.iter_content(8192): f.write(chunk) return True except Exception: return False def _generate_metadata(genre: str, moods: list, lyrics: str, target_countries: list) -> dict: if not ANTHROPIC_API_KEY: tags = [genre] + moods[:3] if genre else moods[:3] return {"yt_title": f"{genre or 'Chill'} Music", "yt_description": "", "yt_tags": tags} import anthropic client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY) countries_str = ", ".join(target_countries) if target_countries else "global" prompt = ( f"YouTube 음악 영상 메타데이터를 JSON으로 생성해주세요.\n" f"장르: {genre}\n분위기: {', '.join(moods)}\n" f"가사 일부: {lyrics[:200] if lyrics else '인스트루멘탈'}\n" f"타겟 국가: {countries_str}\n\n" '{"yt_title":"제목(최대100자,SEO최적화)","yt_description":"설명(500자이내,해시태그포함)",' '"yt_tags":["태그1",...]} 형식으로만 응답.' ) try: msg = client.messages.create( model="claude-haiku-4-5-20251001", max_tokens=1024, messages=[{"role": "user", "content": prompt}], ) text = msg.content[0].text start, end = text.find("{"), text.rfind("}") + 1 return json.loads(text[start:end]) except Exception: return {"yt_title": f"{genre or 'Music'} - Chill Beats", "yt_description": "", "yt_tags": [genre] if genre else []} def _render_visualizer(track: dict, proj: dict, output_path: str) -> None: out_dir = os.path.dirname(output_path) bg_path = os.path.join(out_dir, "bg.jpg") cover_images = track.get("cover_images") or [] if cover_images: ok = _download_url(cover_images[0], bg_path) if not ok: cover_images = [] if not cover_images: _make_gradient_bg(1920, 1080, track.get("genre", "default"), bg_path) cmd = _build_visualizer_cmd(track["file_path"], bg_path, output_path) subprocess.run(cmd, check=True, capture_output=True) def _render_slideshow(track: dict, proj: dict, output_path: str) -> None: out_dir = os.path.dirname(output_path) img_dir = os.path.join(out_dir, "imgs") os.makedirs(img_dir, exist_ok=True) moods = track.get("moods") or [] genre = track.get("genre", "") keywords = [genre] + moods[:2] if genre else moods[:3] pexels_urls = _fetch_pexels_images(keywords, count=5) suno_cover_urls = track.get("cover_images") or [] all_urls = pexels_urls + suno_cover_urls[:2] img_paths = [] for i, url in enumerate(all_urls): dest = os.path.join(img_dir, f"img_{i:02d}.jpg") if url and _download_url(url, dest): img_paths.append(dest) if not img_paths: bg = os.path.join(img_dir, "bg_fallback.jpg") _make_gradient_bg(1920, 1080, genre or "default", bg) img_paths = [bg] duration = track.get("duration_sec") or 180 dur_per_img = max(3.0, duration / len(img_paths)) cmd = _build_slideshow_cmd(img_paths, track["file_path"], output_path, dur_per_img) subprocess.run(cmd, check=True, capture_output=True) def produce_video(project_id: int) -> None: proj = get_video_project(project_id) if not proj: return update_video_project_status(project_id, "rendering") try: track = get_track_by_id(proj["track_id"]) if not track or not track.get("file_path"): raise ValueError(f"트랙 파일 없음 (track_id={proj['track_id']})") out_dir = os.path.join(VIDEO_DATA_DIR, str(project_id)) os.makedirs(out_dir, exist_ok=True) output_path = os.path.join(out_dir, "output.mp4") if proj["format"] == "visualizer": _render_visualizer(track, proj, output_path) elif proj["format"] == "slideshow": _render_slideshow(track, proj, output_path) else: raise ValueError(f"Unknown format: {proj['format']}") thumb_path = os.path.join(out_dir, "thumbnail.jpg") subprocess.run(_build_thumbnail_cmd(output_path, thumb_path), check=True, capture_output=True) meta = _generate_metadata( genre=track.get("genre", ""), moods=track.get("moods") or [], lyrics=track.get("lyrics", ""), target_countries=proj.get("target_countries", []), ) with open(os.path.join(out_dir, "metadata.json"), "w", encoding="utf-8") as f: json.dump(meta, f, ensure_ascii=False, indent=2) update_video_project_status( project_id, "done", output_path=output_path, output_url=f"{VIDEO_MEDIA_BASE}/{project_id}/output.mp4", thumbnail_path=thumb_path, yt_title=meta.get("yt_title", ""), yt_description=meta.get("yt_description", ""), yt_tags=meta.get("yt_tags", []), ) except Exception as e: update_video_project_status(project_id, "failed", error=str(e))