From 7336fd090e90ff63ce2d214fc13635a35c956ced Mon Sep 17 00:00:00 2001 From: gahusb Date: Fri, 1 May 2026 11:49:42 +0900 Subject: [PATCH] =?UTF-8?q?feat(music-lab):=20video=5Fproducer=20=E2=80=94?= =?UTF-8?q?=20FFmpeg=20=EB=B9=84=EC=A3=BC=EC=96=BC=EB=9D=BC=EC=9D=B4?= =?UTF-8?q?=EC=A0=80=C2=B7=EC=8A=AC=EB=9D=BC=EC=9D=B4=EB=93=9C=EC=87=BC=20?= =?UTF-8?q?+=20Claude=20=EB=A9=94=ED=83=80=EB=8D=B0=EC=9D=B4=ED=84=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Sonnet 4.6 --- music-lab/app/video_producer.py | 256 +++++++++++++++++++++++++ music-lab/tests/test_video_producer.py | 98 ++++++++++ 2 files changed, 354 insertions(+) create mode 100644 music-lab/app/video_producer.py create mode 100644 music-lab/tests/test_video_producer.py diff --git a/music-lab/app/video_producer.py b/music-lab/app/video_producer.py new file mode 100644 index 0000000..af2d033 --- /dev/null +++ b/music-lab/app/video_producer.py @@ -0,0 +1,256 @@ +import json +import os +import subprocess +from typing import Optional + +import requests + +from .db import get_video_project, get_track_by_id, update_video_project_status + +VIDEO_DATA_DIR = os.getenv("VIDEO_DATA_DIR", "/app/data/videos") +VIDEO_MEDIA_BASE = os.getenv("VIDEO_MEDIA_BASE", "/media/videos") +PEXELS_API_KEY = os.getenv("PEXELS_API_KEY", "") +ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") + +GENRE_COLORS = { + "lo-fi": ((26, 26, 46), (22, 33, 62)), + "phonk": ((26, 10, 10), (45, 0, 0)), + "ambient": ((13, 33, 55), (10, 22, 40)), + "pop": ((26, 10, 46), (45, 27, 78)), + "default": ((17, 24, 39), (31, 41, 55)), +} + + +def _make_gradient_bg(width: int, height: int, genre: str, output_path: str) -> None: + from PIL import Image + top_rgb, bot_rgb = GENRE_COLORS.get(genre.lower(), GENRE_COLORS["default"]) + img = Image.new("RGB", (width, height)) + pixels = img.load() + for y in range(height): + t = y / height + r = int(top_rgb[0] + (bot_rgb[0] - top_rgb[0]) * t) + g = int(top_rgb[1] + (bot_rgb[1] - top_rgb[1]) * t) + b = int(top_rgb[2] + (bot_rgb[2] - top_rgb[2]) * t) + for x in range(width): + pixels[x, y] = (r, g, b) + img.save(output_path, "JPEG", quality=95) + + +def _build_visualizer_cmd(audio_path: str, bg_path: str, output_path: str) -> list: + return [ + "ffmpeg", "-y", + "-loop", "1", "-i", bg_path, + "-i", audio_path, + "-filter_complex", + "[1:a]showwaves=s=1920x200:mode=cline:colors=0xFF4444@0.8[wave];" + "[0:v][wave]overlay=0:880[out]", + "-map", "[out]", "-map", "1:a", + "-c:v", "libx264", "-preset", "fast", "-crf", "23", + "-c:a", "aac", "-b:a", "192k", + "-shortest", output_path, + ] + + +def _build_thumbnail_cmd(video_path: str, thumb_path: str) -> list: + return [ + "ffmpeg", "-y", + "-i", video_path, + "-ss", "00:00:05", + "-vframes", "1", + "-q:v", "2", + thumb_path, + ] + + +def _build_slideshow_cmd( + image_paths: list, audio_path: str, output_path: str, duration_per_image: float +) -> list: + n = len(image_paths) + inputs = [] + for p in image_paths: + inputs += ["-i", p] + inputs += ["-i", audio_path] + + scale = ( + "scale=1920:1080:force_original_aspect_ratio=decrease," + "pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1" + ) + filter_parts = [f"[{i}:v]{scale}[v{i}]" for i in range(n)] + + xd = 1.0 + if n == 1: + filter_str = ";".join(filter_parts) + ";[v0]copy[out]" + else: + filter_str = ";".join(filter_parts) + prev = "v0" + for i in range(1, n): + offset = max(0.0, duration_per_image * i - xd) + nxt = "out" if i == n - 1 else f"xf{i}" + filter_str += ( + f";[{prev}][v{i}]xfade=transition=fade:" + f"duration={xd}:offset={offset:.2f}[{nxt}]" + ) + prev = nxt + + return [ + "ffmpeg", "-y", + *inputs, + "-filter_complex", filter_str, + "-map", "[out]", "-map", f"{n}:a", + "-c:v", "libx264", "-preset", "fast", "-crf", "23", + "-c:a", "aac", "-b:a", "192k", + "-shortest", output_path, + ] + + +def _fetch_pexels_images(keywords: list, count: int = 5) -> list: + if not PEXELS_API_KEY or not keywords: + return [] + query = " ".join(k for k in keywords if k)[:60] + try: + resp = requests.get( + "https://api.pexels.com/v1/search", + headers={"Authorization": PEXELS_API_KEY}, + params={"query": query, "per_page": count, "orientation": "landscape"}, + timeout=10, + ) + if resp.status_code != 200: + return [] + return [p["src"]["large2x"] for p in resp.json().get("photos", [])] + except Exception: + return [] + + +def _download_url(url: str, dest_path: str) -> bool: + try: + resp = requests.get(url, timeout=30, stream=True) + resp.raise_for_status() + with open(dest_path, "wb") as f: + for chunk in resp.iter_content(8192): + f.write(chunk) + return True + except Exception: + return False + + +def _generate_metadata(genre: str, moods: list, lyrics: str, target_countries: list) -> dict: + if not ANTHROPIC_API_KEY: + tags = [genre] + moods[:3] if genre else moods[:3] + return {"yt_title": f"{genre or 'Chill'} Music", "yt_description": "", "yt_tags": tags} + + import anthropic + client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY) + countries_str = ", ".join(target_countries) if target_countries else "global" + prompt = ( + f"YouTube 음악 영상 메타데이터를 JSON으로 생성해주세요.\n" + f"장르: {genre}\n분위기: {', '.join(moods)}\n" + f"가사 일부: {lyrics[:200] if lyrics else '인스트루멘탈'}\n" + f"타겟 국가: {countries_str}\n\n" + '{"yt_title":"제목(최대100자,SEO최적화)","yt_description":"설명(500자이내,해시태그포함)",' + '"yt_tags":["태그1",...]} 형식으로만 응답.' + ) + try: + msg = client.messages.create( + model="claude-haiku-4-5-20251001", + max_tokens=1024, + messages=[{"role": "user", "content": prompt}], + ) + text = msg.content[0].text + start, end = text.find("{"), text.rfind("}") + 1 + return json.loads(text[start:end]) + except Exception: + return {"yt_title": f"{genre or 'Music'} - Chill Beats", "yt_description": "", "yt_tags": [genre]} + + +def _render_visualizer(track: dict, proj: dict, output_path: str) -> None: + out_dir = os.path.dirname(output_path) + bg_path = os.path.join(out_dir, "bg.jpg") + + cover_images = track.get("cover_images") or [] + if cover_images: + ok = _download_url(cover_images[0], bg_path) + if not ok: + cover_images = [] + if not cover_images: + _make_gradient_bg(1920, 1080, track.get("genre", "default"), bg_path) + + cmd = _build_visualizer_cmd(track["file_path"], bg_path, output_path) + subprocess.run(cmd, check=True, capture_output=True) + + +def _render_slideshow(track: dict, proj: dict, output_path: str) -> None: + out_dir = os.path.dirname(output_path) + img_dir = os.path.join(out_dir, "imgs") + os.makedirs(img_dir, exist_ok=True) + + moods = track.get("moods") or [] + genre = track.get("genre", "") + keywords = [genre] + moods[:2] if genre else moods[:3] + + pexels_urls = _fetch_pexels_images(keywords, count=5) + suno_cover_urls = track.get("cover_images") or [] + all_urls = pexels_urls + suno_cover_urls[:2] + + img_paths = [] + for i, url in enumerate(all_urls): + dest = os.path.join(img_dir, f"img_{i:02d}.jpg") + if url and _download_url(url, dest): + img_paths.append(dest) + + if not img_paths: + bg = os.path.join(img_dir, "bg_fallback.jpg") + _make_gradient_bg(1920, 1080, genre or "default", bg) + img_paths = [bg] + + duration = track.get("duration_sec") or 180 + dur_per_img = max(3.0, duration / len(img_paths)) + cmd = _build_slideshow_cmd(img_paths, track["file_path"], output_path, dur_per_img) + subprocess.run(cmd, check=True, capture_output=True) + + +def produce_video(project_id: int) -> None: + proj = get_video_project(project_id) + if not proj: + return + + update_video_project_status(project_id, "rendering") + + try: + track = get_track_by_id(proj["track_id"]) + if not track or not track.get("file_path"): + raise ValueError(f"트랙 파일 없음 (track_id={proj['track_id']})") + + out_dir = os.path.join(VIDEO_DATA_DIR, str(project_id)) + os.makedirs(out_dir, exist_ok=True) + output_path = os.path.join(out_dir, "output.mp4") + + if proj["format"] == "visualizer": + _render_visualizer(track, proj, output_path) + elif proj["format"] == "slideshow": + _render_slideshow(track, proj, output_path) + else: + raise ValueError(f"Unknown format: {proj['format']}") + + thumb_path = os.path.join(out_dir, "thumbnail.jpg") + subprocess.run(_build_thumbnail_cmd(output_path, thumb_path), check=True, capture_output=True) + + meta = _generate_metadata( + genre=track.get("genre", ""), + moods=track.get("moods") or [], + lyrics=track.get("lyrics", ""), + target_countries=proj.get("target_countries", []), + ) + with open(os.path.join(out_dir, "metadata.json"), "w", encoding="utf-8") as f: + json.dump(meta, f, ensure_ascii=False, indent=2) + + update_video_project_status( + project_id, "done", + output_path=output_path, + output_url=f"{VIDEO_MEDIA_BASE}/{project_id}/output.mp4", + thumbnail_path=thumb_path, + yt_title=meta.get("yt_title", ""), + yt_description=meta.get("yt_description", ""), + yt_tags=meta.get("yt_tags", []), + ) + except Exception as e: + update_video_project_status(project_id, "failed", error=str(e)) diff --git a/music-lab/tests/test_video_producer.py b/music-lab/tests/test_video_producer.py new file mode 100644 index 0000000..30807b2 --- /dev/null +++ b/music-lab/tests/test_video_producer.py @@ -0,0 +1,98 @@ +# music-lab/tests/test_video_producer.py +import os +from unittest.mock import patch, MagicMock +import pytest + + +def test_build_visualizer_cmd(): + from app.video_producer import _build_visualizer_cmd + cmd = _build_visualizer_cmd( + audio_path="/data/music/test.mp3", + bg_path="/tmp/bg.jpg", + output_path="/data/videos/1/output.mp4", + ) + assert cmd[0] == "ffmpeg" + assert "/data/music/test.mp3" in cmd + assert "/data/videos/1/output.mp4" in cmd + assert any("showwaves" in str(c) for c in cmd) + + +def test_make_gradient_bg_uses_pillow(tmp_path): + from app.video_producer import _make_gradient_bg + out = str(tmp_path / "bg.jpg") + _make_gradient_bg(1920, 1080, "lo-fi", out) + assert os.path.exists(out) + assert os.path.getsize(out) > 0 + + +def test_extract_thumbnail_cmd(): + from app.video_producer import _build_thumbnail_cmd + cmd = _build_thumbnail_cmd("/data/videos/1/output.mp4", "/data/videos/1/thumbnail.jpg") + assert cmd[0] == "ffmpeg" + assert "00:00:05" in cmd + assert "/data/videos/1/thumbnail.jpg" in cmd + + +def test_build_slideshow_cmd_single_image(): + from app.video_producer import _build_slideshow_cmd + cmd = _build_slideshow_cmd( + image_paths=["/tmp/img0.jpg"], + audio_path="/tmp/audio.mp3", + output_path="/tmp/out.mp4", + duration_per_image=30.0, + ) + assert "ffmpeg" in cmd[0] + assert "/tmp/out.mp4" in cmd + assert any("copy" in str(c) for c in cmd) + + +def test_build_slideshow_cmd_multiple_images(): + from app.video_producer import _build_slideshow_cmd + cmd = _build_slideshow_cmd( + image_paths=["/tmp/img0.jpg", "/tmp/img1.jpg", "/tmp/img2.jpg"], + audio_path="/tmp/audio.mp3", + output_path="/tmp/out.mp4", + duration_per_image=60.0, + ) + assert "ffmpeg" in cmd[0] + assert any("xfade" in str(c) for c in cmd) + assert "/tmp/out.mp4" in cmd + + +def test_produce_video_visualizer_calls_ffmpeg(tmp_db, tmp_path): + """produce_video가 visualizer 포맷으로 FFmpeg를 호출하는지 확인.""" + from app.db import init_db, create_video_project + import sqlite3 + + init_db() + + # music_library에 직접 트랙 삽입 + import app.db as db_mod + with db_mod._conn() as conn: + conn.execute( + """INSERT INTO music_library (title, genre, audio_url, file_path, provider) + VALUES (?, ?, ?, ?, ?)""", + ("Test Track", "lo-fi", "/media/music/test.mp3", + str(tmp_path / "test.mp3"), "suno"), + ) + + # 빈 mp3 파일 생성 + (tmp_path / "test.mp3").write_bytes(b"\x00" * 100) + + create_video_project({"track_id": 1, "format": "visualizer", "target_countries": ["BR"]}) + + import app.video_producer as vp + vp.VIDEO_DATA_DIR = str(tmp_path / "videos") + + with patch("app.video_producer.subprocess.run") as mock_run, \ + patch("app.video_producer._generate_metadata", return_value={ + "yt_title": "Chill Beats", "yt_description": "desc", "yt_tags": ["lofi"] + }), \ + patch("app.video_producer._download_url", return_value=False): + mock_run.return_value = MagicMock(returncode=0) + vp.produce_video(1) + + from app.db import get_video_project + proj = get_video_project(1) + assert proj["status"] == "done" + assert mock_run.called