merge: 주식 보유종목 인텔리전스 (Phase 1-5)

스크리너 엔진을 보유종목에 restrict + 매도/리스크 룰 + 이슈 감지
(급변·거래량·외인·뉴스감성) + 포트 건강 → 매일 advisory 브리핑.
EOD(16:50)+아침(08:30) cron. KIS 실주문 미사용.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-31 22:56:00 +09:00
13 changed files with 2338 additions and 1 deletions

View File

@@ -336,7 +336,48 @@ class StockAgent(BaseAgent):
await self.transition("idle", "AI 뉴스 완료") await self.transition("idle", "AI 뉴스 완료")
async def run_holdings_eod(self) -> dict:
"""평일 16:50 — 보유종목 시그널 계산·저장."""
# idle 가드 없음(의도적): 스크리너 진행 중에도 EOD/브리핑은 독립적으로 실행되어야 함
from ..service_proxy import stock_holdings_run
from ..db import create_task, update_task_status, add_log
task_id = create_task(self.agent_id, "holdings_eod", {})
try:
res = await stock_holdings_run()
update_task_status(task_id, "succeeded", res)
add_log(self.agent_id, f"holdings_eod: {res}", "info", task_id)
return {"ok": True, **res}
except Exception as e:
update_task_status(task_id, "failed", {"error": str(e)})
add_log(self.agent_id, f"holdings_eod 실패: {e}", "error", task_id)
return {"ok": False, "message": str(e)}
async def run_holdings_brief(self) -> dict:
"""평일 08:30 — 저장된 시그널 브리핑 텔레그램."""
# idle 가드 없음(의도적): 스크리너 진행 중에도 EOD/브리핑은 독립적으로 실행되어야 함
from ..service_proxy import stock_holdings_brief
from ..notifiers.telegram_stock import send_holdings_brief
from ..db import create_task, update_task_status, add_log
task_id = create_task(self.agent_id, "holdings_brief", {})
try:
payload = await stock_holdings_brief()
await send_holdings_brief(payload)
update_task_status(task_id, "succeeded", {"date": payload.get("date"),
"count": len(payload.get("holdings", []))})
add_log(self.agent_id, f"holdings_brief 발송: {payload.get('date')}", "info", task_id)
return {"ok": True}
except Exception as e:
update_task_status(task_id, "failed", {"error": str(e)})
add_log(self.agent_id, f"holdings_brief 실패: {e}", "error", task_id)
return {"ok": False, "message": str(e)}
async def on_command(self, command: str, params: dict) -> dict: async def on_command(self, command: str, params: dict) -> dict:
if command == "holdings_eod":
return await self.run_holdings_eod()
if command == "holdings_brief":
return await self.run_holdings_brief()
if command == "run_screener": if command == "run_screener":
await self.on_screener_schedule() await self.on_screener_schedule()
return {"ok": True, "message": "스크리너 실행 트리거 완료"} return {"ok": True, "message": "스크리너 실행 트리거 완료"}

View File

@@ -0,0 +1,42 @@
"""보유종목 인텔리전스 텔레그램 포매터 (advisory)."""
import logging
from typing import Any, Dict
from ..telegram.messaging import send_raw
logger = logging.getLogger("agent-office")
_ACTION_KR = {"add": "🟢 추가매수", "hold": "⚪ 보유", "trim": "🟡 축소", "sell": "🔴 매도"}
_SEV = {"high": "🔴", "med": "🟠", "low": "🟡"}
def format_holdings_brief(payload: Dict[str, Any]) -> str:
date = payload.get("date") or "?"
lines = [f"📊 <b>보유종목 인텔리전스</b> ({date})", ""]
ph = payload.get("portfolio_health") or {}
if ph:
lines.append(f"포트 손익 {ph.get('total_pnl_rate',0):+.1f}% · "
f"종목 {ph.get('positions',0)} · 최대비중 {ph.get('max_weight',0)*100:.0f}% · "
f"현금 {ph.get('cash_ratio',0)*100:.0f}%")
lines.append("")
for h in payload.get("holdings", []):
act = _ACTION_KR.get(h.get("action"), h.get("action", "?"))
pnl = h.get("pnl_rate")
pnl_txt = f"{pnl:+.1f}%" if pnl is not None else ""
line = f"{act} <b>{h.get('name') or h.get('ticker')}</b> ({pnl_txt})"
if h.get("reasons"):
line += f"{h['reasons']}"
lines.append(line)
for iss in (h.get("issues") or [])[:3]:
lines.append(f" {_SEV.get(iss.get('severity'),'')} {iss.get('summary','')}")
lines.append("")
lines.append(" 투자 판단 보조용 제안입니다(자동매매 아님).")
return "\n".join(lines)
async def send_holdings_brief(payload: Dict[str, Any]) -> None:
text = format_holdings_brief(payload)
try:
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_stock] holdings brief send failed: {e}")

View File

@@ -22,6 +22,16 @@ async def _run_stock_ai_news():
if agent: if agent:
await agent.on_ai_news_schedule() await agent.on_ai_news_schedule()
async def _run_stock_holdings_eod():
agent = AGENT_REGISTRY.get("stock")
if agent:
await agent.run_holdings_eod()
async def _run_stock_holdings_brief():
agent = AGENT_REGISTRY.get("stock")
if agent:
await agent.run_holdings_brief()
async def _run_insta_schedule(): async def _run_insta_schedule():
agent = AGENT_REGISTRY.get("insta") agent = AGENT_REGISTRY.get("insta")
if agent: if agent:
@@ -111,6 +121,8 @@ def init_scheduler():
minute=0, minute=0,
id="stock_ai_news_sentiment", id="stock_ai_news_sentiment",
) )
scheduler.add_job(_run_stock_holdings_eod, "cron", day_of_week="mon-fri", hour=16, minute=50, id="stock_holdings_eod") # 16:50: 스크리너 snapshot(16:30) 완료 후 — 부분 일봉 읽기 방지
scheduler.add_job(_run_stock_holdings_brief, "cron", day_of_week="mon-fri", hour=8, minute=30, id="stock_holdings_brief")
scheduler.add_job(_run_insta_schedule, "cron", hour=9, minute=30, id="insta_pipeline") scheduler.add_job(_run_insta_schedule, "cron", hour=9, minute=30, id="insta_pipeline")
# 외부 트렌드 수집은 장 마감 후 16:40 — 9시 주식 활발 시간대 NAS 자원 회피. # 외부 트렌드 수집은 장 마감 후 16:40 — 9시 주식 활발 시간대 NAS 자원 회피.
# screener(16:30)와 10분 스태거: Celeron 2C/2.0GHz 동시 실행 시 CPU 폭주 방지 (CHECK_POINT FU-A) # screener(16:30)와 10분 스태거: Celeron 2C/2.0GHz 동시 실행 시 CPU 폭주 방지 (CHECK_POINT FU-A)

View File

@@ -88,6 +88,29 @@ async def scrape_stock_news() -> Dict[str, Any]:
resp.raise_for_status() resp.raise_for_status()
return resp.json() return resp.json()
async def stock_holdings_run() -> Dict[str, Any]:
"""보유종목 시그널 계산 트리거 (EOD, use_llm=True).
stock BackgroundTask 등록 후 즉시 {ok, queued} 반환.
실제 계산은 stock 컨테이너 백그라운드에서 진행 — 여유있게 120s.
"""
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{STOCK_URL}/api/stock/holdings/intel/run",
params={"use_llm": True},
)
resp.raise_for_status()
return resp.json()
async def stock_holdings_brief() -> Dict[str, Any]:
"""보유종목 최신 브리핑 payload 조회 (GET, 모듈 레벨 _client 사용)."""
resp = await _client.get(f"{STOCK_URL}/api/stock/holdings/intel")
resp.raise_for_status()
return resp.json()
async def generate_music(payload: dict) -> Dict[str, Any]: async def generate_music(payload: dict) -> Dict[str, Any]:
resp = await _client.post(f"{MUSIC_LAB_URL}/api/music/generate", json=payload) resp = await _client.post(f"{MUSIC_LAB_URL}/api/music/generate", json=payload)
resp.raise_for_status() resp.raise_for_status()

