"""Phase 4 — 매수/매도 신호 생성. 순수 함수 generate_signals(state, dedup, settings). state 를 mutate. """ from __future__ import annotations import logging from datetime import datetime from zoneinfo import ZoneInfo logger = logging.getLogger(__name__) KST = ZoneInfo("Asia/Seoul") MOMENTUM_SCORES = { "strong_up": 1.0, "weak_up": 0.7, "neutral": 0.5, "weak_down": 0.3, "strong_down": 0.0, } def generate_signals(state, dedup, settings) -> None: """Phase 4 entry — state-mutating. Evaluation order: sell first (priority), then buy. A ticker receiving a sell signal in this cycle is excluded from buy evaluation to avoid silent overwrite.""" _evaluate_sell_signals(state, dedup, settings) _evaluate_buy_signals(state, dedup, settings) # ----- 매수 ----- def _evaluate_buy_signals(state, dedup, settings) -> None: candidates = _buy_candidates(state) for ticker, name, rank in candidates: existing = state.signals.get(ticker) if existing is not None and existing.get("action") == "sell": logger.debug("buy %s skipped: same-cycle sell precedence", ticker) continue if not _check_buy_hard_gate(state, ticker, settings): logger.debug("buy %s skipped: hard gate failed", ticker) continue confidence = _compute_buy_confidence(state, ticker, rank) if confidence <= settings.confidence_threshold: logger.debug("buy %s skipped: confidence %.3f <= %.3f", ticker, confidence, settings.confidence_threshold) continue if dedup.is_recent(ticker, "buy", within_hours=24): logger.debug("buy %s skipped: dedup 24h", ticker) continue state.signals[ticker] = _build_buy_signal(state, ticker, name, rank, confidence) dedup.record(ticker, "buy", confidence=confidence) logger.info("signal emit %s buy conf=%.3f rank=%s", ticker, confidence, rank) def _buy_candidates(state) -> list[tuple[str, str, int | None]]: """screener Top-N (rank 1..N) + portfolio (rank=None).""" candidates: list[tuple[str, str, int | None]] = [] seen: set[str] = set() if state.screener_preview is not None: for i, item in enumerate(state.screener_preview.get("items", [])): ticker = item.get("ticker") if not ticker or ticker in seen: continue seen.add(ticker) name = item.get("name", ticker) candidates.append((ticker, name, i + 1)) if state.portfolio is not None: for h in state.portfolio.get("holdings", []): ticker = h.get("ticker") if not ticker or ticker in seen: continue seen.add(ticker) candidates.append((ticker, h.get("name", ticker), None)) return candidates def _check_buy_hard_gate(state, ticker: str, settings) -> bool: pred = state.chronos_predictions.get(ticker) if pred is None or pred.get("median", 0) <= 0: return False spread = pred.get("q90", 0) - pred.get("q10", 0) if spread >= settings.chronos_spread_threshold: return False momentum = state.minute_momentum.get(ticker) if momentum != settings.min_momentum_for_buy: return False ap = state.asking_price.get(ticker) if ap is None or ap.get("bid_ratio", 0) < settings.asking_bid_ratio_threshold: return False return True def _compute_buy_confidence(state, ticker: str, rank: int | None) -> float: pred = state.chronos_predictions[ticker] chronos_conf = pred["conf"] minute_score = MOMENTUM_SCORES.get(state.minute_momentum.get(ticker, "neutral"), 0.5) screener_norm = max(0.0, 1 - (rank - 1) / 20) if rank is not None else 0.0 return chronos_conf * 0.5 + minute_score * 0.3 + screener_norm * 0.2 def _build_buy_signal(state, ticker: str, name: str, rank: int | None, confidence: float) -> dict: ap = state.asking_price[ticker] return { "ticker": ticker, "name": name, "action": "buy", "confidence_webai": confidence, "current_price": ap["current_price"], "avg_price": None, "pnl_pct": None, "context": _build_context(state, ticker, rank), "as_of": datetime.now(KST).isoformat(), } # ----- 매도 ----- def _evaluate_sell_signals(state, dedup, settings) -> None: if state.portfolio is None: return for holding in state.portfolio.get("holdings", []): ticker = holding.get("ticker") if not ticker: continue sell = _try_stop_loss(state, holding, settings) if sell is None: sell = _try_anomaly(state, holding, settings) if sell is None: sell = _try_take_profit(state, holding, settings) if sell is None: continue if dedup.is_recent(ticker, "sell", within_hours=24): logger.debug("sell %s skipped: dedup 24h", ticker) continue state.signals[ticker] = sell dedup.record(ticker, "sell", confidence=sell["confidence_webai"]) logger.info("signal emit %s sell conf=%.3f reason=%s", ticker, sell["confidence_webai"], sell.get("context", {}).get("sell_reason")) def _try_stop_loss(state, holding: dict, settings) -> dict | None: pnl = holding.get("pnl_pct") if pnl is None or pnl >= settings.stop_loss_pct: return None return _build_sell_signal(state, holding, confidence=1.0, reason="stop_loss") def _try_take_profit(state, holding: dict, settings) -> dict | None: pnl = holding.get("pnl_pct") if pnl is None or pnl <= settings.take_profit_pct: return None return _build_sell_signal(state, holding, confidence=0.6, reason="take_profit") def _try_anomaly(state, holding: dict, settings) -> dict | None: ticker = holding["ticker"] pred = state.chronos_predictions.get(ticker) if pred is None or pred["median"] >= -0.01: return None momentum = state.minute_momentum.get(ticker) if momentum != "strong_down": return None ap = state.asking_price.get(ticker) if ap is None: return None if ap["bid_ratio"] > (1 - settings.asking_bid_ratio_threshold): return None minute_score = 1.0 - MOMENTUM_SCORES.get(momentum, 0.5) confidence = pred["conf"] * 0.5 + minute_score * 0.3 + 1.0 * 0.2 if confidence <= settings.confidence_threshold: return None return _build_sell_signal(state, holding, confidence=confidence, reason="anomaly") def _build_sell_signal(state, holding: dict, confidence: float, reason: str) -> dict: ticker = holding["ticker"] return { "ticker": ticker, "name": holding.get("name", ticker), "action": "sell", "confidence_webai": confidence, "current_price": holding.get("current_price"), "avg_price": holding.get("avg_price"), "pnl_pct": holding.get("pnl_pct"), "context": _build_context(state, ticker, rank=None, sell_reason=reason), "as_of": datetime.now(KST).isoformat(), } # ----- Context ----- def _build_context(state, ticker: str, rank: int | None, sell_reason: str | None = None) -> dict: pred = state.chronos_predictions.get(ticker) or {} ap = state.asking_price.get(ticker) or {} news_item = _find_news_sentiment(state, ticker) screener_scores = _find_screener_scores(state, ticker) context: dict = { "chronos_pred_1d": pred.get("median"), "chronos_pred_conf": pred.get("conf"), "chronos_q10": pred.get("q10"), "chronos_q90": pred.get("q90"), "screener_rank": rank, "screener_scores": screener_scores, "minute_momentum": state.minute_momentum.get(ticker), "asking_bid_ratio": ap.get("bid_ratio"), "news_sentiment": news_item.get("score") if news_item else None, "news_reason": news_item.get("reason") if news_item else None, } if sell_reason is not None: context["sell_reason"] = sell_reason return context def _find_news_sentiment(state, ticker: str) -> dict | None: if state.news_sentiment is None: return None for item in state.news_sentiment.get("items", []): if item.get("ticker") == ticker: return item return None def _find_screener_scores(state, ticker: str) -> dict | None: if state.screener_preview is None: return None for item in state.screener_preview.get("items", []): if item.get("ticker") == ticker: return item.get("scores") return None