# co-gahusb/tests/test_locks.py from app import locks async def test_acquire_succeeds_then_blocks_other(r): res = await locks.acquire_lock(r, "nas-deploy", "BE", ttl_sec=300) assert res["acquired"] is True res2 = await locks.acquire_lock(r, "nas-deploy", "FE", ttl_sec=300) assert res2["acquired"] is False assert res2["held_by"] == "BE" assert res2["ttl_remaining"] > 0 async def test_release_only_by_owner(r): await locks.acquire_lock(r, "compose", "BE", ttl_sec=300) bad = await locks.release_lock(r, "compose", "FE") assert bad["released"] is False ok = await locks.release_lock(r, "compose", "BE") assert ok["released"] is True again = await locks.acquire_lock(r, "compose", "FE", ttl_sec=300) assert again["acquired"] is True async def test_heartbeat_only_by_owner_renews_ttl(r): await locks.acquire_lock(r, "nginx-conf", "BE", ttl_sec=10) bad = await locks.heartbeat_lock(r, "nginx-conf", "FE", ttl_sec=300) assert bad["renewed"] is False ok = await locks.heartbeat_lock(r, "nginx-conf", "BE", ttl_sec=300) assert ok["renewed"] is True assert await r.ttl("co:lock:nginx-conf") > 100 async def test_expired_lock_is_reacquirable(r): await locks.acquire_lock(r, "memory-mirror", "AI", ttl_sec=1) await r.delete("co:lock:memory-mirror") res = await locks.acquire_lock(r, "memory-mirror", "FE", ttl_sec=300) assert res["acquired"] is True async def test_list_locks(r): await locks.acquire_lock(r, "nas-deploy", "BE", ttl_sec=300) await locks.acquire_lock(r, "compose", "FE", ttl_sec=300) listed = await locks.list_locks(r) held = {l["resource"]: l["held_by"] for l in listed["locks"]} assert held == {"nas-deploy": "BE", "compose": "FE"}