View File

@@ -0,0 +1,82 @@
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from app.notifiers import telegram_stock as ts
def test_format_holdings_brief():
payload = {
"date": "2026-05-29",
"holdings": [
{"ticker": "005930", "name": "삼성전자", "action": "trim", "tech_score": 60.0,
"exit_flags": {"ma50_break": True}, "issues": [{"type":"news","severity":"high","summary":"악재"}],
"pnl_rate": 5.2, "reasons": "MA50 이탈"},
{"ticker": "000660", "name": "SK하이닉스", "action": "hold", "tech_score": 75.0,
"exit_flags": {}, "issues": [], "pnl_rate": -2.0, "reasons": "특이 신호 없음"},
],
"portfolio_health": {"positions": 2, "total_pnl_rate": 3.1, "max_weight": 0.6, "cash_ratio": 0.2},
}
txt = ts.format_holdings_brief(payload)
assert "삼성전자" in txt
assert "축소" in txt or "trim" in txt
assert "%" in txt
def test_format_holdings_brief_empty_holdings():
"""빈 holdings + None portfolio_health에도 크래시 없음."""
payload = {"date": "2026-05-29", "holdings": [], "portfolio_health": None}
txt = ts.format_holdings_brief(payload)
assert "보유종목 인텔리전스" in txt
assert "자동매매" in txt
def test_format_holdings_brief_missing_fields():
"""pnl_rate None·name None·issues None 방어적 처리."""
payload = {
"date": None,
"holdings": [
{"ticker": "005930", "name": None, "action": "sell",
"pnl_rate": None, "reasons": None, "issues": None},
],
"portfolio_health": {},
}
txt = ts.format_holdings_brief(payload)
assert "005930" in txt # ticker fallback
assert "🔴 매도" in txt
def test_format_holdings_brief_sell_action():
"""sell 액션은 🔴 매도로 표시."""
payload = {
"date": "2026-05-29",
"holdings": [
{"ticker": "000660", "name": "SK하이닉스", "action": "sell",
"pnl_rate": -12.5, "reasons": "손절선 이탈", "issues": []},
],
"portfolio_health": {"positions": 1, "total_pnl_rate": -12.5,
"max_weight": 1.0, "cash_ratio": 0.0},
}
txt = ts.format_holdings_brief(payload)
assert "🔴 매도" in txt
assert "-12.5%" in txt
def test_format_holdings_brief_issue_severity_icons():
"""이슈 심각도별 이모지 매핑 확인."""
payload = {
"date": "2026-05-29",
"holdings": [
{"ticker": "005930", "name": "삼성전자", "action": "hold", "pnl_rate": 2.0,
"reasons": "특이 신호 없음",
"issues": [
{"type": "news", "severity": "high", "summary": "심각 악재"},
{"type": "volume_surge", "severity": "med", "summary": "거래량 급증"},
{"type": "price_move", "severity": "low", "summary": "소폭 변동"},
]},
],
"portfolio_health": {},
}
txt = ts.format_holdings_brief(payload)
assert "🔴" in txt # high severity
assert "🟠" in txt # med severity
assert "🟡" in txt # low severity

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,122 @@
# 주식 보유종목 인텔리전스 — 설계 Spec
- **작성일**: 2026-05-31
- **상태**: 설계 승인 (구현 plan 대기)
- **대상 서비스**: `stock` + `agent-office`(StockAgent) + `web-ui`(stock/포트폴리오 페이지)
- **사이클**: 스마트 에이전트 고도화 3종 중 **2번 주식**. (1번 로또 완료, 3번 인스타 후속)
---
## 1. 배경 & 목표
현재 StockAgent는 아침 뉴스 요약(07:30) · KRX 강세주 스크리너(16:30) · AI 뉴스 sentiment(08:00)를 브리핑한다. CEO는 여기서 더 나아가 **내 보유종목을 집중 분석**해 ①종목별 매수/매도 자세 ②이슈 정리 ③포트폴리오 건강을 매일 advisory로 브리핑받길 원한다.
### 핵심 결정 (2026-05-31 brainstorming)
1. **실행 수준 = 브리핑 전용(advisory)**. `/api/trade/order`(KIS 실주문) 미사용. 매수/매도는 "제안"만, 실제 주문은 사용자 수동. (로또와 동일한 정직·관찰 철학)
2. **분석 주기 = 일봉 EOD + 장중 경량 가드**. 장마감 후 일봉으로 기술분석 → 다음날 아침 브리핑. 장중엔 현재가로 손절·급변(±N%)만 경도 알림. 인트라데이 분봉 파이프라인 신설 안 함.
3. **브리핑 범위 = 보유종목 + 포트 레벨**. 종목별 액션 + 포트폴리오 건강(집중도·비중·현금·손익).
4. **이슈 소스 = 기존 뉴스+감성+LLM 요약 + 급변·거래량·외인수급 이벤트**. 신규 스크래핑 0 (DART·실적 일정 제외).
### 기존 자산 (100% 재활용, 신규 ML/데이터소스 없음)
- `stock/app/screener/snapshot.py``krx_daily_prices`(일봉 OHLCV) + `krx_master`(listing) + naver 외인 flow. 스크리너 잡(평일 16:30)이 갱신.
- `stock/app/screener/engine.py` + `nodes/`(ma_alignment·momentum·rs_rating·vcp_lite·volume_surge·foreign_buy·high52w·hygiene). **`ScreenContext.restrict(tickers)`** + `latest_close()`/`latest_high()`로 보유종목 한정 분석 가능.
- `portfolio` 테이블(broker·ticker·name·quantity·avg_price·purchase_price) + `/api/portfolio`(현재가·손익 계산) + `broker_cash`(예수금).
- `price_fetcher`(현재가 3분 TTL) · `news_sentiment` 테이블(종목별 감성) · `ai_summarizer`(Claude Haiku).
### 알려진 제약 (설계 반영)
- **섹터 필드 없음**: `portfolio`·`krx_master`에 sector 없음 → 섹터 편중은 best-effort(FDR `StockListing`의 Sector/Industry가 있으면 사용, 없으면 생략)이고, **시장(KOSPI/KOSDAQ)·종목 비중 집중도**를 기본 지표로 사용.
- **KRX 외 종목**(미국주 등): krx_daily_prices 밖 → 기술분석 불가, **뉴스·현재가·손익만** graceful 처리.
- **snapshot 히스토리 의존**: MA200·52주 고점 노드는 ~1년 일봉 필요. 스크리너가 이미 이 노드들을 쓰므로 윈도우는 충족 가정(plan에서 lookback 확인 단계 포함).
---
## 2. 데이터 모델 & 컴포넌트
### 신규 테이블 `holdings_signals` (stock.db, 일별 종목 시그널 이력)
```
date TEXT NOT NULL -- KST 거래일
ticker TEXT NOT NULL
name TEXT
action TEXT NOT NULL -- 'add' | 'hold' | 'trim' | 'sell'
tech_score REAL -- 매수강도(score 노드 가중합, 0~1 정규화)
exit_flags TEXT NOT NULL DEFAULT '{}' -- JSON {stop_loss,ma50_break,ma200_break,momentum_loss,take_profit,climax}
issues TEXT NOT NULL DEFAULT '[]' -- JSON [{type, severity, summary}]
close INTEGER
pnl_rate REAL -- 평단 대비 % (스냅샷 시점)
reasons TEXT -- 액션 근거 텍스트
created_at TEXT NOT NULL DEFAULT (datetime('now'))
PRIMARY KEY(date, ticker) -- 멱등 upsert
```
> 추세/이력은 이 테이블에서 조회. 포트 레벨 요약은 on-the-fly 계산(별도 테이블 불필요).
### 신규 `stock/app/holdings_intel.py` (순수연산 중심, FastAPI 의존성 최소)
- `get_holdings() -> list[dict]``portfolio` 행 + 현재가(price_fetcher) + pnl_rate. KRX 여부 플래그(`is_krx`).
- `technical_posture(ctx_restricted, tickers) -> dict[ticker, score]``ScreenContext.restrict(tickers)`에 score 노드 실행 → 매수강도.
- `exit_rules(holding, prices_df, params) -> dict`**신규**: 손절·MA이탈·모멘텀소멸·익절·클라이맥스 flag 산출 (§3).
- `decide_action(tech_score, exit_flags, pnl) -> (action, reasons)`**신규**: 매수강도+exit 조합 → add/hold/trim/sell + 근거.
- `market_events(prices_df, flow, params) -> dict[ticker, list]` — 급변(±N%)·거래량 Z-score·외인 순매도.
- `news_issues(tickers) -> dict[ticker, list]` — news+news_sentiment 필터 → Claude Haiku 악재·심각도 요약(악재 있는 종목만).
- `portfolio_health(holdings, cash) -> dict` — 종목 비중 집중도(HHI/최대비중)·시장 mix·현금 비중·총 손익.
- `compute_and_store(asof) -> dict` — 위를 조합해 holdings_signals upsert (멱등).
- `build_holdings_brief(asof) -> dict` — 브리핑/UI payload 조립(종목별 action+issues + portfolio_health + 추세).
### API (stock)
| 메서드 | 경로 | 설명 |
|--------|------|------|
| GET | `/api/stock/holdings/intel` | 최신 브리핑 payload |
| GET | `/api/stock/holdings/intel/history?ticker=&days=` | 종목 시그널 추세 |
| POST | `/api/stock/holdings/intel/run` | 수동 계산 트리거(BackgroundTask) |
---
## 3. 매도/리스크 룰 & 이슈 (설정 가능 임계값 — 기본값 제시)
### exit_flags (각 boolean + 값)
- **stop_loss**: `current < avg_price × (1 STOP_PCT)` (기본 STOP_PCT=0.08, Minervini식)
- **ma50_break / ma200_break**: 종가 < MA50 / MA200
- **momentum_loss**: momentum/RS 노드 점수가 직전 대비 임계 하락 (or 음전환)
- **take_profit**: `pnl_rate ≥ TAKE_PCT` (기본 25%) — 부분 익절 후보
- **climax**: 거래량 급증(vol > avg×CLIMAX_VOL_X) + 종가 상단 꼬리 (분산 의심)
### decide_action 매트릭스
- tech_score 高 + exit_flags 無 → **add**(추가매수 후보)
- exit_flags 無 (강건) → **hold**
- ma50_break 또는 momentum_loss 또는 take_profit → **trim**(일부 축소)
- stop_loss 또는 ma200_break → **sell**(청산 후보)
- 각 결정에 trigger된 flag를 근거 텍스트로 동봉. (advisory — "제안")
### issues
- **시장이벤트** (기존 데이터): 일봉 ±EVENT_PCT% 급변 / 거래량 Z-score>임계 / naver flow 외인 순매도 N일 연속.
- **뉴스이슈**: 보유종목 최근 뉴스 + news_sentiment 음수 → Claude Haiku로 `{type, severity(low/med/high), summary}` 요약. 악재 있는 종목만 호출(비용 bounded).
---
## 4. 플로우 · 에이전트 · UI
1. **EOD 계산 (평일 16:40)**: 기존 스크리너/뉴스 잡과 동일하게 **agent-office cron이 orchestrate**`_run_stock_holdings_eod()``StockAgent.run_holdings_eod()` → stock `POST /api/stock/holdings/intel/run``holdings_intel.compute_and_store(today)` → holdings_signals upsert. 스크리너 snapshot 갱신(16:30) 직후라 일봉 준비됨.
2. **아침 브리핑 (평일 08:30, agent-office StockAgent.run_holdings_brief)**: 저장된 최신 시그널 + 야간 갭(현재가) → 텔레그램 1통(종목별 액션 + 포트 건강 + 상위 이슈). AI 뉴스(08:00) 다음 슬롯.
3. **장중 경량 가드 (평일 09:00~15:30, 30분 간격)**: 현재가로 손절선 이탈·급변(±N%)만 점검 → 발생 시 텔레그램 alert. throttle(종목·유형별 재발화 억제) + daily cap (로또 시그널 패턴 재활용).
4. **agent-office**: `service_proxy`에 holdings intel 호출 추가 + StockAgent 메서드(run_holdings_brief / intraday_guard) + scheduler cron.
5. **UI (web-ui)**: stock/포트폴리오 페이지에 **"보유종목 인텔리전스" 탭/섹션 통합** — 종목별 액션 카드(자세·exit flags·근거) + 포트 건강 위젯 + 이슈 피드 + 종목 시그널 추세(history).
---
## 5. 에러·성능·테스트·리스크
- **멱등성**: holdings_signals PRIMARY KEY(date,ticker) upsert → 재계산 안전.
- **성능 (NAS Celeron)**: 보유종목만 restrict(소수 종목)이라 전체 스크리너 대비 매우 가벼움. LLM 이슈 요약은 악재 종목만(bounded). EOD 1회 + 장중 가드는 현재가만(경량).
- **graceful degrade**: price_fetcher/KIS/news 실패 시 부분 데이터로 진행 + 경고 로그. KRX 외 종목은 기술분석 skip(뉴스·손익만). 텔레그램 실패는 로그만(job 성공 유지).
- **테스트**: exit_rules 각 flag, decide_action 매트릭스 전 분기, market_events 검출, portfolio_health 계산, holdings_signals 멱등, KRX 외 종목 graceful, 뉴스 0건 경로.
- **리스크**: ①기술적 시그널은 휴리스틱이지 보장 아님 → advisory 프레이밍·자동매매 없음 ②섹터 데이터 갭 → 시장·비중 집중도로 대체 ③snapshot 히스토리 의존 → plan에 lookback 확인 ④보유종목 출처는 portfolio 테이블(사용자/KIS 동기화) — 누락 시 빈 브리핑 graceful.
---
## 6. 결정 로그 (2026-05-31)
1. 실행 수준 = **advisory 전용** (KIS 실주문 미사용)
2. 주기 = **일봉 EOD + 장중 경량 가드**
3. 범위 = **보유종목 + 포트 레벨**
4. 이슈 소스 = **기존 뉴스+감성+LLM + 급변·거래량·외인 이벤트**
## 7. 스코프 밖 / 향후
- 자동매매(승인후/완전자동), 인트라데이 분봉, DART 공시·실적 일정, 신규 매수후보 발굴(기존 16:30 스크리너가 담당), 교체(rotation) 제안 — 향후 사이클.
- 인스타 에이전트(자율 카드 발급) — 다음 사이클.

