diff --git a/services/image-render/main.py b/services/image-render/main.py index 25dad49..72da45e 100644 --- a/services/image-render/main.py +++ b/services/image-render/main.py @@ -3,11 +3,14 @@ from __future__ import annotations import asyncio import logging +import os from contextlib import asynccontextmanager +import redis.asyncio as aioredis from fastapi import FastAPI import worker +from _shared.heartbeat import heartbeat_loop logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) @@ -16,15 +19,19 @@ logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): worker_task = asyncio.create_task(worker.worker_loop()) + hb_redis = aioredis.from_url(os.getenv("REDIS_URL", "redis://192.168.45.54:6379"), decode_responses=False) + hb_task = asyncio.create_task(heartbeat_loop(hb_redis, "image-render", "render", worker.stats)) logger.info("image-render lifespan 시작") try: yield finally: - worker_task.cancel() - try: - await worker_task - except asyncio.CancelledError: - pass + for t in (worker_task, hb_task): + t.cancel() + try: + await t + except asyncio.CancelledError: + pass + await hb_redis.aclose() logger.info("image-render lifespan 종료") diff --git a/services/image-render/tests/test_worker.py b/services/image-render/tests/test_worker.py index 207b737..3e551da 100644 --- a/services/image-render/tests/test_worker.py +++ b/services/image-render/tests/test_worker.py @@ -67,3 +67,25 @@ async def test_poll_once_returns_false_on_timeout(monkeypatch): assert handled is False fake_queue.ack.assert_not_awaited() fake_queue.fail.assert_not_awaited() + + +# ----- heartbeat stats 카운터 ----- + +class _OneJobQueue: + def __init__(self): self.acked = False + async def dequeue(self, timeout=5): + if self.acked: return None + return ({"job_type": "flux_generation", "task_id": "t1", "params": {}}, b"raw") + async def ack(self, raw): self.acked = True + async def fail(self, raw, payload): pass + + +@pytest.mark.asyncio +async def test_poll_once_increments_jobs_done(monkeypatch): + worker.stats.jobs_done = 0 + monkeypatch.setattr(worker, "run_flux_generation", lambda task_id, params: None) + handled = await worker.poll_once(_OneJobQueue()) + assert handled is True + assert worker.stats.jobs_done == 1 + assert worker.stats.busy is False + assert worker.stats.last_job_at is not None diff --git a/services/image-render/worker.py b/services/image-render/worker.py index 03ab021..9b43666 100644 --- a/services/image-render/worker.py +++ b/services/image-render/worker.py @@ -18,6 +18,7 @@ from providers.gpt_image import run_gpt_image_generation from providers.nano_banana import run_nano_banana_generation from providers.flux import run_flux_generation from _shared.reliable_queue import ReliableQueue +from _shared.heartbeat import WorkerStats, utc_now_iso logger = logging.getLogger(__name__) @@ -25,6 +26,8 @@ REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379") QUEUE_KEY = "queue:image-render" PAUSED_KEY = "queue:paused" +stats = WorkerStats() + # string names so `unittest.mock.patch` / `monkeypatch.setattr` on `worker.` # is correctly intercepted by getattr(sys.modules[__name__], ...) _DISPATCH_TABLE = { @@ -59,14 +62,21 @@ async def poll_once(queue: ReliableQueue) -> bool: if result is None: return False payload, raw = result + stats.busy = True try: await asyncio.to_thread(_dispatch, payload) except Exception: logger.exception("dispatch unhandled exception task_id=%s", payload.get("task_id")) await queue.fail(raw, payload) + stats.jobs_failed += 1 + stats.last_job_at = utc_now_iso() + stats.busy = False return True await queue.ack(raw) + stats.jobs_done += 1 + stats.last_job_at = utc_now_iso() + stats.busy = False return True diff --git a/services/insta-render/main.py b/services/insta-render/main.py index 0d93b76..ee98c11 100644 --- a/services/insta-render/main.py +++ b/services/insta-render/main.py @@ -3,12 +3,15 @@ from __future__ import annotations import asyncio import logging +import os from contextlib import asynccontextmanager +import redis.asyncio as aioredis from fastapi import FastAPI import card_renderer import worker +from _shared.heartbeat import heartbeat_loop logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) @@ -20,15 +23,19 @@ async def lifespan(app: FastAPI): await card_renderer.init_browser() # 큐 워커 백그라운드 시작 worker_task = asyncio.create_task(worker.worker_loop()) + hb_redis = aioredis.from_url(os.getenv("REDIS_URL", "redis://192.168.45.54:6379"), decode_responses=False) + hb_task = asyncio.create_task(heartbeat_loop(hb_redis, "insta-render", "render", worker.stats)) logger.info("insta-render lifespan 시작") try: yield finally: - worker_task.cancel() - try: - await worker_task - except asyncio.CancelledError: - pass + for t in (worker_task, hb_task): + t.cancel() + try: + await t + except asyncio.CancelledError: + pass + await hb_redis.aclose() await card_renderer.shutdown_browser() logger.info("insta-render lifespan 종료") diff --git a/services/insta-render/tests/test_worker.py b/services/insta-render/tests/test_worker.py index ef47b47..38bd12b 100644 --- a/services/insta-render/tests/test_worker.py +++ b/services/insta-render/tests/test_worker.py @@ -230,3 +230,27 @@ def test_make_queue_redis_socket_timeout_exceeds_block(): c = worker.make_queue_redis() st = c.connection_pool.connection_kwargs.get("socket_timeout") assert st is not None and st > 5 # blmove 블록(5s)보다 커야 안정 + + +# ----- heartbeat stats 카운터 ----- + +class _OneJobQueueInsta: + def __init__(self): self.acked = False + async def dequeue(self, timeout=5): + if self.acked: return None + return ({"task_id": "t1", "params": {"slate_id": 1, "theme": "default"}}, b"raw") + async def ack(self, raw): self.acked = True + async def fail(self, raw, payload): pass + + +@pytest.mark.asyncio +async def test_poll_once_increments_jobs_done(monkeypatch): + worker.stats.jobs_done = 0 + async def fake_process(client, payload): pass + monkeypatch.setattr(worker, "_process_one", fake_process) + async with httpx.AsyncClient() as client: + handled = await worker.poll_once(_OneJobQueueInsta(), client) + assert handled is True + assert worker.stats.jobs_done == 1 + assert worker.stats.busy is False + assert worker.stats.last_job_at is not None diff --git a/services/insta-render/worker.py b/services/insta-render/worker.py index 86dd207..e47791d 100644 --- a/services/insta-render/worker.py +++ b/services/insta-render/worker.py @@ -14,9 +14,11 @@ import redis.asyncio as aioredis from card_renderer import render_slate from _shared.reliable_queue import ReliableQueue +from _shared.heartbeat import WorkerStats, utc_now_iso logger = logging.getLogger(__name__) +stats = WorkerStats() REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379") NAS_BASE_URL = os.getenv("NAS_BASE_URL", "http://192.168.45.54:18700") @@ -89,12 +91,19 @@ async def poll_once(queue: ReliableQueue, client: httpx.AsyncClient) -> bool: if result is None: return False payload, raw = result + stats.busy = True try: await _process_one(client, payload) except Exception: await queue.fail(raw, payload) + stats.jobs_failed += 1 + stats.last_job_at = utc_now_iso() + stats.busy = False return True await queue.ack(raw) + stats.jobs_done += 1 + stats.last_job_at = utc_now_iso() + stats.busy = False return True diff --git a/services/music-render/main.py b/services/music-render/main.py index 743b48a..285300c 100644 --- a/services/music-render/main.py +++ b/services/music-render/main.py @@ -7,12 +7,15 @@ from __future__ import annotations import asyncio import logging +import os from contextlib import asynccontextmanager +import redis.asyncio as aioredis from fastapi import FastAPI, HTTPException from pydantic import BaseModel import worker +from _shared.heartbeat import heartbeat_loop from providers.sync_ops import ( generate_lyrics, get_credits, get_timestamped_lyrics, generate_style_boost, @@ -25,15 +28,19 @@ logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): worker_task = asyncio.create_task(worker.worker_loop()) + hb_redis = aioredis.from_url(os.getenv("REDIS_URL", "redis://192.168.45.54:6379"), decode_responses=False) + hb_task = asyncio.create_task(heartbeat_loop(hb_redis, "music-render", "render", worker.stats)) logger.info("music-render lifespan 시작") try: yield finally: - worker_task.cancel() - try: - await worker_task - except asyncio.CancelledError: - pass + for t in (worker_task, hb_task): + t.cancel() + try: + await t + except asyncio.CancelledError: + pass + await hb_redis.aclose() logger.info("music-render lifespan 종료") diff --git a/services/music-render/tests/test_worker.py b/services/music-render/tests/test_worker.py index 8c8193b..5510725 100644 --- a/services/music-render/tests/test_worker.py +++ b/services/music-render/tests/test_worker.py @@ -167,3 +167,25 @@ async def test_poll_once_returns_false_on_timeout(monkeypatch): dispatch_mock.assert_not_called() fake_queue.ack.assert_not_awaited() fake_queue.fail.assert_not_awaited() + + +# ----- heartbeat stats 카운터 ----- + +class _OneJobQueue: + def __init__(self): self.acked = False + async def dequeue(self, timeout=5): + if self.acked: return None + return ({"job_type": "suno_generation", "task_id": "t1", "params": {}}, b"raw") + async def ack(self, raw): self.acked = True + async def fail(self, raw, payload): pass + + +@pytest.mark.asyncio +async def test_poll_once_increments_jobs_done(monkeypatch): + worker.stats.jobs_done = 0 + monkeypatch.setattr(worker, "run_suno_generation", lambda task_id, params: None) + handled = await worker.poll_once(_OneJobQueue()) + assert handled is True + assert worker.stats.jobs_done == 1 + assert worker.stats.busy is False + assert worker.stats.last_job_at is not None diff --git a/services/music-render/worker.py b/services/music-render/worker.py index b57807f..ff512c8 100644 --- a/services/music-render/worker.py +++ b/services/music-render/worker.py @@ -21,6 +21,7 @@ from providers.suno import ( ) from providers.local import run_local_generation from _shared.reliable_queue import ReliableQueue +from _shared.heartbeat import WorkerStats, utc_now_iso logger = logging.getLogger(__name__) @@ -28,6 +29,8 @@ REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379") QUEUE_KEY = "queue:music-render" PAUSED_KEY = "queue:paused" +stats = WorkerStats() + # Maps job_type → module-level function name (string). # _dispatch resolves the name via globals() at call time so unittest.mock.patch # on "worker." is correctly intercepted. @@ -74,6 +77,7 @@ async def poll_once(queue: ReliableQueue) -> bool: if result is None: return False payload, raw = result + stats.busy = True try: # sync provider 함수 — thread로 실행해서 이벤트 루프 블로킹 방지 await asyncio.to_thread(_dispatch, payload) @@ -81,8 +85,14 @@ async def poll_once(queue: ReliableQueue) -> bool: logger.exception("dispatch unhandled exception task_id=%s", payload.get("task_id")) await queue.fail(raw, payload) + stats.jobs_failed += 1 + stats.last_job_at = utc_now_iso() + stats.busy = False return True await queue.ack(raw) + stats.jobs_done += 1 + stats.last_job_at = utc_now_iso() + stats.busy = False return True diff --git a/services/video-render/main.py b/services/video-render/main.py index 4641de3..337d5d4 100644 --- a/services/video-render/main.py +++ b/services/video-render/main.py @@ -3,11 +3,14 @@ from __future__ import annotations import asyncio import logging +import os from contextlib import asynccontextmanager +import redis.asyncio as aioredis from fastapi import FastAPI import worker +from _shared.heartbeat import heartbeat_loop logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) @@ -16,15 +19,19 @@ logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): worker_task = asyncio.create_task(worker.worker_loop()) + hb_redis = aioredis.from_url(os.getenv("REDIS_URL", "redis://192.168.45.54:6379"), decode_responses=False) + hb_task = asyncio.create_task(heartbeat_loop(hb_redis, "video-render", "render", worker.stats)) logger.info("video-render lifespan 시작") try: yield finally: - worker_task.cancel() - try: - await worker_task - except asyncio.CancelledError: - pass + for t in (worker_task, hb_task): + t.cancel() + try: + await t + except asyncio.CancelledError: + pass + await hb_redis.aclose() logger.info("video-render lifespan 종료") diff --git a/services/video-render/tests/test_worker.py b/services/video-render/tests/test_worker.py index 5c69934..c052673 100644 --- a/services/video-render/tests/test_worker.py +++ b/services/video-render/tests/test_worker.py @@ -94,3 +94,25 @@ async def test_poll_once_returns_false_on_timeout(monkeypatch): assert handled is False fake_queue.ack.assert_not_awaited() fake_queue.fail.assert_not_awaited() + + +# ----- heartbeat stats 카운터 ----- + +class _OneJobQueue: + def __init__(self): self.acked = False + async def dequeue(self, timeout=5): + if self.acked: return None + return ({"job_type": "sora_generation", "task_id": "t1", "params": {}}, b"raw") + async def ack(self, raw): self.acked = True + async def fail(self, raw, payload): pass + + +@pytest.mark.asyncio +async def test_poll_once_increments_jobs_done(monkeypatch): + worker.stats.jobs_done = 0 + monkeypatch.setattr(worker, "run_sora_generation", lambda task_id, params: None) + handled = await worker.poll_once(_OneJobQueue()) + assert handled is True + assert worker.stats.jobs_done == 1 + assert worker.stats.busy is False + assert worker.stats.last_job_at is not None diff --git a/services/video-render/worker.py b/services/video-render/worker.py index cb2683f..bbe238c 100644 --- a/services/video-render/worker.py +++ b/services/video-render/worker.py @@ -19,6 +19,7 @@ from providers.veo import run_veo_generation from providers.kling import run_kling_generation from providers.seedance import run_seedance_generation from _shared.reliable_queue import ReliableQueue +from _shared.heartbeat import WorkerStats, utc_now_iso logger = logging.getLogger(__name__) @@ -26,6 +27,8 @@ REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379") QUEUE_KEY = "queue:video-render" PAUSED_KEY = "queue:paused" +stats = WorkerStats() + # string names so `unittest.mock.patch` on `worker.` is correctly intercepted _DISPATCH_TABLE = { "sora_generation": "run_sora_generation", @@ -60,14 +63,21 @@ async def poll_once(queue: ReliableQueue) -> bool: if result is None: return False payload, raw = result + stats.busy = True try: await asyncio.to_thread(_dispatch, payload) except Exception: logger.exception("dispatch unhandled exception task_id=%s", payload.get("task_id")) await queue.fail(raw, payload) + stats.jobs_failed += 1 + stats.last_job_at = utc_now_iso() + stats.busy = False return True await queue.ack(raw) + stats.jobs_done += 1 + stats.last_job_at = utc_now_iso() + stats.busy = False return True