Files
web-page-backend/co-gahusb/tests/test_messages.py

48 lines
1.7 KiB
Python

# co-gahusb/tests/test_messages.py
from app import store
async def test_post_and_read_ordering(r):
id1 = (await store.post_message(r, "Producer", "BE", "first"))["message_id"]
id2 = (await store.post_message(r, "Producer", "BE", "second"))["message_id"]
assert id2 > id1
res = await store.read_inbox(r, "BE")
bodies = [m["body"] for m in res["messages"]]
assert bodies == ["first", "second"]
assert res["cursor"] == id2
async def test_read_inbox_after_id(r):
id1 = (await store.post_message(r, "Producer", "BE", "first"))["message_id"]
await store.post_message(r, "Producer", "BE", "second")
res = await store.read_inbox(r, "BE", after_id=id1)
assert [m["body"] for m in res["messages"]] == ["second"]
async def test_inboxes_isolated_per_role(r):
await store.post_message(r, "Producer", "BE", "for-be")
await store.post_message(r, "Producer", "FE", "for-fe")
be = await store.read_inbox(r, "BE")
fe = await store.read_inbox(r, "FE")
assert [m["body"] for m in be["messages"]] == ["for-be"]
assert [m["body"] for m in fe["messages"]] == ["for-fe"]
async def test_mark_read_advances_cursor(r):
await store.post_message(r, "Producer", "BE", "first")
res = await store.read_inbox(r, "BE", mark_read=True)
last = res["cursor"]
await store.post_message(r, "Producer", "BE", "second")
res2 = await store.read_inbox(r, "BE", after_id=last)
assert [m["body"] for m in res2["messages"]] == ["second"]
async def test_message_fields(r):
await store.post_message(r, "Producer", "BE", "hi", thread_id="t1")
res = await store.read_inbox(r, "BE")
m = res["messages"][0]
assert m["from_role"] == "Producer"
assert m["thread_id"] == "t1"
assert "ts" in m and "id" in m