View File

@@ -1,6 +1,7 @@
import sqlite3 import sqlite3
import os import os
import hashlib import hashlib
import json
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
from app.screener.schema import ensure_screener_schema from app.screener.schema import ensure_screener_schema
@@ -103,6 +104,27 @@ def init_db():
if "commission" not in sh_cols: if "commission" not in sh_cols:
conn.execute("ALTER TABLE sell_history ADD COLUMN commission REAL NOT NULL DEFAULT 0") conn.execute("ALTER TABLE sell_history ADD COLUMN commission REAL NOT NULL DEFAULT 0")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS holdings_signals (
date TEXT NOT NULL,
ticker TEXT NOT NULL,
name TEXT,
action TEXT NOT NULL,
tech_score REAL,
exit_flags TEXT NOT NULL DEFAULT '{}',
issues TEXT NOT NULL DEFAULT '[]',
close INTEGER,
pnl_rate REAL,
reasons TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now','localtime')),
PRIMARY KEY (date, ticker)
);
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_holdings_sig_ticker "
"ON holdings_signals(ticker, date DESC);")
# Screener 스키마 부트스트랩 (7테이블 + 디폴트 설정 시드) # Screener 스키마 부트스트랩 (7테이블 + 디폴트 설정 시드)
ensure_screener_schema(conn) ensure_screener_schema(conn)
@@ -297,3 +319,63 @@ def get_asset_snapshots(days: int = 30) -> List[Dict[str, Any]]:
).fetchall() ).fetchall()
rows = list(reversed(rows)) rows = list(reversed(rows))
return [dict(r) for r in rows] return [dict(r) for r in rows]
# --- KRX Master ---
def get_krx_tickers() -> set:
with _conn() as conn:
try:
rows = conn.execute("SELECT ticker FROM krx_master").fetchall()
except Exception:
return set()
return {r["ticker"] for r in rows}
# --- Holdings Signals CRUD ---
def upsert_holdings_signal(
date: str, ticker: str, name: Optional[str], action: str,
tech_score: Optional[float], exit_flags: dict, issues: list,
close: Optional[int], pnl_rate: Optional[float], reasons: Optional[str],
) -> None:
with _conn() as conn:
conn.execute(
"""
INSERT INTO holdings_signals
(date, ticker, name, action, tech_score, exit_flags, issues, close, pnl_rate, reasons)
VALUES (?,?,?,?,?,?,?,?,?,?)
ON CONFLICT(date, ticker) DO UPDATE SET
name=excluded.name, action=excluded.action, tech_score=excluded.tech_score,
exit_flags=excluded.exit_flags, issues=excluded.issues, close=excluded.close,
pnl_rate=excluded.pnl_rate, reasons=excluded.reasons
""",
(date, ticker, name, action, tech_score,
json.dumps(exit_flags, ensure_ascii=False),
json.dumps(issues, ensure_ascii=False), close, pnl_rate, reasons),
)
def _row_to_signal(r) -> dict:
d = dict(r)
d["exit_flags"] = json.loads(d.get("exit_flags") or "{}")
d["issues"] = json.loads(d.get("issues") or "[]")
return d
def get_holdings_signals(date: str) -> list:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM holdings_signals WHERE date=? ORDER BY ticker", (date,)).fetchall()
return [_row_to_signal(r) for r in rows]
def get_latest_holdings_date() -> str | None:
with _conn() as conn:
r = conn.execute("SELECT MAX(date) AS d FROM holdings_signals").fetchone()
return r["d"] if r and r["d"] else None
def get_holdings_signal_history(ticker: str, limit: int = 30) -> list:
"""최근 N개 시그널 행 (시그널은 거래일당 1행이라 ≈ N 거래일)."""
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM holdings_signals WHERE ticker=? ORDER BY date DESC LIMIT ?",
(ticker, limit)).fetchall()
return [_row_to_signal(r) for r in rows]

