68 lines
2.5 KiB
Python
68 lines
2.5 KiB
Python
import sqlite3
|
|
import os
|
|
import hashlib
|
|
from typing import List, Dict, Any
|
|
|
|
DB_PATH = "/app/data/stock.db"
|
|
|
|
def _conn() -> sqlite3.Connection:
|
|
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
def init_db():
|
|
with _conn() as conn:
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS articles (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
hash TEXT UNIQUE NOT NULL,
|
|
category TEXT DEFAULT 'domestic',
|
|
title TEXT NOT NULL,
|
|
link TEXT,
|
|
summary TEXT,
|
|
press TEXT,
|
|
pub_date TEXT,
|
|
crawled_at TEXT
|
|
)
|
|
""")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_articles_crawled ON articles(crawled_at DESC)")
|
|
|
|
# 컬럼 추가 (기존 테이블 마이그레이션)
|
|
cols = {r["name"] for r in conn.execute("PRAGMA table_info(articles)").fetchall()}
|
|
if "category" not in cols:
|
|
conn.execute("ALTER TABLE articles ADD COLUMN category TEXT DEFAULT 'domestic'")
|
|
|
|
def save_articles(articles: List[Dict[str, str]]) -> int:
|
|
count = 0
|
|
with _conn() as conn:
|
|
for a in articles:
|
|
# 중복 체크용 해시 (제목+링크)
|
|
unique_str = f"{a['title']}|{a['link']}"
|
|
h = hashlib.md5(unique_str.encode()).hexdigest()
|
|
|
|
try:
|
|
cat = a.get("category", "domestic")
|
|
conn.execute("""
|
|
INSERT INTO articles (hash, category, title, link, summary, press, pub_date, crawled_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (h, cat, a['title'], a['link'], a['summary'], a['press'], a['date'], a['crawled_at']))
|
|
count += 1
|
|
except sqlite3.IntegrityError:
|
|
pass # 이미 존재함
|
|
return count
|
|
|
|
def get_latest_articles(limit: int = 20, category: str = None) -> List[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
if category:
|
|
rows = conn.execute(
|
|
"SELECT * FROM articles WHERE category = ? ORDER BY crawled_at DESC, id DESC LIMIT ?",
|
|
(category, limit)
|
|
).fetchall()
|
|
else:
|
|
rows = conn.execute(
|
|
"SELECT * FROM articles ORDER BY crawled_at DESC, id DESC LIMIT ?",
|
|
(limit,)
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|