345 lines
13 KiB
Python
345 lines
13 KiB
Python
"""보유종목 인텔리전스 — 순수연산 중심 (advisory). KIS 실주문 미사용."""
|
|
from __future__ import annotations
|
|
import datetime as dt
|
|
from typing import Any, Optional
|
|
|
|
import pandas as pd
|
|
|
|
from . import db
|
|
from . import price_fetcher
|
|
from .screener.engine import combine
|
|
|
|
|
|
def _krx_tickers() -> set:
|
|
"""krx_master ticker 집합 (KRX 판별용)."""
|
|
return db.get_krx_tickers()
|
|
|
|
|
|
def get_holdings() -> list[dict]:
|
|
"""portfolio + 현재가 + pnl_rate + is_krx."""
|
|
items = db.get_all_portfolio()
|
|
tickers = [it["ticker"] for it in items]
|
|
prices = price_fetcher.get_current_prices(tickers) if tickers else {}
|
|
krx = _krx_tickers()
|
|
out = []
|
|
for it in items:
|
|
cur = prices.get(it["ticker"])
|
|
avg = it["avg_price"]
|
|
pnl = ((cur - avg) / avg * 100.0) if (cur and avg) else None
|
|
out.append({
|
|
**it,
|
|
"current_price": cur,
|
|
"pnl_rate": pnl,
|
|
"is_krx": it["ticker"] in krx,
|
|
})
|
|
return out
|
|
|
|
|
|
# ---- Task 2.1: technical_posture ----
|
|
|
|
|
|
def _score_nodes_and_weights():
|
|
"""NODE_REGISTRY에서 보유종목 매수강도 계산용 노드 인스턴스화."""
|
|
from .screener.registry import NODE_REGISTRY
|
|
weights = {"ma_alignment": 0.4, "momentum": 0.3, "rs_rating": 0.3}
|
|
nodes = [NODE_REGISTRY[k]() for k in weights]
|
|
return nodes, weights
|
|
|
|
|
|
def technical_posture(ctx, tickers: list[str]) -> dict[str, float]:
|
|
"""보유종목 restrict 후 score 노드 → 매수강도(0~100)."""
|
|
scoped = ctx.restrict(tickers)
|
|
if scoped.prices.empty:
|
|
return {}
|
|
nodes, weights = _score_nodes_and_weights()
|
|
scores = {}
|
|
for n in nodes:
|
|
try:
|
|
scores[n.name] = n.compute(scoped, {})
|
|
except Exception:
|
|
scores[n.name] = pd.Series(0.0, index=scoped.master.index)
|
|
scores_ne = {k: s for k, s in scores.items() if not s.empty}
|
|
weights_ne = {k: w for k, w in weights.items() if k in scores_ne}
|
|
if not weights_ne:
|
|
return {}
|
|
total = combine(scores_ne, weights_ne)
|
|
return {t: float(total.get(t, 0.0)) for t in tickers if t in total.index}
|
|
|
|
|
|
# ---- Task 2.2: exit_rules ----
|
|
|
|
_DEFAULT_EXIT_PARAMS = {"stop_pct": 0.08, "take_pct": 0.25, "climax_vol_x": 3.0}
|
|
|
|
|
|
def _ma(closes: "pd.Series", window: int) -> Optional[float]:
|
|
if len(closes) < window:
|
|
return None
|
|
val = closes.rolling(window).mean().iloc[-1]
|
|
return float(val) if pd.notna(val) else None
|
|
|
|
|
|
def exit_rules(holding: dict, ticker_prices: "pd.DataFrame", params: dict) -> dict:
|
|
"""가격 기반 청산/리스크 flag (stop_loss/ma50_break/ma200_break/take_profit/climax).
|
|
|
|
Note: momentum_loss는 compute_and_store 단계에서 집계하므로 여기서 설정하지 않는다.
|
|
"""
|
|
p = {**_DEFAULT_EXIT_PARAMS, **(params or {})}
|
|
flags = {"stop_loss": False, "ma50_break": False, "ma200_break": False,
|
|
"take_profit": False, "climax": False}
|
|
avg = holding.get("avg_price")
|
|
cur = holding.get("current_price")
|
|
if ticker_prices is None or ticker_prices.empty:
|
|
closes = pd.Series(dtype=float)
|
|
else:
|
|
closes = ticker_prices.sort_values("date")["close"].astype(float).reset_index(drop=True)
|
|
last_close = float(closes.iloc[-1]) if len(closes) else cur
|
|
if cur is None:
|
|
cur = last_close
|
|
if cur is not None and avg:
|
|
if cur < avg * (1 - p["stop_pct"]):
|
|
flags["stop_loss"] = True
|
|
if avg > 0 and (cur - avg) / avg >= p["take_pct"]:
|
|
flags["take_profit"] = True
|
|
ma50 = _ma(closes, 50)
|
|
ma200 = _ma(closes, 200)
|
|
if ma50 is not None and last_close is not None and last_close < ma50:
|
|
flags["ma50_break"] = True
|
|
if ma200 is not None and last_close is not None and last_close < ma200:
|
|
flags["ma200_break"] = True
|
|
# climax: 최근 거래량이 20일 평균의 climax_vol_x배 이상 + 종가가 당일 고점 대비 하단(상단꼬리)
|
|
if ticker_prices is not None and not ticker_prices.empty and len(ticker_prices) >= 21:
|
|
tp = ticker_prices.sort_values("date")
|
|
vol = tp["volume"].astype(float).reset_index(drop=True)
|
|
avg_vol = vol.iloc[-21:-1].mean()
|
|
last_vol = vol.iloc[-1]
|
|
hi_ = float(tp["high"].astype(float).iloc[-1])
|
|
cl_ = float(tp["close"].astype(float).iloc[-1])
|
|
if avg_vol and last_vol >= avg_vol * p["climax_vol_x"] and hi_ > 0 and cl_ < hi_ * 0.97:
|
|
flags["climax"] = True
|
|
return flags
|
|
|
|
|
|
# ---- Task 2.3: decide_action ----
|
|
|
|
ADD_SCORE = 70.0 # 이 이상이면 추가매수 후보
|
|
|
|
|
|
# ---- Task 3.1: market_events ----
|
|
|
|
_DEFAULT_EVENT_PARAMS = {"move_pct": 7.0, "vol_z": 2.5}
|
|
|
|
|
|
def market_events(ticker: str, ticker_prices: "pd.DataFrame",
|
|
ticker_flow: "pd.DataFrame | None", params: dict) -> list[dict]:
|
|
"""일봉/flow 기반 시장 이벤트 (급변·거래량 Z·외인 순매도)."""
|
|
p = {**_DEFAULT_EVENT_PARAMS, **(params or {})}
|
|
events = []
|
|
if ticker_prices is None or ticker_prices.empty or len(ticker_prices) < 2:
|
|
return events
|
|
tp = ticker_prices.sort_values("date").reset_index(drop=True)
|
|
close = tp["close"].astype(float)
|
|
pct = (close.iloc[-1] - close.iloc[-2]) / close.iloc[-2] * 100.0 if close.iloc[-2] else 0.0
|
|
if abs(pct) >= p["move_pct"]:
|
|
events.append({
|
|
"type": "price_move",
|
|
"severity": "high" if abs(pct) >= p["move_pct"] * 1.5 else "med",
|
|
"summary": f"전일 대비 {pct:+.1f}%",
|
|
})
|
|
vol = tp["volume"].astype(float)
|
|
if len(vol) >= 21:
|
|
base = vol.iloc[-21:-1]
|
|
mu, sd = base.mean(), base.std(ddof=0)
|
|
last_vol = vol.iloc[-1]
|
|
if mu > 0 and (
|
|
(sd and (last_vol - mu) / sd >= p["vol_z"])
|
|
or (not sd and last_vol >= mu * p["vol_z"]) # sd=0 (평탄 기준선): vol_z를 Z-score가 아닌 단순 배수로 사용
|
|
):
|
|
z_txt = f"{(last_vol - mu) / sd:.1f}" if sd else f"ratio={last_vol / mu:.1f}x"
|
|
events.append({
|
|
"type": "volume_surge",
|
|
"severity": "med",
|
|
"summary": f"거래량 평소 대비 급증(Z={z_txt})",
|
|
})
|
|
if ticker_flow is not None and not ticker_flow.empty:
|
|
tf = ticker_flow.sort_values("date")
|
|
recent = tf["foreign_net"].astype(float).iloc[-3:]
|
|
if len(recent) >= 3 and (recent < 0).all():
|
|
events.append({
|
|
"type": "foreign_selling",
|
|
"severity": "med",
|
|
"summary": "외국인 3일 연속 순매도",
|
|
})
|
|
return events
|
|
|
|
|
|
# ---- Task 3.2: news_issues ----
|
|
|
|
NEG_SENTIMENT = -0.3 # 이하면 악재 후보
|
|
|
|
|
|
def _news_sentiment_map(date: str) -> dict:
|
|
"""date 기준 news_sentiment 테이블에서 ticker → {score_raw, news_count} 맵 반환."""
|
|
with db._conn() as conn:
|
|
try:
|
|
rows = conn.execute(
|
|
"SELECT ticker, score_raw, news_count FROM news_sentiment WHERE date=?",
|
|
(date,),
|
|
).fetchall()
|
|
except Exception:
|
|
return {}
|
|
return {r["ticker"]: {"score_raw": r["score_raw"], "news_count": r["news_count"]}
|
|
for r in rows}
|
|
|
|
|
|
def news_issues(tickers: list[str], date: str, use_llm: bool = True) -> dict[str, list]:
|
|
"""news_sentiment 음수 → 악재 flag. (LLM 요약은 best-effort; 단위 테스트는 use_llm=False로.)"""
|
|
senti = _news_sentiment_map(date)
|
|
out: dict[str, list] = {}
|
|
for t in tickers:
|
|
s = senti.get(t)
|
|
if not s or s["score_raw"] is None:
|
|
continue
|
|
if s["score_raw"] <= NEG_SENTIMENT:
|
|
sev = "high" if s["score_raw"] <= NEG_SENTIMENT * 2 else "med"
|
|
out.setdefault(t, []).append({
|
|
"type": "news",
|
|
"severity": sev,
|
|
"summary": f"부정 뉴스 감성({s['score_raw']:+.2f}, {s.get('news_count', 0)}건)",
|
|
})
|
|
return out
|
|
|
|
|
|
# ---- Task 3.3: portfolio_health ----
|
|
|
|
|
|
def portfolio_health(holdings: list[dict], total_cash: int = 0) -> dict:
|
|
"""비중 집중도(최대비중·HHI) + 현금비중 + 총손익 요약."""
|
|
evals, buys = [], []
|
|
for h in holdings:
|
|
cur = h.get("current_price") or h.get("avg_price") or 0
|
|
ev = cur * h.get("quantity", 0)
|
|
bu = (h.get("avg_price") or 0) * h.get("quantity", 0)
|
|
evals.append(ev)
|
|
buys.append(bu)
|
|
total_eval = sum(evals)
|
|
total_buy = sum(buys)
|
|
weights = [e / total_eval for e in evals] if total_eval else []
|
|
hhi = sum(w * w for w in weights)
|
|
total_assets = total_eval + (total_cash or 0)
|
|
return {
|
|
"positions": len(holdings),
|
|
"total_eval": total_eval,
|
|
"total_buy": total_buy,
|
|
"total_pnl": total_eval - total_buy,
|
|
"total_pnl_rate": ((total_eval - total_buy) / total_buy * 100.0) if total_buy else 0.0,
|
|
"max_weight": max(weights) if weights else 0.0,
|
|
"hhi": round(hhi, 4),
|
|
"cash_ratio": ((total_cash or 0) / total_assets) if total_assets else 0.0,
|
|
}
|
|
|
|
|
|
DEFAULT_PARAMS = {
|
|
"stop_pct": 0.08, "take_pct": 0.25, "climax_vol_x": 3.0,
|
|
"move_pct": 7.0, "vol_z": 2.5,
|
|
"momentum_drop": 15.0, "momentum_low": 35.0,
|
|
}
|
|
|
|
|
|
def _load_ctx(asof: dt.date):
|
|
"""ScreenContext.load를 감싸는 thin wrapper (테스트에서 monkeypatch 대상)."""
|
|
from .screener.engine import ScreenContext
|
|
with db._conn() as conn:
|
|
return ScreenContext.load(conn, asof)
|
|
|
|
|
|
def _today_kst() -> dt.date:
|
|
return (dt.datetime.utcnow() + dt.timedelta(hours=9)).date()
|
|
|
|
|
|
def compute_and_store(asof: Optional[dt.date] = None, use_llm: bool = True,
|
|
params: dict | None = None) -> dict:
|
|
"""보유종목 시그널 계산 → holdings_signals upsert (멱등).
|
|
|
|
Returns:
|
|
{"stored": N, "date": "YYYY-MM-DD"} or {"stored": 0, "reason": "..."}
|
|
"""
|
|
asof = asof or _today_kst()
|
|
p = {**DEFAULT_PARAMS, **(params or {})}
|
|
holdings = get_holdings()
|
|
if not holdings:
|
|
return {"stored": 0, "reason": "no_holdings"}
|
|
krx = [h for h in holdings if h.get("is_krx")]
|
|
ctx = _load_ctx(asof)
|
|
posture = technical_posture(ctx, [h["ticker"] for h in krx]) if krx else {}
|
|
date_iso = asof.isoformat()
|
|
issues_map = news_issues([h["ticker"] for h in holdings], date_iso, use_llm=use_llm)
|
|
stored = 0
|
|
for h in holdings:
|
|
t = h["ticker"]
|
|
tp = ctx.prices[ctx.prices["ticker"] == t] if h.get("is_krx") else None
|
|
tf = ctx.flow[ctx.flow["ticker"] == t] if h.get("is_krx") else None
|
|
flags = exit_rules(h, tp, p) if h.get("is_krx") else {}
|
|
tech = posture.get(t)
|
|
# momentum_loss: 직전 저장 시그널 대비 하락 or 낮은 강도
|
|
prev = db.get_holdings_signal_history(t, limit=2)
|
|
prev_score = next((r["tech_score"] for r in prev if r["date"] != date_iso), None)
|
|
if tech is not None and (
|
|
(prev_score is not None and tech < prev_score - p["momentum_drop"])
|
|
or tech < p["momentum_low"]
|
|
):
|
|
flags["momentum_loss"] = True
|
|
evts = market_events(t, tp, tf, p) if h.get("is_krx") else []
|
|
issues = list(issues_map.get(t, [])) + evts
|
|
action, reasons = decide_action(tech if tech is not None else 0.0, flags, h.get("pnl_rate"))
|
|
db.upsert_holdings_signal(
|
|
date=date_iso, ticker=t, name=h.get("name"), action=action,
|
|
tech_score=tech, exit_flags=flags, issues=issues,
|
|
close=h.get("current_price"), pnl_rate=h.get("pnl_rate"), reasons=reasons,
|
|
)
|
|
stored += 1
|
|
return {"stored": stored, "date": date_iso}
|
|
|
|
|
|
def build_holdings_brief(date: Optional[str] = None) -> dict:
|
|
"""최신 시그널 + 포트 건강 조립 (브리핑/UI payload)."""
|
|
date = date or db.get_latest_holdings_date()
|
|
if not date:
|
|
return {"date": None, "holdings": [], "portfolio_health": {}}
|
|
signals = db.get_holdings_signals(date)
|
|
holdings = get_holdings()
|
|
total_cash = sum(c.get("cash", 0) for c in db.get_all_broker_cash())
|
|
health = portfolio_health(holdings, total_cash=total_cash)
|
|
return {"date": date, "holdings": signals, "portfolio_health": health}
|
|
|
|
|
|
def decide_action(tech_score: float, exit_flags: dict, pnl: float | None,
|
|
add_score: float = ADD_SCORE) -> tuple[str, str]:
|
|
"""액션 결정 매트릭스: sell > trim > add > hold (우선순위 순).
|
|
|
|
Returns:
|
|
(action, reasons_text) action ∈ {"sell","trim","add","hold"}
|
|
"""
|
|
reasons = []
|
|
# 청산 (최우선)
|
|
if exit_flags.get("stop_loss"):
|
|
reasons.append("손절선 이탈")
|
|
if exit_flags.get("ma200_break"):
|
|
reasons.append("MA200 이탈")
|
|
if reasons:
|
|
return "sell", " · ".join(reasons)
|
|
# 축소
|
|
if exit_flags.get("ma50_break"):
|
|
reasons.append("MA50 이탈")
|
|
if exit_flags.get("momentum_loss"):
|
|
reasons.append("모멘텀 소멸")
|
|
if exit_flags.get("take_profit"):
|
|
reasons.append(f"목표 수익 도달(+{pnl:.0f}%)" if pnl is not None else "목표 수익 도달")
|
|
if exit_flags.get("climax"):
|
|
reasons.append("거래량 급증 분산 의심")
|
|
if reasons:
|
|
return "trim", " · ".join(reasons)
|
|
# 추가매수
|
|
if tech_score is not None and tech_score >= add_score:
|
|
return "add", f"기술적 강도 양호({tech_score:.0f})"
|
|
return "hold", "특이 신호 없음"
|