feat(co-gahusb): 어드바이저리 락 (acquire/release/heartbeat/list, TDD)
This commit is contained in:
51
co-gahusb/tests/test_locks.py
Normal file
51
co-gahusb/tests/test_locks.py
Normal file
@@ -0,0 +1,51 @@
|
||||
# co-gahusb/tests/test_locks.py
|
||||
from app import locks
|
||||
|
||||
|
||||
async def test_acquire_succeeds_then_blocks_other(r):
|
||||
res = await locks.acquire_lock(r, "nas-deploy", "BE", ttl_sec=300)
|
||||
assert res["acquired"] is True
|
||||
|
||||
res2 = await locks.acquire_lock(r, "nas-deploy", "FE", ttl_sec=300)
|
||||
assert res2["acquired"] is False
|
||||
assert res2["held_by"] == "BE"
|
||||
assert res2["ttl_remaining"] > 0
|
||||
|
||||
|
||||
async def test_release_only_by_owner(r):
|
||||
await locks.acquire_lock(r, "compose", "BE", ttl_sec=300)
|
||||
|
||||
bad = await locks.release_lock(r, "compose", "FE")
|
||||
assert bad["released"] is False
|
||||
|
||||
ok = await locks.release_lock(r, "compose", "BE")
|
||||
assert ok["released"] is True
|
||||
|
||||
again = await locks.acquire_lock(r, "compose", "FE", ttl_sec=300)
|
||||
assert again["acquired"] is True
|
||||
|
||||
|
||||
async def test_heartbeat_only_by_owner_renews_ttl(r):
|
||||
await locks.acquire_lock(r, "nginx-conf", "BE", ttl_sec=10)
|
||||
|
||||
bad = await locks.heartbeat_lock(r, "nginx-conf", "FE", ttl_sec=300)
|
||||
assert bad["renewed"] is False
|
||||
|
||||
ok = await locks.heartbeat_lock(r, "nginx-conf", "BE", ttl_sec=300)
|
||||
assert ok["renewed"] is True
|
||||
assert await r.ttl("co:lock:nginx-conf") > 100
|
||||
|
||||
|
||||
async def test_expired_lock_is_reacquirable(r):
|
||||
await locks.acquire_lock(r, "memory-mirror", "AI", ttl_sec=1)
|
||||
await r.delete("co:lock:memory-mirror")
|
||||
res = await locks.acquire_lock(r, "memory-mirror", "FE", ttl_sec=300)
|
||||
assert res["acquired"] is True
|
||||
|
||||
|
||||
async def test_list_locks(r):
|
||||
await locks.acquire_lock(r, "nas-deploy", "BE", ttl_sec=300)
|
||||
await locks.acquire_lock(r, "compose", "FE", ttl_sec=300)
|
||||
listed = await locks.list_locks(r)
|
||||
held = {l["resource"]: l["held_by"] for l in listed["locks"]}
|
||||
assert held == {"nas-deploy": "BE", "compose": "FE"}
|
||||
Reference in New Issue
Block a user