"""보유종목 인텔리전스 — 순수연산 중심 (advisory). KIS 실주문 미사용.""" from __future__ import annotations import datetime as dt from typing import Any, Optional import pandas as pd from . import db from . import price_fetcher from .screener.engine import combine def _krx_tickers() -> set: """krx_master ticker 집합 (KRX 판별용).""" return db.get_krx_tickers() def get_holdings() -> list[dict]: """portfolio + 현재가 + pnl_rate + is_krx.""" items = db.get_all_portfolio() tickers = [it["ticker"] for it in items] prices = price_fetcher.get_current_prices(tickers) if tickers else {} krx = _krx_tickers() out = [] for it in items: cur = prices.get(it["ticker"]) avg = it["avg_price"] pnl = ((cur - avg) / avg * 100.0) if (cur and avg) else None out.append({ **it, "current_price": cur, "pnl_rate": pnl, "is_krx": it["ticker"] in krx, }) return out # ---- Task 2.1: technical_posture ---- def _score_nodes_and_weights(): """NODE_REGISTRY에서 보유종목 매수강도 계산용 노드 인스턴스화.""" from .screener.registry import NODE_REGISTRY weights = {"ma_alignment": 0.4, "momentum": 0.3, "rs_rating": 0.3} nodes = [NODE_REGISTRY[k]() for k in weights] return nodes, weights def technical_posture(ctx, tickers: list[str]) -> dict[str, float]: """보유종목 restrict 후 score 노드 → 매수강도(0~100).""" scoped = ctx.restrict(tickers) if scoped.prices.empty: return {} nodes, weights = _score_nodes_and_weights() scores = {} for n in nodes: try: scores[n.name] = n.compute(scoped, {}) except Exception: scores[n.name] = pd.Series(0.0, index=scoped.master.index) scores_ne = {k: s for k, s in scores.items() if not s.empty} weights_ne = {k: w for k, w in weights.items() if k in scores_ne} if not weights_ne: return {} total = combine(scores_ne, weights_ne) return {t: float(total.get(t, 0.0)) for t in tickers if t in total.index} # ---- Task 2.2: exit_rules ---- _DEFAULT_EXIT_PARAMS = {"stop_pct": 0.08, "take_pct": 0.25, "climax_vol_x": 3.0} def _ma(closes: "pd.Series", window: int) -> Optional[float]: if len(closes) < window: return None val = closes.rolling(window).mean().iloc[-1] return float(val) if pd.notna(val) else None def exit_rules(holding: dict, ticker_prices: "pd.DataFrame", params: dict) -> dict: """가격 기반 청산/리스크 flag (stop_loss/ma50_break/ma200_break/take_profit/climax). Note: momentum_loss는 compute_and_store 단계에서 집계하므로 여기서 설정하지 않는다. """ p = {**_DEFAULT_EXIT_PARAMS, **(params or {})} flags = {"stop_loss": False, "ma50_break": False, "ma200_break": False, "take_profit": False, "climax": False} avg = holding.get("avg_price") cur = holding.get("current_price") if ticker_prices is None or ticker_prices.empty: closes = pd.Series(dtype=float) else: closes = ticker_prices.sort_values("date")["close"].astype(float).reset_index(drop=True) last_close = float(closes.iloc[-1]) if len(closes) else cur if cur is None: cur = last_close if cur is not None and avg: if cur < avg * (1 - p["stop_pct"]): flags["stop_loss"] = True if avg > 0 and (cur - avg) / avg >= p["take_pct"]: flags["take_profit"] = True ma50 = _ma(closes, 50) ma200 = _ma(closes, 200) if ma50 is not None and last_close is not None and last_close < ma50: flags["ma50_break"] = True if ma200 is not None and last_close is not None and last_close < ma200: flags["ma200_break"] = True # climax: 최근 거래량이 20일 평균의 climax_vol_x배 이상 + 종가가 당일 고점 대비 하단(상단꼬리) if ticker_prices is not None and not ticker_prices.empty and len(ticker_prices) >= 21: tp = ticker_prices.sort_values("date") vol = tp["volume"].astype(float).reset_index(drop=True) avg_vol = vol.iloc[-21:-1].mean() last_vol = vol.iloc[-1] hi_ = float(tp["high"].astype(float).iloc[-1]) cl_ = float(tp["close"].astype(float).iloc[-1]) if avg_vol and last_vol >= avg_vol * p["climax_vol_x"] and hi_ > 0 and cl_ < hi_ * 0.97: flags["climax"] = True return flags # ---- Task 2.3: decide_action ---- ADD_SCORE = 70.0 # 이 이상이면 추가매수 후보 # ---- Task 3.1: market_events ---- _DEFAULT_EVENT_PARAMS = {"move_pct": 7.0, "vol_z": 2.5} def market_events(ticker: str, ticker_prices: "pd.DataFrame", ticker_flow: "pd.DataFrame | None", params: dict) -> list[dict]: """일봉/flow 기반 시장 이벤트 (급변·거래량 Z·외인 순매도).""" p = {**_DEFAULT_EVENT_PARAMS, **(params or {})} events = [] if ticker_prices is None or ticker_prices.empty or len(ticker_prices) < 2: return events tp = ticker_prices.sort_values("date").reset_index(drop=True) close = tp["close"].astype(float) pct = (close.iloc[-1] - close.iloc[-2]) / close.iloc[-2] * 100.0 if close.iloc[-2] else 0.0 if abs(pct) >= p["move_pct"]: events.append({ "type": "price_move", "severity": "high" if abs(pct) >= p["move_pct"] * 1.5 else "med", "summary": f"전일 대비 {pct:+.1f}%", }) vol = tp["volume"].astype(float) if len(vol) >= 21: base = vol.iloc[-21:-1] mu, sd = base.mean(), base.std(ddof=0) last_vol = vol.iloc[-1] if mu > 0 and ( (sd and (last_vol - mu) / sd >= p["vol_z"]) or (not sd and last_vol >= mu * p["vol_z"]) # sd=0 (평탄 기준선): vol_z를 Z-score가 아닌 단순 배수로 사용 ): z_txt = f"{(last_vol - mu) / sd:.1f}" if sd else f"ratio={last_vol / mu:.1f}x" events.append({ "type": "volume_surge", "severity": "med", "summary": f"거래량 평소 대비 급증(Z={z_txt})", }) if ticker_flow is not None and not ticker_flow.empty: tf = ticker_flow.sort_values("date") recent = tf["foreign_net"].astype(float).iloc[-3:] if len(recent) >= 3 and (recent < 0).all(): events.append({ "type": "foreign_selling", "severity": "med", "summary": "외국인 3일 연속 순매도", }) return events # ---- Task 3.2: news_issues ---- NEG_SENTIMENT = -0.3 # 이하면 악재 후보 def _news_sentiment_map(date: str) -> dict: """date 기준 news_sentiment 테이블에서 ticker → {score_raw, news_count} 맵 반환.""" with db._conn() as conn: try: rows = conn.execute( "SELECT ticker, score_raw, news_count FROM news_sentiment WHERE date=?", (date,), ).fetchall() except Exception: return {} return {r["ticker"]: {"score_raw": r["score_raw"], "news_count": r["news_count"]} for r in rows} def news_issues(tickers: list[str], date: str, use_llm: bool = True) -> dict[str, list]: """news_sentiment 음수 → 악재 flag. (LLM 요약은 best-effort; 단위 테스트는 use_llm=False로.)""" senti = _news_sentiment_map(date) out: dict[str, list] = {} for t in tickers: s = senti.get(t) if not s or s["score_raw"] is None: continue if s["score_raw"] <= NEG_SENTIMENT: sev = "high" if s["score_raw"] <= NEG_SENTIMENT * 2 else "med" out.setdefault(t, []).append({ "type": "news", "severity": sev, "summary": f"부정 뉴스 감성({s['score_raw']:+.2f}, {s.get('news_count', 0)}건)", }) return out # ---- Task 3.3: portfolio_health ---- def portfolio_health(holdings: list[dict], total_cash: int = 0) -> dict: """비중 집중도(최대비중·HHI) + 현금비중 + 총손익 요약.""" evals, buys = [], [] for h in holdings: cur = h.get("current_price") or h.get("avg_price") or 0 ev = cur * h.get("quantity", 0) bu = (h.get("avg_price") or 0) * h.get("quantity", 0) evals.append(ev) buys.append(bu) total_eval = sum(evals) total_buy = sum(buys) weights = [e / total_eval for e in evals] if total_eval else [] hhi = sum(w * w for w in weights) total_assets = total_eval + (total_cash or 0) return { "positions": len(holdings), "total_eval": total_eval, "total_buy": total_buy, "total_pnl": total_eval - total_buy, "total_pnl_rate": ((total_eval - total_buy) / total_buy * 100.0) if total_buy else 0.0, "max_weight": max(weights) if weights else 0.0, "hhi": round(hhi, 4), "cash_ratio": ((total_cash or 0) / total_assets) if total_assets else 0.0, } DEFAULT_PARAMS = { "stop_pct": 0.08, "take_pct": 0.25, "climax_vol_x": 3.0, "move_pct": 7.0, "vol_z": 2.5, "momentum_drop": 15.0, "momentum_low": 35.0, } def _load_ctx(asof: dt.date): """ScreenContext.load를 감싸는 thin wrapper (테스트에서 monkeypatch 대상).""" from .screener.engine import ScreenContext with db._conn() as conn: return ScreenContext.load(conn, asof) def _today_kst() -> dt.date: return (dt.datetime.utcnow() + dt.timedelta(hours=9)).date() def compute_and_store(asof: Optional[dt.date] = None, use_llm: bool = True, params: dict | None = None) -> dict: """보유종목 시그널 계산 → holdings_signals upsert (멱등). Returns: {"stored": N, "date": "YYYY-MM-DD"} or {"stored": 0, "reason": "..."} """ asof = asof or _today_kst() p = {**DEFAULT_PARAMS, **(params or {})} holdings = get_holdings() if not holdings: return {"stored": 0, "reason": "no_holdings"} krx = [h for h in holdings if h.get("is_krx")] ctx = _load_ctx(asof) posture = technical_posture(ctx, [h["ticker"] for h in krx]) if krx else {} date_iso = asof.isoformat() issues_map = news_issues([h["ticker"] for h in holdings], date_iso, use_llm=use_llm) stored = 0 for h in holdings: t = h["ticker"] tp = ctx.prices[ctx.prices["ticker"] == t] if h.get("is_krx") else None tf = ctx.flow[ctx.flow["ticker"] == t] if h.get("is_krx") else None flags = exit_rules(h, tp, p) if h.get("is_krx") else {} tech = posture.get(t) # momentum_loss: 직전 저장 시그널 대비 하락 or 낮은 강도 prev = db.get_holdings_signal_history(t, limit=2) prev_score = next((r["tech_score"] for r in prev if r["date"] != date_iso), None) if tech is not None and ( (prev_score is not None and tech < prev_score - p["momentum_drop"]) or tech < p["momentum_low"] ): flags["momentum_loss"] = True evts = market_events(t, tp, tf, p) if h.get("is_krx") else [] issues = list(issues_map.get(t, [])) + evts action, reasons = decide_action(tech if tech is not None else 0.0, flags, h.get("pnl_rate")) db.upsert_holdings_signal( date=date_iso, ticker=t, name=h.get("name"), action=action, tech_score=tech, exit_flags=flags, issues=issues, close=h.get("current_price"), pnl_rate=h.get("pnl_rate"), reasons=reasons, ) stored += 1 return {"stored": stored, "date": date_iso} def build_holdings_brief(date: Optional[str] = None) -> dict: """최신 시그널 + 포트 건강 조립 (브리핑/UI payload).""" date = date or db.get_latest_holdings_date() if not date: return {"date": None, "holdings": [], "portfolio_health": {}} signals = db.get_holdings_signals(date) holdings = get_holdings() total_cash = sum(c.get("cash", 0) for c in db.get_all_broker_cash()) health = portfolio_health(holdings, total_cash=total_cash) return {"date": date, "holdings": signals, "portfolio_health": health} def decide_action(tech_score: float, exit_flags: dict, pnl: float | None, add_score: float = ADD_SCORE) -> tuple[str, str]: """액션 결정 매트릭스: sell > trim > add > hold (우선순위 순). Returns: (action, reasons_text) action ∈ {"sell","trim","add","hold"} """ reasons = [] # 청산 (최우선) if exit_flags.get("stop_loss"): reasons.append("손절선 이탈") if exit_flags.get("ma200_break"): reasons.append("MA200 이탈") if reasons: return "sell", " · ".join(reasons) # 축소 if exit_flags.get("ma50_break"): reasons.append("MA50 이탈") if exit_flags.get("momentum_loss"): reasons.append("모멘텀 소멸") if exit_flags.get("take_profit"): reasons.append(f"목표 수익 도달(+{pnl:.0f}%)" if pnl is not None else "목표 수익 도달") if exit_flags.get("climax"): reasons.append("거래량 급증 분산 의심") if reasons: return "trim", " · ".join(reasons) # 추가매수 if tech_score is not None and tech_score >= add_score: return "add", f"기술적 강도 양호({tech_score:.0f})" return "hold", "특이 신호 없음"