import os import json import sqlite3 import uuid from typing import Any, Dict, List, Optional from .config import DB_PATH def _conn() -> sqlite3.Connection: os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") return conn def init_db() -> None: with _conn() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS agent_config ( agent_id TEXT PRIMARY KEY, display_name TEXT NOT NULL, enabled INTEGER NOT NULL DEFAULT 1, schedule_config TEXT NOT NULL DEFAULT '{}', custom_config TEXT NOT NULL DEFAULT '{}', created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS agent_tasks ( id TEXT PRIMARY KEY, agent_id TEXT NOT NULL, task_type TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', input_data TEXT NOT NULL DEFAULT '{}', result_data TEXT, requires_approval INTEGER NOT NULL DEFAULT 0, approved_at TEXT, approved_via TEXT, created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), completed_at TEXT ) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_tasks_agent ON agent_tasks(agent_id, created_at DESC) """) conn.execute(""" CREATE TABLE IF NOT EXISTS agent_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, agent_id TEXT NOT NULL, task_id TEXT, level TEXT NOT NULL DEFAULT 'info', message TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS telegram_state ( callback_id TEXT PRIMARY KEY, task_id TEXT NOT NULL, agent_id TEXT NOT NULL, action TEXT, responded INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) # Seed default agent configs for agent_id, name in [("stock", "주식 트레이더"), ("music", "음악 프로듀서")]: conn.execute( "INSERT OR IGNORE INTO agent_config(agent_id, display_name) VALUES(?,?)", (agent_id, name), ) # --- agent_config CRUD --- def get_all_agents() -> List[Dict[str, Any]]: with _conn() as conn: rows = conn.execute("SELECT * FROM agent_config ORDER BY agent_id").fetchall() return [_config_to_dict(r) for r in rows] def get_agent_config(agent_id: str) -> Optional[Dict[str, Any]]: with _conn() as conn: r = conn.execute("SELECT * FROM agent_config WHERE agent_id=?", (agent_id,)).fetchone() return _config_to_dict(r) if r else None def update_agent_config(agent_id: str, **kwargs) -> None: sets, vals = [], [] for k in ("enabled", "schedule_config", "custom_config"): if k in kwargs and kwargs[k] is not None: if k in ("schedule_config", "custom_config"): sets.append(f"{k}=?") vals.append(json.dumps(kwargs[k])) else: sets.append(f"{k}=?") vals.append(kwargs[k]) if not sets: return sets.append("updated_at=strftime('%Y-%m-%dT%H:%M:%fZ','now')") vals.append(agent_id) with _conn() as conn: conn.execute(f"UPDATE agent_config SET {','.join(sets)} WHERE agent_id=?", vals) def _config_to_dict(r) -> Dict[str, Any]: return { "agent_id": r["agent_id"], "display_name": r["display_name"], "enabled": bool(r["enabled"]), "schedule_config": json.loads(r["schedule_config"]), "custom_config": json.loads(r["custom_config"]), "created_at": r["created_at"], "updated_at": r["updated_at"], } # --- agent_tasks CRUD --- def create_task(agent_id: str, task_type: str, input_data: dict, requires_approval: bool = False) -> str: task_id = str(uuid.uuid4()) status = "pending" if requires_approval else "working" with _conn() as conn: conn.execute( "INSERT INTO agent_tasks(id,agent_id,task_type,status,input_data,requires_approval) VALUES(?,?,?,?,?,?)", (task_id, agent_id, task_type, status, json.dumps(input_data), int(requires_approval)), ) return task_id def update_task_status(task_id: str, status: str, result_data: dict = None) -> None: with _conn() as conn: if result_data is not None: conn.execute( "UPDATE agent_tasks SET status=?, result_data=?, completed_at=strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id=?", (status, json.dumps(result_data), task_id), ) else: conn.execute("UPDATE agent_tasks SET status=? WHERE id=?", (status, task_id)) def approve_task(task_id: str, via: str = "web") -> None: with _conn() as conn: conn.execute( "UPDATE agent_tasks SET status='approved', approved_at=strftime('%Y-%m-%dT%H:%M:%fZ','now'), approved_via=? WHERE id=?", (via, task_id), ) def reject_task(task_id: str) -> None: with _conn() as conn: conn.execute( "UPDATE agent_tasks SET status='failed', completed_at=strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id=?", (task_id,), ) def get_task(task_id: str) -> Optional[Dict[str, Any]]: with _conn() as conn: r = conn.execute("SELECT * FROM agent_tasks WHERE id=?", (task_id,)).fetchone() return _task_to_dict(r) if r else None def get_agent_tasks(agent_id: str, limit: int = 20) -> List[Dict[str, Any]]: with _conn() as conn: rows = conn.execute( "SELECT * FROM agent_tasks WHERE agent_id=? ORDER BY created_at DESC LIMIT ?", (agent_id, limit), ).fetchall() return [_task_to_dict(r) for r in rows] def get_pending_approvals() -> List[Dict[str, Any]]: with _conn() as conn: rows = conn.execute( "SELECT * FROM agent_tasks WHERE status='pending' AND requires_approval=1 ORDER BY created_at DESC" ).fetchall() return [_task_to_dict(r) for r in rows] def _task_to_dict(r) -> Dict[str, Any]: return { "id": r["id"], "agent_id": r["agent_id"], "task_type": r["task_type"], "status": r["status"], "input_data": json.loads(r["input_data"]) if r["input_data"] else {}, "result_data": json.loads(r["result_data"]) if r["result_data"] else None, "requires_approval": bool(r["requires_approval"]), "approved_at": r["approved_at"], "approved_via": r["approved_via"], "created_at": r["created_at"], "completed_at": r["completed_at"], } # --- agent_logs --- def add_log(agent_id: str, message: str, level: str = "info", task_id: str = None) -> None: with _conn() as conn: conn.execute( "INSERT INTO agent_logs(agent_id,task_id,level,message) VALUES(?,?,?,?)", (agent_id, task_id, level, message), ) def get_logs(agent_id: str, limit: int = 50) -> List[Dict[str, Any]]: with _conn() as conn: rows = conn.execute( "SELECT * FROM agent_logs WHERE agent_id=? ORDER BY created_at DESC LIMIT ?", (agent_id, limit), ).fetchall() return [ { "id": r["id"], "agent_id": r["agent_id"], "task_id": r["task_id"], "level": r["level"], "message": r["message"], "created_at": r["created_at"], } for r in rows ] # --- telegram_state --- def save_telegram_callback(callback_id: str, task_id: str, agent_id: str) -> None: with _conn() as conn: conn.execute( "INSERT OR REPLACE INTO telegram_state(callback_id,task_id,agent_id) VALUES(?,?,?)", (callback_id, task_id, agent_id), ) def get_telegram_callback(callback_id: str) -> Optional[Dict[str, Any]]: with _conn() as conn: r = conn.execute( "SELECT * FROM telegram_state WHERE callback_id=? AND responded=0", (callback_id,), ).fetchone() if not r: return None return { "callback_id": r["callback_id"], "task_id": r["task_id"], "agent_id": r["agent_id"], "responded": bool(r["responded"]), } def mark_telegram_responded(callback_id: str, action: str) -> None: with _conn() as conn: conn.execute( "UPDATE telegram_state SET responded=1, action=? WHERE callback_id=?", (action, callback_id), )