Files
web-page-backend/co-gahusb/app/store.py

156 lines
4.7 KiB
Python

# co-gahusb/app/store.py
import json
import time
from app.config import TEAM_LOG_MAXLEN
MSG_SEQ = "co:msgseq"
INBOX_PREFIX = "co:inbox:" # list of message ids per role
MSG_PREFIX = "co:msg:" # hash per message
READ_PREFIX = "co:read:" # last-read cursor per role
def _now_iso():
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
async def post_message(r, from_role, to_role, body, thread_id=None):
mid = await r.incr(MSG_SEQ)
payload = {
"id": str(mid),
"from_role": from_role,
"to_role": to_role,
"body": body,
"thread_id": thread_id or "",
"ts": _now_iso(),
}
await r.set(MSG_PREFIX + str(mid), json.dumps(payload))
await r.rpush(INBOX_PREFIX + to_role, mid)
return {"message_id": mid}
async def read_inbox(r, role, after_id=0, mark_read=False):
ids = await r.lrange(INBOX_PREFIX + role, 0, -1)
ids = [int(x) for x in ids if int(x) > int(after_id)]
messages = []
for mid in ids:
raw = await r.get(MSG_PREFIX + str(mid))
if raw:
d = json.loads(raw)
d["id"] = int(d["id"])
messages.append(d)
cursor = ids[-1] if ids else int(after_id)
if mark_read and ids:
await r.set(READ_PREFIX + role, cursor)
return {"messages": messages, "cursor": cursor}
TASK_SEQ = "co:taskseq"
TASK_PREFIX = "co:task:" # hash per task
TASK_SET = "co:tasks" # set of task ids
VALID_STATUS = ("open", "in_progress", "blocked", "done")
async def create_task(r, title, assignee_role, created_by, detail=None):
tid = await r.incr(TASK_SEQ)
task = {
"id": str(tid),
"title": title,
"assignee_role": assignee_role,
"status": "open",
"detail": detail or "",
"created_by": created_by,
"note": "",
"ts": _now_iso(),
}
await r.hset(TASK_PREFIX + str(tid), mapping=task)
await r.sadd(TASK_SET, tid)
return {"task_id": tid}
async def _get_task(r, task_id):
d = await r.hgetall(TASK_PREFIX + str(task_id))
if not d:
return None
d["id"] = int(d["id"])
return d
async def claim_task(r, task_id, role):
key = TASK_PREFIX + str(task_id)
async with r.pipeline() as pipe:
while True:
try:
await pipe.watch(key)
status = await pipe.hget(key, "status")
if status is None:
await pipe.unwatch()
return {"ok": False, "error": "not_found"}
if status != "open":
held = await pipe.hget(key, "assignee_role")
await pipe.unwatch()
return {"ok": False, "held_by": held}
pipe.multi()
pipe.hset(key, mapping={"status": "in_progress", "assignee_role": role})
await pipe.execute()
return {"ok": True, "task": await _get_task(r, task_id)}
except Exception as e:
from redis.exceptions import WatchError
if isinstance(e, WatchError):
continue
raise
async def update_task(r, task_id, status, role, note=None):
if status not in VALID_STATUS:
raise ValueError(f"invalid status: {status}")
key = TASK_PREFIX + str(task_id)
mapping = {"status": status}
if note is not None:
mapping["note"] = note
await r.hset(key, mapping=mapping)
return {"ok": True, "task": await _get_task(r, task_id)}
async def list_tasks(r, status=None, assignee_role=None):
ids = sorted(int(x) for x in await r.smembers(TASK_SET))
tasks = []
for tid in ids:
t = await _get_task(r, tid)
if t is None:
continue
if status and t["status"] != status:
continue
if assignee_role and t["assignee_role"] != assignee_role:
continue
tasks.append(t)
return {"tasks": tasks}
LOG_SEQ = "co:logseq"
LOG_LIST = "co:log" # list of event ids (capped)
LOG_PREFIX = "co:logitem:"
async def log_event(r, kind, text):
eid = await r.incr(LOG_SEQ)
item = {"id": eid, "kind": kind, "text": text, "ts": _now_iso()}
await r.set(LOG_PREFIX + str(eid), json.dumps(item))
await r.rpush(LOG_LIST, eid)
await r.ltrim(LOG_LIST, -TEAM_LOG_MAXLEN, -1)
return {"event_id": eid}
async def read_team_log(r, after_id=0, limit=100):
ids = [int(x) for x in await r.lrange(LOG_LIST, 0, -1)]
ids = [i for i in ids if i > int(after_id)]
ids = ids[-limit:]
events = []
for eid in ids:
raw = await r.get(LOG_PREFIX + str(eid))
if raw:
events.append(json.loads(raw))
cursor = ids[-1] if ids else int(after_id)
return {"events": events, "cursor": cursor}