diff --git a/co-gahusb/app/store.py b/co-gahusb/app/store.py new file mode 100644 index 0000000..cc66279 --- /dev/null +++ b/co-gahusb/app/store.py @@ -0,0 +1,155 @@ +# co-gahusb/app/store.py +import json +import time + +from app.config import TEAM_LOG_MAXLEN + +MSG_SEQ = "co:msgseq" +INBOX_PREFIX = "co:inbox:" # list of message ids per role +MSG_PREFIX = "co:msg:" # hash per message +READ_PREFIX = "co:read:" # last-read cursor per role + + +def _now_iso(): + return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + + +async def post_message(r, from_role, to_role, body, thread_id=None): + mid = await r.incr(MSG_SEQ) + payload = { + "id": str(mid), + "from_role": from_role, + "to_role": to_role, + "body": body, + "thread_id": thread_id or "", + "ts": _now_iso(), + } + await r.set(MSG_PREFIX + str(mid), json.dumps(payload)) + await r.rpush(INBOX_PREFIX + to_role, mid) + return {"message_id": mid} + + +async def read_inbox(r, role, after_id=0, mark_read=False): + ids = await r.lrange(INBOX_PREFIX + role, 0, -1) + ids = [int(x) for x in ids if int(x) > int(after_id)] + messages = [] + for mid in ids: + raw = await r.get(MSG_PREFIX + str(mid)) + if raw: + d = json.loads(raw) + d["id"] = int(d["id"]) + messages.append(d) + cursor = ids[-1] if ids else int(after_id) + if mark_read and ids: + await r.set(READ_PREFIX + role, cursor) + return {"messages": messages, "cursor": cursor} + + +TASK_SEQ = "co:taskseq" +TASK_PREFIX = "co:task:" # hash per task +TASK_SET = "co:tasks" # set of task ids + +VALID_STATUS = ("open", "in_progress", "blocked", "done") + + +async def create_task(r, title, assignee_role, created_by, detail=None): + tid = await r.incr(TASK_SEQ) + task = { + "id": str(tid), + "title": title, + "assignee_role": assignee_role, + "status": "open", + "detail": detail or "", + "created_by": created_by, + "note": "", + "ts": _now_iso(), + } + await r.hset(TASK_PREFIX + str(tid), mapping=task) + await r.sadd(TASK_SET, tid) + return {"task_id": tid} + + +async def _get_task(r, task_id): + d = await r.hgetall(TASK_PREFIX + str(task_id)) + if not d: + return None + d["id"] = int(d["id"]) + return d + + +async def claim_task(r, task_id, role): + key = TASK_PREFIX + str(task_id) + async with r.pipeline() as pipe: + while True: + try: + await pipe.watch(key) + status = await pipe.hget(key, "status") + if status is None: + await pipe.unwatch() + return {"ok": False, "error": "not_found"} + if status != "open": + held = await pipe.hget(key, "assignee_role") + await pipe.unwatch() + return {"ok": False, "held_by": held} + pipe.multi() + pipe.hset(key, mapping={"status": "in_progress", "assignee_role": role}) + await pipe.execute() + return {"ok": True, "task": await _get_task(r, task_id)} + except Exception as e: + from redis.exceptions import WatchError + if isinstance(e, WatchError): + continue + raise + + +async def update_task(r, task_id, status, role, note=None): + if status not in VALID_STATUS: + raise ValueError(f"invalid status: {status}") + key = TASK_PREFIX + str(task_id) + mapping = {"status": status} + if note is not None: + mapping["note"] = note + await r.hset(key, mapping=mapping) + return {"ok": True, "task": await _get_task(r, task_id)} + + +async def list_tasks(r, status=None, assignee_role=None): + ids = sorted(int(x) for x in await r.smembers(TASK_SET)) + tasks = [] + for tid in ids: + t = await _get_task(r, tid) + if t is None: + continue + if status and t["status"] != status: + continue + if assignee_role and t["assignee_role"] != assignee_role: + continue + tasks.append(t) + return {"tasks": tasks} + + +LOG_SEQ = "co:logseq" +LOG_LIST = "co:log" # list of event ids (capped) +LOG_PREFIX = "co:logitem:" + + +async def log_event(r, kind, text): + eid = await r.incr(LOG_SEQ) + item = {"id": eid, "kind": kind, "text": text, "ts": _now_iso()} + await r.set(LOG_PREFIX + str(eid), json.dumps(item)) + await r.rpush(LOG_LIST, eid) + await r.ltrim(LOG_LIST, -TEAM_LOG_MAXLEN, -1) + return {"event_id": eid} + + +async def read_team_log(r, after_id=0, limit=100): + ids = [int(x) for x in await r.lrange(LOG_LIST, 0, -1)] + ids = [i for i in ids if i > int(after_id)] + ids = ids[-limit:] + events = [] + for eid in ids: + raw = await r.get(LOG_PREFIX + str(eid)) + if raw: + events.append(json.loads(raw)) + cursor = ids[-1] if ids else int(after_id) + return {"events": events, "cursor": cursor} diff --git a/co-gahusb/tests/test_messages.py b/co-gahusb/tests/test_messages.py new file mode 100644 index 0000000..8c2eb41 --- /dev/null +++ b/co-gahusb/tests/test_messages.py @@ -0,0 +1,47 @@ +# co-gahusb/tests/test_messages.py +from app import store + + +async def test_post_and_read_ordering(r): + id1 = (await store.post_message(r, "Producer", "BE", "first"))["message_id"] + id2 = (await store.post_message(r, "Producer", "BE", "second"))["message_id"] + assert id2 > id1 + + res = await store.read_inbox(r, "BE") + bodies = [m["body"] for m in res["messages"]] + assert bodies == ["first", "second"] + assert res["cursor"] == id2 + + +async def test_read_inbox_after_id(r): + id1 = (await store.post_message(r, "Producer", "BE", "first"))["message_id"] + await store.post_message(r, "Producer", "BE", "second") + res = await store.read_inbox(r, "BE", after_id=id1) + assert [m["body"] for m in res["messages"]] == ["second"] + + +async def test_inboxes_isolated_per_role(r): + await store.post_message(r, "Producer", "BE", "for-be") + await store.post_message(r, "Producer", "FE", "for-fe") + be = await store.read_inbox(r, "BE") + fe = await store.read_inbox(r, "FE") + assert [m["body"] for m in be["messages"]] == ["for-be"] + assert [m["body"] for m in fe["messages"]] == ["for-fe"] + + +async def test_mark_read_advances_cursor(r): + await store.post_message(r, "Producer", "BE", "first") + res = await store.read_inbox(r, "BE", mark_read=True) + last = res["cursor"] + await store.post_message(r, "Producer", "BE", "second") + res2 = await store.read_inbox(r, "BE", after_id=last) + assert [m["body"] for m in res2["messages"]] == ["second"] + + +async def test_message_fields(r): + await store.post_message(r, "Producer", "BE", "hi", thread_id="t1") + res = await store.read_inbox(r, "BE") + m = res["messages"][0] + assert m["from_role"] == "Producer" + assert m["thread_id"] == "t1" + assert "ts" in m and "id" in m