Files
ai-trade/services/insta-render/tests/test_worker.py

225 lines
7.7 KiB
Python

"""worker.py — Redis BLPOP + webhook 단위 테스트."""
import json
import os
from pathlib import Path
import pytest
import httpx
from unittest.mock import AsyncMock, patch
import worker
from card_renderer import render_slate, init_browser, shutdown_browser
@pytest.fixture
def fake_slate():
return {
"id": 42,
"cover_copy": json.dumps({"headline": "테스트 H", "body": "테스트 B", "accent_color": "#FF0000"}),
"body_copies": json.dumps([{"headline": "본문1", "body": "..."} for _ in range(8)]),
"cta_copy": json.dumps({"headline": "CTA", "body": "...", "cta": "Click"}),
}
@pytest.mark.asyncio
async def test_post_update_sends_correct_payload(monkeypatch):
monkeypatch.setenv("INTERNAL_API_KEY", "test-secret")
monkeypatch.setenv("NAS_BASE_URL", "http://nas.test")
# worker 모듈 환경변수 재로딩
worker.NAS_BASE_URL = "http://nas.test"
worker.INTERNAL_API_KEY = "test-secret"
captured = {}
async def fake_post(self, url, headers=None, json=None, **kw):
captured["url"] = url
captured["headers"] = headers
captured["json"] = json
class R:
status_code = 200
text = "ok"
return R()
monkeypatch.setattr(httpx.AsyncClient, "post", fake_post)
async with httpx.AsyncClient() as client:
await worker._post_update(client, "t-1", "processing", 30)
assert captured["url"] == "http://nas.test/api/internal/insta/update"
assert captured["headers"]["X-Internal-Key"] == "test-secret"
assert captured["json"]["status"] == "processing"
assert captured["json"]["progress"] == 30
@pytest.mark.asyncio
async def test_process_one_success_calls_webhook_twice(monkeypatch, fake_slate):
"""processing(50) → succeeded(100) 두 번 호출 + render 한 번."""
calls: list = []
async def fake_post(self, url, headers=None, json=None, **kw):
calls.append({"status": json["status"], "progress": json["progress"]})
class R:
status_code = 200
text = "ok"
return R()
async def fake_get(self, url, **kw):
class R:
status_code = 200
def json(self_inner): return fake_slate
def raise_for_status(self_inner): pass
return R()
async def fake_render(slate, slate_id, template="default/card.html.j2"):
return [f"/tmp/{slate_id}/{i:02d}.png" for i in range(1, 11)]
monkeypatch.setattr(httpx.AsyncClient, "post", fake_post)
monkeypatch.setattr(httpx.AsyncClient, "get", fake_get)
monkeypatch.setattr(worker, "render_slate", fake_render)
worker.INTERNAL_API_KEY = "test"
worker.NAS_BASE_URL = "http://nas.test"
async with httpx.AsyncClient() as client:
await worker._process_one(client, {
"task_id": "t-2",
"params": {"slate_id": 42, "theme": "default"},
})
statuses = [c["status"] for c in calls]
assert "processing" in statuses
assert "succeeded" in statuses
assert calls[-1]["progress"] == 100
@pytest.mark.asyncio
async def test_process_one_render_failure_reports_failed(monkeypatch, fake_slate):
"""render 예외 시 failed webhook 호출."""
calls: list = []
async def fake_post(self, url, headers=None, json=None, **kw):
calls.append(json)
class R: status_code = 200; text = "ok"
return R()
async def fake_get(self, url, **kw):
class R:
status_code = 200
def json(self_inner): return fake_slate
def raise_for_status(self_inner): pass
return R()
async def fake_render(*a, **k):
raise RuntimeError("Chromium crashed")
monkeypatch.setattr(httpx.AsyncClient, "post", fake_post)
monkeypatch.setattr(httpx.AsyncClient, "get", fake_get)
monkeypatch.setattr(worker, "render_slate", fake_render)
worker.INTERNAL_API_KEY = "test"
worker.NAS_BASE_URL = "http://nas.test"
async with httpx.AsyncClient() as client:
# F6: _process_one은 webhook(failed) 호출 후 raise — poll_once가 fail(raw)로 retry/dead-letter.
with pytest.raises(RuntimeError, match="Chromium"):
await worker._process_one(client, {
"task_id": "t-3",
"params": {"slate_id": 99},
})
last = calls[-1]
assert last["status"] == "failed"
assert "Chromium" in last["error"]
# ----- F6: ReliableQueue (ack on success, fail on exception) -----
@pytest.mark.asyncio
async def test_poll_once_acks_on_success(monkeypatch):
"""F6 — 성공 시 queue.ack(raw) 호출 + fail 안 부름."""
fake_payload = {
"task_id": "t-ok",
"params": {"slate_id": 7, "theme": "default"},
}
fake_raw = json.dumps(fake_payload).encode()
fake_queue = AsyncMock()
fake_queue.dequeue = AsyncMock(return_value=(fake_payload, fake_raw))
fake_queue.ack = AsyncMock()
fake_queue.fail = AsyncMock()
process_mock = AsyncMock()
monkeypatch.setattr(worker, "_process_one", process_mock)
async with httpx.AsyncClient() as client:
handled = await worker.poll_once(fake_queue, client)
assert handled is True
process_mock.assert_awaited_once()
fake_queue.ack.assert_awaited_once_with(fake_raw)
fake_queue.fail.assert_not_awaited()
@pytest.mark.asyncio
async def test_poll_once_calls_fail_on_exception(monkeypatch):
"""F6 — _process_one 예외 시 queue.fail(raw, payload) 호출."""
fake_payload = {
"task_id": "t-err",
"params": {"slate_id": 9, "theme": "default"},
}
fake_raw = json.dumps(fake_payload).encode()
fake_queue = AsyncMock()
fake_queue.dequeue = AsyncMock(return_value=(fake_payload, fake_raw))
fake_queue.ack = AsyncMock()
fake_queue.fail = AsyncMock()
async def boom(client, payload):
raise RuntimeError("simulated dispatch failure")
monkeypatch.setattr(worker, "_process_one", boom)
async with httpx.AsyncClient() as client:
handled = await worker.poll_once(fake_queue, client)
assert handled is True
fake_queue.fail.assert_awaited_once_with(fake_raw, fake_payload)
fake_queue.ack.assert_not_awaited()
@pytest.mark.asyncio
async def test_render_produces_nonempty_1080x1350(tmp_path, monkeypatch):
"""Phase 2 — fonts.ready 대기 + PNG 비어있음 검증: 10장 모두 > 1000 bytes."""
import card_renderer as _cr
templates_dir = str(Path(__file__).resolve().parent.parent / "templates")
monkeypatch.setattr(_cr, "CARD_TEMPLATE_DIR", templates_dir)
monkeypatch.setattr(_cr, "INSTA_MEDIA_ROOT", str(tmp_path))
await init_browser()
try:
slate = {
"cover_copy": {"headline": "헤드라인", "body": "서브", "accent_color": "#0F62FE"},
"body_copies": [{"headline": f"포인트{i}", "body": "본문"} for i in range(8)],
"cta_copy": {"headline": "요약", "body": "마무리", "cta": "팔로우"},
}
paths = await render_slate(slate, slate_id=99999)
assert len(paths) == 10
for p in paths:
assert os.path.getsize(p) > 1000 # 비어있지 않음
finally:
await shutdown_browser()
@pytest.mark.asyncio
async def test_poll_once_returns_false_on_timeout(monkeypatch):
"""F6 — dequeue가 None 반환(타임아웃)이면 False 리턴, ack/fail 안 부름."""
fake_queue = AsyncMock()
fake_queue.dequeue = AsyncMock(return_value=None)
fake_queue.ack = AsyncMock()
fake_queue.fail = AsyncMock()
process_mock = AsyncMock()
monkeypatch.setattr(worker, "_process_one", process_mock)
async with httpx.AsyncClient() as client:
handled = await worker.poll_once(fake_queue, client)
assert handled is False
process_mock.assert_not_awaited()
fake_queue.ack.assert_not_awaited()
fake_queue.fail.assert_not_awaited()