"""보유종목 인텔리전스 — 순수연산 중심 (advisory). KIS 실주문 미사용.""" from __future__ import annotations import datetime as dt from typing import Any, Optional import pandas as pd from . import db from . import price_fetcher from .screener.engine import combine def _krx_tickers() -> set: """krx_master ticker 집합 (KRX 판별용).""" return db.get_krx_tickers() def get_holdings() -> list[dict]: """portfolio + 현재가 + pnl_rate + is_krx.""" items = db.get_all_portfolio() tickers = [it["ticker"] for it in items] prices = price_fetcher.get_current_prices(tickers) if tickers else {} krx = _krx_tickers() out = [] for it in items: cur = prices.get(it["ticker"]) avg = it["avg_price"] pnl = ((cur - avg) / avg * 100.0) if (cur and avg) else None out.append({ **it, "current_price": cur, "pnl_rate": pnl, "is_krx": it["ticker"] in krx, }) return out # ---- Task 2.1: technical_posture ---- def _score_nodes_and_weights(): """NODE_REGISTRY에서 보유종목 매수강도 계산용 노드 인스턴스화.""" from .screener.registry import NODE_REGISTRY weights = {"ma_alignment": 0.4, "momentum": 0.3, "rs_rating": 0.3} nodes = [NODE_REGISTRY[k]() for k in weights] return nodes, weights def technical_posture(ctx, tickers: list[str]) -> dict[str, float]: """보유종목 restrict 후 score 노드 → 매수강도(0~100).""" scoped = ctx.restrict(tickers) if scoped.prices.empty: return {} nodes, weights = _score_nodes_and_weights() scores = {} for n in nodes: try: scores[n.name] = n.compute(scoped, {}) except Exception: scores[n.name] = pd.Series(0.0, index=scoped.master.index) scores_ne = {k: s for k, s in scores.items() if not s.empty} weights_ne = {k: w for k, w in weights.items() if k in scores_ne} if not weights_ne: return {} total = combine(scores_ne, weights_ne) return {t: float(total.get(t, 0.0)) for t in tickers if t in total.index} # ---- Task 2.2: exit_rules ---- _DEFAULT_EXIT_PARAMS = {"stop_pct": 0.08, "take_pct": 0.25, "climax_vol_x": 3.0} def _ma(closes: "pd.Series", window: int) -> Optional[float]: if len(closes) < window: return None val = closes.rolling(window).mean().iloc[-1] return float(val) if pd.notna(val) else None def exit_rules(holding: dict, ticker_prices: "pd.DataFrame", params: dict) -> dict: """가격 기반 청산/리스크 flag (stop_loss/ma50_break/ma200_break/take_profit/climax). Note: momentum_loss는 compute_and_store 단계에서 집계하므로 여기서 설정하지 않는다. """ p = {**_DEFAULT_EXIT_PARAMS, **(params or {})} flags = {"stop_loss": False, "ma50_break": False, "ma200_break": False, "take_profit": False, "climax": False} avg = holding.get("avg_price") cur = holding.get("current_price") if ticker_prices is None or ticker_prices.empty: closes = pd.Series(dtype=float) else: closes = ticker_prices.sort_values("date")["close"].astype(float).reset_index(drop=True) last_close = float(closes.iloc[-1]) if len(closes) else cur if cur is None: cur = last_close if cur is not None and avg: if cur < avg * (1 - p["stop_pct"]): flags["stop_loss"] = True if avg > 0 and (cur - avg) / avg >= p["take_pct"]: flags["take_profit"] = True ma50 = _ma(closes, 50) ma200 = _ma(closes, 200) if ma50 is not None and last_close is not None and last_close < ma50: flags["ma50_break"] = True if ma200 is not None and last_close is not None and last_close < ma200: flags["ma200_break"] = True # climax: 최근 거래량이 20일 평균의 climax_vol_x배 이상 + 종가가 당일 고점 대비 하단(상단꼬리) if ticker_prices is not None and not ticker_prices.empty and len(ticker_prices) >= 21: tp = ticker_prices.sort_values("date") vol = tp["volume"].astype(float).reset_index(drop=True) avg_vol = vol.iloc[-21:-1].mean() last_vol = vol.iloc[-1] hi_ = float(tp["high"].astype(float).iloc[-1]) cl_ = float(tp["close"].astype(float).iloc[-1]) if avg_vol and last_vol >= avg_vol * p["climax_vol_x"] and hi_ > 0 and cl_ < hi_ * 0.97: flags["climax"] = True return flags # ---- Task 2.3: decide_action ---- ADD_SCORE = 70.0 # 이 이상이면 추가매수 후보 # ---- Task 3.1: market_events ---- _DEFAULT_EVENT_PARAMS = {"move_pct": 7.0, "vol_z": 2.5} def market_events(ticker: str, ticker_prices: "pd.DataFrame", ticker_flow: "pd.DataFrame | None", params: dict) -> list[dict]: """일봉/flow 기반 시장 이벤트 (급변·거래량 Z·외인 순매도).""" p = {**_DEFAULT_EVENT_PARAMS, **(params or {})} events = [] if ticker_prices is None or ticker_prices.empty or len(ticker_prices) < 2: return events tp = ticker_prices.sort_values("date").reset_index(drop=True) close = tp["close"].astype(float) pct = (close.iloc[-1] - close.iloc[-2]) / close.iloc[-2] * 100.0 if close.iloc[-2] else 0.0 if abs(pct) >= p["move_pct"]: events.append({ "type": "price_move", "severity": "high" if abs(pct) >= p["move_pct"] * 1.5 else "med", "summary": f"전일 대비 {pct:+.1f}%", }) vol = tp["volume"].astype(float) if len(vol) >= 21: base = vol.iloc[-21:-1] mu, sd = base.mean(), base.std(ddof=0) last_vol = vol.iloc[-1] if mu > 0 and ( (sd and (last_vol - mu) / sd >= p["vol_z"]) or (not sd and last_vol >= mu * p["vol_z"]) # sd=0 fallback: plain ratio ): z_txt = f"{(last_vol - mu) / sd:.1f}" if sd else f"ratio={last_vol / mu:.1f}x" events.append({ "type": "volume_surge", "severity": "med", "summary": f"거래량 평소 대비 급증(Z={z_txt})", }) if ticker_flow is not None and not ticker_flow.empty: tf = ticker_flow.sort_values("date") recent = tf["foreign_net"].astype(float).iloc[-3:] if len(recent) >= 3 and (recent < 0).all(): events.append({ "type": "foreign_selling", "severity": "med", "summary": "외국인 3일 연속 순매도", }) return events # ---- Task 3.2: news_issues ---- NEG_SENTIMENT = -0.3 # 이하면 악재 후보 def _news_sentiment_map(date: str) -> dict: """date 기준 news_sentiment 테이블에서 ticker → {score_raw, news_count} 맵 반환.""" with db._conn() as conn: try: rows = conn.execute( "SELECT ticker, score_raw, news_count FROM news_sentiment WHERE date=?", (date,), ).fetchall() except Exception: return {} return {r["ticker"]: {"score_raw": r["score_raw"], "news_count": r["news_count"]} for r in rows} def news_issues(tickers: list[str], date: str, use_llm: bool = True) -> dict[str, list]: """news_sentiment 음수 → 악재 flag. (LLM 요약은 best-effort; 단위 테스트는 use_llm=False로.)""" senti = _news_sentiment_map(date) out: dict[str, list] = {} for t in tickers: s = senti.get(t) if not s or s["score_raw"] is None: continue if s["score_raw"] <= NEG_SENTIMENT: sev = "high" if s["score_raw"] <= NEG_SENTIMENT * 2 else "med" out.setdefault(t, []).append({ "type": "news", "severity": sev, "summary": f"부정 뉴스 감성({s['score_raw']:+.2f}, {s.get('news_count', 0)}건)", }) return out def decide_action(tech_score: float, exit_flags: dict, pnl: float | None, add_score: float = ADD_SCORE) -> tuple[str, str]: """액션 결정 매트릭스: sell > trim > add > hold (우선순위 순). Returns: (action, reasons_text) action ∈ {"sell","trim","add","hold"} """ reasons = [] # 청산 (최우선) if exit_flags.get("stop_loss"): reasons.append("손절선 이탈") if exit_flags.get("ma200_break"): reasons.append("MA200 이탈") if reasons: return "sell", " · ".join(reasons) # 축소 if exit_flags.get("ma50_break"): reasons.append("MA50 이탈") if exit_flags.get("momentum_loss"): reasons.append("모멘텀 소멸") if exit_flags.get("take_profit"): reasons.append(f"목표 수익 도달(+{pnl:.0f}%)" if pnl is not None else "목표 수익 도달") if exit_flags.get("climax"): reasons.append("거래량 급증 분산 의심") if reasons: return "trim", " · ".join(reasons) # 추가매수 if tech_score is not None and tech_score >= add_score: return "add", f"기술적 강도 양호({tech_score:.0f})" return "hold", "특이 신호 없음"