refactor: portfolio → personal 리네이밍 + Blog/Todo 통합
- portfolio/ 디렉토리를 personal/로 리네이밍 - lotto-backend의 Blog/Todo 라우트·CRUD를 personal 서비스로 이전 - lotto-backend에서 Blog/Todo 코드 제거 (DB 테이블 스키마는 유지) - nginx: /api/todos, /api/blog/ 라우팅을 personal로 추가 - docker-compose: portfolio → personal 서비스 변�� - deploy 스크립트: portfolio → personal 반영 데이터 마이그레이션은 배포 후 NAS에서 별도 수행 필요: 1. cp data/portfolio/portfolio.db data/personal/personal.db 2. sqlite3 data/lotto.db ".dump todos" | sqlite3 data/personal/personal.db 3. sqlite3 data/lotto.db ".dump blog_posts" | sqlite3 data/personal/personal.db Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -298,133 +298,6 @@ def init_db() -> None:
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS idx_briefings_draw ON lotto_briefings(draw_no DESC)")
|
||||
|
||||
|
||||
# ── todos CRUD ───────────────────────────────────────────────────────────────
|
||||
|
||||
def _todo_row_to_dict(r) -> Dict[str, Any]:
|
||||
return {
|
||||
"id": r["id"],
|
||||
"title": r["title"],
|
||||
"description": r["description"],
|
||||
"status": r["status"],
|
||||
"created_at": r["created_at"],
|
||||
"updated_at": r["updated_at"],
|
||||
}
|
||||
|
||||
|
||||
def get_all_todos() -> List[Dict[str, Any]]:
|
||||
with _conn() as conn:
|
||||
rows = conn.execute(
|
||||
"SELECT * FROM todos ORDER BY created_at DESC"
|
||||
).fetchall()
|
||||
return [_todo_row_to_dict(r) for r in rows]
|
||||
|
||||
|
||||
def create_todo(title: str, description: Optional[str], status: str) -> Dict[str, Any]:
|
||||
with _conn() as conn:
|
||||
conn.execute(
|
||||
"INSERT INTO todos (title, description, status) VALUES (?, ?, ?)",
|
||||
(title, description, status),
|
||||
)
|
||||
row = conn.execute(
|
||||
"SELECT * FROM todos WHERE rowid = last_insert_rowid()"
|
||||
).fetchone()
|
||||
return _todo_row_to_dict(row)
|
||||
|
||||
|
||||
def update_todo(todo_id: str, fields: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||||
"""fields에 있는 항목만 업데이트 (PATCH 방식), updated_at 자동 갱신"""
|
||||
allowed = {"title", "description", "status"}
|
||||
updates = {k: v for k, v in fields.items() if k in allowed}
|
||||
if not updates:
|
||||
with _conn() as conn:
|
||||
row = conn.execute("SELECT * FROM todos WHERE id = ?", (todo_id,)).fetchone()
|
||||
return _todo_row_to_dict(row) if row else None
|
||||
|
||||
set_clauses = ", ".join(f"{k} = ?" for k in updates)
|
||||
set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')"
|
||||
args = list(updates.values()) + [todo_id]
|
||||
|
||||
with _conn() as conn:
|
||||
conn.execute(
|
||||
f"UPDATE todos SET {set_clauses} WHERE id = ?",
|
||||
args,
|
||||
)
|
||||
row = conn.execute("SELECT * FROM todos WHERE id = ?", (todo_id,)).fetchone()
|
||||
return _todo_row_to_dict(row) if row else None
|
||||
|
||||
|
||||
def delete_todo(todo_id: str) -> bool:
|
||||
with _conn() as conn:
|
||||
cur = conn.execute("DELETE FROM todos WHERE id = ?", (todo_id,))
|
||||
return cur.rowcount > 0
|
||||
|
||||
|
||||
def delete_done_todos() -> int:
|
||||
with _conn() as conn:
|
||||
cur = conn.execute("DELETE FROM todos WHERE status = 'done'")
|
||||
return cur.rowcount
|
||||
|
||||
|
||||
# ── blog_posts CRUD ──────────────────────────────────────────────────────────
|
||||
|
||||
def _post_row_to_dict(r) -> Dict[str, Any]:
|
||||
return {
|
||||
"id": r["id"],
|
||||
"title": r["title"],
|
||||
"body": r["body"],
|
||||
"excerpt": r["excerpt"],
|
||||
"tags": json.loads(r["tags"]) if r["tags"] else [],
|
||||
"date": r["date"],
|
||||
"created_at": r["created_at"],
|
||||
"updated_at": r["updated_at"],
|
||||
}
|
||||
|
||||
|
||||
def get_all_posts() -> List[Dict[str, Any]]:
|
||||
with _conn() as conn:
|
||||
rows = conn.execute(
|
||||
"SELECT * FROM blog_posts ORDER BY date DESC, id DESC"
|
||||
).fetchall()
|
||||
return [_post_row_to_dict(r) for r in rows]
|
||||
|
||||
|
||||
def create_post(title: str, body: str, excerpt: str, tags: List[str], date: str) -> Dict[str, Any]:
|
||||
with _conn() as conn:
|
||||
conn.execute(
|
||||
"INSERT INTO blog_posts (title, body, excerpt, tags, date) VALUES (?, ?, ?, ?, ?)",
|
||||
(title, body, excerpt, json.dumps(tags), date),
|
||||
)
|
||||
row = conn.execute(
|
||||
"SELECT * FROM blog_posts WHERE rowid = last_insert_rowid()"
|
||||
).fetchone()
|
||||
return _post_row_to_dict(row)
|
||||
|
||||
|
||||
def update_post(post_id: int, fields: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||||
allowed = {"title", "body", "excerpt", "tags", "date"}
|
||||
updates = {k: v for k, v in fields.items() if k in allowed}
|
||||
if not updates:
|
||||
with _conn() as conn:
|
||||
row = conn.execute("SELECT * FROM blog_posts WHERE id = ?", (post_id,)).fetchone()
|
||||
return _post_row_to_dict(row) if row else None
|
||||
|
||||
if "tags" in updates:
|
||||
updates["tags"] = json.dumps(updates["tags"])
|
||||
|
||||
set_clauses = ", ".join(f"{k} = ?" for k in updates)
|
||||
set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')"
|
||||
args = list(updates.values()) + [post_id]
|
||||
|
||||
with _conn() as conn:
|
||||
conn.execute(f"UPDATE blog_posts SET {set_clauses} WHERE id = ?", args)
|
||||
row = conn.execute("SELECT * FROM blog_posts WHERE id = ?", (post_id,)).fetchone()
|
||||
return _post_row_to_dict(row) if row else None
|
||||
|
||||
|
||||
def delete_post(post_id: int) -> bool:
|
||||
with _conn() as conn:
|
||||
cur = conn.execute("DELETE FROM blog_posts WHERE id = ?", (post_id,))
|
||||
return cur.rowcount > 0
|
||||
|
||||
|
||||
def upsert_draw(row: Dict[str, Any]) -> None:
|
||||
|
||||
Reference in New Issue
Block a user