import os import json import sqlite3 import uuid from typing import Any, Dict, List, Optional from .config import DB_PATH def _conn() -> sqlite3.Connection: os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = sqlite3.connect(DB_PATH, timeout=120.0) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA busy_timeout=120000") return conn def init_db() -> None: with _conn() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS agent_config ( agent_id TEXT PRIMARY KEY, display_name TEXT NOT NULL, enabled INTEGER NOT NULL DEFAULT 1, schedule_config TEXT NOT NULL DEFAULT '{}', custom_config TEXT NOT NULL DEFAULT '{}', created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS agent_tasks ( id TEXT PRIMARY KEY, agent_id TEXT NOT NULL, task_type TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', input_data TEXT NOT NULL DEFAULT '{}', result_data TEXT, requires_approval INTEGER NOT NULL DEFAULT 0, approved_at TEXT, approved_via TEXT, created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), completed_at TEXT ) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_tasks_agent ON agent_tasks(agent_id, created_at DESC) """) conn.execute(""" CREATE TABLE IF NOT EXISTS agent_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, agent_id TEXT NOT NULL, task_id TEXT, level TEXT NOT NULL DEFAULT 'info', message TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS telegram_state ( callback_id TEXT PRIMARY KEY, task_id TEXT NOT NULL, agent_id TEXT NOT NULL, action TEXT, responded INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS conversation_messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, chat_id TEXT NOT NULL, role TEXT NOT NULL, content TEXT NOT NULL, model TEXT, tokens_input INTEGER DEFAULT 0, tokens_output INTEGER DEFAULT 0, cache_read INTEGER DEFAULT 0, cache_write INTEGER DEFAULT 0, latency_ms INTEGER DEFAULT 0, created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_conv_chat ON conversation_messages(chat_id, created_at DESC) """) conn.execute(""" CREATE TABLE IF NOT EXISTS youtube_research_jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, status TEXT NOT NULL DEFAULT 'running', countries TEXT NOT NULL DEFAULT '[]', trends_collected INTEGER NOT NULL DEFAULT 0, error TEXT, started_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), completed_at TEXT ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS lotto_signals ( id INTEGER PRIMARY KEY AUTOINCREMENT, triggered_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), source TEXT NOT NULL, metric TEXT NOT NULL, value REAL NOT NULL, baseline_mu REAL, baseline_sigma REAL, z_score REAL, fire_level TEXT NOT NULL, notified_at TEXT, payload TEXT ) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_ls_triggered ON lotto_signals(triggered_at DESC) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_ls_fire ON lotto_signals(fire_level, notified_at) """) conn.execute(""" CREATE TABLE IF NOT EXISTS lotto_baselines ( metric TEXT PRIMARY KEY, window_values TEXT NOT NULL DEFAULT '[]', mu REAL NOT NULL DEFAULT 0.0, sigma REAL NOT NULL DEFAULT 0.0, last_pushed_draw_no INTEGER, updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) # Seed default agent configs for agent_id, name in [ ("stock", "주식 트레이더"), ("music", "음악 프로듀서"), ("blog", "블로그 마케터"), ("realestate", "청약 애널리스트"), ("lotto", "로또 큐레이터"), ("youtube", "YouTube 리서치"), ]: conn.execute( "INSERT OR IGNORE INTO agent_config(agent_id, display_name) VALUES(?,?)", (agent_id, name), ) # --- agent_config CRUD --- def get_all_agents() -> List[Dict[str, Any]]: with _conn() as conn: rows = conn.execute("SELECT * FROM agent_config ORDER BY agent_id").fetchall() return [_config_to_dict(r) for r in rows] def get_agent_config(agent_id: str) -> Optional[Dict[str, Any]]: with _conn() as conn: r = conn.execute("SELECT * FROM agent_config WHERE agent_id=?", (agent_id,)).fetchone() return _config_to_dict(r) if r else None def update_agent_config(agent_id: str, **kwargs) -> None: sets, vals = [], [] for k in ("enabled", "schedule_config", "custom_config"): if k in kwargs and kwargs[k] is not None: if k in ("schedule_config", "custom_config"): sets.append(f"{k}=?") vals.append(json.dumps(kwargs[k])) else: sets.append(f"{k}=?") vals.append(kwargs[k]) if not sets: return sets.append("updated_at=strftime('%Y-%m-%dT%H:%M:%fZ','now')") vals.append(agent_id) with _conn() as conn: conn.execute(f"UPDATE agent_config SET {','.join(sets)} WHERE agent_id=?", vals) def _config_to_dict(r) -> Dict[str, Any]: return { "agent_id": r["agent_id"], "display_name": r["display_name"], "enabled": bool(r["enabled"]), "schedule_config": json.loads(r["schedule_config"]), "custom_config": json.loads(r["custom_config"]), "created_at": r["created_at"], "updated_at": r["updated_at"], } # --- agent_tasks CRUD --- def create_task(agent_id: str, task_type: str, input_data: dict, requires_approval: bool = False) -> str: task_id = str(uuid.uuid4()) status = "pending" if requires_approval else "working" with _conn() as conn: conn.execute( "INSERT INTO agent_tasks(id,agent_id,task_type,status,input_data,requires_approval) VALUES(?,?,?,?,?,?)", (task_id, agent_id, task_type, status, json.dumps(input_data), int(requires_approval)), ) return task_id def update_task_status(task_id: str, status: str, result_data: dict = None) -> None: with _conn() as conn: if result_data is not None: conn.execute( "UPDATE agent_tasks SET status=?, result_data=?, completed_at=strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id=?", (status, json.dumps(result_data), task_id), ) else: conn.execute("UPDATE agent_tasks SET status=? WHERE id=?", (status, task_id)) def approve_task(task_id: str, via: str = "web") -> None: with _conn() as conn: conn.execute( "UPDATE agent_tasks SET status='approved', approved_at=strftime('%Y-%m-%dT%H:%M:%fZ','now'), approved_via=? WHERE id=?", (via, task_id), ) def reject_task(task_id: str) -> None: with _conn() as conn: conn.execute( "UPDATE agent_tasks SET status='rejected', completed_at=strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id=?", (task_id,), ) def get_task(task_id: str) -> Optional[Dict[str, Any]]: with _conn() as conn: r = conn.execute("SELECT * FROM agent_tasks WHERE id=?", (task_id,)).fetchone() return _task_to_dict(r) if r else None def get_agent_tasks(agent_id: str, limit: int = 20) -> List[Dict[str, Any]]: with _conn() as conn: rows = conn.execute( "SELECT * FROM agent_tasks WHERE agent_id=? ORDER BY created_at DESC LIMIT ?", (agent_id, limit), ).fetchall() return [_task_to_dict(r) for r in rows] def get_pending_approvals() -> List[Dict[str, Any]]: with _conn() as conn: rows = conn.execute( "SELECT * FROM agent_tasks WHERE status='pending' AND requires_approval=1 ORDER BY created_at DESC" ).fetchall() return [_task_to_dict(r) for r in rows] def _task_to_dict(r) -> Dict[str, Any]: return { "id": r["id"], "agent_id": r["agent_id"], "task_type": r["task_type"], "status": r["status"], "input_data": json.loads(r["input_data"]) if r["input_data"] else {}, "result_data": json.loads(r["result_data"]) if r["result_data"] else None, "requires_approval": bool(r["requires_approval"]), "approved_at": r["approved_at"], "approved_via": r["approved_via"], "created_at": r["created_at"], "completed_at": r["completed_at"], } # --- agent_logs --- def add_log(agent_id: str, message: str, level: str = "info", task_id: str = None) -> None: with _conn() as conn: conn.execute( "INSERT INTO agent_logs(agent_id,task_id,level,message) VALUES(?,?,?,?)", (agent_id, task_id, level, message), ) def get_logs(agent_id: str, limit: int = 50) -> List[Dict[str, Any]]: with _conn() as conn: rows = conn.execute( "SELECT * FROM agent_logs WHERE agent_id=? ORDER BY created_at DESC LIMIT ?", (agent_id, limit), ).fetchall() return [ { "id": r["id"], "agent_id": r["agent_id"], "task_id": r["task_id"], "level": r["level"], "message": r["message"], "created_at": r["created_at"], } for r in rows ] # --- telegram_state --- def save_telegram_callback(callback_id: str, task_id: str, agent_id: str) -> None: with _conn() as conn: conn.execute( "INSERT OR REPLACE INTO telegram_state(callback_id,task_id,agent_id) VALUES(?,?,?)", (callback_id, task_id, agent_id), ) def get_telegram_callback(callback_id: str) -> Optional[Dict[str, Any]]: with _conn() as conn: r = conn.execute( "SELECT * FROM telegram_state WHERE callback_id=? AND responded=0", (callback_id,), ).fetchone() if not r: return None return { "callback_id": r["callback_id"], "task_id": r["task_id"], "agent_id": r["agent_id"], "responded": bool(r["responded"]), } def mark_telegram_responded(callback_id: str, action: str) -> None: with _conn() as conn: conn.execute( "UPDATE telegram_state SET responded=1, action=? WHERE callback_id=?", (action, callback_id), ) def get_token_usage_stats(agent_id: str, days: int = 1) -> dict: """지정 에이전트의 최근 N일 토큰 사용량 집계. agent_tasks 테이블의 result_data JSON에서 tokens.total을 합산. 반환: {"total_tokens": int, "task_count": int, "by_day": [{"date": "YYYY-MM-DD", "tokens": int}]} """ with _conn() as conn: rows = conn.execute( """ SELECT completed_at, result_data FROM agent_tasks WHERE agent_id = ? AND status = 'succeeded' AND completed_at IS NOT NULL AND completed_at >= strftime('%Y-%m-%dT%H:%M:%fZ','now', ?) """, (agent_id, f"-{int(days)} days"), ).fetchall() total_tokens = 0 task_count = 0 by_day_map: Dict[str, int] = {} for r in rows: result_data = r["result_data"] if not result_data: continue try: parsed = json.loads(result_data) except Exception: continue tokens = parsed.get("tokens") if isinstance(parsed, dict) else None total = 0 if isinstance(tokens, dict): total = int(tokens.get("total", 0) or 0) if total <= 0: continue total_tokens += total task_count += 1 completed_at = r["completed_at"] or "" day = completed_at[:10] if completed_at else "unknown" by_day_map[day] = by_day_map.get(day, 0) + total by_day = [ {"date": d, "tokens": t} for d, t in sorted(by_day_map.items()) ] return { "total_tokens": total_tokens, "task_count": task_count, "by_day": by_day, } def save_conversation_message( chat_id: str, role: str, content: str, model: Optional[str] = None, tokens_input: int = 0, tokens_output: int = 0, cache_read: int = 0, cache_write: int = 0, latency_ms: int = 0, ) -> None: with _conn() as conn: conn.execute( """ INSERT INTO conversation_messages (chat_id, role, content, model, tokens_input, tokens_output, cache_read, cache_write, latency_ms) VALUES (?,?,?,?,?,?,?,?,?) """, (str(chat_id), role, content, model, tokens_input, tokens_output, cache_read, cache_write, latency_ms), ) def get_conversation_history(chat_id: str, limit: int = 20) -> List[Dict[str, Any]]: """최근 N개를 시간순(오래된 → 최신)으로 반환.""" with _conn() as conn: rows = conn.execute( """ SELECT role, content FROM conversation_messages WHERE chat_id=? ORDER BY id DESC LIMIT ? """, (str(chat_id), limit), ).fetchall() return [{"role": r["role"], "content": r["content"]} for r in reversed(rows)] def count_recent_user_messages(chat_id: str, seconds: int = 60) -> int: with _conn() as conn: r = conn.execute( """ SELECT COUNT(*) AS c FROM conversation_messages WHERE chat_id=? AND role='user' AND created_at >= strftime('%Y-%m-%dT%H:%M:%fZ','now', ?) """, (str(chat_id), f"-{int(seconds)} seconds"), ).fetchone() return r["c"] if r else 0 def get_conversation_stats(days: int = 7) -> Dict[str, Any]: with _conn() as conn: rows = conn.execute( """ SELECT chat_id, COUNT(*) AS msg_count, SUM(tokens_input) AS in_tokens, SUM(tokens_output) AS out_tokens, SUM(cache_read) AS cache_read, SUM(cache_write) AS cache_write, AVG(latency_ms) AS avg_latency FROM conversation_messages WHERE role='assistant' AND created_at >= strftime('%Y-%m-%dT%H:%M:%fZ','now', ?) GROUP BY chat_id """, (f"-{int(days)} days",), ).fetchall() by_chat = [] tot_in = tot_out = tot_r = tot_w = tot_msgs = 0 for r in rows: ci = int(r["in_tokens"] or 0) co = int(r["out_tokens"] or 0) cr = int(r["cache_read"] or 0) cw = int(r["cache_write"] or 0) mc = int(r["msg_count"] or 0) hit_rate = (cr / (cr + cw)) if (cr + cw) > 0 else 0.0 by_chat.append({ "chat_id": r["chat_id"], "message_count": mc, "tokens_input": ci, "tokens_output": co, "cache_read": cr, "cache_write": cw, "cache_hit_rate": round(hit_rate, 3), "avg_latency_ms": round(float(r["avg_latency"] or 0), 1), }) tot_in += ci; tot_out += co; tot_r += cr; tot_w += cw; tot_msgs += mc overall_hit = (tot_r / (tot_r + tot_w)) if (tot_r + tot_w) > 0 else 0.0 return { "days": days, "total_messages": tot_msgs, "tokens_input": tot_in, "tokens_output": tot_out, "cache_read": tot_r, "cache_write": tot_w, "cache_hit_rate": round(overall_hit, 3), "by_chat": by_chat, } def get_activity_feed(limit: int = 50, offset: int = 0) -> dict: with _conn() as conn: total_row = conn.execute(""" SELECT (SELECT COUNT(*) FROM agent_tasks) + (SELECT COUNT(*) FROM agent_logs) AS total """).fetchone() total = total_row["total"] if total_row else 0 rows = conn.execute(""" SELECT 'task' AS type, agent_id, id AS task_id, task_type, status, NULL AS level, COALESCE( json_extract(result_data, '$.summary'), task_type ) AS message, created_at, completed_at, result_data FROM agent_tasks UNION ALL SELECT 'log' AS type, agent_id, task_id, NULL AS task_type, NULL AS status, level, message, created_at, NULL AS completed_at, NULL AS result_data FROM agent_logs ORDER BY created_at DESC LIMIT ? OFFSET ? """, (limit, offset)).fetchall() items = [] for r in rows: item = { "type": r["type"], "agent_id": r["agent_id"], "task_id": r["task_id"], "message": r["message"], "created_at": r["created_at"], } if r["type"] == "task": item["task_type"] = r["task_type"] item["status"] = r["status"] item["completed_at"] = r["completed_at"] if r["created_at"] and r["completed_at"]: try: from datetime import datetime start = datetime.fromisoformat(r["created_at"].replace("Z", "+00:00")) end = datetime.fromisoformat(r["completed_at"].replace("Z", "+00:00")) item["duration_seconds"] = round((end - start).total_seconds()) except Exception: item["duration_seconds"] = None else: item["duration_seconds"] = None result_data = json.loads(r["result_data"]) if r["result_data"] else None if result_data and "telegram_sent" in result_data: item["telegram_sent"] = result_data["telegram_sent"] else: item["level"] = r["level"] items.append(item) return {"items": items, "total": total} # ── youtube_research_jobs CRUD ──────────────────────────────────────────────── def add_youtube_research_job(countries: list) -> int: with _conn() as conn: conn.execute( "INSERT INTO youtube_research_jobs (countries) VALUES (?)", (json.dumps(countries),), ) return conn.execute("SELECT last_insert_rowid()").fetchone()[0] def update_youtube_research_job( job_id: int, status: str, trends_collected: int, error: Optional[str] = None ) -> None: with _conn() as conn: conn.execute( """UPDATE youtube_research_jobs SET status=?, trends_collected=?, error=?, completed_at=strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id=?""", (status, trends_collected, error, job_id), ) def get_latest_youtube_research_job() -> Optional[Dict[str, Any]]: with _conn() as conn: row = conn.execute( "SELECT * FROM youtube_research_jobs ORDER BY id DESC LIMIT 1" ).fetchone() if not row: return None return { "id": row["id"], "status": row["status"], "countries": json.loads(row["countries"]), "trends_collected": row["trends_collected"], "error": row["error"], "started_at": row["started_at"], "completed_at": row["completed_at"], } # --- lotto_signals / lotto_baselines CRUD --- def insert_lotto_signal( source: str, metric: str, value: float, baseline_mu: Optional[float], baseline_sigma: Optional[float], z_score: Optional[float], fire_level: str, payload: Optional[Dict[str, Any]] = None, ) -> int: with _conn() as conn: cur = conn.execute( """ INSERT INTO lotto_signals (source, metric, value, baseline_mu, baseline_sigma, z_score, fire_level, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( source, metric, value, baseline_mu, baseline_sigma, z_score, fire_level, json.dumps(payload or {}, ensure_ascii=False), ), ) return cur.lastrowid def mark_signal_notified(signal_id: int) -> None: with _conn() as conn: conn.execute( "UPDATE lotto_signals SET notified_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id = ?", (signal_id,), ) def get_recent_lotto_signals(hours: int = 24, min_fire: str = "normal") -> List[Dict[str, Any]]: """지난 N시간 발화 시그널. min_fire='normal'이면 normal+urgent.""" levels = ("urgent",) if min_fire == "urgent" else ("normal", "urgent") placeholders = ",".join("?" * len(levels)) with _conn() as conn: rows = conn.execute( f""" SELECT * FROM lotto_signals WHERE triggered_at >= datetime('now', ?) AND fire_level IN ({placeholders}) ORDER BY triggered_at DESC """, (f"-{int(hours)} hours", *levels), ).fetchall() return [dict(r) for r in rows] def get_signals_history(days: int = 7) -> List[Dict[str, Any]]: """차트/이력 페이지용 — 모든 fire_level 포함.""" with _conn() as conn: rows = conn.execute( """ SELECT * FROM lotto_signals WHERE triggered_at >= datetime('now', ?) ORDER BY triggered_at DESC """, (f"-{int(days)} days",), ).fetchall() return [dict(r) for r in rows] def get_recent_urgent_count(hours: int = 24) -> int: with _conn() as conn: row = conn.execute( """ SELECT COUNT(*) AS c FROM lotto_signals WHERE triggered_at >= datetime('now', ?) AND fire_level = 'urgent' AND notified_at IS NOT NULL """, (f"-{int(hours)} hours",), ).fetchone() return int(row["c"]) if row else 0 def get_last_signal_notification(metric: str, fire_level: str, hours: int) -> Optional[str]: """같은 metric+fire_level이 hours 내에 알림 발송된 마지막 시각. throttle용.""" with _conn() as conn: row = conn.execute( """ SELECT notified_at FROM lotto_signals WHERE metric = ? AND fire_level = ? AND notified_at IS NOT NULL AND notified_at >= datetime('now', ?) ORDER BY notified_at DESC LIMIT 1 """, (metric, fire_level, f"-{int(hours)} hours"), ).fetchone() return row["notified_at"] if row else None def get_baseline(metric: str) -> Optional[Dict[str, Any]]: with _conn() as conn: row = conn.execute( "SELECT * FROM lotto_baselines WHERE metric = ?", (metric,), ).fetchone() if not row: return None d = dict(row) d["window_values"] = json.loads(d["window_values"]) return d def upsert_baseline( metric: str, window_values: List[float], mu: float, sigma: float, last_pushed_draw_no: Optional[int], ) -> None: with _conn() as conn: conn.execute( """ INSERT INTO lotto_baselines (metric, window_values, mu, sigma, last_pushed_draw_no, updated_at) VALUES (?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%fZ','now')) ON CONFLICT(metric) DO UPDATE SET window_values = excluded.window_values, mu = excluded.mu, sigma = excluded.sigma, last_pushed_draw_no = excluded.last_pushed_draw_no, updated_at = excluded.updated_at """, ( metric, json.dumps(window_values), mu, sigma, last_pushed_draw_no, ), ) def get_all_baselines() -> List[Dict[str, Any]]: with _conn() as conn: rows = conn.execute("SELECT * FROM lotto_baselines ORDER BY metric").fetchall() out = [] for r in rows: d = dict(r) d["window_values"] = json.loads(d["window_values"]) out.append(d) return out