- _build_slideshow_cmd: offset 공식을 `duration_per_image * i - xd * i`로 수정 (누적 전환 오차 제거) - _generate_metadata: genre 빈 문자열일 때 yt_tags에 빈 문자열 삽입 방지 - test: VIDEO_DATA_DIR 패치를 monkeypatch로 교체 (자동 복원 보장) - test: xfade offset 값 검증 테스트 추가 (29.00, 58.00) - test: 미사용 import 제거 (pytest, sqlite3) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
257 lines
8.9 KiB
Python
257 lines
8.9 KiB
Python
import json
|
|
import os
|
|
import subprocess
|
|
from typing import Optional
|
|
|
|
import requests
|
|
|
|
from .db import get_video_project, get_track_by_id, update_video_project_status
|
|
|
|
VIDEO_DATA_DIR = os.getenv("VIDEO_DATA_DIR", "/app/data/videos")
|
|
VIDEO_MEDIA_BASE = os.getenv("VIDEO_MEDIA_BASE", "/media/videos")
|
|
PEXELS_API_KEY = os.getenv("PEXELS_API_KEY", "")
|
|
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
|
|
|
|
GENRE_COLORS = {
|
|
"lo-fi": ((26, 26, 46), (22, 33, 62)),
|
|
"phonk": ((26, 10, 10), (45, 0, 0)),
|
|
"ambient": ((13, 33, 55), (10, 22, 40)),
|
|
"pop": ((26, 10, 46), (45, 27, 78)),
|
|
"default": ((17, 24, 39), (31, 41, 55)),
|
|
}
|
|
|
|
|
|
def _make_gradient_bg(width: int, height: int, genre: str, output_path: str) -> None:
|
|
from PIL import Image
|
|
top_rgb, bot_rgb = GENRE_COLORS.get(genre.lower(), GENRE_COLORS["default"])
|
|
img = Image.new("RGB", (width, height))
|
|
pixels = img.load()
|
|
for y in range(height):
|
|
t = y / height
|
|
r = int(top_rgb[0] + (bot_rgb[0] - top_rgb[0]) * t)
|
|
g = int(top_rgb[1] + (bot_rgb[1] - top_rgb[1]) * t)
|
|
b = int(top_rgb[2] + (bot_rgb[2] - top_rgb[2]) * t)
|
|
for x in range(width):
|
|
pixels[x, y] = (r, g, b)
|
|
img.save(output_path, "JPEG", quality=95)
|
|
|
|
|
|
def _build_visualizer_cmd(audio_path: str, bg_path: str, output_path: str) -> list:
|
|
return [
|
|
"ffmpeg", "-y",
|
|
"-loop", "1", "-i", bg_path,
|
|
"-i", audio_path,
|
|
"-filter_complex",
|
|
"[1:a]showwaves=s=1920x200:mode=cline:colors=0xFF4444@0.8[wave];"
|
|
"[0:v][wave]overlay=0:880[out]",
|
|
"-map", "[out]", "-map", "1:a",
|
|
"-c:v", "libx264", "-preset", "fast", "-crf", "23",
|
|
"-c:a", "aac", "-b:a", "192k",
|
|
"-shortest", output_path,
|
|
]
|
|
|
|
|
|
def _build_thumbnail_cmd(video_path: str, thumb_path: str) -> list:
|
|
return [
|
|
"ffmpeg", "-y",
|
|
"-i", video_path,
|
|
"-ss", "00:00:05",
|
|
"-vframes", "1",
|
|
"-q:v", "2",
|
|
thumb_path,
|
|
]
|
|
|
|
|
|
def _build_slideshow_cmd(
|
|
image_paths: list, audio_path: str, output_path: str, duration_per_image: float
|
|
) -> list:
|
|
n = len(image_paths)
|
|
inputs = []
|
|
for p in image_paths:
|
|
inputs += ["-i", p]
|
|
inputs += ["-i", audio_path]
|
|
|
|
scale = (
|
|
"scale=1920:1080:force_original_aspect_ratio=decrease,"
|
|
"pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1"
|
|
)
|
|
filter_parts = [f"[{i}:v]{scale}[v{i}]" for i in range(n)]
|
|
|
|
xd = 1.0
|
|
if n == 1:
|
|
filter_str = ";".join(filter_parts) + ";[v0]copy[out]"
|
|
else:
|
|
filter_str = ";".join(filter_parts)
|
|
prev = "v0"
|
|
for i in range(1, n):
|
|
offset = max(0.0, duration_per_image * i - xd * i)
|
|
nxt = "out" if i == n - 1 else f"xf{i}"
|
|
filter_str += (
|
|
f";[{prev}][v{i}]xfade=transition=fade:"
|
|
f"duration={xd}:offset={offset:.2f}[{nxt}]"
|
|
)
|
|
prev = nxt
|
|
|
|
return [
|
|
"ffmpeg", "-y",
|
|
*inputs,
|
|
"-filter_complex", filter_str,
|
|
"-map", "[out]", "-map", f"{n}:a",
|
|
"-c:v", "libx264", "-preset", "fast", "-crf", "23",
|
|
"-c:a", "aac", "-b:a", "192k",
|
|
"-shortest", output_path,
|
|
]
|
|
|
|
|
|
def _fetch_pexels_images(keywords: list, count: int = 5) -> list:
|
|
if not PEXELS_API_KEY or not keywords:
|
|
return []
|
|
query = " ".join(k for k in keywords if k)[:60]
|
|
try:
|
|
resp = requests.get(
|
|
"https://api.pexels.com/v1/search",
|
|
headers={"Authorization": PEXELS_API_KEY},
|
|
params={"query": query, "per_page": count, "orientation": "landscape"},
|
|
timeout=10,
|
|
)
|
|
if resp.status_code != 200:
|
|
return []
|
|
return [p["src"]["large2x"] for p in resp.json().get("photos", [])]
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def _download_url(url: str, dest_path: str) -> bool:
|
|
try:
|
|
resp = requests.get(url, timeout=30, stream=True)
|
|
resp.raise_for_status()
|
|
with open(dest_path, "wb") as f:
|
|
for chunk in resp.iter_content(8192):
|
|
f.write(chunk)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _generate_metadata(genre: str, moods: list, lyrics: str, target_countries: list) -> dict:
|
|
if not ANTHROPIC_API_KEY:
|
|
tags = [genre] + moods[:3] if genre else moods[:3]
|
|
return {"yt_title": f"{genre or 'Chill'} Music", "yt_description": "", "yt_tags": tags}
|
|
|
|
import anthropic
|
|
client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
|
|
countries_str = ", ".join(target_countries) if target_countries else "global"
|
|
prompt = (
|
|
f"YouTube 음악 영상 메타데이터를 JSON으로 생성해주세요.\n"
|
|
f"장르: {genre}\n분위기: {', '.join(moods)}\n"
|
|
f"가사 일부: {lyrics[:200] if lyrics else '인스트루멘탈'}\n"
|
|
f"타겟 국가: {countries_str}\n\n"
|
|
'{"yt_title":"제목(최대100자,SEO최적화)","yt_description":"설명(500자이내,해시태그포함)",'
|
|
'"yt_tags":["태그1",...]} 형식으로만 응답.'
|
|
)
|
|
try:
|
|
msg = client.messages.create(
|
|
model="claude-haiku-4-5-20251001",
|
|
max_tokens=1024,
|
|
messages=[{"role": "user", "content": prompt}],
|
|
)
|
|
text = msg.content[0].text
|
|
start, end = text.find("{"), text.rfind("}") + 1
|
|
return json.loads(text[start:end])
|
|
except Exception:
|
|
return {"yt_title": f"{genre or 'Music'} - Chill Beats", "yt_description": "", "yt_tags": [genre] if genre else []}
|
|
|
|
|
|
def _render_visualizer(track: dict, proj: dict, output_path: str) -> None:
|
|
out_dir = os.path.dirname(output_path)
|
|
bg_path = os.path.join(out_dir, "bg.jpg")
|
|
|
|
cover_images = track.get("cover_images") or []
|
|
if cover_images:
|
|
ok = _download_url(cover_images[0], bg_path)
|
|
if not ok:
|
|
cover_images = []
|
|
if not cover_images:
|
|
_make_gradient_bg(1920, 1080, track.get("genre", "default"), bg_path)
|
|
|
|
cmd = _build_visualizer_cmd(track["file_path"], bg_path, output_path)
|
|
subprocess.run(cmd, check=True, capture_output=True)
|
|
|
|
|
|
def _render_slideshow(track: dict, proj: dict, output_path: str) -> None:
|
|
out_dir = os.path.dirname(output_path)
|
|
img_dir = os.path.join(out_dir, "imgs")
|
|
os.makedirs(img_dir, exist_ok=True)
|
|
|
|
moods = track.get("moods") or []
|
|
genre = track.get("genre", "")
|
|
keywords = [genre] + moods[:2] if genre else moods[:3]
|
|
|
|
pexels_urls = _fetch_pexels_images(keywords, count=5)
|
|
suno_cover_urls = track.get("cover_images") or []
|
|
all_urls = pexels_urls + suno_cover_urls[:2]
|
|
|
|
img_paths = []
|
|
for i, url in enumerate(all_urls):
|
|
dest = os.path.join(img_dir, f"img_{i:02d}.jpg")
|
|
if url and _download_url(url, dest):
|
|
img_paths.append(dest)
|
|
|
|
if not img_paths:
|
|
bg = os.path.join(img_dir, "bg_fallback.jpg")
|
|
_make_gradient_bg(1920, 1080, genre or "default", bg)
|
|
img_paths = [bg]
|
|
|
|
duration = track.get("duration_sec") or 180
|
|
dur_per_img = max(3.0, duration / len(img_paths))
|
|
cmd = _build_slideshow_cmd(img_paths, track["file_path"], output_path, dur_per_img)
|
|
subprocess.run(cmd, check=True, capture_output=True)
|
|
|
|
|
|
def produce_video(project_id: int) -> None:
|
|
proj = get_video_project(project_id)
|
|
if not proj:
|
|
return
|
|
|
|
update_video_project_status(project_id, "rendering")
|
|
|
|
try:
|
|
track = get_track_by_id(proj["track_id"])
|
|
if not track or not track.get("file_path"):
|
|
raise ValueError(f"트랙 파일 없음 (track_id={proj['track_id']})")
|
|
|
|
out_dir = os.path.join(VIDEO_DATA_DIR, str(project_id))
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
output_path = os.path.join(out_dir, "output.mp4")
|
|
|
|
if proj["format"] == "visualizer":
|
|
_render_visualizer(track, proj, output_path)
|
|
elif proj["format"] == "slideshow":
|
|
_render_slideshow(track, proj, output_path)
|
|
else:
|
|
raise ValueError(f"Unknown format: {proj['format']}")
|
|
|
|
thumb_path = os.path.join(out_dir, "thumbnail.jpg")
|
|
subprocess.run(_build_thumbnail_cmd(output_path, thumb_path), check=True, capture_output=True)
|
|
|
|
meta = _generate_metadata(
|
|
genre=track.get("genre", ""),
|
|
moods=track.get("moods") or [],
|
|
lyrics=track.get("lyrics", ""),
|
|
target_countries=proj.get("target_countries", []),
|
|
)
|
|
with open(os.path.join(out_dir, "metadata.json"), "w", encoding="utf-8") as f:
|
|
json.dump(meta, f, ensure_ascii=False, indent=2)
|
|
|
|
update_video_project_status(
|
|
project_id, "done",
|
|
output_path=output_path,
|
|
output_url=f"{VIDEO_MEDIA_BASE}/{project_id}/output.mp4",
|
|
thumbnail_path=thumb_path,
|
|
yt_title=meta.get("yt_title", ""),
|
|
yt_description=meta.get("yt_description", ""),
|
|
yt_tags=meta.get("yt_tags", []),
|
|
)
|
|
except Exception as e:
|
|
update_video_project_status(project_id, "failed", error=str(e))
|