769 lines
27 KiB
Python
769 lines
27 KiB
Python
import os
|
|
import json
|
|
import sqlite3
|
|
import uuid
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from .config import DB_PATH
|
|
|
|
|
|
def _conn() -> sqlite3.Connection:
|
|
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
|
|
conn = sqlite3.connect(DB_PATH, timeout=120.0)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA busy_timeout=120000")
|
|
return conn
|
|
|
|
|
|
def init_db() -> None:
|
|
with _conn() as conn:
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS agent_config (
|
|
agent_id TEXT PRIMARY KEY,
|
|
display_name TEXT NOT NULL,
|
|
enabled INTEGER NOT NULL DEFAULT 1,
|
|
schedule_config TEXT NOT NULL DEFAULT '{}',
|
|
custom_config TEXT NOT NULL DEFAULT '{}',
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
|
|
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS agent_tasks (
|
|
id TEXT PRIMARY KEY,
|
|
agent_id TEXT NOT NULL,
|
|
task_type TEXT NOT NULL,
|
|
status TEXT NOT NULL DEFAULT 'pending',
|
|
input_data TEXT NOT NULL DEFAULT '{}',
|
|
result_data TEXT,
|
|
requires_approval INTEGER NOT NULL DEFAULT 0,
|
|
approved_at TEXT,
|
|
approved_via TEXT,
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
|
|
completed_at TEXT
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_tasks_agent
|
|
ON agent_tasks(agent_id, created_at DESC)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS agent_logs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
agent_id TEXT NOT NULL,
|
|
task_id TEXT,
|
|
level TEXT NOT NULL DEFAULT 'info',
|
|
message TEXT NOT NULL,
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS telegram_state (
|
|
callback_id TEXT PRIMARY KEY,
|
|
task_id TEXT NOT NULL,
|
|
agent_id TEXT NOT NULL,
|
|
action TEXT,
|
|
responded INTEGER NOT NULL DEFAULT 0,
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS conversation_messages (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
chat_id TEXT NOT NULL,
|
|
role TEXT NOT NULL,
|
|
content TEXT NOT NULL,
|
|
model TEXT,
|
|
tokens_input INTEGER DEFAULT 0,
|
|
tokens_output INTEGER DEFAULT 0,
|
|
cache_read INTEGER DEFAULT 0,
|
|
cache_write INTEGER DEFAULT 0,
|
|
latency_ms INTEGER DEFAULT 0,
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_conv_chat
|
|
ON conversation_messages(chat_id, created_at DESC)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS youtube_research_jobs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
status TEXT NOT NULL DEFAULT 'running',
|
|
countries TEXT NOT NULL DEFAULT '[]',
|
|
trends_collected INTEGER NOT NULL DEFAULT 0,
|
|
error TEXT,
|
|
started_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
|
|
completed_at TEXT
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS lotto_signals (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
triggered_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
|
|
source TEXT NOT NULL,
|
|
metric TEXT NOT NULL,
|
|
value REAL NOT NULL,
|
|
baseline_mu REAL,
|
|
baseline_sigma REAL,
|
|
z_score REAL,
|
|
fire_level TEXT NOT NULL,
|
|
notified_at TEXT,
|
|
payload TEXT
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_ls_triggered
|
|
ON lotto_signals(triggered_at DESC)
|
|
""")
|
|
conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_ls_fire
|
|
ON lotto_signals(fire_level, notified_at)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS lotto_baselines (
|
|
metric TEXT PRIMARY KEY,
|
|
window_values TEXT NOT NULL DEFAULT '[]',
|
|
mu REAL NOT NULL DEFAULT 0.0,
|
|
sigma REAL NOT NULL DEFAULT 0.0,
|
|
last_pushed_draw_no INTEGER,
|
|
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
)
|
|
""")
|
|
# Seed default agent configs
|
|
for agent_id, name in [
|
|
("stock", "주식 트레이더"),
|
|
("music", "음악 프로듀서"),
|
|
("blog", "블로그 마케터"),
|
|
("realestate", "청약 애널리스트"),
|
|
("lotto", "로또 큐레이터"),
|
|
("youtube", "YouTube 리서치"),
|
|
]:
|
|
conn.execute(
|
|
"INSERT OR IGNORE INTO agent_config(agent_id, display_name) VALUES(?,?)",
|
|
(agent_id, name),
|
|
)
|
|
|
|
|
|
# --- agent_config CRUD ---
|
|
|
|
def get_all_agents() -> List[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
rows = conn.execute("SELECT * FROM agent_config ORDER BY agent_id").fetchall()
|
|
return [_config_to_dict(r) for r in rows]
|
|
|
|
|
|
def get_agent_config(agent_id: str) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
r = conn.execute("SELECT * FROM agent_config WHERE agent_id=?", (agent_id,)).fetchone()
|
|
return _config_to_dict(r) if r else None
|
|
|
|
|
|
def update_agent_config(agent_id: str, **kwargs) -> None:
|
|
sets, vals = [], []
|
|
for k in ("enabled", "schedule_config", "custom_config"):
|
|
if k in kwargs and kwargs[k] is not None:
|
|
if k in ("schedule_config", "custom_config"):
|
|
sets.append(f"{k}=?")
|
|
vals.append(json.dumps(kwargs[k]))
|
|
else:
|
|
sets.append(f"{k}=?")
|
|
vals.append(kwargs[k])
|
|
if not sets:
|
|
return
|
|
sets.append("updated_at=strftime('%Y-%m-%dT%H:%M:%fZ','now')")
|
|
vals.append(agent_id)
|
|
with _conn() as conn:
|
|
conn.execute(f"UPDATE agent_config SET {','.join(sets)} WHERE agent_id=?", vals)
|
|
|
|
|
|
def _config_to_dict(r) -> Dict[str, Any]:
|
|
return {
|
|
"agent_id": r["agent_id"],
|
|
"display_name": r["display_name"],
|
|
"enabled": bool(r["enabled"]),
|
|
"schedule_config": json.loads(r["schedule_config"]),
|
|
"custom_config": json.loads(r["custom_config"]),
|
|
"created_at": r["created_at"],
|
|
"updated_at": r["updated_at"],
|
|
}
|
|
|
|
|
|
# --- agent_tasks CRUD ---
|
|
|
|
def create_task(agent_id: str, task_type: str, input_data: dict, requires_approval: bool = False) -> str:
|
|
task_id = str(uuid.uuid4())
|
|
status = "pending" if requires_approval else "working"
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"INSERT INTO agent_tasks(id,agent_id,task_type,status,input_data,requires_approval) VALUES(?,?,?,?,?,?)",
|
|
(task_id, agent_id, task_type, status, json.dumps(input_data), int(requires_approval)),
|
|
)
|
|
return task_id
|
|
|
|
|
|
def update_task_status(task_id: str, status: str, result_data: dict = None) -> None:
|
|
with _conn() as conn:
|
|
if result_data is not None:
|
|
conn.execute(
|
|
"UPDATE agent_tasks SET status=?, result_data=?, completed_at=strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id=?",
|
|
(status, json.dumps(result_data), task_id),
|
|
)
|
|
else:
|
|
conn.execute("UPDATE agent_tasks SET status=? WHERE id=?", (status, task_id))
|
|
|
|
|
|
def approve_task(task_id: str, via: str = "web") -> None:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"UPDATE agent_tasks SET status='approved', approved_at=strftime('%Y-%m-%dT%H:%M:%fZ','now'), approved_via=? WHERE id=?",
|
|
(via, task_id),
|
|
)
|
|
|
|
|
|
def reject_task(task_id: str) -> None:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"UPDATE agent_tasks SET status='rejected', completed_at=strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id=?",
|
|
(task_id,),
|
|
)
|
|
|
|
|
|
def get_task(task_id: str) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
r = conn.execute("SELECT * FROM agent_tasks WHERE id=?", (task_id,)).fetchone()
|
|
return _task_to_dict(r) if r else None
|
|
|
|
|
|
def get_agent_tasks(
|
|
agent_id: str,
|
|
limit: int = 20,
|
|
task_type: Optional[str] = None,
|
|
days: Optional[int] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
sql = "SELECT * FROM agent_tasks WHERE agent_id=?"
|
|
params: List[Any] = [agent_id]
|
|
if task_type is not None:
|
|
sql += " AND task_type=?"
|
|
params.append(task_type)
|
|
if days is not None and days > 0:
|
|
sql += " AND created_at >= datetime('now', ?)"
|
|
params.append(f"-{int(days)} days")
|
|
sql += " ORDER BY created_at DESC LIMIT ?"
|
|
params.append(limit)
|
|
with _conn() as conn:
|
|
rows = conn.execute(sql, params).fetchall()
|
|
return [_task_to_dict(r) for r in rows]
|
|
|
|
|
|
def get_pending_approvals() -> List[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"SELECT * FROM agent_tasks WHERE status='pending' AND requires_approval=1 ORDER BY created_at DESC"
|
|
).fetchall()
|
|
return [_task_to_dict(r) for r in rows]
|
|
|
|
|
|
def _task_to_dict(r) -> Dict[str, Any]:
|
|
return {
|
|
"id": r["id"],
|
|
"agent_id": r["agent_id"],
|
|
"task_type": r["task_type"],
|
|
"status": r["status"],
|
|
"input_data": json.loads(r["input_data"]) if r["input_data"] else {},
|
|
"result_data": json.loads(r["result_data"]) if r["result_data"] else None,
|
|
"requires_approval": bool(r["requires_approval"]),
|
|
"approved_at": r["approved_at"],
|
|
"approved_via": r["approved_via"],
|
|
"created_at": r["created_at"],
|
|
"completed_at": r["completed_at"],
|
|
}
|
|
|
|
|
|
# --- agent_logs ---
|
|
|
|
def add_log(agent_id: str, message: str, level: str = "info", task_id: str = None) -> None:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"INSERT INTO agent_logs(agent_id,task_id,level,message) VALUES(?,?,?,?)",
|
|
(agent_id, task_id, level, message),
|
|
)
|
|
|
|
|
|
def get_logs(agent_id: str, limit: int = 50) -> List[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"SELECT * FROM agent_logs WHERE agent_id=? ORDER BY created_at DESC LIMIT ?",
|
|
(agent_id, limit),
|
|
).fetchall()
|
|
return [
|
|
{
|
|
"id": r["id"],
|
|
"agent_id": r["agent_id"],
|
|
"task_id": r["task_id"],
|
|
"level": r["level"],
|
|
"message": r["message"],
|
|
"created_at": r["created_at"],
|
|
}
|
|
for r in rows
|
|
]
|
|
|
|
|
|
# --- telegram_state ---
|
|
|
|
def save_telegram_callback(callback_id: str, task_id: str, agent_id: str) -> None:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"INSERT OR REPLACE INTO telegram_state(callback_id,task_id,agent_id) VALUES(?,?,?)",
|
|
(callback_id, task_id, agent_id),
|
|
)
|
|
|
|
|
|
def get_telegram_callback(callback_id: str) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
r = conn.execute(
|
|
"SELECT * FROM telegram_state WHERE callback_id=? AND responded=0",
|
|
(callback_id,),
|
|
).fetchone()
|
|
if not r:
|
|
return None
|
|
return {
|
|
"callback_id": r["callback_id"],
|
|
"task_id": r["task_id"],
|
|
"agent_id": r["agent_id"],
|
|
"responded": bool(r["responded"]),
|
|
}
|
|
|
|
|
|
def mark_telegram_responded(callback_id: str, action: str) -> None:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"UPDATE telegram_state SET responded=1, action=? WHERE callback_id=?",
|
|
(action, callback_id),
|
|
)
|
|
|
|
|
|
def get_token_usage_stats(agent_id: str, days: int = 1) -> dict:
|
|
"""지정 에이전트의 최근 N일 토큰 사용량 집계.
|
|
|
|
agent_tasks 테이블의 result_data JSON에서 tokens.total을 합산.
|
|
반환: {"total_tokens": int, "task_count": int, "by_day": [{"date": "YYYY-MM-DD", "tokens": int}]}
|
|
"""
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT completed_at, result_data
|
|
FROM agent_tasks
|
|
WHERE agent_id = ?
|
|
AND status = 'succeeded'
|
|
AND completed_at IS NOT NULL
|
|
AND completed_at >= strftime('%Y-%m-%dT%H:%M:%fZ','now', ?)
|
|
""",
|
|
(agent_id, f"-{int(days)} days"),
|
|
).fetchall()
|
|
|
|
total_tokens = 0
|
|
task_count = 0
|
|
by_day_map: Dict[str, int] = {}
|
|
for r in rows:
|
|
result_data = r["result_data"]
|
|
if not result_data:
|
|
continue
|
|
try:
|
|
parsed = json.loads(result_data)
|
|
except Exception:
|
|
continue
|
|
tokens = parsed.get("tokens") if isinstance(parsed, dict) else None
|
|
total = 0
|
|
if isinstance(tokens, dict):
|
|
total = int(tokens.get("total", 0) or 0)
|
|
if total <= 0:
|
|
continue
|
|
total_tokens += total
|
|
task_count += 1
|
|
completed_at = r["completed_at"] or ""
|
|
day = completed_at[:10] if completed_at else "unknown"
|
|
by_day_map[day] = by_day_map.get(day, 0) + total
|
|
|
|
by_day = [
|
|
{"date": d, "tokens": t}
|
|
for d, t in sorted(by_day_map.items())
|
|
]
|
|
return {
|
|
"total_tokens": total_tokens,
|
|
"task_count": task_count,
|
|
"by_day": by_day,
|
|
}
|
|
|
|
|
|
def save_conversation_message(
|
|
chat_id: str,
|
|
role: str,
|
|
content: str,
|
|
model: Optional[str] = None,
|
|
tokens_input: int = 0,
|
|
tokens_output: int = 0,
|
|
cache_read: int = 0,
|
|
cache_write: int = 0,
|
|
latency_ms: int = 0,
|
|
) -> None:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO conversation_messages
|
|
(chat_id, role, content, model, tokens_input, tokens_output,
|
|
cache_read, cache_write, latency_ms)
|
|
VALUES (?,?,?,?,?,?,?,?,?)
|
|
""",
|
|
(str(chat_id), role, content, model, tokens_input, tokens_output,
|
|
cache_read, cache_write, latency_ms),
|
|
)
|
|
|
|
|
|
def get_conversation_history(chat_id: str, limit: int = 20) -> List[Dict[str, Any]]:
|
|
"""최근 N개를 시간순(오래된 → 최신)으로 반환."""
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT role, content FROM conversation_messages
|
|
WHERE chat_id=? ORDER BY id DESC LIMIT ?
|
|
""",
|
|
(str(chat_id), limit),
|
|
).fetchall()
|
|
return [{"role": r["role"], "content": r["content"]} for r in reversed(rows)]
|
|
|
|
|
|
def count_recent_user_messages(chat_id: str, seconds: int = 60) -> int:
|
|
with _conn() as conn:
|
|
r = conn.execute(
|
|
"""
|
|
SELECT COUNT(*) AS c FROM conversation_messages
|
|
WHERE chat_id=? AND role='user'
|
|
AND created_at >= strftime('%Y-%m-%dT%H:%M:%fZ','now', ?)
|
|
""",
|
|
(str(chat_id), f"-{int(seconds)} seconds"),
|
|
).fetchone()
|
|
return r["c"] if r else 0
|
|
|
|
|
|
def get_conversation_stats(days: int = 7) -> Dict[str, Any]:
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT chat_id,
|
|
COUNT(*) AS msg_count,
|
|
SUM(tokens_input) AS in_tokens,
|
|
SUM(tokens_output) AS out_tokens,
|
|
SUM(cache_read) AS cache_read,
|
|
SUM(cache_write) AS cache_write,
|
|
AVG(latency_ms) AS avg_latency
|
|
FROM conversation_messages
|
|
WHERE role='assistant'
|
|
AND created_at >= strftime('%Y-%m-%dT%H:%M:%fZ','now', ?)
|
|
GROUP BY chat_id
|
|
""",
|
|
(f"-{int(days)} days",),
|
|
).fetchall()
|
|
|
|
by_chat = []
|
|
tot_in = tot_out = tot_r = tot_w = tot_msgs = 0
|
|
for r in rows:
|
|
ci = int(r["in_tokens"] or 0)
|
|
co = int(r["out_tokens"] or 0)
|
|
cr = int(r["cache_read"] or 0)
|
|
cw = int(r["cache_write"] or 0)
|
|
mc = int(r["msg_count"] or 0)
|
|
hit_rate = (cr / (cr + cw)) if (cr + cw) > 0 else 0.0
|
|
by_chat.append({
|
|
"chat_id": r["chat_id"],
|
|
"message_count": mc,
|
|
"tokens_input": ci,
|
|
"tokens_output": co,
|
|
"cache_read": cr,
|
|
"cache_write": cw,
|
|
"cache_hit_rate": round(hit_rate, 3),
|
|
"avg_latency_ms": round(float(r["avg_latency"] or 0), 1),
|
|
})
|
|
tot_in += ci; tot_out += co; tot_r += cr; tot_w += cw; tot_msgs += mc
|
|
|
|
overall_hit = (tot_r / (tot_r + tot_w)) if (tot_r + tot_w) > 0 else 0.0
|
|
return {
|
|
"days": days,
|
|
"total_messages": tot_msgs,
|
|
"tokens_input": tot_in,
|
|
"tokens_output": tot_out,
|
|
"cache_read": tot_r,
|
|
"cache_write": tot_w,
|
|
"cache_hit_rate": round(overall_hit, 3),
|
|
"by_chat": by_chat,
|
|
}
|
|
|
|
|
|
def get_activity_feed(limit: int = 50, offset: int = 0) -> dict:
|
|
with _conn() as conn:
|
|
total_row = conn.execute("""
|
|
SELECT (SELECT COUNT(*) FROM agent_tasks) + (SELECT COUNT(*) FROM agent_logs) AS total
|
|
""").fetchone()
|
|
total = total_row["total"] if total_row else 0
|
|
|
|
rows = conn.execute("""
|
|
SELECT 'task' AS type, agent_id, id AS task_id, task_type,
|
|
status, NULL AS level,
|
|
COALESCE(
|
|
json_extract(result_data, '$.summary'),
|
|
task_type
|
|
) AS message,
|
|
created_at, completed_at,
|
|
result_data
|
|
FROM agent_tasks
|
|
UNION ALL
|
|
SELECT 'log' AS type, agent_id, task_id, NULL AS task_type,
|
|
NULL AS status, level,
|
|
message,
|
|
created_at, NULL AS completed_at,
|
|
NULL AS result_data
|
|
FROM agent_logs
|
|
ORDER BY created_at DESC
|
|
LIMIT ? OFFSET ?
|
|
""", (limit, offset)).fetchall()
|
|
|
|
items = []
|
|
for r in rows:
|
|
item = {
|
|
"type": r["type"],
|
|
"agent_id": r["agent_id"],
|
|
"task_id": r["task_id"],
|
|
"message": r["message"],
|
|
"created_at": r["created_at"],
|
|
}
|
|
if r["type"] == "task":
|
|
item["task_type"] = r["task_type"]
|
|
item["status"] = r["status"]
|
|
item["completed_at"] = r["completed_at"]
|
|
if r["created_at"] and r["completed_at"]:
|
|
try:
|
|
from datetime import datetime
|
|
start = datetime.fromisoformat(r["created_at"].replace("Z", "+00:00"))
|
|
end = datetime.fromisoformat(r["completed_at"].replace("Z", "+00:00"))
|
|
item["duration_seconds"] = round((end - start).total_seconds())
|
|
except Exception:
|
|
item["duration_seconds"] = None
|
|
else:
|
|
item["duration_seconds"] = None
|
|
result_data = json.loads(r["result_data"]) if r["result_data"] else None
|
|
if result_data and "telegram_sent" in result_data:
|
|
item["telegram_sent"] = result_data["telegram_sent"]
|
|
else:
|
|
item["level"] = r["level"]
|
|
items.append(item)
|
|
|
|
return {"items": items, "total": total}
|
|
|
|
|
|
# ── youtube_research_jobs CRUD ────────────────────────────────────────────────
|
|
|
|
def add_youtube_research_job(countries: list) -> int:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"INSERT INTO youtube_research_jobs (countries) VALUES (?)",
|
|
(json.dumps(countries),),
|
|
)
|
|
return conn.execute("SELECT last_insert_rowid()").fetchone()[0]
|
|
|
|
|
|
def update_youtube_research_job(
|
|
job_id: int, status: str, trends_collected: int, error: Optional[str] = None
|
|
) -> None:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""UPDATE youtube_research_jobs
|
|
SET status=?, trends_collected=?, error=?,
|
|
completed_at=strftime('%Y-%m-%dT%H:%M:%fZ','now')
|
|
WHERE id=?""",
|
|
(status, trends_collected, error, job_id),
|
|
)
|
|
|
|
|
|
def get_latest_youtube_research_job() -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
row = conn.execute(
|
|
"SELECT * FROM youtube_research_jobs ORDER BY id DESC LIMIT 1"
|
|
).fetchone()
|
|
if not row:
|
|
return None
|
|
return {
|
|
"id": row["id"],
|
|
"status": row["status"],
|
|
"countries": json.loads(row["countries"]),
|
|
"trends_collected": row["trends_collected"],
|
|
"error": row["error"],
|
|
"started_at": row["started_at"],
|
|
"completed_at": row["completed_at"],
|
|
}
|
|
|
|
|
|
# --- lotto_signals / lotto_baselines CRUD ---
|
|
|
|
def insert_lotto_signal(
|
|
source: str,
|
|
metric: str,
|
|
value: float,
|
|
baseline_mu: Optional[float],
|
|
baseline_sigma: Optional[float],
|
|
z_score: Optional[float],
|
|
fire_level: str,
|
|
payload: Optional[Dict[str, Any]] = None,
|
|
) -> int:
|
|
with _conn() as conn:
|
|
cur = conn.execute(
|
|
"""
|
|
INSERT INTO lotto_signals
|
|
(source, metric, value, baseline_mu, baseline_sigma, z_score, fire_level, payload)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
source, metric, value,
|
|
baseline_mu, baseline_sigma, z_score, fire_level,
|
|
json.dumps(payload or {}, ensure_ascii=False),
|
|
),
|
|
)
|
|
return cur.lastrowid
|
|
|
|
|
|
def mark_signal_notified(signal_id: int) -> None:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"UPDATE lotto_signals SET notified_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id = ?",
|
|
(signal_id,),
|
|
)
|
|
|
|
|
|
def get_recent_lotto_signals(hours: int = 24, min_fire: str = "normal") -> List[Dict[str, Any]]:
|
|
"""지난 N시간 발화 시그널. min_fire='normal'이면 normal+urgent."""
|
|
levels = ("urgent",) if min_fire == "urgent" else ("normal", "urgent")
|
|
placeholders = ",".join("?" * len(levels))
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
f"""
|
|
SELECT * FROM lotto_signals
|
|
WHERE triggered_at >= datetime('now', ?)
|
|
AND fire_level IN ({placeholders})
|
|
ORDER BY triggered_at DESC
|
|
""",
|
|
(f"-{int(hours)} hours", *levels),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def get_signals_history(days: int = 7) -> List[Dict[str, Any]]:
|
|
"""차트/이력 페이지용 — 모든 fire_level 포함."""
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT * FROM lotto_signals
|
|
WHERE triggered_at >= datetime('now', ?)
|
|
ORDER BY triggered_at DESC
|
|
""",
|
|
(f"-{int(days)} days",),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def get_recent_urgent_count(hours: int = 24) -> int:
|
|
with _conn() as conn:
|
|
row = conn.execute(
|
|
"""
|
|
SELECT COUNT(*) AS c FROM lotto_signals
|
|
WHERE triggered_at >= datetime('now', ?)
|
|
AND fire_level = 'urgent'
|
|
AND notified_at IS NOT NULL
|
|
""",
|
|
(f"-{int(hours)} hours",),
|
|
).fetchone()
|
|
return int(row["c"]) if row else 0
|
|
|
|
|
|
def get_last_signal_notification(metric: str, fire_level: str, hours: int) -> Optional[str]:
|
|
"""같은 metric+fire_level이 hours 내에 알림 발송된 마지막 시각. throttle용."""
|
|
with _conn() as conn:
|
|
row = conn.execute(
|
|
"""
|
|
SELECT notified_at FROM lotto_signals
|
|
WHERE metric = ?
|
|
AND fire_level = ?
|
|
AND notified_at IS NOT NULL
|
|
AND notified_at >= datetime('now', ?)
|
|
ORDER BY notified_at DESC LIMIT 1
|
|
""",
|
|
(metric, fire_level, f"-{int(hours)} hours"),
|
|
).fetchone()
|
|
return row["notified_at"] if row else None
|
|
|
|
|
|
def get_baseline(metric: str) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
row = conn.execute(
|
|
"SELECT * FROM lotto_baselines WHERE metric = ?",
|
|
(metric,),
|
|
).fetchone()
|
|
if not row:
|
|
return None
|
|
d = dict(row)
|
|
d["window_values"] = json.loads(d["window_values"])
|
|
return d
|
|
|
|
|
|
def upsert_baseline(
|
|
metric: str,
|
|
window_values: List[float],
|
|
mu: float,
|
|
sigma: float,
|
|
last_pushed_draw_no: Optional[int],
|
|
) -> None:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO lotto_baselines
|
|
(metric, window_values, mu, sigma, last_pushed_draw_no, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
ON CONFLICT(metric) DO UPDATE SET
|
|
window_values = excluded.window_values,
|
|
mu = excluded.mu,
|
|
sigma = excluded.sigma,
|
|
last_pushed_draw_no = excluded.last_pushed_draw_no,
|
|
updated_at = excluded.updated_at
|
|
""",
|
|
(
|
|
metric,
|
|
json.dumps(window_values),
|
|
mu, sigma, last_pushed_draw_no,
|
|
),
|
|
)
|
|
|
|
|
|
def get_all_baselines() -> List[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
rows = conn.execute("SELECT * FROM lotto_baselines ORDER BY metric").fetchall()
|
|
out = []
|
|
for r in rows:
|
|
d = dict(r)
|
|
d["window_values"] = json.loads(d["window_values"])
|
|
out.append(d)
|
|
return out
|
|
|
|
|
|
def get_tasks_by_agent_date_kind(agent_id: str, date_iso: str, task_type: str) -> List[Dict[str, Any]]:
|
|
"""같은 (agent, date, task_type)으로 이미 생성된 task 조회. 멱등 guard."""
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT * FROM agent_tasks
|
|
WHERE agent_id = ? AND task_type = ?
|
|
AND substr(created_at, 1, 10) = ?
|
|
ORDER BY created_at DESC
|
|
""",
|
|
(agent_id, task_type, date_iso),
|
|
).fetchall()
|
|
return [_task_to_dict(r) for r in rows]
|