"""KRX daily snapshot loader (FDR + naver finance scraping).""" from __future__ import annotations import datetime as dt import logging import re import sqlite3 import time from dataclasses import dataclass import FinanceDataReader as fdr import httpx import pandas as pd from bs4 import BeautifulSoup log = logging.getLogger(__name__) NAVER_FRGN_URL = "https://finance.naver.com/item/frgn.naver" NAVER_HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Referer": "https://finance.naver.com/", } DEFAULT_FLOW_TOP_N = 100 DEFAULT_RATE_LIMIT_SEC = 0.2 # 시총 상위 100종목 × 0.2초 = ~20초 — agent-office httpx timeout(180s) 안에 여유롭게 완료 # 외국인 매수 시그널은 대형주에서 의미가 크므로 상위 100종목으로 충분. # 더 많은 종목이 필요하면 별도 cron으로 분리 권장. @dataclass class RefreshSummary: asof: dt.date master_count: int prices_count: int flow_count: int failures: list[str] def asdict(self) -> dict: return { "asof": self.asof.isoformat(), "master_count": self.master_count, "prices_count": self.prices_count, "flow_count": self.flow_count, "failures": self.failures, } def _iso(d: dt.date) -> str: return d.isoformat() def _is_preferred(name: str) -> int: """우선주 휴리스틱: 종목명이 '우'로 끝나거나 '우[A-Z]?'/'우\\d?' 패턴.""" n = name or "" return 1 if re.search(r"우[A-Z]?$|우\d?$", n) else 0 def _is_spac(name: str) -> int: return 1 if "스팩" in (name or "") else 0 def fetch_master_listing() -> pd.DataFrame: """fdr.StockListing('KRX'). Wrapped for stub-ability in tests.""" return fdr.StockListing("KRX") def fetch_ohlcv_for_ticker(ticker: str, start: str, end: str) -> pd.DataFrame: """fdr.DataReader for backfill.""" return fdr.DataReader(ticker, start, end) def fetch_flow_naver(ticker: str, *, client) -> dict | None: """Scrape naver frgn page; return latest-day flow dict, or None.""" r = client.get(NAVER_FRGN_URL, params={"code": ticker, "page": 1}) if r.status_code != 200: return None soup = BeautifulSoup(r.text, "lxml") for row in soup.select("table.type2 tr"): cells = [c.get_text(strip=True).replace(",", "") for c in row.select("td")] if not cells or not cells[0]: continue if not re.match(r"\d{4}\.\d{2}\.\d{2}", cells[0]): continue try: inst = int(cells[5]) if cells[5] not in ("", "-") else 0 foreign = int(cells[6]) if cells[6] not in ("", "-") else 0 return { "date": cells[0].replace(".", "-"), "foreign_net": foreign, "institution_net": inst, } except (IndexError, ValueError): return None return None def _master_and_prices_rows(asof: dt.date, df: pd.DataFrame) -> tuple[list[tuple], list[tuple]]: iso = _iso(asof) now_iso = dt.datetime.utcnow().isoformat() master_rows: list[tuple] = [] price_rows: list[tuple] = [] for _, row in df.iterrows(): ticker = str(row.get("Code") or "").strip() name = str(row.get("Name") or "").strip() if not ticker or not name: continue market_raw = str(row.get("Market") or "").upper() market = "KOSDAQ" if "KOSDAQ" in market_raw else "KOSPI" try: market_cap = int(row["Marcap"]) if pd.notna(row.get("Marcap")) else None except (TypeError, ValueError): market_cap = None master_rows.append(( ticker, name, market, market_cap, 0, _is_preferred(name), _is_spac(name), None, now_iso, )) try: o = int(row["Open"]) if pd.notna(row.get("Open")) else None h = int(row["High"]) if pd.notna(row.get("High")) else None l = int(row["Low"]) if pd.notna(row.get("Low")) else None c = int(row["Close"]) if pd.notna(row.get("Close")) else None v = int(row["Volume"]) if pd.notna(row.get("Volume")) else None amt = row.get("Amount") a = int(amt) if pd.notna(amt) else None if c is not None and v is not None: price_rows.append((ticker, iso, o, h, l, c, v, a)) except (TypeError, KeyError): pass return master_rows, price_rows def _gather_flow_naver(asof: dt.date, tickers: list[str], *, rate_limit_sec: float = DEFAULT_RATE_LIMIT_SEC) -> list[tuple]: iso = _iso(asof) rows: list[tuple] = [] if not tickers: return rows with httpx.Client(timeout=10, headers=NAVER_HEADERS) as client: for t in tickers: try: data = fetch_flow_naver(t, client=client) if data and data["date"] == iso: rows.append((t, iso, data["foreign_net"], data["institution_net"])) except Exception as e: log.warning("flow scrape failed for %s: %s", t, e) if rate_limit_sec > 0: time.sleep(rate_limit_sec) return rows def refresh_daily(conn: sqlite3.Connection, asof: dt.date, flow_top_n: int = DEFAULT_FLOW_TOP_N, rate_limit_sec: float = DEFAULT_RATE_LIMIT_SEC) -> dict: """Pull master + prices (FDR) + flow (naver scraping for top N by market cap).""" df = fetch_master_listing() master_rows, price_rows = _master_and_prices_rows(asof, df) conn.executemany(""" INSERT INTO krx_master ( ticker, name, market, market_cap, is_managed, is_preferred, is_spac, listed_date, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(ticker) DO UPDATE SET name=excluded.name, market=excluded.market, market_cap=excluded.market_cap, is_managed=excluded.is_managed, is_preferred=excluded.is_preferred, is_spac=excluded.is_spac, updated_at=excluded.updated_at """, master_rows) conn.executemany(""" INSERT OR REPLACE INTO krx_daily_prices (ticker, date, open, high, low, close, volume, value) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, price_rows) # 외국인/기관: 시총 상위 N종목만 (rate limit 보호) if flow_top_n > 0: top = sorted(master_rows, key=lambda r: r[3] or 0, reverse=True)[:flow_top_n] flow_tickers = [r[0] for r in top] else: flow_tickers = [] flow_rows = _gather_flow_naver(asof, flow_tickers, rate_limit_sec=rate_limit_sec) conn.executemany(""" INSERT OR REPLACE INTO krx_flow (ticker, date, foreign_net, institution_net) VALUES (?, ?, ?, ?) """, flow_rows) conn.commit() return RefreshSummary( asof=asof, master_count=len(master_rows), prices_count=len(price_rows), flow_count=len(flow_rows), failures=[], ).asdict() def backfill(conn: sqlite3.Connection, start: dt.date, end: dt.date) -> list[dict]: """5년치 일봉 백필 — 종목별 fdr.DataReader 호출. Master는 end 기준 (FDR은 historical master 미지원).""" df = fetch_master_listing() master_rows, _ = _master_and_prices_rows(end, df) conn.executemany(""" INSERT INTO krx_master ( ticker, name, market, market_cap, is_managed, is_preferred, is_spac, listed_date, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(ticker) DO UPDATE SET name=excluded.name """, master_rows) iso_start = start.isoformat() iso_end = end.isoformat() results = [] for r in master_rows: t = r[0] try: ddf = fetch_ohlcv_for_ticker(t, iso_start, iso_end) if ddf is None or ddf.empty: continue ddf = ddf.reset_index() ddf["Date"] = pd.to_datetime(ddf["Date"]).dt.strftime("%Y-%m-%d") rows = [] for _, rr in ddf.iterrows(): if pd.isna(rr["Close"]) or pd.isna(rr["Volume"]): continue rows.append(( t, rr["Date"], int(rr["Open"]) if pd.notna(rr["Open"]) else None, int(rr["High"]) if pd.notna(rr["High"]) else None, int(rr["Low"]) if pd.notna(rr["Low"]) else None, int(rr["Close"]), int(rr["Volume"]), int(rr["Close"] * rr["Volume"]), )) conn.executemany(""" INSERT OR REPLACE INTO krx_daily_prices (ticker, date, open, high, low, close, volume, value) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, rows) results.append({"ticker": t, "count": len(rows)}) except Exception as e: log.error("backfill failed for %s: %s", t, e) results.append({"ticker": t, "error": str(e)}) conn.commit() return results