From 8212a51f9023bac72eae6540abf3032a845ed9f1 Mon Sep 17 00:00:00 2001 From: gahusb Date: Fri, 12 Jun 2026 07:20:30 +0900 Subject: [PATCH] =?UTF-8?q?feat(co-gahusb):=20=EC=96=B4=EB=93=9C=EB=B0=94?= =?UTF-8?q?=EC=9D=B4=EC=A0=80=EB=A6=AC=20=EB=9D=BD=20(acquire/release/hear?= =?UTF-8?q?tbeat/list,=20TDD)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- co-gahusb/app/locks.py | 66 +++++++++++++++++++++++++++++++++++ co-gahusb/pytest.ini | 3 ++ co-gahusb/tests/conftest.py | 11 ++++++ co-gahusb/tests/test_locks.py | 51 +++++++++++++++++++++++++++ 4 files changed, 131 insertions(+) create mode 100644 co-gahusb/app/locks.py create mode 100644 co-gahusb/pytest.ini create mode 100644 co-gahusb/tests/conftest.py create mode 100644 co-gahusb/tests/test_locks.py diff --git a/co-gahusb/app/locks.py b/co-gahusb/app/locks.py new file mode 100644 index 0000000..0bf241a --- /dev/null +++ b/co-gahusb/app/locks.py @@ -0,0 +1,66 @@ +# co-gahusb/app/locks.py +from redis.exceptions import WatchError + +LOCK_PREFIX = "co:lock:" + + +async def acquire_lock(r, resource, role, ttl_sec=300): + key = LOCK_PREFIX + resource + ok = await r.set(key, role, nx=True, ex=ttl_sec) + if ok: + return {"acquired": True} + held_by = await r.get(key) + ttl = await r.ttl(key) + return {"acquired": False, "held_by": held_by, "ttl_remaining": max(ttl, 0)} + + +async def release_lock(r, resource, role): + key = LOCK_PREFIX + resource + async with r.pipeline() as pipe: + while True: + try: + await pipe.watch(key) + owner = await pipe.get(key) + if owner != role: + await pipe.unwatch() + return {"released": False, "held_by": owner} + pipe.multi() + pipe.delete(key) + await pipe.execute() + return {"released": True} + except WatchError: + continue + + +async def heartbeat_lock(r, resource, role, ttl_sec=300): + key = LOCK_PREFIX + resource + async with r.pipeline() as pipe: + while True: + try: + await pipe.watch(key) + owner = await pipe.get(key) + if owner != role: + await pipe.unwatch() + return {"renewed": False, "held_by": owner} + pipe.multi() + pipe.expire(key, ttl_sec) + await pipe.execute() + return {"renewed": True} + except WatchError: + continue + + +async def list_locks(r): + keys = await r.keys(LOCK_PREFIX + "*") + out = [] + for key in keys: + held_by = await r.get(key) + if held_by is None: + continue + ttl = await r.ttl(key) + out.append({ + "resource": key[len(LOCK_PREFIX):], + "held_by": held_by, + "ttl_remaining": max(ttl, 0), + }) + return {"locks": out} diff --git a/co-gahusb/pytest.ini b/co-gahusb/pytest.ini new file mode 100644 index 0000000..78c5011 --- /dev/null +++ b/co-gahusb/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +asyncio_mode = auto +testpaths = tests diff --git a/co-gahusb/tests/conftest.py b/co-gahusb/tests/conftest.py new file mode 100644 index 0000000..a8fc6d1 --- /dev/null +++ b/co-gahusb/tests/conftest.py @@ -0,0 +1,11 @@ +# co-gahusb/tests/conftest.py +import pytest_asyncio +import fakeredis.aioredis + + +@pytest_asyncio.fixture +async def r(): + client = fakeredis.aioredis.FakeRedis(decode_responses=True) + await client.flushall() + yield client + await client.aclose() diff --git a/co-gahusb/tests/test_locks.py b/co-gahusb/tests/test_locks.py new file mode 100644 index 0000000..1779881 --- /dev/null +++ b/co-gahusb/tests/test_locks.py @@ -0,0 +1,51 @@ +# co-gahusb/tests/test_locks.py +from app import locks + + +async def test_acquire_succeeds_then_blocks_other(r): + res = await locks.acquire_lock(r, "nas-deploy", "BE", ttl_sec=300) + assert res["acquired"] is True + + res2 = await locks.acquire_lock(r, "nas-deploy", "FE", ttl_sec=300) + assert res2["acquired"] is False + assert res2["held_by"] == "BE" + assert res2["ttl_remaining"] > 0 + + +async def test_release_only_by_owner(r): + await locks.acquire_lock(r, "compose", "BE", ttl_sec=300) + + bad = await locks.release_lock(r, "compose", "FE") + assert bad["released"] is False + + ok = await locks.release_lock(r, "compose", "BE") + assert ok["released"] is True + + again = await locks.acquire_lock(r, "compose", "FE", ttl_sec=300) + assert again["acquired"] is True + + +async def test_heartbeat_only_by_owner_renews_ttl(r): + await locks.acquire_lock(r, "nginx-conf", "BE", ttl_sec=10) + + bad = await locks.heartbeat_lock(r, "nginx-conf", "FE", ttl_sec=300) + assert bad["renewed"] is False + + ok = await locks.heartbeat_lock(r, "nginx-conf", "BE", ttl_sec=300) + assert ok["renewed"] is True + assert await r.ttl("co:lock:nginx-conf") > 100 + + +async def test_expired_lock_is_reacquirable(r): + await locks.acquire_lock(r, "memory-mirror", "AI", ttl_sec=1) + await r.delete("co:lock:memory-mirror") + res = await locks.acquire_lock(r, "memory-mirror", "FE", ttl_sec=300) + assert res["acquired"] is True + + +async def test_list_locks(r): + await locks.acquire_lock(r, "nas-deploy", "BE", ttl_sec=300) + await locks.acquire_lock(r, "compose", "FE", ttl_sec=300) + listed = await locks.list_locks(r) + held = {l["resource"]: l["held_by"] for l in listed["locks"]} + assert held == {"nas-deploy": "BE", "compose": "FE"}