229 lines
8.8 KiB
Python
229 lines
8.8 KiB
Python
import datetime as dt
|
|
import pandas as pd
|
|
|
|
from app import holdings_intel as hi
|
|
|
|
|
|
def test_get_holdings_merges_price_and_pnl(monkeypatch):
|
|
monkeypatch.setattr(hi.db, "get_all_portfolio", lambda: [
|
|
{"id": 1, "broker": "kis", "ticker": "005930", "name": "삼성전자",
|
|
"quantity": 10, "avg_price": 70000, "purchase_price": 70000},
|
|
{"id": 2, "broker": "kis", "ticker": "AAPL", "name": "Apple",
|
|
"quantity": 5, "avg_price": 200, "purchase_price": 200},
|
|
])
|
|
monkeypatch.setattr(hi.price_fetcher, "get_current_prices",
|
|
lambda tickers: {"005930": 77000}) # AAPL 미조회(비KRX)
|
|
monkeypatch.setattr(hi, "_krx_tickers", lambda: {"005930"})
|
|
hs = hi.get_holdings()
|
|
s = {h["ticker"]: h for h in hs}
|
|
assert s["005930"]["is_krx"] is True
|
|
assert round(s["005930"]["pnl_rate"], 1) == 10.0 # (77000-70000)/70000
|
|
assert s["AAPL"]["is_krx"] is False # KRX 외
|
|
|
|
|
|
def test_get_holdings_zero_avg_price(monkeypatch):
|
|
"""avg_price=0인 종목은 pnl_rate가 None이어야 한다 (ZeroDivisionError 없음)."""
|
|
monkeypatch.setattr(hi.db, "get_all_portfolio", lambda: [
|
|
{"id": 1, "broker": "kis", "ticker": "005930", "name": "삼성전자",
|
|
"quantity": 10, "avg_price": 0, "purchase_price": 0},
|
|
])
|
|
monkeypatch.setattr(hi.price_fetcher, "get_current_prices",
|
|
lambda tickers: {"005930": 80000})
|
|
monkeypatch.setattr(hi, "_krx_tickers", lambda: {"005930"})
|
|
hs = hi.get_holdings()
|
|
assert hs[0]["pnl_rate"] is None
|
|
|
|
|
|
def test_get_holdings_empty_portfolio(monkeypatch):
|
|
"""포트폴리오가 비어있으면 빈 리스트를 반환하고 가격 조회를 호출하지 않는다."""
|
|
monkeypatch.setattr(hi.db, "get_all_portfolio", lambda: [])
|
|
called = []
|
|
monkeypatch.setattr(hi.price_fetcher, "get_current_prices",
|
|
lambda tickers: called.append(tickers) or {})
|
|
monkeypatch.setattr(hi, "_krx_tickers", lambda: set())
|
|
result = hi.get_holdings()
|
|
assert result == []
|
|
assert called == [] # get_current_prices must NOT have been called
|
|
|
|
|
|
def test_get_holdings_price_missing(monkeypatch):
|
|
"""prices dict에 ticker가 없으면 current_price와 pnl_rate는 None이다."""
|
|
monkeypatch.setattr(hi.db, "get_all_portfolio", lambda: [
|
|
{"id": 1, "broker": "kis", "ticker": "000660", "name": "SK하이닉스",
|
|
"quantity": 5, "avg_price": 150000, "purchase_price": 150000},
|
|
])
|
|
monkeypatch.setattr(hi.price_fetcher, "get_current_prices",
|
|
lambda tickers: {}) # 가격 없음
|
|
monkeypatch.setattr(hi, "_krx_tickers", lambda: {"000660"})
|
|
hs = hi.get_holdings()
|
|
assert hs[0]["current_price"] is None
|
|
assert hs[0]["pnl_rate"] is None
|
|
|
|
|
|
# ---- Phase 2 tests ----
|
|
|
|
def _toy_ctx(tickers=("005930",), n=300):
|
|
"""결정적 일봉으로 ScreenContext 유사 객체 구성."""
|
|
from app.screener.engine import ScreenContext
|
|
rows = []
|
|
base = dt.date(2025, 1, 1)
|
|
for t in tickers:
|
|
price = 1000
|
|
for i in range(n):
|
|
price = int(price * 1.002) # 완만한 상승 → 정배열
|
|
d = (base + dt.timedelta(days=i)).isoformat()
|
|
rows.append({"ticker": t, "date": d, "open": price, "high": price,
|
|
"low": price, "close": price, "volume": 1000, "value": price*1000})
|
|
prices = pd.DataFrame(rows)
|
|
master = pd.DataFrame({"name": [f"n{t}" for t in tickers],
|
|
"market": ["KOSPI"]*len(tickers),
|
|
"market_cap": [1e12]*len(tickers)},
|
|
index=pd.Index(tickers, name="ticker"))
|
|
flow = pd.DataFrame(columns=["ticker","date","foreign_net","institution_net"])
|
|
return ScreenContext(master=master, prices=prices, flow=flow,
|
|
kospi=pd.Series(dtype=float), asof=base+dt.timedelta(days=n-1))
|
|
|
|
|
|
def test_technical_posture_returns_scores():
|
|
ctx = _toy_ctx(("005930",))
|
|
scores = hi.technical_posture(ctx, ["005930"])
|
|
assert "005930" in scores
|
|
assert 0.0 <= scores["005930"] <= 100.0 # 상승추세 → 양수 점수
|
|
|
|
|
|
# ---- Task 2.2 tests ----
|
|
|
|
def _ticker_prices(closes, vols=None):
|
|
n = len(closes)
|
|
base = dt.date(2025, 1, 1)
|
|
vols = vols or [1000]*n
|
|
return pd.DataFrame({
|
|
"ticker": ["005930"]*n,
|
|
"date": [(base+dt.timedelta(days=i)).isoformat() for i in range(n)],
|
|
"open": closes, "high": closes, "low": closes, "close": closes, "volume": vols,
|
|
})
|
|
|
|
|
|
DEFAULT_EXIT = {"stop_pct": 0.08, "take_pct": 0.25, "climax_vol_x": 3.0}
|
|
|
|
|
|
def test_exit_rules_stop_and_ma():
|
|
closes = [1000]*60 + [1100]*200 # 충분한 길이, 최근 평탄
|
|
df = _ticker_prices(closes)
|
|
# 현재가가 평단(2000) 대비 -45% → stop_loss
|
|
flags = hi.exit_rules({"avg_price": 2000, "current_price": 1100}, df, DEFAULT_EXIT)
|
|
assert flags["stop_loss"] is True
|
|
# 종가 1100 > MA50≈1100, MA200은 더 낮음 → ma 이탈 아님
|
|
assert flags["ma200_break"] is False
|
|
|
|
|
|
def test_exit_rules_take_profit():
|
|
df = _ticker_prices([1000]*260)
|
|
flags = hi.exit_rules({"avg_price": 1000, "current_price": 1300}, df, DEFAULT_EXIT)
|
|
assert flags["take_profit"] is True # +30% ≥ 25%
|
|
|
|
|
|
# ---- Task 2.3 tests ----
|
|
|
|
def test_decide_action_matrix():
|
|
# 강건 + 이탈 없음 + 높은 강도 → add
|
|
a, r = hi.decide_action(tech_score=80, exit_flags={}, pnl=5)
|
|
assert a == "add"
|
|
# ma200 이탈 → sell
|
|
a, r = hi.decide_action(70, {"ma200_break": True}, 2)
|
|
assert a == "sell"
|
|
# stop_loss → sell
|
|
a, _ = hi.decide_action(70, {"stop_loss": True}, -10)
|
|
assert a == "sell"
|
|
# ma50 이탈만 → trim
|
|
a, _ = hi.decide_action(60, {"ma50_break": True}, 3)
|
|
assert a == "trim"
|
|
# 이탈 없음 보통 강도 → hold
|
|
a, _ = hi.decide_action(50, {}, 1)
|
|
assert a == "hold"
|
|
|
|
|
|
# ---- Phase 2 hardening tests (m3) ----
|
|
|
|
def _ticker_prices_hl(closes, highs, vols):
|
|
n = len(closes)
|
|
base = dt.date(2025, 1, 1)
|
|
return pd.DataFrame({
|
|
"ticker": ["005930"] * n,
|
|
"date": [(base + dt.timedelta(days=i)).isoformat() for i in range(n)],
|
|
"open": closes,
|
|
"high": highs,
|
|
"low": closes,
|
|
"close": closes,
|
|
"volume": vols,
|
|
})
|
|
|
|
|
|
def test_exit_rules_climax():
|
|
closes = [1000] * 30
|
|
highs = [1000] * 29 + [1100] # 마지막날 상단꼬리(종가1000 < 고가1100*0.97)
|
|
vols = [1000] * 29 + [5000] # 거래량 5x
|
|
flags = hi.exit_rules({"avg_price": 900, "current_price": 1000},
|
|
_ticker_prices_hl(closes, highs, vols), {})
|
|
assert flags["climax"] is True
|
|
|
|
|
|
def test_exit_rules_ma200_break():
|
|
closes = list(range(1000, 1000 + 260))[::-1] # 하락 추세 → 종가 < MA200
|
|
df = _ticker_prices(closes)
|
|
flags = hi.exit_rules({"avg_price": 2000, "current_price": closes[-1]}, df, {})
|
|
assert flags["ma200_break"] is True
|
|
|
|
|
|
def test_technical_posture_short_history_returns_low_not_crash():
|
|
ctx = _toy_ctx(("005930",), n=100) # <252 → MA 노드 NaN→0, but no crash
|
|
scores = hi.technical_posture(ctx, ["005930"])
|
|
assert "005930" in scores
|
|
assert 0.0 <= scores["005930"] <= 100.0
|
|
|
|
|
|
def test_technical_posture_empty_kospi_not_penalized():
|
|
# rs_rating는 빈 kospi에서 빈 Series → combine에서 제외되어야 (C1)
|
|
ctx = _toy_ctx(("005930",), n=300) # kospi 빈 fixture
|
|
scores = hi.technical_posture(ctx, ["005930"])
|
|
# ma_alignment+momentum만으로 정규화 → 상승추세면 충분히 높은 점수
|
|
assert scores["005930"] > 50.0
|
|
|
|
|
|
# ---- Phase 3 tests ----
|
|
|
|
DEFAULT_EVENT = {"move_pct": 7.0, "vol_z": 2.5}
|
|
|
|
|
|
def test_market_events_detects_move_and_volume():
|
|
closes = [1000]*30 + [1100] # 마지막날 +10%
|
|
vols = [1000]*30 + [10000] # 거래량 급증
|
|
df = _ticker_prices(closes, vols)
|
|
evts = hi.market_events("005930", df, None, DEFAULT_EVENT)
|
|
types = {e["type"] for e in evts}
|
|
assert "price_move" in types
|
|
assert "volume_surge" in types
|
|
|
|
|
|
def test_news_issues_flags_negative_sentiment(monkeypatch):
|
|
# news_sentiment: 005930 음수 점수 → 악재 flag
|
|
monkeypatch.setattr(hi, "_news_sentiment_map", lambda date: {
|
|
"005930": {"score_raw": -0.6, "news_count": 8}})
|
|
issues = hi.news_issues(["005930"], date="2026-05-29", use_llm=False)
|
|
assert "005930" in issues
|
|
assert issues["005930"][0]["type"] == "news"
|
|
assert issues["005930"][0]["severity"] in ("med", "high")
|
|
|
|
|
|
def test_portfolio_health():
|
|
holdings = [
|
|
{"ticker": "005930", "quantity": 10, "avg_price": 70000, "current_price": 77000,
|
|
"is_krx": True},
|
|
{"ticker": "000660", "quantity": 5, "avg_price": 100000, "current_price": 90000,
|
|
"is_krx": True},
|
|
]
|
|
h = hi.portfolio_health(holdings, total_cash=1000000)
|
|
assert h["positions"] == 2
|
|
assert 0 <= h["max_weight"] <= 1.0
|
|
assert "total_eval" in h and "total_pnl" in h and "cash_ratio" in h
|