Files
gahusb 2b5009f864 fix(sqlite): WAL + busy_timeout 120s standardized across all labs
8개 lab의 _conn() 함수에 표준 동시성 패턴 통일:
- timeout=120.0 (connection 획득)
- PRAGMA journal_mode=WAL (reader/writer 분리)
- PRAGMA busy_timeout=120000 (트랜잭션 충돌 시 120초 대기)

stock-lab/screener/router.py 의 검증된 패턴(d9b6122) 을 lotto, stock-lab(메인),
music-lab, blog-lab, realestate-lab, agent-office, personal, travel-proxy 로 확산.
기존 'database is locked' 오류 윈도우를 흡수.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 22:12:01 +09:00

484 lines
20 KiB
Python

import sqlite3
import json
import logging
from typing import Dict, Any, List, Optional
logger = logging.getLogger("personal")
DB_PATH = "/app/data/personal.db"
def _conn():
c = sqlite3.connect(DB_PATH, timeout=120.0)
c.row_factory = sqlite3.Row
c.execute("PRAGMA journal_mode=WAL;")
c.execute("PRAGMA busy_timeout=120000;")
c.execute("PRAGMA foreign_keys=ON;")
return c
def _row_to_dict(r) -> Dict[str, Any]:
if r is None:
return None
d = {c: r[c] for c in r.keys()}
if "tech_stack" in d and isinstance(d["tech_stack"], str):
try:
d["tech_stack"] = json.loads(d["tech_stack"])
except (json.JSONDecodeError, TypeError):
d["tech_stack"] = []
return d
def init_db():
with _conn() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS profile (
id INTEGER PRIMARY KEY CHECK (id = 1),
name TEXT NOT NULL DEFAULT '',
name_en TEXT NOT NULL DEFAULT '',
role TEXT NOT NULL DEFAULT '',
role_en TEXT NOT NULL DEFAULT '',
email TEXT NOT NULL DEFAULT '',
phone TEXT NOT NULL DEFAULT '',
github_url TEXT NOT NULL DEFAULT '',
blog_url TEXT NOT NULL DEFAULT '',
photo_url TEXT NOT NULL DEFAULT '',
bio TEXT NOT NULL DEFAULT '',
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("""
INSERT OR IGNORE INTO profile (id) VALUES (1)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS careers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
category TEXT NOT NULL DEFAULT 'company',
organization TEXT NOT NULL DEFAULT '',
role TEXT NOT NULL DEFAULT '',
description TEXT NOT NULL DEFAULT '',
start_date TEXT NOT NULL DEFAULT '',
end_date TEXT NOT NULL DEFAULT '',
sort_order INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
category TEXT NOT NULL DEFAULT 'personal',
title TEXT NOT NULL DEFAULT '',
description TEXT NOT NULL DEFAULT '',
tech_stack TEXT NOT NULL DEFAULT '[]',
role TEXT NOT NULL DEFAULT '',
start_date TEXT NOT NULL DEFAULT '',
end_date TEXT NOT NULL DEFAULT '',
url TEXT NOT NULL DEFAULT '',
image_url TEXT NOT NULL DEFAULT '',
sort_order INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS skills (
id INTEGER PRIMARY KEY AUTOINCREMENT,
category TEXT NOT NULL DEFAULT 'language',
name TEXT NOT NULL DEFAULT '',
level INTEGER NOT NULL DEFAULT 3,
sort_order INTEGER NOT NULL DEFAULT 0
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS introductions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL DEFAULT '',
content TEXT NOT NULL DEFAULT '',
is_main INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
# ── todos 테이블 ──
conn.execute("""
CREATE TABLE IF NOT EXISTS todos (
id TEXT PRIMARY KEY
DEFAULT (lower(hex(randomblob(4))) || '-' || lower(hex(randomblob(2)))),
title TEXT NOT NULL,
description TEXT,
status TEXT NOT NULL DEFAULT 'todo'
CHECK(status IN ('todo','in_progress','done')),
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_todos_created ON todos(created_at DESC)")
# ── blog_posts 테이블 ──
conn.execute("""
CREATE TABLE IF NOT EXISTS blog_posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
body TEXT NOT NULL DEFAULT '',
excerpt TEXT NOT NULL DEFAULT '',
tags TEXT NOT NULL DEFAULT '[]',
date TEXT NOT NULL DEFAULT (date('now','localtime')),
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_blog_date ON blog_posts(date DESC)")
logger.info("personal DB initialized")
# ── Profile ──
def get_profile() -> Dict[str, Any]:
with _conn() as conn:
row = conn.execute("SELECT * FROM profile WHERE id = 1").fetchone()
return _row_to_dict(row)
def update_profile(data: Dict[str, Any]) -> Dict[str, Any]:
fields = {k: v for k, v in data.items() if k != "id" and v is not None}
if not fields:
return get_profile()
set_clauses = ", ".join(f"{k} = ?" for k in fields)
set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')"
with _conn() as conn:
conn.execute(
f"UPDATE profile SET {set_clauses} WHERE id = 1",
list(fields.values()),
)
return get_profile()
# ── Careers ──
def get_careers() -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute("SELECT * FROM careers ORDER BY sort_order, start_date DESC").fetchall()
return [_row_to_dict(r) for r in rows]
def create_career(data: Dict[str, Any]) -> Dict[str, Any]:
with _conn() as conn:
conn.execute(
"""INSERT INTO careers (category, organization, role, description, start_date, end_date, sort_order)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(data.get("category", "company"), data.get("organization", ""),
data.get("role", ""), data.get("description", ""),
data.get("start_date", ""), data.get("end_date", ""),
data.get("sort_order", 0)),
)
row = conn.execute("SELECT * FROM careers ORDER BY id DESC LIMIT 1").fetchone()
return _row_to_dict(row)
def update_career(career_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
fields = {k: v for k, v in data.items() if k not in ("id", "created_at") and v is not None}
if not fields:
return get_career(career_id)
set_clauses = ", ".join(f"{k} = ?" for k in fields)
set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')"
with _conn() as conn:
existing = conn.execute("SELECT id FROM careers WHERE id = ?", (career_id,)).fetchone()
if not existing:
return None
conn.execute(f"UPDATE careers SET {set_clauses} WHERE id = ?", list(fields.values()) + [career_id])
row = conn.execute("SELECT * FROM careers WHERE id = ?", (career_id,)).fetchone()
return _row_to_dict(row)
def delete_career(career_id: int) -> bool:
with _conn() as conn:
cur = conn.execute("DELETE FROM careers WHERE id = ?", (career_id,))
return cur.rowcount > 0
def get_career(career_id: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM careers WHERE id = ?", (career_id,)).fetchone()
return _row_to_dict(row)
# ── Projects ──
def get_projects() -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute("SELECT * FROM projects ORDER BY sort_order, start_date DESC").fetchall()
return [_row_to_dict(r) for r in rows]
def create_project(data: Dict[str, Any]) -> Dict[str, Any]:
tech = json.dumps(data.get("tech_stack", []), ensure_ascii=False)
with _conn() as conn:
conn.execute(
"""INSERT INTO projects (category, title, description, tech_stack, role, start_date, end_date, url, image_url, sort_order)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(data.get("category", "personal"), data.get("title", ""),
data.get("description", ""), tech,
data.get("role", ""), data.get("start_date", ""),
data.get("end_date", ""), data.get("url", ""),
data.get("image_url", ""), data.get("sort_order", 0)),
)
row = conn.execute("SELECT * FROM projects ORDER BY id DESC LIMIT 1").fetchone()
return _row_to_dict(row)
def update_project(project_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
fields = {k: v for k, v in data.items() if k not in ("id", "created_at") and v is not None}
if "tech_stack" in fields and isinstance(fields["tech_stack"], list):
fields["tech_stack"] = json.dumps(fields["tech_stack"], ensure_ascii=False)
if not fields:
return get_project(project_id)
set_clauses = ", ".join(f"{k} = ?" for k in fields)
set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')"
with _conn() as conn:
existing = conn.execute("SELECT id FROM projects WHERE id = ?", (project_id,)).fetchone()
if not existing:
return None
conn.execute(f"UPDATE projects SET {set_clauses} WHERE id = ?", list(fields.values()) + [project_id])
row = conn.execute("SELECT * FROM projects WHERE id = ?", (project_id,)).fetchone()
return _row_to_dict(row)
def delete_project(project_id: int) -> bool:
with _conn() as conn:
cur = conn.execute("DELETE FROM projects WHERE id = ?", (project_id,))
return cur.rowcount > 0
def get_project(project_id: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM projects WHERE id = ?", (project_id,)).fetchone()
return _row_to_dict(row)
# ── Skills ──
def get_skills() -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute("SELECT * FROM skills ORDER BY sort_order, category, name").fetchall()
return [_row_to_dict(r) for r in rows]
def create_skill(data: Dict[str, Any]) -> Dict[str, Any]:
with _conn() as conn:
conn.execute(
"INSERT INTO skills (category, name, level, sort_order) VALUES (?, ?, ?, ?)",
(data.get("category", "language"), data.get("name", ""),
data.get("level", 3), data.get("sort_order", 0)),
)
row = conn.execute("SELECT * FROM skills ORDER BY id DESC LIMIT 1").fetchone()
return _row_to_dict(row)
def update_skill(skill_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
fields = {k: v for k, v in data.items() if k != "id" and v is not None}
if not fields:
return get_skill(skill_id)
set_clauses = ", ".join(f"{k} = ?" for k in fields)
with _conn() as conn:
existing = conn.execute("SELECT id FROM skills WHERE id = ?", (skill_id,)).fetchone()
if not existing:
return None
conn.execute(f"UPDATE skills SET {set_clauses} WHERE id = ?", list(fields.values()) + [skill_id])
row = conn.execute("SELECT * FROM skills WHERE id = ?", (skill_id,)).fetchone()
return _row_to_dict(row)
def delete_skill(skill_id: int) -> bool:
with _conn() as conn:
cur = conn.execute("DELETE FROM skills WHERE id = ?", (skill_id,))
return cur.rowcount > 0
def get_skill(skill_id: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM skills WHERE id = ?", (skill_id,)).fetchone()
return _row_to_dict(row)
# ── Introductions ──
def get_introductions() -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute("SELECT * FROM introductions ORDER BY is_main DESC, updated_at DESC").fetchall()
return [_row_to_dict(r) for r in rows]
def create_introduction(data: Dict[str, Any]) -> Dict[str, Any]:
with _conn() as conn:
conn.execute(
"INSERT INTO introductions (title, content, is_main) VALUES (?, ?, ?)",
(data.get("title", ""), data.get("content", ""), data.get("is_main", 0)),
)
row = conn.execute("SELECT * FROM introductions ORDER BY id DESC LIMIT 1").fetchone()
return _row_to_dict(row)
def update_introduction(intro_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
fields = {k: v for k, v in data.items() if k not in ("id", "created_at") and v is not None}
if not fields:
return get_introduction(intro_id)
set_clauses = ", ".join(f"{k} = ?" for k in fields)
set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')"
with _conn() as conn:
existing = conn.execute("SELECT id FROM introductions WHERE id = ?", (intro_id,)).fetchone()
if not existing:
return None
conn.execute(f"UPDATE introductions SET {set_clauses} WHERE id = ?", list(fields.values()) + [intro_id])
row = conn.execute("SELECT * FROM introductions WHERE id = ?", (intro_id,)).fetchone()
return _row_to_dict(row)
def delete_introduction(intro_id: int) -> bool:
with _conn() as conn:
cur = conn.execute("DELETE FROM introductions WHERE id = ?", (intro_id,))
return cur.rowcount > 0
def get_introduction(intro_id: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM introductions WHERE id = ?", (intro_id,)).fetchone()
return _row_to_dict(row)
def set_main_introduction(intro_id: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
existing = conn.execute("SELECT id FROM introductions WHERE id = ?", (intro_id,)).fetchone()
if not existing:
return None
conn.execute("UPDATE introductions SET is_main = 0 WHERE is_main = 1")
conn.execute("UPDATE introductions SET is_main = 1, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id = ?", (intro_id,))
row = conn.execute("SELECT * FROM introductions WHERE id = ?", (intro_id,)).fetchone()
return _row_to_dict(row)
# ── Public (일괄 조회) ──
# ── Todos ──
def _todo_row_to_dict(r) -> Dict[str, Any]:
return {
"id": r["id"], "title": r["title"], "description": r["description"],
"status": r["status"], "created_at": r["created_at"], "updated_at": r["updated_at"],
}
def get_all_todos() -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute("SELECT * FROM todos ORDER BY created_at DESC").fetchall()
return [_todo_row_to_dict(r) for r in rows]
def create_todo(title: str, description: Optional[str], status: str) -> Dict[str, Any]:
with _conn() as conn:
conn.execute("INSERT INTO todos (title, description, status) VALUES (?, ?, ?)", (title, description, status))
row = conn.execute("SELECT * FROM todos WHERE rowid = last_insert_rowid()").fetchone()
return _todo_row_to_dict(row)
def update_todo(todo_id: str, fields: Dict[str, Any]) -> Optional[Dict[str, Any]]:
allowed = {"title", "description", "status"}
updates = {k: v for k, v in fields.items() if k in allowed}
if not updates:
with _conn() as conn:
row = conn.execute("SELECT * FROM todos WHERE id = ?", (todo_id,)).fetchone()
return _todo_row_to_dict(row) if row else None
set_clauses = ", ".join(f"{k} = ?" for k in updates)
set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')"
args = list(updates.values()) + [todo_id]
with _conn() as conn:
conn.execute(f"UPDATE todos SET {set_clauses} WHERE id = ?", args)
row = conn.execute("SELECT * FROM todos WHERE id = ?", (todo_id,)).fetchone()
return _todo_row_to_dict(row) if row else None
def delete_todo(todo_id: str) -> bool:
with _conn() as conn:
cur = conn.execute("DELETE FROM todos WHERE id = ?", (todo_id,))
return cur.rowcount > 0
def delete_done_todos() -> int:
with _conn() as conn:
cur = conn.execute("DELETE FROM todos WHERE status = 'done'")
return cur.rowcount
# ── Blog Posts ──
def _post_row_to_dict(r) -> Dict[str, Any]:
return {
"id": r["id"], "title": r["title"], "body": r["body"],
"excerpt": r["excerpt"], "tags": json.loads(r["tags"]) if r["tags"] else [],
"date": r["date"], "created_at": r["created_at"], "updated_at": r["updated_at"],
}
def get_all_posts() -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute("SELECT * FROM blog_posts ORDER BY date DESC, id DESC").fetchall()
return [_post_row_to_dict(r) for r in rows]
def create_post(title: str, body: str, excerpt: str, tags: List[str], date: str) -> Dict[str, Any]:
with _conn() as conn:
conn.execute(
"INSERT INTO blog_posts (title, body, excerpt, tags, date) VALUES (?, ?, ?, ?, ?)",
(title, body, excerpt, json.dumps(tags), date),
)
row = conn.execute("SELECT * FROM blog_posts WHERE rowid = last_insert_rowid()").fetchone()
return _post_row_to_dict(row)
def update_post(post_id: int, fields: Dict[str, Any]) -> Optional[Dict[str, Any]]:
allowed = {"title", "body", "excerpt", "tags", "date"}
updates = {k: v for k, v in fields.items() if k in allowed}
if not updates:
with _conn() as conn:
row = conn.execute("SELECT * FROM blog_posts WHERE id = ?", (post_id,)).fetchone()
return _post_row_to_dict(row) if row else None
if "tags" in updates:
updates["tags"] = json.dumps(updates["tags"])
set_clauses = ", ".join(f"{k} = ?" for k in updates)
set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')"
args = list(updates.values()) + [post_id]
with _conn() as conn:
conn.execute(f"UPDATE blog_posts SET {set_clauses} WHERE id = ?", args)
row = conn.execute("SELECT * FROM blog_posts WHERE id = ?", (post_id,)).fetchone()
return _post_row_to_dict(row) if row else None
def delete_post(post_id: int) -> bool:
with _conn() as conn:
cur = conn.execute("DELETE FROM blog_posts WHERE id = ?", (post_id,))
return cur.rowcount > 0
# ── Public (일괄 조회) ──
def get_public_data() -> Dict[str, Any]:
with _conn() as conn:
profile = _row_to_dict(conn.execute("SELECT * FROM profile WHERE id = 1").fetchone())
careers = [_row_to_dict(r) for r in conn.execute("SELECT * FROM careers ORDER BY sort_order, start_date DESC").fetchall()]
projects = [_row_to_dict(r) for r in conn.execute("SELECT * FROM projects ORDER BY sort_order, start_date DESC").fetchall()]
skills = [_row_to_dict(r) for r in conn.execute("SELECT * FROM skills ORDER BY sort_order, category, name").fetchall()]
main_intro_row = conn.execute("SELECT * FROM introductions WHERE is_main = 1 LIMIT 1").fetchone()
main_introduction = _row_to_dict(main_intro_row) if main_intro_row else None
return {
"profile": profile,
"careers": careers,
"projects": projects,
"skills": skills,
"main_introduction": main_introduction,
}