344
stock/app/holdings_intel.py Normal file
View File

@@ -0,0 +1,344 @@
"""보유종목 인텔리전스 — 순수연산 중심 (advisory). KIS 실주문 미사용."""
from __future__ import annotations
import datetime as dt
from typing import Any, Optional
import pandas as pd
from . import db
from . import price_fetcher
from .screener.engine import combine
def _krx_tickers() -> set:
"""krx_master ticker 집합 (KRX 판별용)."""
return db.get_krx_tickers()
def get_holdings() -> list[dict]:
"""portfolio + 현재가 + pnl_rate + is_krx."""
items = db.get_all_portfolio()
tickers = [it["ticker"] for it in items]
prices = price_fetcher.get_current_prices(tickers) if tickers else {}
krx = _krx_tickers()
out = []
for it in items:
cur = prices.get(it["ticker"])
avg = it["avg_price"]
pnl = ((cur - avg) / avg * 100.0) if (cur and avg) else None
out.append({
**it,
"current_price": cur,
"pnl_rate": pnl,
"is_krx": it["ticker"] in krx,
})
return out
# ---- Task 2.1: technical_posture ----
def _score_nodes_and_weights():
"""NODE_REGISTRY에서 보유종목 매수강도 계산용 노드 인스턴스화."""
from .screener.registry import NODE_REGISTRY
weights = {"ma_alignment": 0.4, "momentum": 0.3, "rs_rating": 0.3}
nodes = [NODE_REGISTRY[k]() for k in weights]
return nodes, weights
def technical_posture(ctx, tickers: list[str]) -> dict[str, float]:
"""보유종목 restrict 후 score 노드 → 매수강도(0~100)."""
scoped = ctx.restrict(tickers)
if scoped.prices.empty:
return {}
nodes, weights = _score_nodes_and_weights()
scores = {}
for n in nodes:
try:
scores[n.name] = n.compute(scoped, {})
except Exception:
scores[n.name] = pd.Series(0.0, index=scoped.master.index)
scores_ne = {k: s for k, s in scores.items() if not s.empty}
weights_ne = {k: w for k, w in weights.items() if k in scores_ne}
if not weights_ne:
return {}
total = combine(scores_ne, weights_ne)
return {t: float(total.get(t, 0.0)) for t in tickers if t in total.index}
# ---- Task 2.2: exit_rules ----
_DEFAULT_EXIT_PARAMS = {"stop_pct": 0.08, "take_pct": 0.25, "climax_vol_x": 3.0}
def _ma(closes: "pd.Series", window: int) -> Optional[float]:
if len(closes) < window:
return None
val = closes.rolling(window).mean().iloc[-1]
return float(val) if pd.notna(val) else None
def exit_rules(holding: dict, ticker_prices: "pd.DataFrame", params: dict) -> dict:
"""가격 기반 청산/리스크 flag (stop_loss/ma50_break/ma200_break/take_profit/climax).
Note: momentum_loss는 compute_and_store 단계에서 집계하므로 여기서 설정하지 않는다.
"""
p = {**_DEFAULT_EXIT_PARAMS, **(params or {})}
flags = {"stop_loss": False, "ma50_break": False, "ma200_break": False,
"take_profit": False, "climax": False}
avg = holding.get("avg_price")
cur = holding.get("current_price")
if ticker_prices is None or ticker_prices.empty:
closes = pd.Series(dtype=float)
else:
closes = ticker_prices.sort_values("date")["close"].astype(float).reset_index(drop=True)
last_close = float(closes.iloc[-1]) if len(closes) else cur
if cur is None:
cur = last_close
if cur is not None and avg:
if cur < avg * (1 - p["stop_pct"]):
flags["stop_loss"] = True
if avg > 0 and (cur - avg) / avg >= p["take_pct"]:
flags["take_profit"] = True
ma50 = _ma(closes, 50)
ma200 = _ma(closes, 200)
if ma50 is not None and last_close is not None and last_close < ma50:
flags["ma50_break"] = True
if ma200 is not None and last_close is not None and last_close < ma200:
flags["ma200_break"] = True
# climax: 최근 거래량이 20일 평균의 climax_vol_x배 이상 + 종가가 당일 고점 대비 하단(상단꼬리)
if ticker_prices is not None and not ticker_prices.empty and len(ticker_prices) >= 21:
tp = ticker_prices.sort_values("date")
vol = tp["volume"].astype(float).reset_index(drop=True)
avg_vol = vol.iloc[-21:-1].mean()
last_vol = vol.iloc[-1]
hi_ = float(tp["high"].astype(float).iloc[-1])
cl_ = float(tp["close"].astype(float).iloc[-1])
if avg_vol and last_vol >= avg_vol * p["climax_vol_x"] and hi_ > 0 and cl_ < hi_ * 0.97:
flags["climax"] = True
return flags
# ---- Task 2.3: decide_action ----
ADD_SCORE = 70.0 # 이 이상이면 추가매수 후보
# ---- Task 3.1: market_events ----
_DEFAULT_EVENT_PARAMS = {"move_pct": 7.0, "vol_z": 2.5}
def market_events(ticker: str, ticker_prices: "pd.DataFrame",
ticker_flow: "pd.DataFrame | None", params: dict) -> list[dict]:
"""일봉/flow 기반 시장 이벤트 (급변·거래량 Z·외인 순매도)."""
p = {**_DEFAULT_EVENT_PARAMS, **(params or {})}
events = []
if ticker_prices is None or ticker_prices.empty or len(ticker_prices) < 2:
return events
tp = ticker_prices.sort_values("date").reset_index(drop=True)
close = tp["close"].astype(float)
pct = (close.iloc[-1] - close.iloc[-2]) / close.iloc[-2] * 100.0 if close.iloc[-2] else 0.0
if abs(pct) >= p["move_pct"]:
events.append({
"type": "price_move",
"severity": "high" if abs(pct) >= p["move_pct"] * 1.5 else "med",
"summary": f"전일 대비 {pct:+.1f}%",
})
vol = tp["volume"].astype(float)
if len(vol) >= 21:
base = vol.iloc[-21:-1]
mu, sd = base.mean(), base.std(ddof=0)
last_vol = vol.iloc[-1]
if mu > 0 and (
(sd and (last_vol - mu) / sd >= p["vol_z"])
or (not sd and last_vol >= mu * p["vol_z"]) # sd=0 (평탄 기준선): vol_z를 Z-score가 아닌 단순 배수로 사용
):
z_txt = f"{(last_vol - mu) / sd:.1f}" if sd else f"ratio={last_vol / mu:.1f}x"
events.append({
"type": "volume_surge",
"severity": "med",
"summary": f"거래량 평소 대비 급증(Z={z_txt})",
})
if ticker_flow is not None and not ticker_flow.empty:
tf = ticker_flow.sort_values("date")
recent = tf["foreign_net"].astype(float).iloc[-3:]
if len(recent) >= 3 and (recent < 0).all():
events.append({
"type": "foreign_selling",
"severity": "med",
"summary": "외국인 3일 연속 순매도",
})
return events
# ---- Task 3.2: news_issues ----
NEG_SENTIMENT = -0.3 # 이하면 악재 후보
def _news_sentiment_map(date: str) -> dict:
"""date 기준 news_sentiment 테이블에서 ticker → {score_raw, news_count} 맵 반환."""
with db._conn() as conn:
try:
rows = conn.execute(
"SELECT ticker, score_raw, news_count FROM news_sentiment WHERE date=?",
(date,),
).fetchall()
except Exception:
return {}
return {r["ticker"]: {"score_raw": r["score_raw"], "news_count": r["news_count"]}
for r in rows}
def news_issues(tickers: list[str], date: str, use_llm: bool = True) -> dict[str, list]:
"""news_sentiment 음수 → 악재 flag. (LLM 요약은 best-effort; 단위 테스트는 use_llm=False로.)"""
senti = _news_sentiment_map(date)
out: dict[str, list] = {}
for t in tickers:
s = senti.get(t)
if not s or s["score_raw"] is None:
continue
if s["score_raw"] <= NEG_SENTIMENT:
sev = "high" if s["score_raw"] <= NEG_SENTIMENT * 2 else "med"
out.setdefault(t, []).append({
"type": "news",
"severity": sev,
"summary": f"부정 뉴스 감성({s['score_raw']:+.2f}, {s.get('news_count', 0)}건)",
})
return out
# ---- Task 3.3: portfolio_health ----
def portfolio_health(holdings: list[dict], total_cash: int = 0) -> dict:
"""비중 집중도(최대비중·HHI) + 현금비중 + 총손익 요약."""
evals, buys = [], []
for h in holdings:
cur = h.get("current_price") or h.get("avg_price") or 0
ev = cur * h.get("quantity", 0)
bu = (h.get("avg_price") or 0) * h.get("quantity", 0)
evals.append(ev)
buys.append(bu)
total_eval = sum(evals)
total_buy = sum(buys)
weights = [e / total_eval for e in evals] if total_eval else []
hhi = sum(w * w for w in weights)
total_assets = total_eval + (total_cash or 0)
return {
"positions": len(holdings),
"total_eval": total_eval,
"total_buy": total_buy,
"total_pnl": total_eval - total_buy,
"total_pnl_rate": ((total_eval - total_buy) / total_buy * 100.0) if total_buy else 0.0,
"max_weight": max(weights) if weights else 0.0,
"hhi": round(hhi, 4),
"cash_ratio": ((total_cash or 0) / total_assets) if total_assets else 0.0,
}
DEFAULT_PARAMS = {
"stop_pct": 0.08, "take_pct": 0.25, "climax_vol_x": 3.0,
"move_pct": 7.0, "vol_z": 2.5,
"momentum_drop": 15.0, "momentum_low": 35.0,
}
def _load_ctx(asof: dt.date):
"""ScreenContext.load를 감싸는 thin wrapper (테스트에서 monkeypatch 대상)."""
from .screener.engine import ScreenContext
with db._conn() as conn:
return ScreenContext.load(conn, asof)
def _today_kst() -> dt.date:
return (dt.datetime.utcnow() + dt.timedelta(hours=9)).date()
def compute_and_store(asof: Optional[dt.date] = None, use_llm: bool = True,
params: dict | None = None) -> dict:
"""보유종목 시그널 계산 → holdings_signals upsert (멱등).
Returns:
{"stored": N, "date": "YYYY-MM-DD"} or {"stored": 0, "reason": "..."}
"""
asof = asof or _today_kst()
p = {**DEFAULT_PARAMS, **(params or {})}
holdings = get_holdings()
if not holdings:
return {"stored": 0, "reason": "no_holdings"}
krx = [h for h in holdings if h.get("is_krx")]
ctx = _load_ctx(asof)
posture = technical_posture(ctx, [h["ticker"] for h in krx]) if krx else {}
date_iso = asof.isoformat()
issues_map = news_issues([h["ticker"] for h in holdings], date_iso, use_llm=use_llm)
stored = 0
for h in holdings:
t = h["ticker"]
tp = ctx.prices[ctx.prices["ticker"] == t] if h.get("is_krx") else None
tf = ctx.flow[ctx.flow["ticker"] == t] if h.get("is_krx") else None
flags = exit_rules(h, tp, p) if h.get("is_krx") else {}
tech = posture.get(t)
# momentum_loss: 직전 저장 시그널 대비 하락 or 낮은 강도
prev = db.get_holdings_signal_history(t, limit=2)
prev_score = next((r["tech_score"] for r in prev if r["date"] != date_iso), None)
if tech is not None and (
(prev_score is not None and tech < prev_score - p["momentum_drop"])
or tech < p["momentum_low"]
):
flags["momentum_loss"] = True
evts = market_events(t, tp, tf, p) if h.get("is_krx") else []
issues = list(issues_map.get(t, [])) + evts
action, reasons = decide_action(tech if tech is not None else 0.0, flags, h.get("pnl_rate"))
db.upsert_holdings_signal(
date=date_iso, ticker=t, name=h.get("name"), action=action,
tech_score=tech, exit_flags=flags, issues=issues,
close=h.get("current_price"), pnl_rate=h.get("pnl_rate"), reasons=reasons,
)
stored += 1
return {"stored": stored, "date": date_iso}
def build_holdings_brief(date: Optional[str] = None) -> dict:
"""최신 시그널 + 포트 건강 조립 (브리핑/UI payload)."""
date = date or db.get_latest_holdings_date()
if not date:
return {"date": None, "holdings": [], "portfolio_health": {}}
signals = db.get_holdings_signals(date)
holdings = get_holdings()
total_cash = sum(c.get("cash", 0) for c in db.get_all_broker_cash())
health = portfolio_health(holdings, total_cash=total_cash)
return {"date": date, "holdings": signals, "portfolio_health": health}
def decide_action(tech_score: float, exit_flags: dict, pnl: float | None,
add_score: float = ADD_SCORE) -> tuple[str, str]:
"""액션 결정 매트릭스: sell > trim > add > hold (우선순위 순).
Returns:
(action, reasons_text) action ∈ {"sell","trim","add","hold"}
"""
reasons = []
# 청산 (최우선)
if exit_flags.get("stop_loss"):
reasons.append("손절선 이탈")
if exit_flags.get("ma200_break"):
reasons.append("MA200 이탈")
if reasons:
return "sell", " · ".join(reasons)
# 축소
if exit_flags.get("ma50_break"):
reasons.append("MA50 이탈")
if exit_flags.get("momentum_loss"):
reasons.append("모멘텀 소멸")
if exit_flags.get("take_profit"):
reasons.append(f"목표 수익 도달(+{pnl:.0f}%)" if pnl is not None else "목표 수익 도달")
if exit_flags.get("climax"):
reasons.append("거래량 급증 분산 의심")
if reasons:
return "trim", " · ".join(reasons)
# 추가매수
if tech_score is not None and tech_score >= add_score:
return "add", f"기술적 강도 양호({tech_score:.0f})"
return "hold", "특이 신호 없음"

