feat(music-lab): video_producer — FFmpeg 비주얼라이저·슬라이드쇼 + Claude 메타데이터
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
256
music-lab/app/video_producer.py
Normal file
256
music-lab/app/video_producer.py
Normal file
@@ -0,0 +1,256 @@
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
from typing import Optional
|
||||
|
||||
import requests
|
||||
|
||||
from .db import get_video_project, get_track_by_id, update_video_project_status
|
||||
|
||||
VIDEO_DATA_DIR = os.getenv("VIDEO_DATA_DIR", "/app/data/videos")
|
||||
VIDEO_MEDIA_BASE = os.getenv("VIDEO_MEDIA_BASE", "/media/videos")
|
||||
PEXELS_API_KEY = os.getenv("PEXELS_API_KEY", "")
|
||||
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
|
||||
|
||||
GENRE_COLORS = {
|
||||
"lo-fi": ((26, 26, 46), (22, 33, 62)),
|
||||
"phonk": ((26, 10, 10), (45, 0, 0)),
|
||||
"ambient": ((13, 33, 55), (10, 22, 40)),
|
||||
"pop": ((26, 10, 46), (45, 27, 78)),
|
||||
"default": ((17, 24, 39), (31, 41, 55)),
|
||||
}
|
||||
|
||||
|
||||
def _make_gradient_bg(width: int, height: int, genre: str, output_path: str) -> None:
|
||||
from PIL import Image
|
||||
top_rgb, bot_rgb = GENRE_COLORS.get(genre.lower(), GENRE_COLORS["default"])
|
||||
img = Image.new("RGB", (width, height))
|
||||
pixels = img.load()
|
||||
for y in range(height):
|
||||
t = y / height
|
||||
r = int(top_rgb[0] + (bot_rgb[0] - top_rgb[0]) * t)
|
||||
g = int(top_rgb[1] + (bot_rgb[1] - top_rgb[1]) * t)
|
||||
b = int(top_rgb[2] + (bot_rgb[2] - top_rgb[2]) * t)
|
||||
for x in range(width):
|
||||
pixels[x, y] = (r, g, b)
|
||||
img.save(output_path, "JPEG", quality=95)
|
||||
|
||||
|
||||
def _build_visualizer_cmd(audio_path: str, bg_path: str, output_path: str) -> list:
|
||||
return [
|
||||
"ffmpeg", "-y",
|
||||
"-loop", "1", "-i", bg_path,
|
||||
"-i", audio_path,
|
||||
"-filter_complex",
|
||||
"[1:a]showwaves=s=1920x200:mode=cline:colors=0xFF4444@0.8[wave];"
|
||||
"[0:v][wave]overlay=0:880[out]",
|
||||
"-map", "[out]", "-map", "1:a",
|
||||
"-c:v", "libx264", "-preset", "fast", "-crf", "23",
|
||||
"-c:a", "aac", "-b:a", "192k",
|
||||
"-shortest", output_path,
|
||||
]
|
||||
|
||||
|
||||
def _build_thumbnail_cmd(video_path: str, thumb_path: str) -> list:
|
||||
return [
|
||||
"ffmpeg", "-y",
|
||||
"-i", video_path,
|
||||
"-ss", "00:00:05",
|
||||
"-vframes", "1",
|
||||
"-q:v", "2",
|
||||
thumb_path,
|
||||
]
|
||||
|
||||
|
||||
def _build_slideshow_cmd(
|
||||
image_paths: list, audio_path: str, output_path: str, duration_per_image: float
|
||||
) -> list:
|
||||
n = len(image_paths)
|
||||
inputs = []
|
||||
for p in image_paths:
|
||||
inputs += ["-i", p]
|
||||
inputs += ["-i", audio_path]
|
||||
|
||||
scale = (
|
||||
"scale=1920:1080:force_original_aspect_ratio=decrease,"
|
||||
"pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1"
|
||||
)
|
||||
filter_parts = [f"[{i}:v]{scale}[v{i}]" for i in range(n)]
|
||||
|
||||
xd = 1.0
|
||||
if n == 1:
|
||||
filter_str = ";".join(filter_parts) + ";[v0]copy[out]"
|
||||
else:
|
||||
filter_str = ";".join(filter_parts)
|
||||
prev = "v0"
|
||||
for i in range(1, n):
|
||||
offset = max(0.0, duration_per_image * i - xd)
|
||||
nxt = "out" if i == n - 1 else f"xf{i}"
|
||||
filter_str += (
|
||||
f";[{prev}][v{i}]xfade=transition=fade:"
|
||||
f"duration={xd}:offset={offset:.2f}[{nxt}]"
|
||||
)
|
||||
prev = nxt
|
||||
|
||||
return [
|
||||
"ffmpeg", "-y",
|
||||
*inputs,
|
||||
"-filter_complex", filter_str,
|
||||
"-map", "[out]", "-map", f"{n}:a",
|
||||
"-c:v", "libx264", "-preset", "fast", "-crf", "23",
|
||||
"-c:a", "aac", "-b:a", "192k",
|
||||
"-shortest", output_path,
|
||||
]
|
||||
|
||||
|
||||
def _fetch_pexels_images(keywords: list, count: int = 5) -> list:
|
||||
if not PEXELS_API_KEY or not keywords:
|
||||
return []
|
||||
query = " ".join(k for k in keywords if k)[:60]
|
||||
try:
|
||||
resp = requests.get(
|
||||
"https://api.pexels.com/v1/search",
|
||||
headers={"Authorization": PEXELS_API_KEY},
|
||||
params={"query": query, "per_page": count, "orientation": "landscape"},
|
||||
timeout=10,
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
return []
|
||||
return [p["src"]["large2x"] for p in resp.json().get("photos", [])]
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
def _download_url(url: str, dest_path: str) -> bool:
|
||||
try:
|
||||
resp = requests.get(url, timeout=30, stream=True)
|
||||
resp.raise_for_status()
|
||||
with open(dest_path, "wb") as f:
|
||||
for chunk in resp.iter_content(8192):
|
||||
f.write(chunk)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _generate_metadata(genre: str, moods: list, lyrics: str, target_countries: list) -> dict:
|
||||
if not ANTHROPIC_API_KEY:
|
||||
tags = [genre] + moods[:3] if genre else moods[:3]
|
||||
return {"yt_title": f"{genre or 'Chill'} Music", "yt_description": "", "yt_tags": tags}
|
||||
|
||||
import anthropic
|
||||
client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
|
||||
countries_str = ", ".join(target_countries) if target_countries else "global"
|
||||
prompt = (
|
||||
f"YouTube 음악 영상 메타데이터를 JSON으로 생성해주세요.\n"
|
||||
f"장르: {genre}\n분위기: {', '.join(moods)}\n"
|
||||
f"가사 일부: {lyrics[:200] if lyrics else '인스트루멘탈'}\n"
|
||||
f"타겟 국가: {countries_str}\n\n"
|
||||
'{"yt_title":"제목(최대100자,SEO최적화)","yt_description":"설명(500자이내,해시태그포함)",'
|
||||
'"yt_tags":["태그1",...]} 형식으로만 응답.'
|
||||
)
|
||||
try:
|
||||
msg = client.messages.create(
|
||||
model="claude-haiku-4-5-20251001",
|
||||
max_tokens=1024,
|
||||
messages=[{"role": "user", "content": prompt}],
|
||||
)
|
||||
text = msg.content[0].text
|
||||
start, end = text.find("{"), text.rfind("}") + 1
|
||||
return json.loads(text[start:end])
|
||||
except Exception:
|
||||
return {"yt_title": f"{genre or 'Music'} - Chill Beats", "yt_description": "", "yt_tags": [genre]}
|
||||
|
||||
|
||||
def _render_visualizer(track: dict, proj: dict, output_path: str) -> None:
|
||||
out_dir = os.path.dirname(output_path)
|
||||
bg_path = os.path.join(out_dir, "bg.jpg")
|
||||
|
||||
cover_images = track.get("cover_images") or []
|
||||
if cover_images:
|
||||
ok = _download_url(cover_images[0], bg_path)
|
||||
if not ok:
|
||||
cover_images = []
|
||||
if not cover_images:
|
||||
_make_gradient_bg(1920, 1080, track.get("genre", "default"), bg_path)
|
||||
|
||||
cmd = _build_visualizer_cmd(track["file_path"], bg_path, output_path)
|
||||
subprocess.run(cmd, check=True, capture_output=True)
|
||||
|
||||
|
||||
def _render_slideshow(track: dict, proj: dict, output_path: str) -> None:
|
||||
out_dir = os.path.dirname(output_path)
|
||||
img_dir = os.path.join(out_dir, "imgs")
|
||||
os.makedirs(img_dir, exist_ok=True)
|
||||
|
||||
moods = track.get("moods") or []
|
||||
genre = track.get("genre", "")
|
||||
keywords = [genre] + moods[:2] if genre else moods[:3]
|
||||
|
||||
pexels_urls = _fetch_pexels_images(keywords, count=5)
|
||||
suno_cover_urls = track.get("cover_images") or []
|
||||
all_urls = pexels_urls + suno_cover_urls[:2]
|
||||
|
||||
img_paths = []
|
||||
for i, url in enumerate(all_urls):
|
||||
dest = os.path.join(img_dir, f"img_{i:02d}.jpg")
|
||||
if url and _download_url(url, dest):
|
||||
img_paths.append(dest)
|
||||
|
||||
if not img_paths:
|
||||
bg = os.path.join(img_dir, "bg_fallback.jpg")
|
||||
_make_gradient_bg(1920, 1080, genre or "default", bg)
|
||||
img_paths = [bg]
|
||||
|
||||
duration = track.get("duration_sec") or 180
|
||||
dur_per_img = max(3.0, duration / len(img_paths))
|
||||
cmd = _build_slideshow_cmd(img_paths, track["file_path"], output_path, dur_per_img)
|
||||
subprocess.run(cmd, check=True, capture_output=True)
|
||||
|
||||
|
||||
def produce_video(project_id: int) -> None:
|
||||
proj = get_video_project(project_id)
|
||||
if not proj:
|
||||
return
|
||||
|
||||
update_video_project_status(project_id, "rendering")
|
||||
|
||||
try:
|
||||
track = get_track_by_id(proj["track_id"])
|
||||
if not track or not track.get("file_path"):
|
||||
raise ValueError(f"트랙 파일 없음 (track_id={proj['track_id']})")
|
||||
|
||||
out_dir = os.path.join(VIDEO_DATA_DIR, str(project_id))
|
||||
os.makedirs(out_dir, exist_ok=True)
|
||||
output_path = os.path.join(out_dir, "output.mp4")
|
||||
|
||||
if proj["format"] == "visualizer":
|
||||
_render_visualizer(track, proj, output_path)
|
||||
elif proj["format"] == "slideshow":
|
||||
_render_slideshow(track, proj, output_path)
|
||||
else:
|
||||
raise ValueError(f"Unknown format: {proj['format']}")
|
||||
|
||||
thumb_path = os.path.join(out_dir, "thumbnail.jpg")
|
||||
subprocess.run(_build_thumbnail_cmd(output_path, thumb_path), check=True, capture_output=True)
|
||||
|
||||
meta = _generate_metadata(
|
||||
genre=track.get("genre", ""),
|
||||
moods=track.get("moods") or [],
|
||||
lyrics=track.get("lyrics", ""),
|
||||
target_countries=proj.get("target_countries", []),
|
||||
)
|
||||
with open(os.path.join(out_dir, "metadata.json"), "w", encoding="utf-8") as f:
|
||||
json.dump(meta, f, ensure_ascii=False, indent=2)
|
||||
|
||||
update_video_project_status(
|
||||
project_id, "done",
|
||||
output_path=output_path,
|
||||
output_url=f"{VIDEO_MEDIA_BASE}/{project_id}/output.mp4",
|
||||
thumbnail_path=thumb_path,
|
||||
yt_title=meta.get("yt_title", ""),
|
||||
yt_description=meta.get("yt_description", ""),
|
||||
yt_tags=meta.get("yt_tags", []),
|
||||
)
|
||||
except Exception as e:
|
||||
update_video_project_status(project_id, "failed", error=str(e))
|
||||
98
music-lab/tests/test_video_producer.py
Normal file
98
music-lab/tests/test_video_producer.py
Normal file
@@ -0,0 +1,98 @@
|
||||
# music-lab/tests/test_video_producer.py
|
||||
import os
|
||||
from unittest.mock import patch, MagicMock
|
||||
import pytest
|
||||
|
||||
|
||||
def test_build_visualizer_cmd():
|
||||
from app.video_producer import _build_visualizer_cmd
|
||||
cmd = _build_visualizer_cmd(
|
||||
audio_path="/data/music/test.mp3",
|
||||
bg_path="/tmp/bg.jpg",
|
||||
output_path="/data/videos/1/output.mp4",
|
||||
)
|
||||
assert cmd[0] == "ffmpeg"
|
||||
assert "/data/music/test.mp3" in cmd
|
||||
assert "/data/videos/1/output.mp4" in cmd
|
||||
assert any("showwaves" in str(c) for c in cmd)
|
||||
|
||||
|
||||
def test_make_gradient_bg_uses_pillow(tmp_path):
|
||||
from app.video_producer import _make_gradient_bg
|
||||
out = str(tmp_path / "bg.jpg")
|
||||
_make_gradient_bg(1920, 1080, "lo-fi", out)
|
||||
assert os.path.exists(out)
|
||||
assert os.path.getsize(out) > 0
|
||||
|
||||
|
||||
def test_extract_thumbnail_cmd():
|
||||
from app.video_producer import _build_thumbnail_cmd
|
||||
cmd = _build_thumbnail_cmd("/data/videos/1/output.mp4", "/data/videos/1/thumbnail.jpg")
|
||||
assert cmd[0] == "ffmpeg"
|
||||
assert "00:00:05" in cmd
|
||||
assert "/data/videos/1/thumbnail.jpg" in cmd
|
||||
|
||||
|
||||
def test_build_slideshow_cmd_single_image():
|
||||
from app.video_producer import _build_slideshow_cmd
|
||||
cmd = _build_slideshow_cmd(
|
||||
image_paths=["/tmp/img0.jpg"],
|
||||
audio_path="/tmp/audio.mp3",
|
||||
output_path="/tmp/out.mp4",
|
||||
duration_per_image=30.0,
|
||||
)
|
||||
assert "ffmpeg" in cmd[0]
|
||||
assert "/tmp/out.mp4" in cmd
|
||||
assert any("copy" in str(c) for c in cmd)
|
||||
|
||||
|
||||
def test_build_slideshow_cmd_multiple_images():
|
||||
from app.video_producer import _build_slideshow_cmd
|
||||
cmd = _build_slideshow_cmd(
|
||||
image_paths=["/tmp/img0.jpg", "/tmp/img1.jpg", "/tmp/img2.jpg"],
|
||||
audio_path="/tmp/audio.mp3",
|
||||
output_path="/tmp/out.mp4",
|
||||
duration_per_image=60.0,
|
||||
)
|
||||
assert "ffmpeg" in cmd[0]
|
||||
assert any("xfade" in str(c) for c in cmd)
|
||||
assert "/tmp/out.mp4" in cmd
|
||||
|
||||
|
||||
def test_produce_video_visualizer_calls_ffmpeg(tmp_db, tmp_path):
|
||||
"""produce_video가 visualizer 포맷으로 FFmpeg를 호출하는지 확인."""
|
||||
from app.db import init_db, create_video_project
|
||||
import sqlite3
|
||||
|
||||
init_db()
|
||||
|
||||
# music_library에 직접 트랙 삽입
|
||||
import app.db as db_mod
|
||||
with db_mod._conn() as conn:
|
||||
conn.execute(
|
||||
"""INSERT INTO music_library (title, genre, audio_url, file_path, provider)
|
||||
VALUES (?, ?, ?, ?, ?)""",
|
||||
("Test Track", "lo-fi", "/media/music/test.mp3",
|
||||
str(tmp_path / "test.mp3"), "suno"),
|
||||
)
|
||||
|
||||
# 빈 mp3 파일 생성
|
||||
(tmp_path / "test.mp3").write_bytes(b"\x00" * 100)
|
||||
|
||||
create_video_project({"track_id": 1, "format": "visualizer", "target_countries": ["BR"]})
|
||||
|
||||
import app.video_producer as vp
|
||||
vp.VIDEO_DATA_DIR = str(tmp_path / "videos")
|
||||
|
||||
with patch("app.video_producer.subprocess.run") as mock_run, \
|
||||
patch("app.video_producer._generate_metadata", return_value={
|
||||
"yt_title": "Chill Beats", "yt_description": "desc", "yt_tags": ["lofi"]
|
||||
}), \
|
||||
patch("app.video_producer._download_url", return_value=False):
|
||||
mock_run.return_value = MagicMock(returncode=0)
|
||||
vp.produce_video(1)
|
||||
|
||||
from app.db import get_video_project
|
||||
proj = get_video_project(1)
|
||||
assert proj["status"] == "done"
|
||||
assert mock_run.called
|
||||
Reference in New Issue
Block a user