Files
web-page-backend/music-lab/tests/test_pipeline_endpoints.py

177 lines
6.1 KiB
Python

import sqlite3
import pytest
from unittest.mock import AsyncMock, patch
from fastapi.testclient import TestClient
from app.main import app
from app import db
@pytest.fixture
def client(monkeypatch, tmp_path):
monkeypatch.setattr(db, "DB_PATH", str(tmp_path / "music.db"))
db.init_db()
# 최소 트랙 1개 — music_library 테이블에 직접 삽입
conn = sqlite3.connect(db.DB_PATH)
cur = conn.cursor()
cur.execute(
"""INSERT INTO music_library
(id, title, genre, moods, instruments, duration_sec, bpm, key, scale,
prompt, audio_url, file_path, task_id, tags)
VALUES (1, 'T', 'lo-fi', '["chill"]', '["piano"]', 120, 85, 'C', 'maj',
'p', '/media/music/x.mp3', '/app/data/music/x.mp3', NULL, '[]')""",
)
conn.commit()
conn.close()
return TestClient(app)
def test_create_pipeline(client):
r = client.post("/api/music/pipeline", json={"track_id": 1})
assert r.status_code == 201
assert r.json()["state"] == "created"
def test_create_duplicate_pipeline_returns_409(client):
client.post("/api/music/pipeline", json={"track_id": 1})
r = client.post("/api/music/pipeline", json={"track_id": 1})
assert r.status_code == 409
def test_get_pipeline_returns_jobs_and_feedback(client):
pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
r = client.get(f"/api/music/pipeline/{pid}")
assert "jobs" in r.json()
assert "feedback" in r.json()
def test_list_pipelines_active_filter(client):
pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
db.update_pipeline_state(pid, "published")
r = client.get("/api/music/pipeline?status=active")
assert all(p["state"] != "published" for p in r.json()["pipelines"])
def test_feedback_reject_records_feedback_and_increments_count(client):
pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
db.update_pipeline_state(pid, "cover_pending")
# orchestrator.run_step를 mock해서 백그라운드 작업이 cover_pending을 변경하지 않도록
with patch("app.main.orchestrator.run_step", new=AsyncMock()):
r = client.post(
f"/api/music/pipeline/{pid}/feedback",
json={"step": "cover", "intent": "reject", "feedback_text": "더 어둡게"},
)
assert r.status_code == 202
p = db.get_pipeline(pid)
assert p["feedback_count_per_step"]["cover"] == 1
history = db.get_feedback_history(pid)
assert history[0]["feedback_text"] == "더 어둡게"
def test_feedback_after_5_rejects_marks_awaiting_manual(client):
pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
db.update_pipeline_state(pid, "cover_pending")
with patch("app.main.orchestrator.run_step", new=AsyncMock()):
for i in range(5):
client.post(
f"/api/music/pipeline/{pid}/feedback",
json={"step": "cover", "intent": "reject", "feedback_text": f"again {i}"},
)
r = client.post(
f"/api/music/pipeline/{pid}/feedback",
json={"step": "cover", "intent": "reject", "feedback_text": "6th"},
)
assert r.status_code == 409
assert db.get_pipeline(pid)["state"] == "awaiting_manual"
def test_cancel_pipeline(client):
pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
r = client.post(f"/api/music/pipeline/{pid}/cancel")
assert r.status_code == 200
assert db.get_pipeline(pid)["state"] == "cancelled"
def test_setup_get_returns_defaults(client):
r = client.get("/api/music/setup")
assert r.status_code == 200
assert r.json()["review_threshold"] == 60
def test_setup_put_updates(client):
r = client.put("/api/music/setup", json={"review_threshold": 70})
assert r.status_code == 200
assert r.json()["review_threshold"] == 70
def test_youtube_status_when_disconnected(client):
r = client.get("/api/music/youtube/status")
assert r.status_code == 200
assert r.json() == {"connected": False}
def test_create_pipeline_with_compile_job(client, monkeypatch):
import sqlite3
conn = sqlite3.connect(db.DB_PATH)
cur = conn.cursor()
try:
cur.execute("""
INSERT INTO compile_jobs (title, track_ids_json, crossfade_sec,
audio_path, status, created_at)
VALUES ('Test Mix', '[1,2,3]', 3, '/app/data/compiles/9.mp3',
'succeeded', datetime())
""")
except sqlite3.OperationalError:
pytest.skip("compile_jobs schema mismatch")
conn.commit()
cid = cur.lastrowid
conn.close()
r = client.post("/api/music/pipeline", json={"compile_job_id": cid})
assert r.status_code == 201
body = r.json()
assert body["track_id"] is None
assert body["compile_job_id"] == cid
assert body["visual_style"] == "essential"
def test_create_pipeline_rejects_both_inputs(client):
r = client.post("/api/music/pipeline", json={"track_id": 1, "compile_job_id": 1})
assert r.status_code == 400
def test_create_pipeline_rejects_neither(client):
r = client.post("/api/music/pipeline", json={})
assert r.status_code == 400
def test_create_pipeline_rejects_compile_not_ready(client):
import sqlite3
conn = sqlite3.connect(db.DB_PATH)
cur = conn.cursor()
try:
cur.execute("""
INSERT INTO compile_jobs (title, status, created_at)
VALUES ('Pending', 'rendering', datetime())
""")
except sqlite3.OperationalError:
pytest.skip("compile_jobs schema mismatch")
conn.commit()
cid = cur.lastrowid
conn.close()
r = client.post("/api/music/pipeline", json={"compile_job_id": cid})
assert r.status_code == 400
def test_create_pipeline_with_visual_options(client):
r = client.post("/api/music/pipeline", json={
"track_id": 1, "visual_style": "single",
"background_mode": "video_loop", "background_keyword": "rain",
})
assert r.status_code == 201
body = r.json()
assert body["visual_style"] == "single"
assert body["background_mode"] == "video_loop"
assert body["background_keyword"] == "rain"