View File

@@ -3,7 +3,7 @@ import json
import logging import logging
from datetime import date as date_type from datetime import date as date_type
from typing import Optional from typing import Optional
from fastapi import FastAPI, Query, Header, Depends, HTTPException from fastapi import FastAPI, Query, Header, Depends, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
import requests import requests
@@ -27,6 +27,7 @@ from .price_fetcher import get_current_prices, get_current_prices_detail
from .ai_summarizer import summarize_news, OllamaError from .ai_summarizer import summarize_news, OllamaError
from .auth import verify_webai_key from .auth import verify_webai_key
from . import webai_cache from . import webai_cache
from . import holdings_intel
app = FastAPI() app = FastAPI()
install_access_log(app) install_access_log(app)
@@ -652,5 +653,25 @@ def remove_sell_history(record_id: int):
return {"ok": True} return {"ok": True}
# --- Holdings Intelligence API ---
@app.get("/api/stock/holdings/intel")
def holdings_intel_brief():
"""보유종목 인텔리전스 브리핑 (최신 시그널 + 포트 건강)"""
return holdings_intel.build_holdings_brief()
@app.get("/api/stock/holdings/intel/history")
def holdings_intel_history(ticker: str, days: int = 30):
"""종목별 시그널 이력 조회"""
from . import db
return {"ticker": ticker, "history": db.get_holdings_signal_history(ticker, days)}
@app.post("/api/stock/holdings/intel/run")
def holdings_intel_run(background_tasks: BackgroundTasks, use_llm: bool = True):
"""보유종목 시그널 계산 트리거 (BackgroundTask)"""
background_tasks.add_task(holdings_intel.compute_and_store, None, use_llm)
return {"ok": True, "queued": True}

