Files
web-page-backend/insta-lab/tests/test_internal_router.py
gahusb 0c12c3527f feat(insta-lab): internal webhook /api/internal/insta/update (SP-4)
Windows insta-render worker가 작업 진행률·완료·실패를 보고할 수신부.
X-Internal-Key 인증 필수. 4건의 단위 테스트로 status·error·result_path 검증.

Plan-B-Insta Phase 1.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 01:57:17 +09:00

81 lines
2.3 KiB
Python

"""POST /api/internal/insta/update — Windows worker webhook."""
import os
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from app.internal_router import router
from app import db
@pytest.fixture(autouse=True)
def _set_key(monkeypatch):
monkeypatch.setenv("INTERNAL_API_KEY", "test-secret")
@pytest.fixture
def client(tmp_path, monkeypatch):
# SQLite in-memory test
monkeypatch.setenv("INSTA_DATA_PATH", str(tmp_path))
db.init_db()
app = FastAPI()
app.include_router(router)
return TestClient(app)
def _make_task():
return db.create_task("slate_render", {"slate_id": 42})
def test_update_with_valid_key_updates_db(client):
tid = _make_task()
r = client.post(
"/api/internal/insta/update",
headers={"X-Internal-Key": "test-secret"},
json={"task_id": tid, "status": "processing", "progress": 30},
)
assert r.status_code == 200
task = db.get_task(tid)
assert task["status"] == "processing"
assert task["progress"] == 30
def test_update_with_invalid_key_returns_401(client):
tid = _make_task()
r = client.post(
"/api/internal/insta/update",
headers={"X-Internal-Key": "wrong"},
json={"task_id": tid, "status": "processing", "progress": 30},
)
assert r.status_code == 401
def test_update_succeeded_sets_result_path(client):
tid = _make_task()
r = client.post(
"/api/internal/insta/update",
headers={"X-Internal-Key": "test-secret"},
json={
"task_id": tid,
"status": "succeeded",
"progress": 100,
"result_path": "/media/insta/42/01.png",
},
)
assert r.status_code == 200
task = db.get_task(tid)
assert task["status"] == "succeeded"
assert task["result_id"] is not None # slate_id from input_data
def test_update_failed_records_error(client):
tid = _make_task()
r = client.post(
"/api/internal/insta/update",
headers={"X-Internal-Key": "test-secret"},
json={"task_id": tid, "status": "failed", "progress": 0, "error": "Chromium crashed"},
)
assert r.status_code == 200
task = db.get_task(tid)
assert task["status"] == "failed"
assert "Chromium" in (task.get("error") or "")