# co-gahusb/tests/test_tasks.py import pytest from app import store async def test_create_and_list(r): res = await store.create_task(r, "deploy FE", "FE", created_by="Producer", detail="ship it") tid = res["task_id"] listed = await store.list_tasks(r) t = [t for t in listed["tasks"] if t["id"] == tid][0] assert t["title"] == "deploy FE" assert t["assignee_role"] == "FE" assert t["status"] == "open" assert t["created_by"] == "Producer" async def test_claim_then_duplicate_claim_rejected(r): tid = (await store.create_task(r, "x", "FE", created_by="Producer"))["task_id"] ok = await store.claim_task(r, tid, "FE") assert ok["ok"] is True assert ok["task"]["status"] == "in_progress" dup = await store.claim_task(r, tid, "BE") assert dup["ok"] is False assert dup["held_by"] == "FE" async def test_update_status(r): tid = (await store.create_task(r, "x", "FE", created_by="Producer"))["task_id"] await store.claim_task(r, tid, "FE") res = await store.update_task(r, tid, "done", "FE", note="finished") assert res["ok"] is True assert res["task"]["status"] == "done" assert res["task"]["note"] == "finished" async def test_list_filters(r): t1 = (await store.create_task(r, "a", "FE", created_by="Producer"))["task_id"] await store.create_task(r, "b", "BE", created_by="Producer") await store.claim_task(r, t1, "FE") fe = await store.list_tasks(r, assignee_role="FE") assert [t["title"] for t in fe["tasks"]] == ["a"] in_prog = await store.list_tasks(r, status="in_progress") assert [t["title"] for t in in_prog["tasks"]] == ["a"] async def test_invalid_status_rejected(r): tid = (await store.create_task(r, "x", "FE", created_by="Producer"))["task_id"] with pytest.raises(ValueError): await store.update_task(r, tid, "bogus", "FE") async def test_update_nonexistent_task_returns_not_found(r): res = await store.update_task(r, 999, "done", "FE") assert res["ok"] is False assert res["error"] == "not_found"