View File

@@ -0,0 +1,42 @@
import os
import sys
import tempfile
from fastapi.testclient import TestClient
def _client(monkeypatch):
# Add web-backend root to sys.path so _shared can be imported by main.py
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from app import db
monkeypatch.setattr(db, "DB_PATH", os.path.join(tempfile.mkdtemp(), "stock.db"))
db.init_db()
from app.main import app
return TestClient(app)
def test_holdings_intel_endpoint(monkeypatch):
client = _client(monkeypatch)
r = client.get("/api/stock/holdings/intel")
assert r.status_code == 200
body = r.json()
assert "holdings" in body and "portfolio_health" in body
def test_holdings_intel_history_endpoint(monkeypatch):
client = _client(monkeypatch)
r = client.get("/api/stock/holdings/intel/history?ticker=005930")
assert r.status_code == 200
body = r.json()
assert body["ticker"] == "005930"
assert "history" in body
assert isinstance(body["history"], list)
def test_holdings_intel_run_endpoint(monkeypatch):
client = _client(monkeypatch)
r = client.post("/api/stock/holdings/intel/run")
assert r.status_code == 200
body = r.json()
assert body["ok"] is True
assert body["queued"] is True

View File

@@ -0,0 +1,37 @@
import os, tempfile
def _fresh_db(monkeypatch):
tmp = tempfile.mkdtemp()
from app import db
monkeypatch.setattr(db, "DB_PATH", os.path.join(tmp, "stock.db"))
db.init_db()
return db
def test_holdings_signals_table_and_upsert(monkeypatch):
db = _fresh_db(monkeypatch)
db.upsert_holdings_signal(date="2026-05-29", ticker="005930", name="삼성전자",
action="hold", tech_score=72.0, exit_flags={"stop_loss": False},
issues=[{"type": "news", "severity": "low", "summary": "x"}],
close=80000, pnl_rate=5.2, reasons="강건")
db.upsert_holdings_signal(date="2026-05-29", ticker="005930", name="삼성전자",
action="trim", tech_score=60.0, exit_flags={"ma50_break": True},
issues=[], close=79000, pnl_rate=3.0, reasons="MA50 이탈")
rows = db.get_holdings_signals(date="2026-05-29")
assert len(rows) == 1 # upsert 멱등
assert rows[0]["action"] == "trim"
assert rows[0]["exit_flags"]["ma50_break"] is True # JSON 역직렬화
hist = db.get_holdings_signal_history("005930", limit=30)
assert len(hist) == 1
def test_get_latest_holdings_date(monkeypatch):
db = _fresh_db(monkeypatch)
# empty table → None
assert db.get_latest_holdings_date() is None
# after an upsert → returns that date
db.upsert_holdings_signal(
date="2026-05-30", ticker="005930", name="삼성전자",
action="hold", tech_score=70.0, exit_flags={}, issues=[],
close=80000, pnl_rate=4.0, reasons="테스트",
)
assert db.get_latest_holdings_date() == "2026-05-30"

View File

