feat(stock-lab): FDR 종목 마스터+일봉 + naver 외국인 수급 (snapshot)

This commit is contained in:
2026-05-12 07:41:40 +09:00
parent d7e235c008
commit 6c5481971b
2 changed files with 376 additions and 0 deletions

View File

@@ -0,0 +1,247 @@
"""KRX daily snapshot loader (FDR + naver finance scraping)."""
from __future__ import annotations
import datetime as dt
import logging
import re
import sqlite3
import time
from dataclasses import dataclass
import FinanceDataReader as fdr
import httpx
import pandas as pd
from bs4 import BeautifulSoup
log = logging.getLogger(__name__)
NAVER_FRGN_URL = "https://finance.naver.com/item/frgn.naver"
NAVER_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Referer": "https://finance.naver.com/",
}
DEFAULT_FLOW_TOP_N = 500
DEFAULT_RATE_LIMIT_SEC = 0.2
@dataclass
class RefreshSummary:
asof: dt.date
master_count: int
prices_count: int
flow_count: int
failures: list[str]
def asdict(self) -> dict:
return {
"asof": self.asof.isoformat(),
"master_count": self.master_count,
"prices_count": self.prices_count,
"flow_count": self.flow_count,
"failures": self.failures,
}
def _iso(d: dt.date) -> str:
return d.isoformat()
def _is_preferred(name: str) -> int:
"""우선주 휴리스틱: 종목명이 ''로 끝나거나 '우[A-Z]?'/'\\d?' 패턴."""
n = name or ""
return 1 if re.search(r"우[A-Z]?$|우\d?$", n) else 0
def _is_spac(name: str) -> int:
return 1 if "스팩" in (name or "") else 0
def fetch_master_listing() -> pd.DataFrame:
"""fdr.StockListing('KRX'). Wrapped for stub-ability in tests."""
return fdr.StockListing("KRX")
def fetch_ohlcv_for_ticker(ticker: str, start: str, end: str) -> pd.DataFrame:
"""fdr.DataReader for backfill."""
return fdr.DataReader(ticker, start, end)
def fetch_flow_naver(ticker: str, *, client) -> dict | None:
"""Scrape naver frgn page; return latest-day flow dict, or None."""
r = client.get(NAVER_FRGN_URL, params={"code": ticker, "page": 1})
if r.status_code != 200:
return None
soup = BeautifulSoup(r.text, "lxml")
for row in soup.select("table.type2 tr"):
cells = [c.get_text(strip=True).replace(",", "") for c in row.select("td")]
if not cells or not cells[0]:
continue
if not re.match(r"\d{4}\.\d{2}\.\d{2}", cells[0]):
continue
try:
inst = int(cells[5]) if cells[5] not in ("", "-") else 0
foreign = int(cells[6]) if cells[6] not in ("", "-") else 0
return {
"date": cells[0].replace(".", "-"),
"foreign_net": foreign,
"institution_net": inst,
}
except (IndexError, ValueError):
return None
return None
def _master_and_prices_rows(asof: dt.date,
df: pd.DataFrame) -> tuple[list[tuple], list[tuple]]:
iso = _iso(asof)
now_iso = dt.datetime.utcnow().isoformat()
master_rows: list[tuple] = []
price_rows: list[tuple] = []
for _, row in df.iterrows():
ticker = str(row.get("Code") or "").strip()
name = str(row.get("Name") or "").strip()
if not ticker or not name:
continue
market_raw = str(row.get("Market") or "").upper()
market = "KOSDAQ" if "KOSDAQ" in market_raw else "KOSPI"
try:
market_cap = int(row["Marcap"]) if pd.notna(row.get("Marcap")) else None
except (TypeError, ValueError):
market_cap = None
master_rows.append((
ticker, name, market, market_cap,
0, _is_preferred(name), _is_spac(name),
None, now_iso,
))
try:
o = int(row["Open"]) if pd.notna(row.get("Open")) else None
h = int(row["High"]) if pd.notna(row.get("High")) else None
l = int(row["Low"]) if pd.notna(row.get("Low")) else None
c = int(row["Close"]) if pd.notna(row.get("Close")) else None
v = int(row["Volume"]) if pd.notna(row.get("Volume")) else None
amt = row.get("Amount")
a = int(amt) if pd.notna(amt) else None
if c is not None and v is not None:
price_rows.append((ticker, iso, o, h, l, c, v, a))
except (TypeError, KeyError):
pass
return master_rows, price_rows
def _gather_flow_naver(asof: dt.date, tickers: list[str],
*, rate_limit_sec: float = DEFAULT_RATE_LIMIT_SEC) -> list[tuple]:
iso = _iso(asof)
rows: list[tuple] = []
if not tickers:
return rows
with httpx.Client(timeout=10, headers=NAVER_HEADERS) as client:
for t in tickers:
try:
data = fetch_flow_naver(t, client=client)
if data and data["date"] == iso:
rows.append((t, iso, data["foreign_net"], data["institution_net"]))
except Exception as e:
log.warning("flow scrape failed for %s: %s", t, e)
if rate_limit_sec > 0:
time.sleep(rate_limit_sec)
return rows
def refresh_daily(conn: sqlite3.Connection, asof: dt.date,
flow_top_n: int = DEFAULT_FLOW_TOP_N,
rate_limit_sec: float = DEFAULT_RATE_LIMIT_SEC) -> dict:
"""Pull master + prices (FDR) + flow (naver scraping for top N by market cap)."""
df = fetch_master_listing()
master_rows, price_rows = _master_and_prices_rows(asof, df)
conn.executemany("""
INSERT INTO krx_master (
ticker, name, market, market_cap,
is_managed, is_preferred, is_spac,
listed_date, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(ticker) DO UPDATE SET
name=excluded.name, market=excluded.market,
market_cap=excluded.market_cap,
is_managed=excluded.is_managed,
is_preferred=excluded.is_preferred,
is_spac=excluded.is_spac,
updated_at=excluded.updated_at
""", master_rows)
conn.executemany("""
INSERT OR REPLACE INTO krx_daily_prices
(ticker, date, open, high, low, close, volume, value)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", price_rows)
# 외국인/기관: 시총 상위 N종목만 (rate limit 보호)
if flow_top_n > 0:
top = sorted(master_rows, key=lambda r: r[3] or 0, reverse=True)[:flow_top_n]
flow_tickers = [r[0] for r in top]
else:
flow_tickers = []
flow_rows = _gather_flow_naver(asof, flow_tickers, rate_limit_sec=rate_limit_sec)
conn.executemany("""
INSERT OR REPLACE INTO krx_flow
(ticker, date, foreign_net, institution_net)
VALUES (?, ?, ?, ?)
""", flow_rows)
conn.commit()
return RefreshSummary(
asof=asof, master_count=len(master_rows),
prices_count=len(price_rows), flow_count=len(flow_rows),
failures=[],
).asdict()
def backfill(conn: sqlite3.Connection, start: dt.date, end: dt.date) -> list[dict]:
"""5년치 일봉 백필 — 종목별 fdr.DataReader 호출. Master는 end 기준 (FDR은 historical master 미지원)."""
df = fetch_master_listing()
master_rows, _ = _master_and_prices_rows(end, df)
conn.executemany("""
INSERT INTO krx_master (
ticker, name, market, market_cap,
is_managed, is_preferred, is_spac,
listed_date, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(ticker) DO UPDATE SET name=excluded.name
""", master_rows)
iso_start = start.isoformat()
iso_end = end.isoformat()
results = []
for r in master_rows:
t = r[0]
try:
ddf = fetch_ohlcv_for_ticker(t, iso_start, iso_end)
if ddf is None or ddf.empty:
continue
ddf = ddf.reset_index()
ddf["Date"] = pd.to_datetime(ddf["Date"]).dt.strftime("%Y-%m-%d")
rows = []
for _, rr in ddf.iterrows():
if pd.isna(rr["Close"]) or pd.isna(rr["Volume"]):
continue
rows.append((
t, rr["Date"],
int(rr["Open"]) if pd.notna(rr["Open"]) else None,
int(rr["High"]) if pd.notna(rr["High"]) else None,
int(rr["Low"]) if pd.notna(rr["Low"]) else None,
int(rr["Close"]),
int(rr["Volume"]),
int(rr["Close"] * rr["Volume"]),
))
conn.executemany("""
INSERT OR REPLACE INTO krx_daily_prices
(ticker, date, open, high, low, close, volume, value)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", rows)
results.append({"ticker": t, "count": len(rows)})
except Exception as e:
log.error("backfill failed for %s: %s", t, e)
results.append({"ticker": t, "error": str(e)})
conn.commit()
return results

View File

@@ -0,0 +1,129 @@
import datetime as dt
import sqlite3
import pandas as pd
import pytest
from app.screener import snapshot as snap
from app.screener.schema import ensure_screener_schema
@pytest.fixture
def conn(tmp_path):
db_path = tmp_path / "snap.db"
c = sqlite3.connect(db_path)
ensure_screener_schema(c)
yield c
c.close()
def _stub_listing(monkeypatch):
df = pd.DataFrame([
{"Code": "005930", "Name": "삼성전자", "Market": "KOSPI",
"Marcap": 420_000_000_000_000,
"Open": 70000, "High": 72000, "Low": 69500, "Close": 71000,
"Volume": 12_000_000, "Amount": 840_000_000_000},
{"Code": "035420", "Name": "NAVER", "Market": "KOSPI",
"Marcap": 30_000_000_000_000,
"Open": 215000, "High": 220000, "Low": 213000, "Close": 218000,
"Volume": 1_000_000, "Amount": 218_000_000_000},
{"Code": "091990", "Name": "셀트리온헬스케어우", "Market": "KOSDAQ",
"Marcap": 10_000_000_000_000,
"Open": 60000, "High": 61000, "Low": 59500, "Close": 60500,
"Volume": 500_000, "Amount": 30_250_000_000},
])
monkeypatch.setattr(snap, "fetch_master_listing", lambda: df)
def _stub_flow(monkeypatch, mapping):
def fake_flow(ticker, *, client):
if mapping is None:
return None
v = mapping.get(ticker)
if v is None:
return None
return {
"date": dt.date(2026, 5, 12).isoformat(),
"foreign_net": v["foreign_net"],
"institution_net": v["institution_net"],
}
monkeypatch.setattr(snap, "fetch_flow_naver", fake_flow)
def test_refresh_daily_writes_master_and_prices(conn, monkeypatch):
_stub_listing(monkeypatch)
_stub_flow(monkeypatch, None)
summary = snap.refresh_daily(conn, dt.date(2026, 5, 12),
flow_top_n=10, rate_limit_sec=0)
assert summary["master_count"] == 3
assert summary["prices_count"] == 3
assert summary["flow_count"] == 0
row = conn.execute(
"SELECT close FROM krx_daily_prices WHERE ticker='005930' AND date='2026-05-12'"
).fetchone()
assert row[0] == 71000
def test_refresh_daily_writes_flow_for_top_n(conn, monkeypatch):
_stub_listing(monkeypatch)
_stub_flow(monkeypatch, {
"005930": {"foreign_net": 12_000_000_000, "institution_net": 4_000_000_000},
"035420": {"foreign_net": -3_000_000_000, "institution_net": 8_000_000_000},
})
summary = snap.refresh_daily(conn, dt.date(2026, 5, 12),
flow_top_n=2, rate_limit_sec=0)
assert summary["flow_count"] == 2
row = conn.execute(
"SELECT foreign_net FROM krx_flow WHERE ticker='005930'"
).fetchone()
assert row[0] == 12_000_000_000
def test_master_flags_preferred(conn, monkeypatch):
_stub_listing(monkeypatch)
_stub_flow(monkeypatch, None)
snap.refresh_daily(conn, dt.date(2026, 5, 12), flow_top_n=0, rate_limit_sec=0)
pref = conn.execute(
"SELECT is_preferred FROM krx_master WHERE ticker='091990'"
).fetchone()
assert pref[0] == 1
def test_refresh_daily_is_idempotent(conn, monkeypatch):
_stub_listing(monkeypatch)
_stub_flow(monkeypatch, None)
snap.refresh_daily(conn, dt.date(2026, 5, 12), flow_top_n=0, rate_limit_sec=0)
snap.refresh_daily(conn, dt.date(2026, 5, 12), flow_top_n=0, rate_limit_sec=0)
cnt = conn.execute(
"SELECT count(*) FROM krx_daily_prices WHERE date='2026-05-12'"
).fetchone()[0]
assert cnt == 3
def test_fetch_flow_naver_parses_html():
"""Real HTML structure parse with synthetic naver-like markup."""
html = """
<html><body>
<table class="type2">
<tr><th>날짜</th></tr>
<tr><td>2026.05.12</td><td>71,000</td><td>500</td><td>0.71%</td>
<td>12,000,000</td><td>4,000,000,000</td><td>12,000,000,000</td>
<td>1</td><td>53.0</td></tr>
<tr><td>2026.05.09</td><td>70,500</td><td>-200</td><td>-0.28%</td>
<td>10,000,000</td><td>2,000,000,000</td><td>5,000,000,000</td>
<td>1</td><td>52.8</td></tr>
</table>
</body></html>
"""
class FakeResp:
status_code = 200
text = html
class FakeClient:
def get(self, url, params): return FakeResp()
out = snap.fetch_flow_naver("005930", client=FakeClient())
assert out == {
"date": "2026-05-12",
"foreign_net": 12_000_000_000,
"institution_net": 4_000_000_000,
}