근본원인 실험 확정: redis-py 블로킹 read에서 socket_timeout이 BLMOVE 블록(5s) 이하/None이면 read_timeout 경계 경합으로 간헐 "Timeout reading" → dequeue 실패 → 슬레이트 draft 정지. socket_timeout 10/30은 모든 실험에서 안정. 블록보다 큰 30으로 명시(직전 None 커밋은 단독 테스트만 통과시켜 오도 — 재사용 패턴서 깨짐). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
233 lines
8.1 KiB
Python
233 lines
8.1 KiB
Python
"""worker.py — Redis BLPOP + webhook 단위 테스트."""
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
import pytest
|
|
import httpx
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
import worker
|
|
from card_renderer import render_slate, init_browser, shutdown_browser
|
|
|
|
|
|
@pytest.fixture
|
|
def fake_slate():
|
|
return {
|
|
"id": 42,
|
|
"cover_copy": json.dumps({"headline": "테스트 H", "body": "테스트 B", "accent_color": "#FF0000"}),
|
|
"body_copies": json.dumps([{"headline": "본문1", "body": "..."} for _ in range(8)]),
|
|
"cta_copy": json.dumps({"headline": "CTA", "body": "...", "cta": "Click"}),
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_post_update_sends_correct_payload(monkeypatch):
|
|
monkeypatch.setenv("INTERNAL_API_KEY", "test-secret")
|
|
monkeypatch.setenv("NAS_BASE_URL", "http://nas.test")
|
|
# worker 모듈 환경변수 재로딩
|
|
worker.NAS_BASE_URL = "http://nas.test"
|
|
worker.INTERNAL_API_KEY = "test-secret"
|
|
|
|
captured = {}
|
|
async def fake_post(self, url, headers=None, json=None, **kw):
|
|
captured["url"] = url
|
|
captured["headers"] = headers
|
|
captured["json"] = json
|
|
class R:
|
|
status_code = 200
|
|
text = "ok"
|
|
return R()
|
|
monkeypatch.setattr(httpx.AsyncClient, "post", fake_post)
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
await worker._post_update(client, "t-1", "processing", 30)
|
|
|
|
assert captured["url"] == "http://nas.test/api/internal/insta/update"
|
|
assert captured["headers"]["X-Internal-Key"] == "test-secret"
|
|
assert captured["json"]["status"] == "processing"
|
|
assert captured["json"]["progress"] == 30
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_one_success_calls_webhook_twice(monkeypatch, fake_slate):
|
|
"""processing(50) → succeeded(100) 두 번 호출 + render 한 번."""
|
|
calls: list = []
|
|
|
|
async def fake_post(self, url, headers=None, json=None, **kw):
|
|
calls.append({"status": json["status"], "progress": json["progress"]})
|
|
class R:
|
|
status_code = 200
|
|
text = "ok"
|
|
return R()
|
|
|
|
async def fake_get(self, url, **kw):
|
|
class R:
|
|
status_code = 200
|
|
def json(self_inner): return fake_slate
|
|
def raise_for_status(self_inner): pass
|
|
return R()
|
|
|
|
async def fake_render(slate, slate_id, template="default/card.html.j2"):
|
|
return [f"/tmp/{slate_id}/{i:02d}.png" for i in range(1, 11)]
|
|
|
|
monkeypatch.setattr(httpx.AsyncClient, "post", fake_post)
|
|
monkeypatch.setattr(httpx.AsyncClient, "get", fake_get)
|
|
monkeypatch.setattr(worker, "render_slate", fake_render)
|
|
worker.INTERNAL_API_KEY = "test"
|
|
worker.NAS_BASE_URL = "http://nas.test"
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
await worker._process_one(client, {
|
|
"task_id": "t-2",
|
|
"params": {"slate_id": 42, "theme": "default"},
|
|
})
|
|
|
|
statuses = [c["status"] for c in calls]
|
|
assert "processing" in statuses
|
|
assert "succeeded" in statuses
|
|
assert calls[-1]["progress"] == 100
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_one_render_failure_reports_failed(monkeypatch, fake_slate):
|
|
"""render 예외 시 failed webhook 호출."""
|
|
calls: list = []
|
|
|
|
async def fake_post(self, url, headers=None, json=None, **kw):
|
|
calls.append(json)
|
|
class R: status_code = 200; text = "ok"
|
|
return R()
|
|
|
|
async def fake_get(self, url, **kw):
|
|
class R:
|
|
status_code = 200
|
|
def json(self_inner): return fake_slate
|
|
def raise_for_status(self_inner): pass
|
|
return R()
|
|
|
|
async def fake_render(*a, **k):
|
|
raise RuntimeError("Chromium crashed")
|
|
|
|
monkeypatch.setattr(httpx.AsyncClient, "post", fake_post)
|
|
monkeypatch.setattr(httpx.AsyncClient, "get", fake_get)
|
|
monkeypatch.setattr(worker, "render_slate", fake_render)
|
|
worker.INTERNAL_API_KEY = "test"
|
|
worker.NAS_BASE_URL = "http://nas.test"
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
# F6: _process_one은 webhook(failed) 호출 후 raise — poll_once가 fail(raw)로 retry/dead-letter.
|
|
with pytest.raises(RuntimeError, match="Chromium"):
|
|
await worker._process_one(client, {
|
|
"task_id": "t-3",
|
|
"params": {"slate_id": 99},
|
|
})
|
|
|
|
last = calls[-1]
|
|
assert last["status"] == "failed"
|
|
assert "Chromium" in last["error"]
|
|
|
|
|
|
# ----- F6: ReliableQueue (ack on success, fail on exception) -----
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_poll_once_acks_on_success(monkeypatch):
|
|
"""F6 — 성공 시 queue.ack(raw) 호출 + fail 안 부름."""
|
|
fake_payload = {
|
|
"task_id": "t-ok",
|
|
"params": {"slate_id": 7, "theme": "default"},
|
|
}
|
|
fake_raw = json.dumps(fake_payload).encode()
|
|
|
|
fake_queue = AsyncMock()
|
|
fake_queue.dequeue = AsyncMock(return_value=(fake_payload, fake_raw))
|
|
fake_queue.ack = AsyncMock()
|
|
fake_queue.fail = AsyncMock()
|
|
|
|
process_mock = AsyncMock()
|
|
monkeypatch.setattr(worker, "_process_one", process_mock)
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
handled = await worker.poll_once(fake_queue, client)
|
|
|
|
assert handled is True
|
|
process_mock.assert_awaited_once()
|
|
fake_queue.ack.assert_awaited_once_with(fake_raw)
|
|
fake_queue.fail.assert_not_awaited()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_poll_once_calls_fail_on_exception(monkeypatch):
|
|
"""F6 — _process_one 예외 시 queue.fail(raw, payload) 호출."""
|
|
fake_payload = {
|
|
"task_id": "t-err",
|
|
"params": {"slate_id": 9, "theme": "default"},
|
|
}
|
|
fake_raw = json.dumps(fake_payload).encode()
|
|
|
|
fake_queue = AsyncMock()
|
|
fake_queue.dequeue = AsyncMock(return_value=(fake_payload, fake_raw))
|
|
fake_queue.ack = AsyncMock()
|
|
fake_queue.fail = AsyncMock()
|
|
|
|
async def boom(client, payload):
|
|
raise RuntimeError("simulated dispatch failure")
|
|
|
|
monkeypatch.setattr(worker, "_process_one", boom)
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
handled = await worker.poll_once(fake_queue, client)
|
|
|
|
assert handled is True
|
|
fake_queue.fail.assert_awaited_once_with(fake_raw, fake_payload)
|
|
fake_queue.ack.assert_not_awaited()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_render_produces_nonempty_1080x1350(tmp_path, monkeypatch):
|
|
"""Phase 2 — fonts.ready 대기 + PNG 비어있음 검증: 10장 모두 > 1000 bytes."""
|
|
import card_renderer as _cr
|
|
templates_dir = str(Path(__file__).resolve().parent.parent / "templates")
|
|
monkeypatch.setattr(_cr, "CARD_TEMPLATE_DIR", templates_dir)
|
|
monkeypatch.setattr(_cr, "INSTA_MEDIA_ROOT", str(tmp_path))
|
|
await init_browser()
|
|
try:
|
|
slate = {
|
|
"cover_copy": {"headline": "헤드라인", "body": "서브", "accent_color": "#0F62FE"},
|
|
"body_copies": [{"headline": f"포인트{i}", "body": "본문"} for i in range(8)],
|
|
"cta_copy": {"headline": "요약", "body": "마무리", "cta": "팔로우"},
|
|
}
|
|
paths = await render_slate(slate, slate_id=99999)
|
|
assert len(paths) == 10
|
|
for p in paths:
|
|
assert os.path.getsize(p) > 1000 # 비어있지 않음
|
|
finally:
|
|
await shutdown_browser()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_poll_once_returns_false_on_timeout(monkeypatch):
|
|
"""F6 — dequeue가 None 반환(타임아웃)이면 False 리턴, ack/fail 안 부름."""
|
|
fake_queue = AsyncMock()
|
|
fake_queue.dequeue = AsyncMock(return_value=None)
|
|
fake_queue.ack = AsyncMock()
|
|
fake_queue.fail = AsyncMock()
|
|
|
|
process_mock = AsyncMock()
|
|
monkeypatch.setattr(worker, "_process_one", process_mock)
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
handled = await worker.poll_once(fake_queue, client)
|
|
|
|
assert handled is False
|
|
process_mock.assert_not_awaited()
|
|
fake_queue.ack.assert_not_awaited()
|
|
fake_queue.fail.assert_not_awaited()
|
|
|
|
|
|
def test_make_queue_redis_socket_timeout_exceeds_block():
|
|
"""BLMOVE(블록 5s) dequeue가 read-timeout 경계 경합으로 깨지지 않도록
|
|
socket_timeout이 블록보다 충분히 커야 한다 (회귀 가드)."""
|
|
c = worker.make_queue_redis()
|
|
st = c.connection_pool.connection_kwargs.get("socket_timeout")
|
|
assert st is not None and st > 5 # blmove 블록(5s)보다 커야 안정
|