Files
web-page-backend/insta-lab/app/db.py
gahusb b3982c8f72 feat(insta-lab): db migration — trending_keywords.source + account_preferences + CRUD
- Idempotent ALTER TABLE adds source column (default 'manual') + idx_tk_source index
- New account_preferences table seeded with economy/psychology/celebrity at weight=1.0
- add_trending_keyword now accepts optional source param
- New helpers: add_external_trend, list_trends, get_preferences, upsert_preferences
- test_db updated: six→seven tables; test_preferences_crud.py (7 new tests, all pass)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-16 17:44:01 +09:00

353 lines
15 KiB
Python

import os
import sqlite3
import json
import uuid
from typing import Any, Dict, List, Optional
from .config import DB_PATH
def _conn() -> sqlite3.Connection:
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH, timeout=120.0)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=120000")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def init_db() -> None:
with _conn() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS news_articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
category TEXT NOT NULL,
title TEXT NOT NULL,
link TEXT NOT NULL UNIQUE,
summary TEXT NOT NULL DEFAULT '',
pub_date TEXT,
fetched_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_na_category_fetched ON news_articles(category, fetched_at DESC)")
conn.execute("""
CREATE TABLE IF NOT EXISTS trending_keywords (
id INTEGER PRIMARY KEY AUTOINCREMENT,
keyword TEXT NOT NULL,
category TEXT NOT NULL,
score REAL NOT NULL DEFAULT 0,
articles_count INTEGER NOT NULL DEFAULT 0,
suggested_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
used INTEGER NOT NULL DEFAULT 0
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_tk_score ON trending_keywords(category, score DESC)")
conn.execute("""
CREATE TABLE IF NOT EXISTS card_slates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
keyword TEXT NOT NULL,
category TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'draft',
cover_copy TEXT NOT NULL DEFAULT '{}',
body_copies TEXT NOT NULL DEFAULT '[]',
cta_copy TEXT NOT NULL DEFAULT '{}',
suggested_caption TEXT NOT NULL DEFAULT '',
hashtags TEXT NOT NULL DEFAULT '[]',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_cs_created ON card_slates(created_at DESC)")
conn.execute("""
CREATE TABLE IF NOT EXISTS card_assets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
slate_id INTEGER NOT NULL REFERENCES card_slates(id) ON DELETE CASCADE,
page_index INTEGER NOT NULL,
file_path TEXT NOT NULL,
file_hash TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
UNIQUE (slate_id, page_index)
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_ca_slate ON card_assets(slate_id, page_index)")
conn.execute("""
CREATE TABLE IF NOT EXISTS generation_tasks (
id TEXT PRIMARY KEY,
type TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'queued',
progress INTEGER NOT NULL DEFAULT 0,
message TEXT NOT NULL DEFAULT '',
result_id INTEGER,
error TEXT,
params TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_gt_created ON generation_tasks(created_at DESC)")
conn.execute("""
CREATE TABLE IF NOT EXISTS prompt_templates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
description TEXT NOT NULL DEFAULT '',
template TEXT NOT NULL DEFAULT '',
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
# source column for trending_keywords (idempotent ALTER)
cols = [r[1] for r in conn.execute("PRAGMA table_info(trending_keywords)").fetchall()]
if "source" not in cols:
conn.execute("ALTER TABLE trending_keywords ADD COLUMN source TEXT NOT NULL DEFAULT 'manual'")
conn.execute("CREATE INDEX IF NOT EXISTS idx_tk_source ON trending_keywords(source, suggested_at DESC)")
# account_preferences — 카테고리 가중치
conn.execute("""
CREATE TABLE IF NOT EXISTS account_preferences (
category TEXT PRIMARY KEY,
weight REAL NOT NULL DEFAULT 1.0,
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
# seed defaults if table empty
existing = conn.execute("SELECT COUNT(*) FROM account_preferences").fetchone()[0]
if existing == 0:
for cat in ("economy", "psychology", "celebrity"):
conn.execute(
"INSERT INTO account_preferences(category, weight) VALUES(?,?)",
(cat, 1.0),
)
# ── news_articles ────────────────────────────────────────────────
def add_news_article(row: Dict[str, Any]) -> int:
with _conn() as conn:
try:
cur = conn.execute(
"INSERT INTO news_articles(category, title, link, summary, pub_date) VALUES(?,?,?,?,?)",
(row["category"], row["title"], row["link"], row.get("summary", ""), row.get("pub_date")),
)
return cur.lastrowid
except sqlite3.IntegrityError:
existing = conn.execute("SELECT id FROM news_articles WHERE link=?", (row["link"],)).fetchone()
return existing["id"] if existing else 0
def list_news_articles(category: Optional[str] = None, days: int = 1) -> List[Dict[str, Any]]:
sql = "SELECT * FROM news_articles WHERE fetched_at >= datetime('now', ?)"
params: List[Any] = [f"-{int(days)} days"]
if category:
sql += " AND category=?"
params.append(category)
sql += " ORDER BY fetched_at DESC"
with _conn() as conn:
rows = conn.execute(sql, params).fetchall()
return [dict(r) for r in rows]
# ── trending_keywords ───────────────────────────────────────────
def add_trending_keyword(row: Dict[str, Any]) -> int:
with _conn() as conn:
cur = conn.execute(
"INSERT INTO trending_keywords(keyword, category, score, articles_count, source) VALUES(?,?,?,?,?)",
(
row["keyword"], row["category"],
float(row.get("score", 0.0)), int(row.get("articles_count", 0)),
row.get("source", "manual"),
),
)
return cur.lastrowid
def list_trending_keywords(category: Optional[str] = None, used: Optional[bool] = None) -> List[Dict[str, Any]]:
sql = "SELECT * FROM trending_keywords WHERE 1=1"
params: List[Any] = []
if category:
sql += " AND category=?"
params.append(category)
if used is not None:
sql += " AND used=?"
params.append(1 if used else 0)
sql += " ORDER BY score DESC, suggested_at DESC"
with _conn() as conn:
rows = conn.execute(sql, params).fetchall()
return [dict(r) for r in rows]
def mark_keyword_used(keyword_id: int) -> None:
with _conn() as conn:
conn.execute("UPDATE trending_keywords SET used=1 WHERE id=?", (keyword_id,))
def get_trending_keyword(keyword_id: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM trending_keywords WHERE id=?", (keyword_id,)).fetchone()
return dict(row) if row else None
# ── card_slates ─────────────────────────────────────────────────
def add_card_slate(row: Dict[str, Any]) -> int:
with _conn() as conn:
cur = conn.execute("""
INSERT INTO card_slates(keyword, category, status, cover_copy, body_copies, cta_copy,
suggested_caption, hashtags)
VALUES(?,?,?,?,?,?,?,?)
""", (
row["keyword"], row["category"], row.get("status", "draft"),
json.dumps(row.get("cover_copy", {}), ensure_ascii=False),
json.dumps(row.get("body_copies", []), ensure_ascii=False),
json.dumps(row.get("cta_copy", {}), ensure_ascii=False),
row.get("suggested_caption", ""),
json.dumps(row.get("hashtags", []), ensure_ascii=False),
))
return cur.lastrowid
def update_slate_status(slate_id: int, status: str) -> None:
with _conn() as conn:
conn.execute(
"UPDATE card_slates SET status=?, updated_at=strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id=?",
(status, slate_id),
)
def get_card_slate(slate_id: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM card_slates WHERE id=?", (slate_id,)).fetchone()
return dict(row) if row else None
def list_card_slates(limit: int = 50) -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM card_slates ORDER BY created_at DESC LIMIT ?",
(limit,),
).fetchall()
return [dict(r) for r in rows]
def delete_card_slate(slate_id: int) -> None:
with _conn() as conn:
conn.execute("DELETE FROM card_slates WHERE id=?", (slate_id,))
# ── card_assets ─────────────────────────────────────────────────
def add_card_asset(slate_id: int, page_index: int, file_path: str, file_hash: str = "") -> int:
with _conn() as conn:
cur = conn.execute("""
INSERT INTO card_assets(slate_id, page_index, file_path, file_hash)
VALUES(?,?,?,?)
ON CONFLICT(slate_id, page_index) DO UPDATE SET
file_path=excluded.file_path, file_hash=excluded.file_hash
""", (slate_id, page_index, file_path, file_hash))
return cur.lastrowid
def list_card_assets(slate_id: int) -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM card_assets WHERE slate_id=? ORDER BY page_index ASC",
(slate_id,),
).fetchall()
return [dict(r) for r in rows]
# ── generation_tasks ────────────────────────────────────────────
def create_task(task_type: str, params: Dict[str, Any]) -> str:
tid = uuid.uuid4().hex
with _conn() as conn:
conn.execute(
"INSERT INTO generation_tasks(id, type, params) VALUES(?,?,?)",
(tid, task_type, json.dumps(params, ensure_ascii=False)),
)
return tid
def update_task(task_id: str, status: str, progress: int = 0, message: str = "",
result_id: Optional[int] = None, error: Optional[str] = None) -> None:
with _conn() as conn:
conn.execute("""
UPDATE generation_tasks
SET status=?, progress=?, message=?, result_id=?, error=?,
updated_at=strftime('%Y-%m-%dT%H:%M:%fZ','now')
WHERE id=?
""", (status, progress, message, result_id, error, task_id))
def get_task(task_id: str) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM generation_tasks WHERE id=?", (task_id,)).fetchone()
return dict(row) if row else None
# ── prompt_templates ────────────────────────────────────────────
def upsert_prompt_template(name: str, template: str, description: str = "") -> None:
with _conn() as conn:
conn.execute("""
INSERT INTO prompt_templates(name, description, template)
VALUES(?,?,?)
ON CONFLICT(name) DO UPDATE SET
template=excluded.template,
description=excluded.description,
updated_at=strftime('%Y-%m-%dT%H:%M:%fZ','now')
""", (name, description, template))
def get_prompt_template(name: str) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM prompt_templates WHERE name=?", (name,)).fetchone()
return dict(row) if row else None
# ── external trends ─────────────────────────────────────────────
def add_external_trend(row: Dict[str, Any]) -> int:
"""`source` 필수 — naver_popular | google_trends. trending_keywords에 인서트."""
if "source" not in row:
raise ValueError("add_external_trend requires 'source' field")
return add_trending_keyword(row)
def list_trends(source: Optional[str] = None, category: Optional[str] = None,
days: int = 1) -> List[Dict[str, Any]]:
sql = "SELECT * FROM trending_keywords WHERE suggested_at >= datetime('now', ?)"
params: List[Any] = [f"-{int(days)} days"]
if source and source != "all":
sql += " AND source=?"
params.append(source)
if category:
sql += " AND category=?"
params.append(category)
sql += " ORDER BY suggested_at DESC, score DESC"
with _conn() as conn:
rows = conn.execute(sql, params).fetchall()
return [dict(r) for r in rows]
# ── account_preferences ─────────────────────────────────────────
def get_preferences() -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute(
"SELECT category, weight, updated_at FROM account_preferences ORDER BY category ASC"
).fetchall()
return [dict(r) for r in rows]
def upsert_preferences(weights: Dict[str, float]) -> None:
"""전체 upsert. 기존에 있던 카테고리는 weight 갱신, 신규는 INSERT.
명시되지 않은 기존 카테고리는 그대로 둔다 (삭제 X). 삭제 필요 시 별도 API로."""
with _conn() as conn:
for cat, w in weights.items():
conn.execute("""
INSERT INTO account_preferences(category, weight)
VALUES(?,?)
ON CONFLICT(category) DO UPDATE SET
weight=excluded.weight,
updated_at=strftime('%Y-%m-%dT%H:%M:%fZ','now')
""", (cat, float(w)))