@@ -0,0 +1,387 @@
import datetime as dt
import pandas as pd
from app import holdings_intel as hi
def test_get_holdings_merges_price_and_pnl(monkeypatch):
monkeypatch.setattr(hi.db, "get_all_portfolio", lambda: [
{"id": 1, "broker": "kis", "ticker": "005930", "name": "삼성전자",
"quantity": 10, "avg_price": 70000, "purchase_price": 70000},
{"id": 2, "broker": "kis", "ticker": "AAPL", "name": "Apple",
"quantity": 5, "avg_price": 200, "purchase_price": 200},
])
monkeypatch.setattr(hi.price_fetcher, "get_current_prices",
lambda tickers: {"005930": 77000}) # AAPL 미조회(비KRX)
monkeypatch.setattr(hi, "_krx_tickers", lambda: {"005930"})
hs = hi.get_holdings()
s = {h["ticker"]: h for h in hs}
assert s["005930"]["is_krx"] is True
assert round(s["005930"]["pnl_rate"], 1) == 10.0 # (77000-70000)/70000
assert s["AAPL"]["is_krx"] is False # KRX 외
def test_get_holdings_zero_avg_price(monkeypatch):
"""avg_price=0인 종목은 pnl_rate가 None이어야 한다 (ZeroDivisionError 없음)."""
monkeypatch.setattr(hi.db, "get_all_portfolio", lambda: [
{"id": 1, "broker": "kis", "ticker": "005930", "name": "삼성전자",
"quantity": 10, "avg_price": 0, "purchase_price": 0},
])
monkeypatch.setattr(hi.price_fetcher, "get_current_prices",
lambda tickers: {"005930": 80000})
monkeypatch.setattr(hi, "_krx_tickers", lambda: {"005930"})
hs = hi.get_holdings()
assert hs[0]["pnl_rate"] is None
def test_get_holdings_empty_portfolio(monkeypatch):
"""포트폴리오가 비어있으면 빈 리스트를 반환하고 가격 조회를 호출하지 않는다."""
monkeypatch.setattr(hi.db, "get_all_portfolio", lambda: [])
called = []
monkeypatch.setattr(hi.price_fetcher, "get_current_prices",
lambda tickers: called.append(tickers) or {})
monkeypatch.setattr(hi, "_krx_tickers", lambda: set())
result = hi.get_holdings()
assert result == []
assert called == [] # get_current_prices must NOT have been called
def test_get_holdings_price_missing(monkeypatch):
"""prices dict에 ticker가 없으면 current_price와 pnl_rate는 None이다."""
monkeypatch.setattr(hi.db, "get_all_portfolio", lambda: [
{"id": 1, "broker": "kis", "ticker": "000660", "name": "SK하이닉스",
"quantity": 5, "avg_price": 150000, "purchase_price": 150000},
])
monkeypatch.setattr(hi.price_fetcher, "get_current_prices",
lambda tickers: {}) # 가격 없음
monkeypatch.setattr(hi, "_krx_tickers", lambda: {"000660"})
hs = hi.get_holdings()
assert hs[0]["current_price"] is None
assert hs[0]["pnl_rate"] is None
# ---- Phase 2 tests ----
def _toy_ctx(tickers=("005930",), n=300):
"""결정적 일봉으로 ScreenContext 유사 객체 구성."""
from app.screener.engine import ScreenContext
rows = []
base = dt.date(2025, 1, 1)
for t in tickers:
price = 1000
for i in range(n):
price = int(price * 1.002) # 완만한 상승 → 정배열
d = (base + dt.timedelta(days=i)).isoformat()
rows.append({"ticker": t, "date": d, "open": price, "high": price,
"low": price, "close": price, "volume": 1000, "value": price*1000})
prices = pd.DataFrame(rows)
master = pd.DataFrame({"name": [f"n{t}" for t in tickers],
"market": ["KOSPI"]*len(tickers),
"market_cap": [1e12]*len(tickers)},
index=pd.Index(tickers, name="ticker"))
flow = pd.DataFrame(columns=["ticker","date","foreign_net","institution_net"])
return ScreenContext(master=master, prices=prices, flow=flow,
kospi=pd.Series(dtype=float), asof=base+dt.timedelta(days=n-1))
def test_technical_posture_returns_scores():
ctx = _toy_ctx(("005930",))
scores = hi.technical_posture(ctx, ["005930"])
assert "005930" in scores
assert 0.0 <= scores["005930"] <= 100.0 # 상승추세 → 양수 점수
# ---- Task 2.2 tests ----
def _ticker_prices(closes, vols=None):
n = len(closes)
base = dt.date(2025, 1, 1)
vols = vols or [1000]*n
return pd.DataFrame({
"ticker": ["005930"]*n,
"date": [(base+dt.timedelta(days=i)).isoformat() for i in range(n)],
"open": closes, "high": closes, "low": closes, "close": closes, "volume": vols,
})
DEFAULT_EXIT = {"stop_pct": 0.08, "take_pct": 0.25, "climax_vol_x": 3.0}
def test_exit_rules_stop_and_ma():
closes = [1000]*60 + [1100]*200 # 충분한 길이, 최근 평탄
df = _ticker_prices(closes)
# 현재가가 평단(2000) 대비 -45% → stop_loss
flags = hi.exit_rules({"avg_price": 2000, "current_price": 1100}, df, DEFAULT_EXIT)
assert flags["stop_loss"] is True
# 종가 1100 > MA50≈1100, MA200은 더 낮음 → ma 이탈 아님
assert flags["ma200_break"] is False
def test_exit_rules_take_profit():
df = _ticker_prices([1000]*260)
flags = hi.exit_rules({"avg_price": 1000, "current_price": 1300}, df, DEFAULT_EXIT)
assert flags["take_profit"] is True # +30% ≥ 25%
# ---- Task 2.3 tests ----
def test_decide_action_matrix():
# 강건 + 이탈 없음 + 높은 강도 → add
a, r = hi.decide_action(tech_score=80, exit_flags={}, pnl=5)
assert a == "add"
# ma200 이탈 → sell
a, r = hi.decide_action(70, {"ma200_break": True}, 2)
assert a == "sell"
# stop_loss → sell
a, _ = hi.decide_action(70, {"stop_loss": True}, -10)
assert a == "sell"
# ma50 이탈만 → trim
a, _ = hi.decide_action(60, {"ma50_break": True}, 3)
assert a == "trim"
# 이탈 없음 보통 강도 → hold
a, _ = hi.decide_action(50, {}, 1)
assert a == "hold"
# ---- Phase 2 hardening tests (m3) ----
def _ticker_prices_hl(closes, highs, vols):
n = len(closes)
base = dt.date(2025, 1, 1)
return pd.DataFrame({
"ticker": ["005930"] * n,
"date": [(base + dt.timedelta(days=i)).isoformat() for i in range(n)],
"open": closes,
"high": highs,
"low": closes,
"close": closes,
"volume": vols,
})
def test_exit_rules_climax():
closes = [1000] * 30
highs = [1000] * 29 + [1100] # 마지막날 상단꼬리(종가1000 < 고가1100*0.97)
vols = [1000] * 29 + [5000] # 거래량 5x
flags = hi.exit_rules({"avg_price": 900, "current_price": 1000},
_ticker_prices_hl(closes, highs, vols), {})
assert flags["climax"] is True
def test_exit_rules_ma200_break():
closes = list(range(1000, 1000 + 260))[::-1] # 하락 추세 → 종가 < MA200
df = _ticker_prices(closes)
flags = hi.exit_rules({"avg_price": 2000, "current_price": closes[-1]}, df, {})
assert flags["ma200_break"] is True
def test_technical_posture_short_history_returns_low_not_crash():
ctx = _toy_ctx(("005930",), n=100) # <252 → MA 노드 NaN→0, but no crash
scores = hi.technical_posture(ctx, ["005930"])
assert "005930" in scores
assert 0.0 <= scores["005930"] <= 100.0
def test_technical_posture_empty_kospi_not_penalized():
# rs_rating는 빈 kospi에서 빈 Series → combine에서 제외되어야 (C1)
ctx = _toy_ctx(("005930",), n=300) # kospi 빈 fixture
scores = hi.technical_posture(ctx, ["005930"])
# ma_alignment+momentum만으로 정규화 → 상승추세면 충분히 높은 점수
assert scores["005930"] > 50.0
# ---- Phase 3 tests ----
DEFAULT_EVENT = {"move_pct": 7.0, "vol_z": 2.5}
def test_market_events_detects_move_and_volume():
closes = [1000]*30 + [1100] # 마지막날 +10%
vols = [1000]*30 + [10000] # 거래량 급증
df = _ticker_prices(closes, vols)
evts = hi.market_events("005930", df, None, DEFAULT_EVENT)
types = {e["type"] for e in evts}
assert "price_move" in types
assert "volume_surge" in types
def test_news_issues_flags_negative_sentiment(monkeypatch):
# news_sentiment: 005930 음수 점수 → 악재 flag
monkeypatch.setattr(hi, "_news_sentiment_map", lambda date: {
"005930": {"score_raw": -0.6, "news_count": 8}})
issues = hi.news_issues(["005930"], date="2026-05-29", use_llm=False)
assert "005930" in issues
assert issues["005930"][0]["type"] == "news"
assert issues["005930"][0]["severity"] in ("med", "high")
def test_portfolio_health():
holdings = [
{"ticker": "005930", "quantity": 10, "avg_price": 70000, "current_price": 77000,
"is_krx": True},
{"ticker": "000660", "quantity": 5, "avg_price": 100000, "current_price": 90000,
"is_krx": True},
]
h = hi.portfolio_health(holdings, total_cash=1000000)
assert h["positions"] == 2
assert 0 <= h["max_weight"] <= 1.0
assert "total_eval" in h and "total_pnl" in h and "cash_ratio" in h
def test_market_events_volume_surge_zscore_path():
# 변동 있는 기준선 → Z-score 경로(sd>0) 검증 (sd=0 fallback 아님)
import random as _r
_r.seed(1)
base_vols = [1000 + _r.randint(-50, 50) for _ in range(30)]
closes = [1000] * 30 + [1010]
vols = base_vols + [max(base_vols) * 10] # 마지막날 큰 급증
df = _ticker_prices(closes, vols)
evts = hi.market_events("005930", df, None, DEFAULT_EVENT)
assert any(e["type"] == "volume_surge" for e in evts)
def test_market_events_foreign_selling():
closes = [1000] * 5
df = _ticker_prices(closes)
import datetime as _dt
base = _dt.date(2025, 1, 1)
flow = pd.DataFrame({
"ticker": ["005930"] * 5,
"date": [(base + _dt.timedelta(days=i)).isoformat() for i in range(5)],
"foreign_net": [100, 50, -10, -20, -30], # 최근 3일 연속 순매도
"institution_net": [0] * 5,
})
evts = hi.market_events("005930", df, flow, DEFAULT_EVENT)
assert any(e["type"] == "foreign_selling" for e in evts)
def test_news_issues_severity_high_boundary(monkeypatch):
monkeypatch.setattr(hi, "_news_sentiment_map", lambda date: {
"005930": {"score_raw": -0.6, "news_count": 5}}) # 정확히 high 경계
issues = hi.news_issues(["005930"], date="2026-05-29", use_llm=False)
assert issues["005930"][0]["severity"] == "high"
def test_portfolio_health_empty_and_zero():
# 빈 포트 → 0/빈값, 크래시 없음
h0 = hi.portfolio_health([], total_cash=0)
assert h0["positions"] == 0
assert h0["max_weight"] == 0.0
assert h0["total_pnl_rate"] == 0.0
assert h0["cash_ratio"] == 0.0
# total_buy=0 (avg_price 0) → div-by-zero 없이 0.0
h1 = hi.portfolio_health([{"ticker": "X", "quantity": 1, "avg_price": 0,
"current_price": 0, "is_krx": True}], total_cash=0)
assert h1["total_pnl_rate"] == 0.0
# ---- Phase 4 tests ----
def test_compute_and_store_and_brief(monkeypatch):
import os, tempfile
from app import db
monkeypatch.setattr(db, "DB_PATH", os.path.join(tempfile.mkdtemp(), "stock.db"))
db.init_db()
monkeypatch.setattr(hi, "get_holdings", lambda: [
{"ticker": "005930", "name": "삼성전자", "quantity": 10, "avg_price": 1000,
"current_price": 1100, "pnl_rate": 10.0, "is_krx": True}])
ctx = _toy_ctx(("005930",))
monkeypatch.setattr(hi, "_load_ctx", lambda asof: ctx)
monkeypatch.setattr(hi, "_news_sentiment_map", lambda date: {})
monkeypatch.setattr(hi.db, "get_all_broker_cash", lambda: [{"broker": "kis", "cash": 500000}])
res = hi.compute_and_store(asof=ctx.asof, use_llm=False)
assert res["stored"] == 1
brief = hi.build_holdings_brief()
assert brief["holdings"][0]["ticker"] == "005930"
assert "portfolio_health" in brief
assert brief["holdings"][0]["action"] in ("add", "hold", "trim", "sell")
def test_compute_momentum_loss_flag(monkeypatch):
"""직전 시그널 tech_score HIGH → 오늘 LOW → momentum_loss=True."""
import os, tempfile
from app import db
monkeypatch.setattr(db, "DB_PATH", os.path.join(tempfile.mkdtemp(), "stock.db"))
db.init_db()
yesterday = (dt.date.today() - dt.timedelta(days=1)).isoformat()
today = dt.date.today()
today_iso = today.isoformat()
# 어제 시그널 삽입: tech_score=90 (HIGH)
db.upsert_holdings_signal(
date=yesterday, ticker="005930", name="삼성전자",
action="hold", tech_score=90.0, exit_flags={}, issues=[],
close=1000, pnl_rate=0.0, reasons="x",
)
monkeypatch.setattr(hi, "get_holdings", lambda: [
{"ticker": "005930", "name": "삼성전자", "quantity": 10, "avg_price": 1000,
"current_price": 1000, "pnl_rate": 0.0, "is_krx": True}
])
ctx = _toy_ctx(("005930",))
monkeypatch.setattr(hi, "_load_ctx", lambda asof: ctx)
monkeypatch.setattr(hi, "_news_sentiment_map", lambda date: {})
monkeypatch.setattr(hi.db, "get_all_broker_cash", lambda: [])
# tech_score=30 → 낮음 (momentum_low=35 미만, 또한 90-30=60 > momentum_drop=15)
monkeypatch.setattr(hi, "technical_posture", lambda ctx, tickers: {"005930": 30.0})
res = hi.compute_and_store(asof=today, use_llm=False)
assert res["stored"] == 1
signals = db.get_holdings_signals(today_iso)
assert len(signals) == 1
assert signals[0]["exit_flags"]["momentum_loss"] is True
def test_compute_idempotent(monkeypatch):
"""동일 입력으로 compute_and_store 두 번 실행 → upsert로 1건만 저장."""
import os, tempfile
from app import db
monkeypatch.setattr(db, "DB_PATH", os.path.join(tempfile.mkdtemp(), "stock.db"))
db.init_db()
monkeypatch.setattr(hi, "get_holdings", lambda: [
{"ticker": "005930", "name": "삼성전자", "quantity": 10, "avg_price": 1000,
"current_price": 1100, "pnl_rate": 10.0, "is_krx": True}
])
ctx = _toy_ctx(("005930",))
monkeypatch.setattr(hi, "_load_ctx", lambda asof: ctx)
monkeypatch.setattr(hi, "_news_sentiment_map", lambda date: {})
monkeypatch.setattr(hi.db, "get_all_broker_cash", lambda: [])
hi.compute_and_store(asof=ctx.asof, use_llm=False)
hi.compute_and_store(asof=ctx.asof, use_llm=False)
signals = db.get_holdings_signals(ctx.asof.isoformat())
assert len(signals) == 1, f"upsert 실패: {len(signals)}건 저장됨"
def test_compute_non_krx_holding(monkeypatch):
"""is_krx=False 종목은 tech_score=None·action='hold'로 저장된다."""
import os, tempfile
from app import db
monkeypatch.setattr(db, "DB_PATH", os.path.join(tempfile.mkdtemp(), "stock.db"))
db.init_db()
monkeypatch.setattr(hi, "get_holdings", lambda: [
{"ticker": "AAPL", "name": "Apple", "quantity": 5, "avg_price": 200,
"current_price": 220, "pnl_rate": 10.0, "is_krx": False}
])
ctx = _toy_ctx(()) # ticker 없는 빈 ctx
monkeypatch.setattr(hi, "_load_ctx", lambda asof: ctx)
monkeypatch.setattr(hi, "_news_sentiment_map", lambda date: {})
monkeypatch.setattr(hi.db, "get_all_broker_cash", lambda: [])
res = hi.compute_and_store(asof=ctx.asof, use_llm=False)
assert res["stored"] == 1
signals = db.get_holdings_signals(ctx.asof.isoformat())
assert len(signals) == 1
sig = signals[0]
assert sig["ticker"] == "AAPL"
assert sig["tech_score"] is None
assert sig["action"] == "hold"