Files
web-page-backend/co-gahusb/tests/test_locks.py

52 lines
1.7 KiB
Python

# co-gahusb/tests/test_locks.py
from app import locks
async def test_acquire_succeeds_then_blocks_other(r):
res = await locks.acquire_lock(r, "nas-deploy", "BE", ttl_sec=300)
assert res["acquired"] is True
res2 = await locks.acquire_lock(r, "nas-deploy", "FE", ttl_sec=300)
assert res2["acquired"] is False
assert res2["held_by"] == "BE"
assert res2["ttl_remaining"] > 0
async def test_release_only_by_owner(r):
await locks.acquire_lock(r, "compose", "BE", ttl_sec=300)
bad = await locks.release_lock(r, "compose", "FE")
assert bad["released"] is False
ok = await locks.release_lock(r, "compose", "BE")
assert ok["released"] is True
again = await locks.acquire_lock(r, "compose", "FE", ttl_sec=300)
assert again["acquired"] is True
async def test_heartbeat_only_by_owner_renews_ttl(r):
await locks.acquire_lock(r, "nginx-conf", "BE", ttl_sec=10)
bad = await locks.heartbeat_lock(r, "nginx-conf", "FE", ttl_sec=300)
assert bad["renewed"] is False
ok = await locks.heartbeat_lock(r, "nginx-conf", "BE", ttl_sec=300)
assert ok["renewed"] is True
assert await r.ttl("co:lock:nginx-conf") > 100
async def test_expired_lock_is_reacquirable(r):
await locks.acquire_lock(r, "memory-mirror", "AI", ttl_sec=1)
await r.delete("co:lock:memory-mirror")
res = await locks.acquire_lock(r, "memory-mirror", "FE", ttl_sec=300)
assert res["acquired"] is True
async def test_list_locks(r):
await locks.acquire_lock(r, "nas-deploy", "BE", ttl_sec=300)
await locks.acquire_lock(r, "compose", "FE", ttl_sec=300)
listed = await locks.list_locks(r)
held = {l["resource"]: l["held_by"] for l in listed["locks"]}
assert held == {"nas-deploy": "BE", "compose": "FE"}