# co-gahusb/app/store.py import json import time from app.config import TEAM_LOG_MAXLEN MSG_SEQ = "co:msgseq" INBOX_PREFIX = "co:inbox:" # list of message ids per role MSG_PREFIX = "co:msg:" # hash per message READ_PREFIX = "co:read:" # last-read cursor per role def _now_iso(): return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) async def post_message(r, from_role, to_role, body, thread_id=None): mid = await r.incr(MSG_SEQ) payload = { "id": str(mid), "from_role": from_role, "to_role": to_role, "body": body, "thread_id": thread_id or "", "ts": _now_iso(), } await r.set(MSG_PREFIX + str(mid), json.dumps(payload)) await r.rpush(INBOX_PREFIX + to_role, mid) return {"message_id": mid} async def read_inbox(r, role, after_id=0, mark_read=False): ids = await r.lrange(INBOX_PREFIX + role, 0, -1) ids = [int(x) for x in ids if int(x) > int(after_id)] messages = [] for mid in ids: raw = await r.get(MSG_PREFIX + str(mid)) if raw: d = json.loads(raw) d["id"] = int(d["id"]) messages.append(d) cursor = ids[-1] if ids else int(after_id) if mark_read and ids: await r.set(READ_PREFIX + role, cursor) return {"messages": messages, "cursor": cursor} TASK_SEQ = "co:taskseq" TASK_PREFIX = "co:task:" # hash per task TASK_SET = "co:tasks" # set of task ids VALID_STATUS = ("open", "in_progress", "blocked", "done") async def create_task(r, title, assignee_role, created_by, detail=None): tid = await r.incr(TASK_SEQ) task = { "id": str(tid), "title": title, "assignee_role": assignee_role, "status": "open", "detail": detail or "", "created_by": created_by, "note": "", "ts": _now_iso(), } await r.hset(TASK_PREFIX + str(tid), mapping=task) await r.sadd(TASK_SET, tid) return {"task_id": tid} async def _get_task(r, task_id): d = await r.hgetall(TASK_PREFIX + str(task_id)) if not d: return None d["id"] = int(d["id"]) return d async def claim_task(r, task_id, role): key = TASK_PREFIX + str(task_id) async with r.pipeline() as pipe: while True: try: await pipe.watch(key) status = await pipe.hget(key, "status") if status is None: await pipe.unwatch() return {"ok": False, "error": "not_found"} if status != "open": held = await pipe.hget(key, "assignee_role") await pipe.unwatch() return {"ok": False, "held_by": held} pipe.multi() pipe.hset(key, mapping={"status": "in_progress", "assignee_role": role}) await pipe.execute() return {"ok": True, "task": await _get_task(r, task_id)} except Exception as e: from redis.exceptions import WatchError if isinstance(e, WatchError): continue raise async def update_task(r, task_id, status, role, note=None): if status not in VALID_STATUS: raise ValueError(f"invalid status: {status}") key = TASK_PREFIX + str(task_id) if not await r.exists(key): return {"ok": False, "error": "not_found"} mapping = {"status": status} if note is not None: mapping["note"] = note await r.hset(key, mapping=mapping) return {"ok": True, "task": await _get_task(r, task_id)} async def list_tasks(r, status=None, assignee_role=None): ids = sorted(int(x) for x in await r.smembers(TASK_SET)) tasks = [] for tid in ids: t = await _get_task(r, tid) if t is None: continue if status and t["status"] != status: continue if assignee_role and t["assignee_role"] != assignee_role: continue tasks.append(t) return {"tasks": tasks} LOG_SEQ = "co:logseq" LOG_LIST = "co:log" # list of event ids (capped) LOG_PREFIX = "co:logitem:" async def log_event(r, kind, text): eid = await r.incr(LOG_SEQ) item = {"id": eid, "kind": kind, "text": text, "ts": _now_iso()} await r.set(LOG_PREFIX + str(eid), json.dumps(item)) await r.rpush(LOG_LIST, eid) await r.ltrim(LOG_LIST, -TEAM_LOG_MAXLEN, -1) return {"event_id": eid} async def read_team_log(r, after_id=0, limit=100): ids = [int(x) for x in await r.lrange(LOG_LIST, 0, -1)] ids = [i for i in ids if i > int(after_id)] ids = ids[-limit:] events = [] for eid in ids: raw = await r.get(LOG_PREFIX + str(eid)) if raw: events.append(json.loads(raw)) cursor = ids[-1] if ids else int(after_id) return {"events": events, "cursor": cursor}