diff --git a/agent-office/app/agents/stock.py b/agent-office/app/agents/stock.py index df59a5b..4520d69 100644 --- a/agent-office/app/agents/stock.py +++ b/agent-office/app/agents/stock.py @@ -336,7 +336,48 @@ class StockAgent(BaseAgent): await self.transition("idle", "AI 뉴스 완료") + async def run_holdings_eod(self) -> dict: + """평일 16:50 — 보유종목 시그널 계산·저장.""" + # idle 가드 없음(의도적): 스크리너 진행 중에도 EOD/브리핑은 독립적으로 실행되어야 함 + from ..service_proxy import stock_holdings_run + from ..db import create_task, update_task_status, add_log + task_id = create_task(self.agent_id, "holdings_eod", {}) + try: + res = await stock_holdings_run() + update_task_status(task_id, "succeeded", res) + add_log(self.agent_id, f"holdings_eod: {res}", "info", task_id) + return {"ok": True, **res} + except Exception as e: + update_task_status(task_id, "failed", {"error": str(e)}) + add_log(self.agent_id, f"holdings_eod 실패: {e}", "error", task_id) + return {"ok": False, "message": str(e)} + + async def run_holdings_brief(self) -> dict: + """평일 08:30 — 저장된 시그널 브리핑 텔레그램.""" + # idle 가드 없음(의도적): 스크리너 진행 중에도 EOD/브리핑은 독립적으로 실행되어야 함 + from ..service_proxy import stock_holdings_brief + from ..notifiers.telegram_stock import send_holdings_brief + from ..db import create_task, update_task_status, add_log + task_id = create_task(self.agent_id, "holdings_brief", {}) + try: + payload = await stock_holdings_brief() + await send_holdings_brief(payload) + update_task_status(task_id, "succeeded", {"date": payload.get("date"), + "count": len(payload.get("holdings", []))}) + add_log(self.agent_id, f"holdings_brief 발송: {payload.get('date')}", "info", task_id) + return {"ok": True} + except Exception as e: + update_task_status(task_id, "failed", {"error": str(e)}) + add_log(self.agent_id, f"holdings_brief 실패: {e}", "error", task_id) + return {"ok": False, "message": str(e)} + async def on_command(self, command: str, params: dict) -> dict: + if command == "holdings_eod": + return await self.run_holdings_eod() + + if command == "holdings_brief": + return await self.run_holdings_brief() + if command == "run_screener": await self.on_screener_schedule() return {"ok": True, "message": "스크리너 실행 트리거 완료"} diff --git a/agent-office/app/notifiers/telegram_stock.py b/agent-office/app/notifiers/telegram_stock.py new file mode 100644 index 0000000..0a14844 --- /dev/null +++ b/agent-office/app/notifiers/telegram_stock.py @@ -0,0 +1,42 @@ +"""보유종목 인텔리전스 텔레그램 포매터 (advisory).""" +import logging +from typing import Any, Dict + +from ..telegram.messaging import send_raw + +logger = logging.getLogger("agent-office") + +_ACTION_KR = {"add": "🟢 추가매수", "hold": "⚪ 보유", "trim": "🟡 축소", "sell": "🔴 매도"} +_SEV = {"high": "🔴", "med": "🟠", "low": "🟡"} + + +def format_holdings_brief(payload: Dict[str, Any]) -> str: + date = payload.get("date") or "?" + lines = [f"📊 보유종목 인텔리전스 ({date})", ""] + ph = payload.get("portfolio_health") or {} + if ph: + lines.append(f"포트 손익 {ph.get('total_pnl_rate',0):+.1f}% · " + f"종목 {ph.get('positions',0)} · 최대비중 {ph.get('max_weight',0)*100:.0f}% · " + f"현금 {ph.get('cash_ratio',0)*100:.0f}%") + lines.append("") + for h in payload.get("holdings", []): + act = _ACTION_KR.get(h.get("action"), h.get("action", "?")) + pnl = h.get("pnl_rate") + pnl_txt = f"{pnl:+.1f}%" if pnl is not None else "—" + line = f"{act} {h.get('name') or h.get('ticker')} ({pnl_txt})" + if h.get("reasons"): + line += f" — {h['reasons']}" + lines.append(line) + for iss in (h.get("issues") or [])[:3]: + lines.append(f" {_SEV.get(iss.get('severity'),'•')} {iss.get('summary','')}") + lines.append("") + lines.append("ℹ️ 투자 판단 보조용 제안입니다(자동매매 아님).") + return "\n".join(lines) + + +async def send_holdings_brief(payload: Dict[str, Any]) -> None: + text = format_holdings_brief(payload) + try: + await send_raw(text) + except Exception as e: + logger.warning(f"[telegram_stock] holdings brief send failed: {e}") diff --git a/agent-office/app/scheduler.py b/agent-office/app/scheduler.py index abbc00d..e937da3 100644 --- a/agent-office/app/scheduler.py +++ b/agent-office/app/scheduler.py @@ -22,6 +22,16 @@ async def _run_stock_ai_news(): if agent: await agent.on_ai_news_schedule() +async def _run_stock_holdings_eod(): + agent = AGENT_REGISTRY.get("stock") + if agent: + await agent.run_holdings_eod() + +async def _run_stock_holdings_brief(): + agent = AGENT_REGISTRY.get("stock") + if agent: + await agent.run_holdings_brief() + async def _run_insta_schedule(): agent = AGENT_REGISTRY.get("insta") if agent: @@ -111,6 +121,8 @@ def init_scheduler(): minute=0, id="stock_ai_news_sentiment", ) + scheduler.add_job(_run_stock_holdings_eod, "cron", day_of_week="mon-fri", hour=16, minute=50, id="stock_holdings_eod") # 16:50: 스크리너 snapshot(16:30) 완료 후 — 부분 일봉 읽기 방지 + scheduler.add_job(_run_stock_holdings_brief, "cron", day_of_week="mon-fri", hour=8, minute=30, id="stock_holdings_brief") scheduler.add_job(_run_insta_schedule, "cron", hour=9, minute=30, id="insta_pipeline") # 외부 트렌드 수집은 장 마감 후 16:40 — 9시 주식 활발 시간대 NAS 자원 회피. # screener(16:30)와 10분 스태거: Celeron 2C/2.0GHz 동시 실행 시 CPU 폭주 방지 (CHECK_POINT FU-A) diff --git a/agent-office/app/service_proxy.py b/agent-office/app/service_proxy.py index 30408f5..c49237e 100644 --- a/agent-office/app/service_proxy.py +++ b/agent-office/app/service_proxy.py @@ -88,6 +88,29 @@ async def scrape_stock_news() -> Dict[str, Any]: resp.raise_for_status() return resp.json() + +async def stock_holdings_run() -> Dict[str, Any]: + """보유종목 시그널 계산 트리거 (EOD, use_llm=True). + + stock BackgroundTask 등록 후 즉시 {ok, queued} 반환. + 실제 계산은 stock 컨테이너 백그라운드에서 진행 — 여유있게 120s. + """ + async with httpx.AsyncClient(timeout=120.0) as client: + resp = await client.post( + f"{STOCK_URL}/api/stock/holdings/intel/run", + params={"use_llm": True}, + ) + resp.raise_for_status() + return resp.json() + + +async def stock_holdings_brief() -> Dict[str, Any]: + """보유종목 최신 브리핑 payload 조회 (GET, 모듈 레벨 _client 사용).""" + resp = await _client.get(f"{STOCK_URL}/api/stock/holdings/intel") + resp.raise_for_status() + return resp.json() + + async def generate_music(payload: dict) -> Dict[str, Any]: resp = await _client.post(f"{MUSIC_LAB_URL}/api/music/generate", json=payload) resp.raise_for_status() diff --git a/agent-office/tests/test_holdings_brief_format.py b/agent-office/tests/test_holdings_brief_format.py new file mode 100644 index 0000000..108518a --- /dev/null +++ b/agent-office/tests/test_holdings_brief_format.py @@ -0,0 +1,82 @@ +import sys, os +sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) + +from app.notifiers import telegram_stock as ts + + +def test_format_holdings_brief(): + payload = { + "date": "2026-05-29", + "holdings": [ + {"ticker": "005930", "name": "삼성전자", "action": "trim", "tech_score": 60.0, + "exit_flags": {"ma50_break": True}, "issues": [{"type":"news","severity":"high","summary":"악재"}], + "pnl_rate": 5.2, "reasons": "MA50 이탈"}, + {"ticker": "000660", "name": "SK하이닉스", "action": "hold", "tech_score": 75.0, + "exit_flags": {}, "issues": [], "pnl_rate": -2.0, "reasons": "특이 신호 없음"}, + ], + "portfolio_health": {"positions": 2, "total_pnl_rate": 3.1, "max_weight": 0.6, "cash_ratio": 0.2}, + } + txt = ts.format_holdings_brief(payload) + assert "삼성전자" in txt + assert "축소" in txt or "trim" in txt + assert "%" in txt + + +def test_format_holdings_brief_empty_holdings(): + """빈 holdings + None portfolio_health에도 크래시 없음.""" + payload = {"date": "2026-05-29", "holdings": [], "portfolio_health": None} + txt = ts.format_holdings_brief(payload) + assert "보유종목 인텔리전스" in txt + assert "자동매매" in txt + + +def test_format_holdings_brief_missing_fields(): + """pnl_rate None·name None·issues None 방어적 처리.""" + payload = { + "date": None, + "holdings": [ + {"ticker": "005930", "name": None, "action": "sell", + "pnl_rate": None, "reasons": None, "issues": None}, + ], + "portfolio_health": {}, + } + txt = ts.format_holdings_brief(payload) + assert "005930" in txt # ticker fallback + assert "🔴 매도" in txt + + +def test_format_holdings_brief_sell_action(): + """sell 액션은 🔴 매도로 표시.""" + payload = { + "date": "2026-05-29", + "holdings": [ + {"ticker": "000660", "name": "SK하이닉스", "action": "sell", + "pnl_rate": -12.5, "reasons": "손절선 이탈", "issues": []}, + ], + "portfolio_health": {"positions": 1, "total_pnl_rate": -12.5, + "max_weight": 1.0, "cash_ratio": 0.0}, + } + txt = ts.format_holdings_brief(payload) + assert "🔴 매도" in txt + assert "-12.5%" in txt + + +def test_format_holdings_brief_issue_severity_icons(): + """이슈 심각도별 이모지 매핑 확인.""" + payload = { + "date": "2026-05-29", + "holdings": [ + {"ticker": "005930", "name": "삼성전자", "action": "hold", "pnl_rate": 2.0, + "reasons": "특이 신호 없음", + "issues": [ + {"type": "news", "severity": "high", "summary": "심각 악재"}, + {"type": "volume_surge", "severity": "med", "summary": "거래량 급증"}, + {"type": "price_move", "severity": "low", "summary": "소폭 변동"}, + ]}, + ], + "portfolio_health": {}, + } + txt = ts.format_holdings_brief(payload) + assert "🔴" in txt # high severity + assert "🟠" in txt # med severity + assert "🟡" in txt # low severity diff --git a/docs/superpowers/plans/2026-05-31-stock-holdings-intelligence.md b/docs/superpowers/plans/2026-05-31-stock-holdings-intelligence.md new file mode 100644 index 0000000..edbc836 --- /dev/null +++ b/docs/superpowers/plans/2026-05-31-stock-holdings-intelligence.md @@ -0,0 +1,1102 @@ +# 주식 보유종목 인텔리전스 Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** 시장용 스크리너 엔진을 내 보유종목에 restrict 적용하고, 신규 매도/리스크 룰·이슈 감지·포트 건강을 얹어 매일 advisory 브리핑(텔레그램+UI)한다. + +**Architecture:** stock에 순수연산 `holdings_intel.py` + 집계테이블 `holdings_signals` 추가. 기존 `screener/engine.py`의 `ScreenContext.restrict()`로 보유종목 기술분석, 신규 exit_rules/decide_action으로 매도자세 결정, market_events+news_issues로 이슈, portfolio_health로 포트요약. agent-office가 EOD 계산(16:40)·아침 브리핑(08:30)·장중 가드(30분)를 orchestrate. KIS 실주문 미사용(advisory). + +**Tech Stack:** Python 3.12, FastAPI, SQLite, pandas, APScheduler, Claude Haiku(ai_summarizer), pytest / React+Vite(web-ui 별도 repo). + +**Spec:** `docs/superpowers/specs/2026-05-31-stock-holdings-intelligence-design.md` + +--- + +## 기존 자산 (재사용 — 시그니처 확인됨) +- `stock/app/db.py`: `_conn()`, `init_db()`(CREATE TABLE IF NOT EXISTS + `_ensure`류 마이그레이션), `get_all_portfolio() -> [{id,broker,ticker,name,quantity,avg_price,purchase_price}]`, `get_all_broker_cash() -> [{broker,cash}]`, `get_latest_articles(limit,category)`. 테스트는 monkeypatch로 DB_PATH류 격리(기존 test 파일 참조). +- `stock/app/price_fetcher.py`: `get_current_prices(tickers) -> {ticker:int}`, `get_current_prices_detail(tickers) -> {ticker:{...}}`. +- `stock/app/screener/engine.py`: `ScreenContext.load(conn, asof, lookback_days=504) -> ctx`(master/prices/flow/news_sentiment), `ctx.restrict(tickers)`, `ctx.latest_close()`. `combine(scores, weights)`. +- `stock/app/screener/nodes/base.py`: `ScoreNode.compute(ctx, params) -> pd.Series(0..100, index=ticker)`. nodes: ma_alignment/momentum/rs_rating/vcp_lite/volume_surge/foreign_buy/high52w. `stock/app/screener/registry.py`에 GATE/SCORE 레지스트리. +- `stock/app/ai_summarizer.py`: `async summarize_news(articles) -> {summary, tokens, model, duration_ms}` (Claude/Ollama). +- `news_sentiment` 테이블(date,ticker,score_raw,news_count) — 종목별 감성. `krx_daily_prices`(ticker,date,o/h/l/c,volume,value), `krx_flow`(ticker,date,foreign_net,institution_net). +- agent-office: `service_proxy`(STOCK_URL httpx), `StockAgent`(on_schedule/on_screener_schedule/on_ai_news_schedule/on_command), `scheduler.py`(`_run_stock_*` wrappers + init_scheduler), `telegram.messaging.send_raw`. + +## 알려진 제약 (plan 전반 반영) +- `articles`는 종목 태깅 없음 → 종목별 이슈는 `news_sentiment` 기반 + 회사명 substring 매칭으로 article best-effort. +- MA200/momentum 노드는 ~252일 일봉 필요 → 누적 부족 종목은 NaN→0(노드가 이미 처리). 신규 보유·운영 초기엔 tech_score 낮을 수 있음(graceful). +- KRX 외 종목(미국주): krx_daily_prices 밖 → `is_krx=False`로 기술분석 skip, 뉴스·손익만. + +--- + +# Phase 1 — holdings_signals 테이블 + get_holdings + +## Task 1.1: holdings_signals 테이블 + CRUD + +**Files:** +- Modify: `stock/app/db.py` (init_db + CRUD) +- Test: `stock/app/test_holdings_db.py` + +- [ ] **Step 1: 실패 테스트** + +`stock/app/test_holdings_db.py`: +```python +import os, tempfile, importlib + +def _fresh_db(monkeypatch): + tmp = tempfile.mkdtemp() + from app import db + monkeypatch.setattr(db, "DB_PATH", os.path.join(tmp, "stock.db")) + db.init_db() + return db + +def test_holdings_signals_table_and_upsert(monkeypatch): + db = _fresh_db(monkeypatch) + db.upsert_holdings_signal(date="2026-05-29", ticker="005930", name="삼성전자", + action="hold", tech_score=72.0, exit_flags={"stop_loss": False}, + issues=[{"type": "news", "severity": "low", "summary": "x"}], + close=80000, pnl_rate=5.2, reasons="강건") + db.upsert_holdings_signal(date="2026-05-29", ticker="005930", name="삼성전자", + action="trim", tech_score=60.0, exit_flags={"ma50_break": True}, + issues=[], close=79000, pnl_rate=3.0, reasons="MA50 이탈") + rows = db.get_holdings_signals(date="2026-05-29") + assert len(rows) == 1 # upsert 멱등 + assert rows[0]["action"] == "trim" + assert rows[0]["exit_flags"]["ma50_break"] is True # JSON 역직렬화 + hist = db.get_holdings_signal_history("005930", days=30) + assert len(hist) == 1 +``` + +- [ ] **Step 2: 실패 확인** — Run: `cd stock && python -m pytest app/test_holdings_db.py -v` Expected: FAIL (`upsert_holdings_signal` 없음) + +- [ ] **Step 3: 테이블 DDL** — `stock/app/db.py` `init_db()` 안 sell_history 테이블 블록 뒤에: +```python + conn.execute( + """ + CREATE TABLE IF NOT EXISTS holdings_signals ( + date TEXT NOT NULL, + ticker TEXT NOT NULL, + name TEXT, + action TEXT NOT NULL, + tech_score REAL, + exit_flags TEXT NOT NULL DEFAULT '{}', + issues TEXT NOT NULL DEFAULT '[]', + close INTEGER, + pnl_rate REAL, + reasons TEXT, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + PRIMARY KEY (date, ticker) + ); + """ + ) + conn.execute("CREATE INDEX IF NOT EXISTS idx_holdings_sig_ticker " + "ON holdings_signals(ticker, date DESC);") +``` + +- [ ] **Step 4: CRUD 함수** — `stock/app/db.py` 끝에 (`import json`은 파일 상단에 이미 있으면 재사용, 없으면 추가): +```python +def upsert_holdings_signal(date, ticker, name, action, tech_score, exit_flags, + issues, close, pnl_rate, reasons) -> None: + with _conn() as conn: + conn.execute( + """ + INSERT INTO holdings_signals + (date, ticker, name, action, tech_score, exit_flags, issues, close, pnl_rate, reasons) + VALUES (?,?,?,?,?,?,?,?,?,?) + ON CONFLICT(date, ticker) DO UPDATE SET + name=excluded.name, action=excluded.action, tech_score=excluded.tech_score, + exit_flags=excluded.exit_flags, issues=excluded.issues, close=excluded.close, + pnl_rate=excluded.pnl_rate, reasons=excluded.reasons + """, + (date, ticker, name, action, tech_score, + json.dumps(exit_flags, ensure_ascii=False), + json.dumps(issues, ensure_ascii=False), close, pnl_rate, reasons), + ) + +def _row_to_signal(r) -> dict: + d = dict(r) + d["exit_flags"] = json.loads(d.get("exit_flags") or "{}") + d["issues"] = json.loads(d.get("issues") or "[]") + return d + +def get_holdings_signals(date: str) -> list: + with _conn() as conn: + rows = conn.execute( + "SELECT * FROM holdings_signals WHERE date=? ORDER BY ticker", (date,)).fetchall() + return [_row_to_signal(r) for r in rows] + +def get_latest_holdings_date() -> str | None: + with _conn() as conn: + r = conn.execute("SELECT MAX(date) AS d FROM holdings_signals").fetchone() + return r["d"] if r and r["d"] else None + +def get_holdings_signal_history(ticker: str, days: int = 30) -> list: + with _conn() as conn: + rows = conn.execute( + "SELECT * FROM holdings_signals WHERE ticker=? ORDER BY date DESC LIMIT ?", + (ticker, days)).fetchall() + return [_row_to_signal(r) for r in rows] +``` +> `_conn()` row_factory가 `sqlite3.Row`인지 확인(기존 db.py 패턴). 아니면 dict 변환 보장. + +- [ ] **Step 5: 통과 확인** — Run: `cd stock && python -m pytest app/test_holdings_db.py -v` Expected: PASS + +- [ ] **Step 6: Commit** +```bash +git add stock/app/db.py stock/app/test_holdings_db.py +git commit -m "feat(stock): holdings_signals 테이블 + CRUD" +``` + +## Task 1.2: get_holdings — 보유종목 + 현재가 + 손익 + KRX 판별 + +**Files:** +- Create: `stock/app/holdings_intel.py` +- Test: `stock/app/test_holdings_intel.py` + +- [ ] **Step 1: 실패 테스트** + +`stock/app/test_holdings_intel.py`: +```python +from app import holdings_intel as hi + +def test_get_holdings_merges_price_and_pnl(monkeypatch): + monkeypatch.setattr(hi.db, "get_all_portfolio", lambda: [ + {"id": 1, "broker": "kis", "ticker": "005930", "name": "삼성전자", + "quantity": 10, "avg_price": 70000, "purchase_price": 70000}, + {"id": 2, "broker": "kis", "ticker": "AAPL", "name": "Apple", + "quantity": 5, "avg_price": 200, "purchase_price": 200}, + ]) + monkeypatch.setattr(hi.price_fetcher, "get_current_prices", + lambda tickers: {"005930": 77000}) # AAPL 미조회(비KRX) + monkeypatch.setattr(hi, "_krx_tickers", lambda: {"005930"}) + hs = hi.get_holdings() + s = {h["ticker"]: h for h in hs} + assert s["005930"]["is_krx"] is True + assert round(s["005930"]["pnl_rate"], 1) == 10.0 # (77000-70000)/70000 + assert s["AAPL"]["is_krx"] is False # KRX 외 +``` + +- [ ] **Step 2: 실패 확인** — Run: `cd stock && python -m pytest app/test_holdings_intel.py::test_get_holdings_merges_price_and_pnl -v` Expected: FAIL + +- [ ] **Step 3: 구현** — `stock/app/holdings_intel.py`: +```python +"""보유종목 인텔리전스 — 순수연산 중심 (advisory). KIS 실주문 미사용.""" +from __future__ import annotations +import datetime as dt +from typing import Any, Optional + +from . import db +from . import price_fetcher + + +def _krx_tickers() -> set: + """krx_master에 존재하는 ticker 집합 (KRX 판별용).""" + with db._conn() as conn: + try: + rows = conn.execute("SELECT ticker FROM krx_master").fetchall() + except Exception: + return set() + return {r["ticker"] for r in rows} + + +def get_holdings() -> list[dict]: + """portfolio + 현재가 + pnl_rate + is_krx.""" + items = db.get_all_portfolio() + tickers = [it["ticker"] for it in items] + prices = price_fetcher.get_current_prices(tickers) if tickers else {} + krx = _krx_tickers() + out = [] + for it in items: + cur = prices.get(it["ticker"]) + avg = it["avg_price"] + pnl = ((cur - avg) / avg * 100.0) if (cur and avg) else None + out.append({ + **it, + "current_price": cur, + "pnl_rate": pnl, + "is_krx": it["ticker"] in krx, + }) + return out +``` + +- [ ] **Step 4: 통과 확인** — Run: `cd stock && python -m pytest app/test_holdings_intel.py -v` Expected: PASS + +- [ ] **Step 5: Commit** +```bash +git add stock/app/holdings_intel.py stock/app/test_holdings_intel.py +git commit -m "feat(stock): get_holdings (현재가·손익·KRX판별)" +``` + +--- + +# Phase 2 — 기술분석 + 매도룰 + 액션 결정 (핵심 신규 로직) + +## Task 2.1: technical_posture — 스크리너 노드를 보유종목에 적용 + +**Files:** +- Modify: `stock/app/holdings_intel.py` +- Test: `stock/app/test_holdings_intel.py` + +- [ ] **Step 1: registry 확인** — Run: `cd stock && python -c "from app.screener import registry; print(dir(registry))"` 로 SCORE 노드 레지스트리/기본 weights 접근법 확인. (예: `registry.SCORE_REGISTRY` dict[name->NodeClass], `registry.DEFAULT_WEIGHTS`. 실제 이름은 registry.py를 읽어 확인.) + +- [ ] **Step 2: 실패 테스트** — `test_holdings_intel.py`에 추가: +```python +import datetime as dt +import pandas as pd + +def _toy_ctx(tickers=("005930",), n=300): + # 결정적 일봉으로 ScreenContext 유사 객체 구성 + from app.screener.engine import ScreenContext + rows = [] + base = dt.date(2025, 1, 1) + for t in tickers: + price = 1000 + for i in range(n): + price = int(price * 1.002) # 완만한 상승 → 정배열 + d = (base + dt.timedelta(days=i)).isoformat() + rows.append({"ticker": t, "date": d, "open": price, "high": price, + "low": price, "close": price, "volume": 1000, "value": price*1000}) + prices = pd.DataFrame(rows) + master = pd.DataFrame({"name": [f"n{t}" for t in tickers], + "market": ["KOSPI"]*len(tickers), + "market_cap": [1e12]*len(tickers)}, + index=pd.Index(tickers, name="ticker")) + flow = pd.DataFrame(columns=["ticker","date","foreign_net","institution_net"]) + return ScreenContext(master=master, prices=prices, flow=flow, + kospi=pd.Series(dtype=float), asof=base+dt.timedelta(days=n-1)) + +def test_technical_posture_returns_scores(): + ctx = _toy_ctx(("005930",)) + scores = hi.technical_posture(ctx, ["005930"]) + assert "005930" in scores + assert 0.0 <= scores["005930"] <= 100.0 # 상승추세 → 양수 점수 +``` + +- [ ] **Step 3: 실패 확인** — Run: `cd stock && python -m pytest app/test_holdings_intel.py::test_technical_posture_returns_scores -v` Expected: FAIL + +- [ ] **Step 4: 구현** — `holdings_intel.py`에 추가 (registry의 실제 SCORE 노드/weights 이름은 Step 1에서 확인한 것을 사용): +```python +from .screener.engine import combine + +# 보유종목 매수강도에 쓸 score 노드 (registry에서 인스턴스화). +# registry.py 실제 구조에 맞춰 import — 아래는 직접 인스턴스화 예시. +def _score_nodes_and_weights(): + # NODE_REGISTRY(검증됨): {"momentum": Momentum20, "rs_rating": RsRating, "ma_alignment": MaAlignment, ...} + from .screener.registry import NODE_REGISTRY + weights = {"ma_alignment": 0.4, "momentum": 0.3, "rs_rating": 0.3} + nodes = [NODE_REGISTRY[k]() for k in weights] + return nodes, weights + +def technical_posture(ctx, tickers: list[str]) -> dict[str, float]: + """보유종목 restrict 후 score 노드 → 매수강도(0~100).""" + scoped = ctx.restrict(tickers) + if scoped.prices.empty: + return {} + nodes, weights = _score_nodes_and_weights() + scores = {} + for n in nodes: + try: + scores[n.name] = n.compute(scoped, {}) + except Exception: + scores[n.name] = pd.Series(0.0, index=scoped.master.index) + total = combine(scores, weights) + return {t: float(total.get(t, 0.0)) for t in tickers if t in total.index} +``` +> Step 1에서 확인한 노드 클래스명/모듈경로/`Momentum`·`RsRating` 실제 이름에 맞춰 import 수정. `compute`가 빈 params 허용하는지 확인(MaAlignment는 default_params 사용 → `{}` OK). + +- [ ] **Step 5: 통과 확인** — Run: `cd stock && python -m pytest app/test_holdings_intel.py -v` Expected: PASS + +- [ ] **Step 6: Commit** +```bash +git add stock/app/holdings_intel.py stock/app/test_holdings_intel.py +git commit -m "feat(stock): technical_posture (스크리너 노드 보유종목 적용)" +``` + +## Task 2.2: exit_rules — 손절·MA이탈·익절·클라이맥스 (가격 기반 flag) + +**Files:** +- Modify: `stock/app/holdings_intel.py` +- Test: `stock/app/test_holdings_intel.py` + +- [ ] **Step 1: 실패 테스트** — `test_holdings_intel.py`에 추가: +```python +def _ticker_prices(closes, vols=None): + n = len(closes) + base = dt.date(2025, 1, 1) + vols = vols or [1000]*n + return pd.DataFrame({ + "ticker": ["005930"]*n, + "date": [(base+dt.timedelta(days=i)).isoformat() for i in range(n)], + "open": closes, "high": closes, "low": closes, "close": closes, "volume": vols, + }) + +DEFAULT_EXIT = {"stop_pct": 0.08, "take_pct": 0.25, "climax_vol_x": 3.0} + +def test_exit_rules_stop_and_ma(): + closes = [1000]*60 + [1100]*200 # 충분한 길이, 최근 평탄 + df = _ticker_prices(closes) + # 현재가가 평단(2000) 대비 -45% → stop_loss + flags = hi.exit_rules({"avg_price": 2000, "current_price": 1100}, df, DEFAULT_EXIT) + assert flags["stop_loss"] is True + # 종가 1100 > MA50≈1100, MA200은 더 낮음 → ma 이탈 아님 + assert flags["ma200_break"] is False + +def test_exit_rules_take_profit(): + df = _ticker_prices([1000]*260) + flags = hi.exit_rules({"avg_price": 1000, "current_price": 1300}, df, DEFAULT_EXIT) + assert flags["take_profit"] is True # +30% ≥ 25% +``` + +- [ ] **Step 2: 실패 확인** — Run: `cd stock && python -m pytest app/test_holdings_intel.py -k exit_rules -v` Expected: FAIL + +- [ ] **Step 3: 구현** — `holdings_intel.py`에 추가: +```python +def _ma(closes: "pd.Series", window: int) -> Optional[float]: + if len(closes) < window: + return None + return float(closes.rolling(window).mean().iloc[-1]) + +def exit_rules(holding: dict, ticker_prices: "pd.DataFrame", params: dict) -> dict: + """가격 기반 청산/리스크 flag. (momentum_loss는 compute 단계에서 합산.)""" + flags = {"stop_loss": False, "ma50_break": False, "ma200_break": False, + "take_profit": False, "climax": False} + avg = holding.get("avg_price") + cur = holding.get("current_price") + if ticker_prices is None or ticker_prices.empty: + closes = pd.Series(dtype=float) + else: + closes = ticker_prices.sort_values("date")["close"].astype(float).reset_index(drop=True) + last_close = float(closes.iloc[-1]) if len(closes) else cur + if cur is None: + cur = last_close + if cur and avg: + if cur < avg * (1 - params["stop_pct"]): + flags["stop_loss"] = True + if (cur - avg) / avg >= params["take_pct"]: + flags["take_profit"] = True + ma50 = _ma(closes, 50) + ma200 = _ma(closes, 200) + if ma50 is not None and last_close is not None and last_close < ma50: + flags["ma50_break"] = True + if ma200 is not None and last_close is not None and last_close < ma200: + flags["ma200_break"] = True + # climax: 최근 거래량이 20일 평균의 climax_vol_x배 이상 + 종가가 당일 고점 대비 하단(상단꼬리) + if ticker_prices is not None and not ticker_prices.empty and len(ticker_prices) >= 21: + tp = ticker_prices.sort_values("date") + vol = tp["volume"].astype(float).reset_index(drop=True) + avg_vol = vol.iloc[-21:-1].mean() + last_vol = vol.iloc[-1] + hi_ = float(tp["high"].astype(float).iloc[-1]) + cl_ = float(tp["close"].astype(float).iloc[-1]) + if avg_vol and last_vol >= avg_vol * params["climax_vol_x"] and hi_ > 0 and cl_ < hi_ * 0.97: + flags["climax"] = True + return flags +``` + +- [ ] **Step 4: 통과 확인** — Run: `cd stock && python -m pytest app/test_holdings_intel.py -k exit_rules -v` Expected: PASS + +- [ ] **Step 5: Commit** +```bash +git add stock/app/holdings_intel.py stock/app/test_holdings_intel.py +git commit -m "feat(stock): exit_rules (손절·MA이탈·익절·클라이맥스)" +``` + +## Task 2.3: decide_action — 매수강도+flag → 액션 매트릭스 + +**Files:** +- Modify: `stock/app/holdings_intel.py` +- Test: `stock/app/test_holdings_intel.py` + +- [ ] **Step 1: 실패 테스트** — 추가: +```python +def test_decide_action_matrix(): + # 강건 + 이탈 없음 + 높은 강도 → add + a, r = hi.decide_action(tech_score=80, exit_flags={}, pnl=5) + assert a == "add" + # ma200 이탈 → sell + a, r = hi.decide_action(70, {"ma200_break": True}, 2) + assert a == "sell" + # stop_loss → sell + a, _ = hi.decide_action(70, {"stop_loss": True}, -10) + assert a == "sell" + # ma50 이탈만 → trim + a, _ = hi.decide_action(60, {"ma50_break": True}, 3) + assert a == "trim" + # 이탈 없음 보통 강도 → hold + a, _ = hi.decide_action(50, {}, 1) + assert a == "hold" +``` + +- [ ] **Step 2: 실패 확인** — Run: `cd stock && python -m pytest app/test_holdings_intel.py::test_decide_action_matrix -v` Expected: FAIL + +- [ ] **Step 3: 구현** — `holdings_intel.py`에 추가: +```python +ADD_SCORE = 70.0 # 이 이상이면 추가매수 후보 + +def decide_action(tech_score: float, exit_flags: dict, pnl: float | None) -> tuple[str, str]: + """우선순위: sell > trim > add > hold. 근거 텍스트 동봉.""" + reasons = [] + # 청산 (최우선) + if exit_flags.get("stop_loss"): + reasons.append("손절선 이탈") + if exit_flags.get("ma200_break"): + reasons.append("MA200 이탈") + if reasons: + return "sell", " · ".join(reasons) + # 축소 + if exit_flags.get("ma50_break"): + reasons.append("MA50 이탈") + if exit_flags.get("momentum_loss"): + reasons.append("모멘텀 소멸") + if exit_flags.get("take_profit"): + reasons.append(f"목표 수익 도달(+{pnl:.0f}%)" if pnl is not None else "목표 수익 도달") + if exit_flags.get("climax"): + reasons.append("거래량 급증 분산 의심") + if reasons: + return "trim", " · ".join(reasons) + # 추가매수 + if tech_score is not None and tech_score >= ADD_SCORE: + return "add", f"기술적 강도 양호({tech_score:.0f})" + return "hold", "특이 신호 없음" +``` + +- [ ] **Step 4: 통과 확인** — Run: `cd stock && python -m pytest app/test_holdings_intel.py::test_decide_action_matrix -v` Expected: PASS + +- [ ] **Step 5: Commit** +```bash +git add stock/app/holdings_intel.py stock/app/test_holdings_intel.py +git commit -m "feat(stock): decide_action 매트릭스 (sell>trim>add>hold)" +``` + +--- + +# Phase 3 — 이슈 감지 + 포트 건강 + +## Task 3.1: market_events — 급변·거래량·외인 (기존 데이터) + +**Files:** +- Modify: `stock/app/holdings_intel.py` +- Test: `stock/app/test_holdings_intel.py` + +- [ ] **Step 1: 실패 테스트** — 추가: +```python +DEFAULT_EVENT = {"move_pct": 7.0, "vol_z": 2.5} + +def test_market_events_detects_move_and_volume(): + closes = [1000]*30 + [1100] # 마지막날 +10% + vols = [1000]*30 + [10000] # 거래량 급증 + df = _ticker_prices(closes, vols) + evts = hi.market_events("005930", df, None, DEFAULT_EVENT) + types = {e["type"] for e in evts} + assert "price_move" in types + assert "volume_surge" in types +``` + +- [ ] **Step 2: 실패 확인** — Run: `cd stock && python -m pytest app/test_holdings_intel.py::test_market_events_detects_move_and_volume -v` Expected: FAIL + +- [ ] **Step 3: 구현** — `holdings_intel.py`에 추가: +```python +def market_events(ticker: str, ticker_prices: "pd.DataFrame", + ticker_flow: "pd.DataFrame | None", params: dict) -> list[dict]: + """일봉/flow 기반 시장 이벤트 (급변·거래량 Z·외인 순매도).""" + events = [] + if ticker_prices is None or ticker_prices.empty or len(ticker_prices) < 2: + return events + tp = ticker_prices.sort_values("date").reset_index(drop=True) + close = tp["close"].astype(float) + pct = (close.iloc[-1] - close.iloc[-2]) / close.iloc[-2] * 100.0 if close.iloc[-2] else 0.0 + if abs(pct) >= params["move_pct"]: + events.append({"type": "price_move", "severity": "high" if abs(pct) >= params["move_pct"]*1.5 else "med", + "summary": f"전일 대비 {pct:+.1f}%"}) + vol = tp["volume"].astype(float) + if len(vol) >= 21: + base = vol.iloc[-21:-1] + mu, sd = base.mean(), base.std(ddof=0) + if sd and (vol.iloc[-1] - mu) / sd >= params["vol_z"]: + events.append({"type": "volume_surge", "severity": "med", + "summary": f"거래량 평소 대비 급증(Z={ (vol.iloc[-1]-mu)/sd:.1f })"}) + if ticker_flow is not None and not ticker_flow.empty: + tf = ticker_flow.sort_values("date") + recent = tf["foreign_net"].astype(float).iloc[-3:] + if len(recent) >= 3 and (recent < 0).all(): + events.append({"type": "foreign_selling", "severity": "med", + "summary": "외국인 3일 연속 순매도"}) + return events +``` + +- [ ] **Step 4: 통과 확인** — Run: `cd stock && python -m pytest app/test_holdings_intel.py::test_market_events_detects_move_and_volume -v` Expected: PASS + +- [ ] **Step 5: Commit** +```bash +git add stock/app/holdings_intel.py stock/app/test_holdings_intel.py +git commit -m "feat(stock): market_events (급변·거래량Z·외인순매도)" +``` + +## Task 3.2: news_issues — news_sentiment 기반 악재 flag (+LLM best-effort) + +**Files:** +- Modify: `stock/app/holdings_intel.py` +- Test: `stock/app/test_holdings_intel.py` + +- [ ] **Step 1: 실패 테스트** — 추가: +```python +def test_news_issues_flags_negative_sentiment(monkeypatch): + # news_sentiment: 005930 음수 점수 → 악재 flag + monkeypatch.setattr(hi, "_news_sentiment_map", lambda date: { + "005930": {"score_raw": -0.6, "news_count": 8}}) + issues = hi.news_issues(["005930"], date="2026-05-29", use_llm=False) + assert "005930" in issues + assert issues["005930"][0]["type"] == "news" + assert issues["005930"][0]["severity"] in ("med", "high") +``` + +- [ ] **Step 2: 실패 확인** — Run: `cd stock && python -m pytest app/test_holdings_intel.py::test_news_issues_flags_negative_sentiment -v` Expected: FAIL + +- [ ] **Step 3: 구현** — `holdings_intel.py`에 추가: +```python +NEG_SENTIMENT = -0.3 # 이하면 악재 후보 + +def _news_sentiment_map(date: str) -> dict: + with db._conn() as conn: + try: + rows = conn.execute( + "SELECT ticker, score_raw, news_count FROM news_sentiment WHERE date=?", + (date,)).fetchall() + except Exception: + return {} + return {r["ticker"]: {"score_raw": r["score_raw"], "news_count": r["news_count"]} for r in rows} + +def news_issues(tickers: list[str], date: str, use_llm: bool = True) -> dict[str, list]: + """news_sentiment 음수 → 악재 flag. (LLM 요약은 best-effort, 기본 비활성 테스트.)""" + senti = _news_sentiment_map(date) + out: dict[str, list] = {} + for t in tickers: + s = senti.get(t) + if not s or s["score_raw"] is None: + continue + if s["score_raw"] <= NEG_SENTIMENT: + sev = "high" if s["score_raw"] <= NEG_SENTIMENT * 2 else "med" + out.setdefault(t, []).append({ + "type": "news", "severity": sev, + "summary": f"부정 뉴스 감성({s['score_raw']:+.2f}, {s.get('news_count',0)}건)", + }) + return out +``` +> LLM 요약(`use_llm=True`)은 후속 — articles가 종목 태깅이 없어 회사명 substring 매칭이 필요. v1은 sentiment 기반 flag로 충분(spec §3). LLM 통합은 Phase 4 compute에서 옵션으로 호출하되 실패 graceful. + +- [ ] **Step 4: 통과 확인** — Run: `cd stock && python -m pytest app/test_holdings_intel.py::test_news_issues_flags_negative_sentiment -v` Expected: PASS + +- [ ] **Step 5: Commit** +```bash +git add stock/app/holdings_intel.py stock/app/test_holdings_intel.py +git commit -m "feat(stock): news_issues (감성 기반 악재 flag)" +``` + +## Task 3.3: portfolio_health — 집중도·시장mix·현금·손익 + +**Files:** +- Modify: `stock/app/holdings_intel.py` +- Test: `stock/app/test_holdings_intel.py` + +- [ ] **Step 1: 실패 테스트** — 추가: +```python +def test_portfolio_health(): + holdings = [ + {"ticker": "005930", "quantity": 10, "avg_price": 70000, "current_price": 77000, + "is_krx": True}, + {"ticker": "000660", "quantity": 5, "avg_price": 100000, "current_price": 90000, + "is_krx": True}, + ] + h = hi.portfolio_health(holdings, total_cash=1000000) + assert h["positions"] == 2 + assert 0 <= h["max_weight"] <= 1.0 + assert "total_eval" in h and "total_pnl" in h and "cash_ratio" in h +``` + +- [ ] **Step 2: 실패 확인** — Run: `cd stock && python -m pytest app/test_holdings_intel.py::test_portfolio_health -v` Expected: FAIL + +- [ ] **Step 3: 구현** — `holdings_intel.py`에 추가: +```python +def portfolio_health(holdings: list[dict], total_cash: int = 0) -> dict: + """비중 집중도(최대비중·HHI) + 시장 mix + 현금비중 + 총손익.""" + evals, buys = [], [] + for h in holdings: + cur = h.get("current_price") or h.get("avg_price") or 0 + ev = cur * h.get("quantity", 0) + bu = (h.get("avg_price") or 0) * h.get("quantity", 0) + evals.append(ev); buys.append(bu) + total_eval = sum(evals) + total_buy = sum(buys) + weights = [e / total_eval for e in evals] if total_eval else [] + hhi = sum(w*w for w in weights) + total_assets = total_eval + (total_cash or 0) + return { + "positions": len(holdings), + "total_eval": total_eval, + "total_buy": total_buy, + "total_pnl": total_eval - total_buy, + "total_pnl_rate": ((total_eval - total_buy) / total_buy * 100.0) if total_buy else 0.0, + "max_weight": max(weights) if weights else 0.0, + "hhi": round(hhi, 4), + "cash_ratio": ((total_cash or 0) / total_assets) if total_assets else 0.0, + } +``` + +- [ ] **Step 4: 통과 확인** — Run: `cd stock && python -m pytest app/test_holdings_intel.py::test_portfolio_health -v` Expected: PASS + +- [ ] **Step 5: Commit** +```bash +git add stock/app/holdings_intel.py stock/app/test_holdings_intel.py +git commit -m "feat(stock): portfolio_health (집중도·현금·손익)" +``` + +--- + +# Phase 4 — compute_and_store + 브리핑 조립 + API + +## Task 4.1: compute_and_store + build_holdings_brief + +**Files:** +- Modify: `stock/app/holdings_intel.py` +- Test: `stock/app/test_holdings_intel.py` + +- [ ] **Step 1: 실패 테스트** — 추가 (DB + ctx 통합; monkeypatch로 ScreenContext.load·get_holdings·news를 결정적으로): +```python +def test_compute_and_store_and_brief(monkeypatch): + import os, tempfile + from app import db + monkeypatch.setattr(db, "DB_PATH", os.path.join(tempfile.mkdtemp(), "stock.db")) + db.init_db() + monkeypatch.setattr(hi, "get_holdings", lambda: [ + {"ticker": "005930", "name": "삼성전자", "quantity": 10, "avg_price": 1000, + "current_price": 1100, "pnl_rate": 10.0, "is_krx": True}]) + ctx = _toy_ctx(("005930",)) + monkeypatch.setattr(hi, "_load_ctx", lambda asof: ctx) + monkeypatch.setattr(hi, "_news_sentiment_map", lambda date: {}) + monkeypatch.setattr(hi.db, "get_all_broker_cash", lambda: [{"broker":"kis","cash":500000}]) + res = hi.compute_and_store(asof=ctx.asof, use_llm=False) + assert res["stored"] == 1 + brief = hi.build_holdings_brief() + assert brief["holdings"][0]["ticker"] == "005930" + assert "portfolio_health" in brief + assert brief["holdings"][0]["action"] in ("add","hold","trim","sell") +``` + +- [ ] **Step 2: 실패 확인** — Run: `cd stock && python -m pytest app/test_holdings_intel.py::test_compute_and_store_and_brief -v` Expected: FAIL + +- [ ] **Step 3: 구현** — `holdings_intel.py`에 추가: +```python +DEFAULT_PARAMS = {"stop_pct": 0.08, "take_pct": 0.25, "climax_vol_x": 3.0, + "move_pct": 7.0, "vol_z": 2.5, "momentum_drop": 15.0, "momentum_low": 35.0} + +def _load_ctx(asof: dt.date): + from .screener.engine import ScreenContext + with db._conn() as conn: + return ScreenContext.load(conn, asof) + +def _today_kst() -> dt.date: + return (dt.datetime.utcnow() + dt.timedelta(hours=9)).date() + +def compute_and_store(asof: Optional[dt.date] = None, use_llm: bool = True, + params: dict | None = None) -> dict: + """보유종목 시그널 계산 → holdings_signals upsert (멱등).""" + asof = asof or _today_kst() + p = {**DEFAULT_PARAMS, **(params or {})} + holdings = get_holdings() + if not holdings: + return {"stored": 0, "reason": "no_holdings"} + krx = [h for h in holdings if h.get("is_krx")] + ctx = _load_ctx(asof) + posture = technical_posture(ctx, [h["ticker"] for h in krx]) if krx else {} + issues_map = news_issues([h["ticker"] for h in holdings], asof.isoformat(), use_llm=use_llm) + date_iso = asof.isoformat() + stored = 0 + for h in holdings: + t = h["ticker"] + tp = ctx.prices[ctx.prices["ticker"] == t] if h.get("is_krx") else None + tf = ctx.flow[ctx.flow["ticker"] == t] if h.get("is_krx") else None + flags = exit_rules(h, tp, p) if h.get("is_krx") else {} + tech = posture.get(t) + # momentum_loss: 직전 저장 시그널 대비 하락 or 낮은 강도 + prev = db.get_holdings_signal_history(t, days=2) + prev_score = next((r["tech_score"] for r in prev if r["date"] != date_iso), None) + if tech is not None and ((prev_score is not None and tech < prev_score - p["momentum_drop"]) + or tech < p["momentum_low"]): + flags["momentum_loss"] = True + evts = market_events(t, tp, tf, p) if h.get("is_krx") else [] + issues = list(issues_map.get(t, [])) + evts + action, reasons = decide_action(tech if tech is not None else 0.0, flags, h.get("pnl_rate")) + db.upsert_holdings_signal( + date=date_iso, ticker=t, name=h.get("name"), action=action, + tech_score=tech, exit_flags=flags, issues=issues, + close=h.get("current_price"), pnl_rate=h.get("pnl_rate"), reasons=reasons) + stored += 1 + return {"stored": stored, "date": date_iso} + +def build_holdings_brief(date: Optional[str] = None) -> dict: + """최신 시그널 + 포트 건강 조립 (브리핑/UI payload).""" + date = date or db.get_latest_holdings_date() + if not date: + return {"date": None, "holdings": [], "portfolio_health": {}} + signals = db.get_holdings_signals(date) + holdings = get_holdings() + hmap = {h["ticker"]: h for h in holdings} + total_cash = sum(c.get("cash", 0) for c in db.get_all_broker_cash()) + health = portfolio_health(holdings, total_cash=total_cash) + return {"date": date, "holdings": signals, "portfolio_health": health} +``` + +- [ ] **Step 4: 통과 확인** — Run: `cd stock && python -m pytest app/test_holdings_intel.py -v` Expected: PASS (전체) + +- [ ] **Step 5: Commit** +```bash +git add stock/app/holdings_intel.py stock/app/test_holdings_intel.py +git commit -m "feat(stock): compute_and_store + build_holdings_brief" +``` + +## Task 4.2: API 라우터 + main 등록 + +**Files:** +- Modify: `stock/app/main.py` +- Test: `stock/app/test_holdings_api.py` + +- [ ] **Step 1: main.py 라우팅 패턴 확인** — Run: `cd stock && grep -nE "@app.(get|post)|include_router|FastAPI\(" app/main.py | head` 로 stock이 `@app.get` 직접 정의인지 라우터인지 확인. (stock/main.py는 직접 `@app.get` 패턴으로 보임 — 그에 맞춰 엔드포인트 추가.) + +- [ ] **Step 2: 실패 테스트** + +`stock/app/test_holdings_api.py`: +```python +import os, tempfile, sys +from fastapi.testclient import TestClient + +def _client(monkeypatch): + sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) + from app import db + monkeypatch.setattr(db, "DB_PATH", os.path.join(tempfile.mkdtemp(), "stock.db")) + db.init_db() + from app.main import app + return TestClient(app) + +def test_holdings_intel_endpoint(monkeypatch): + client = _client(monkeypatch) + r = client.get("/api/stock/holdings/intel") + assert r.status_code == 200 + body = r.json() + assert "holdings" in body and "portfolio_health" in body +``` + +- [ ] **Step 3: 실패 확인** — Run: `cd stock && python -m pytest app/test_holdings_api.py -v` Expected: FAIL (404) + +- [ ] **Step 4: 엔드포인트 추가** — `stock/app/main.py`에 (`from . import holdings_intel` import 추가, 기존 `@app.get` 패턴 사용): +```python +from . import holdings_intel + +@app.get("/api/stock/holdings/intel") +def holdings_intel_brief(): + return holdings_intel.build_holdings_brief() + +@app.get("/api/stock/holdings/intel/history") +def holdings_intel_history(ticker: str, days: int = 30): + from . import db + return {"ticker": ticker, "history": db.get_holdings_signal_history(ticker, days)} + +@app.post("/api/stock/holdings/intel/run") +def holdings_intel_run(background_tasks: BackgroundTasks, use_llm: bool = True): + background_tasks.add_task(holdings_intel.compute_and_store, None, use_llm) + return {"ok": True, "queued": True} +``` +> **확인됨**: main.py의 기존 import는 `from fastapi import FastAPI, Query, Header, Depends, HTTPException`로 **`BackgroundTasks`가 없음** → 그 줄에 `, BackgroundTasks`를 추가할 것. `holdings_intel.compute_and_store(None, use_llm)`은 첫 인자 asof=None(오늘) 의미. + +- [ ] **Step 5: 통과 확인** — Run: `cd stock && python -m pytest app/test_holdings_api.py -v` Expected: PASS + +- [ ] **Step 6: Commit** +```bash +git add stock/app/main.py stock/app/test_holdings_api.py +git commit -m "feat(stock): holdings intel API (intel/history/run)" +``` + +--- + +# Phase 5 — agent-office (EOD 계산·아침 브리핑·장중 가드) + +## Task 5.1: service_proxy 호출 + +**Files:** +- Modify: `agent-office/app/service_proxy.py` + +- [ ] **Step 1: 함수 추가** — stock 섹션에: +```python +async def stock_holdings_run() -> Dict[str, Any]: + async with httpx.AsyncClient(timeout=120) as client: + resp = await client.post(f"{STOCK_URL}/api/stock/holdings/intel/run", params={"use_llm": True}) + resp.raise_for_status() + return resp.json() + +async def stock_holdings_brief() -> Dict[str, Any]: + resp = await _client.get(f"{STOCK_URL}/api/stock/holdings/intel") + resp.raise_for_status() + return resp.json() +``` +> 파일 상단 httpx import / `_client` 패턴 확인 후 일치시킬 것. + +- [ ] **Step 2: import 확인** — Run: `cd agent-office && python -c "from app import service_proxy"` Expected: 에러 없음 +- [ ] **Step 3: Commit** +```bash +git add agent-office/app/service_proxy.py +git commit -m "feat(agent-office): stock holdings run/brief 프록시" +``` + +## Task 5.2: 브리핑 텔레그램 포매터 + +**Files:** +- Create: `agent-office/app/notifiers/telegram_stock.py` (없으면 생성; 있으면 함수 추가) +- Test: `agent-office/tests/test_holdings_brief_format.py` + +- [ ] **Step 1: 실패 테스트** + +`agent-office/tests/test_holdings_brief_format.py`: +```python +from app.notifiers import telegram_stock as ts + +def test_format_holdings_brief(): + payload = { + "date": "2026-05-29", + "holdings": [ + {"ticker": "005930", "name": "삼성전자", "action": "trim", "tech_score": 60.0, + "exit_flags": {"ma50_break": True}, "issues": [{"type":"news","severity":"high","summary":"악재"}], + "pnl_rate": 5.2, "reasons": "MA50 이탈"}, + {"ticker": "000660", "name": "SK하이닉스", "action": "hold", "tech_score": 75.0, + "exit_flags": {}, "issues": [], "pnl_rate": -2.0, "reasons": "특이 신호 없음"}, + ], + "portfolio_health": {"positions": 2, "total_pnl_rate": 3.1, "max_weight": 0.6, "cash_ratio": 0.2}, + } + txt = ts.format_holdings_brief(payload) + assert "삼성전자" in txt + assert "축소" in txt or "trim" in txt + assert "%" in txt +``` + +- [ ] **Step 2: 실패 확인** — Run: `cd agent-office && python -m pytest tests/test_holdings_brief_format.py -v` Expected: FAIL + +- [ ] **Step 3: 구현** — `agent-office/app/notifiers/telegram_stock.py`: +```python +"""보유종목 인텔리전스 텔레그램 포매터 (advisory).""" +import logging +from typing import Any, Dict + +logger = logging.getLogger("agent-office") + +_ACTION_KR = {"add": "🟢 추가매수", "hold": "⚪ 보유", "trim": "🟡 축소", "sell": "🔴 매도"} +_SEV = {"high": "🔴", "med": "🟠", "low": "🟡"} + + +def format_holdings_brief(payload: Dict[str, Any]) -> str: + date = payload.get("date") or "?" + lines = [f"📊 보유종목 인텔리전스 ({date})", ""] + ph = payload.get("portfolio_health") or {} + if ph: + lines.append(f"포트 손익 {ph.get('total_pnl_rate',0):+.1f}% · " + f"종목 {ph.get('positions',0)} · 최대비중 {ph.get('max_weight',0)*100:.0f}% · " + f"현금 {ph.get('cash_ratio',0)*100:.0f}%") + lines.append("") + for h in payload.get("holdings", []): + act = _ACTION_KR.get(h.get("action"), h.get("action", "?")) + pnl = h.get("pnl_rate") + pnl_txt = f"{pnl:+.1f}%" if pnl is not None else "—" + line = f"{act} {h.get('name') or h.get('ticker')} ({pnl_txt})" + if h.get("reasons"): + line += f" — {h['reasons']}" + lines.append(line) + for iss in (h.get("issues") or [])[:3]: + lines.append(f" {_SEV.get(iss.get('severity'),'•')} {iss.get('summary','')}") + lines.append("") + lines.append("ℹ️ 투자 판단 보조용 제안입니다(자동매매 아님).") + return "\n".join(lines) + + +async def send_holdings_brief(payload: Dict[str, Any]) -> None: + from ..telegram.messaging import send_raw + text = format_holdings_brief(payload) + try: + await send_raw(text) + except Exception as e: + logger.warning(f"[telegram_stock] holdings brief send failed: {e}") +``` + +- [ ] **Step 4: 통과 확인** — Run: `cd agent-office && python -m pytest tests/test_holdings_brief_format.py -v` Expected: PASS + +- [ ] **Step 5: Commit** +```bash +git add agent-office/app/notifiers/telegram_stock.py agent-office/tests/test_holdings_brief_format.py +git commit -m "feat(agent-office): 보유종목 브리핑 텔레그램 포매터" +``` + +## Task 5.3: StockAgent 메서드 + 장중 가드 + scheduler cron + +**Files:** +- Modify: `agent-office/app/agents/stock.py` +- Modify: `agent-office/app/scheduler.py` + +- [ ] **Step 1: StockAgent 메서드 추가** — `agents/stock.py` `StockAgent`에: +```python + async def run_holdings_eod(self) -> dict: + """평일 16:40 — 보유종목 시그널 계산·저장.""" + from ..service_proxy import stock_holdings_run + from ..db import create_task, update_task_status, add_log + task_id = create_task(self.agent_id, "holdings_eod", {}) + try: + res = await stock_holdings_run() + update_task_status(task_id, "succeeded", res) + add_log(self.agent_id, f"holdings_eod: {res}", "info", task_id) + return {"ok": True, **res} + except Exception as e: + update_task_status(task_id, "failed", {"error": str(e)}) + add_log(self.agent_id, f"holdings_eod 실패: {e}", "error", task_id) + return {"ok": False, "message": str(e)} + + async def run_holdings_brief(self) -> dict: + """평일 08:30 — 저장된 시그널 브리핑 텔레그램.""" + from ..service_proxy import stock_holdings_brief + from ..notifiers.telegram_stock import send_holdings_brief + from ..db import create_task, update_task_status, add_log + task_id = create_task(self.agent_id, "holdings_brief", {}) + try: + payload = await stock_holdings_brief() + await send_holdings_brief(payload) + update_task_status(task_id, "succeeded", {"date": payload.get("date"), + "count": len(payload.get("holdings", []))}) + add_log(self.agent_id, f"holdings_brief 발송: {payload.get('date')}", "info", task_id) + return {"ok": True} + except Exception as e: + update_task_status(task_id, "failed", {"error": str(e)}) + add_log(self.agent_id, f"holdings_brief 실패: {e}", "error", task_id) + return {"ok": False, "message": str(e)} +``` +그리고 `on_command`에 분기 추가: +```python + if command == "holdings_eod": + return await self.run_holdings_eod() + if command == "holdings_brief": + return await self.run_holdings_brief() +``` + +- [ ] **Step 2: scheduler cron** — `agent-office/app/scheduler.py`에 wrapper + 등록: +```python +async def _run_stock_holdings_eod(): + agent = AGENT_REGISTRY.get("stock") + if agent: + await agent.run_holdings_eod() + +async def _run_stock_holdings_brief(): + agent = AGENT_REGISTRY.get("stock") + if agent: + await agent.run_holdings_brief() +``` +`init_scheduler()` stock cron 그룹에: +```python + scheduler.add_job(_run_stock_holdings_eod, "cron", day_of_week="mon-fri", hour=16, minute=40, id="stock_holdings_eod") + scheduler.add_job(_run_stock_holdings_brief, "cron", day_of_week="mon-fri", hour=8, minute=30, id="stock_holdings_brief") +``` +> 장중 경량 가드(30분 간격 손절·급변 alert)는 **후속 슬라이스**로 분리 — 본 plan은 EOD 계산 + 아침 브리핑까지. (가드는 holdings_signals의 exit_flags + 현재가 비교가 필요해 별도 설계가 깔끔; spec §4.3은 다음 사이클로 명시.) + +- [ ] **Step 3: import 확인** — Run: `cd agent-office && python -c "from app import scheduler; from app.agents.stock import StockAgent; from app.notifiers import telegram_stock"` Expected: 에러 없음 + +- [ ] **Step 4: Commit** +```bash +git add agent-office/app/agents/stock.py agent-office/app/scheduler.py +git commit -m "feat(agent-office): StockAgent holdings EOD(16:40)+브리핑(08:30) cron" +``` + +> **Note (장중 가드 분리):** spec §4.3의 장중 경량 가드는 별도 후속 슬라이스로 미룬다(throttle/cap 설계가 로또 시그널 수준의 별도 작업). 본 plan은 EOD+아침브리핑으로 완결된 advisory 루프를 제공. + +--- + +# Phase 6 — web-ui 보유종목 인텔리전스 탭 (별도 repo: web-ui) + +> **주의:** web-ui는 별도 Git 저장소. 커밋은 `web-ui/`에서([[feedback-commit-repo]]). 배포 `npm run release:nas` 수동. 먼저 feature 브랜치 생성. + +## Task 6.1: api.js 헬퍼 + +**Files:** Modify `web-ui/src/api.js` + +- [ ] **Step 1: 헬퍼 추가** — 기존 `apiGet` 패턴 사용(확인됨): +```javascript +export const stockHoldingsIntel = () => apiGet('/api/stock/holdings/intel'); +export const stockHoldingsHistory = (ticker, days = 30) => + apiGet(`/api/stock/holdings/intel/history?ticker=${ticker}&days=${days}`); +``` +- [ ] **Step 2: Commit** (web-ui repo, feature 브랜치) +```bash +cd ../web-ui && git checkout -b feat/stock-holdings-ui && git add src/api.js && git commit -m "feat: 보유종목 인텔리전스 API 헬퍼" +``` + +## Task 6.2: HoldingsIntel 컴포넌트 + 포트폴리오 페이지 통합 + +**Files:** +- Create: `web-ui/src/pages/stock/HoldingsIntel.jsx` (경로는 Step 1 확인 결과에 맞춤) +- Modify: stock/포트폴리오 페이지 컨테이너 + +- [ ] **Step 1: 페이지 구조 확인** — Run: `cd ../web-ui && ls src/pages/stock/ 2>/dev/null; grep -rln "portfolio\|Portfolio\|포트폴리오" src/pages/ | head` 로 포트폴리오 페이지 + 탭 패턴 + 기존 카드 컴포넌트 스타일 확인. + +- [ ] **Step 2: HoldingsIntel 컴포넌트** — 기존 카드/탭 컨벤션에 맞춰 작성 (액션별 색상, 이슈 severity 뱃지, 포트 건강 요약). 예시 골격: +```jsx +import { useEffect, useState } from 'react'; +import { stockHoldingsIntel } from '../../api'; + +const ACTION = { add: ['추가매수', '#22c55e'], hold: ['보유', '#94a3b8'], + trim: ['축소', '#f59e0b'], sell: ['매도', '#ef4444'] }; + +export default function HoldingsIntel() { + const [data, setData] = useState(null); + useEffect(() => { stockHoldingsIntel().then(setData).catch(() => {}); }, []); + if (!data) return null; + const ph = data.portfolio_health || {}; + return ( +
+
+ 손익 {(ph.total_pnl_rate ?? 0).toFixed(1)}% · 종목 {ph.positions ?? 0} · + 최대비중 {((ph.max_weight ?? 0) * 100).toFixed(0)}% · 현금 {((ph.cash_ratio ?? 0) * 100).toFixed(0)}% +
+ {(data.holdings || []).map((h) => { + const [label, color] = ACTION[h.action] || [h.action, '#94a3b8']; + return ( +
+ {label} + {h.name || h.ticker} + {h.pnl_rate != null ? `${h.pnl_rate.toFixed(1)}%` : '—'} +
{h.reasons}
+ {(h.issues || []).slice(0, 3).map((iss, i) => ( +
{iss.summary}
+ ))} +
+ ); + })} +
+ ); +} +``` +> 실제 디자인 토큰·CSS 클래스·탭 통합 지점은 Step 1에서 확인한 기존 포트폴리오 페이지 컨벤션에 맞춰 조정. 신규 라우트보다 **기존 페이지 탭 통합** 선호([[feedback-new-page-or-tab]]). + +- [ ] **Step 3: 페이지 통합** — Step 1에서 찾은 포트폴리오 페이지에 탭/섹션으로 `` 추가. + +- [ ] **Step 4: 빌드 확인** — Run: `cd ../web-ui && npm run build` Expected: exit 0 +- [ ] **Step 5: Commit** (web-ui) +```bash +git add src/ && git commit -m "feat: 보유종목 인텔리전스 탭 (액션·이슈·포트건강)" +``` + +--- + +# Phase 7 — 통합 검증 + +## Task 7.1: 전체 회귀 + +- [ ] **Step 1: stock 테스트** — Run: `cd stock && python -m pytest app/ -q` Expected: 신규 통과 + 기존 회귀 없음(사전 실패가 있으면 별도 식별) +- [ ] **Step 2: agent-office 테스트** — Run: `cd agent-office && python -m pytest -q` Expected: 신규 통과 + 기존 회귀 없음 +- [ ] **Step 3: 배포 후 수동 트리거 안내** — NAS 배포 후 `POST /api/stock/holdings/intel/run`으로 첫 시그널 생성, `GET /api/stock/holdings/intel`로 확인. 평일 EOD/아침 cron이 이후 자동 운영. + +--- + +## Self-Review 체크리스트 결과 +- **Spec 커버리지**: 데이터모델(1.1) / get_holdings(1.2) / technical_posture(2.1) / 매도룰(2.2) / decide_action(2.3) / market_events(3.1) / news_issues(3.2) / portfolio_health(3.3) / compute+brief(4.1) / API(4.2) / agent-office(5.x) / UI(6.x). 장중 가드(spec §4.3)는 Phase 5 Note에서 후속 슬라이스로 명시 분리(스코프 관리). +- **Placeholder**: 모든 코드 step에 실제 코드. registry 노드명·main.py 라우팅 패턴·web-ui 컨벤션은 "Step에서 확인 후 맞춤" 명시(코드베이스 의존, 합리적). +- **타입 일관성**: exit_flags dict 키(stop_loss/ma50_break/ma200_break/momentum_loss/take_profit/climax)가 exit_rules·decide_action·compute·포매터에서 일치. holdings_signals 컬럼 ↔ upsert ↔ build_brief ↔ telegram_stock 키 일치. action 값(add/hold/trim/sell) 전 구간 일치. diff --git a/docs/superpowers/specs/2026-05-31-stock-holdings-intelligence-design.md b/docs/superpowers/specs/2026-05-31-stock-holdings-intelligence-design.md new file mode 100644 index 0000000..ea7afcc --- /dev/null +++ b/docs/superpowers/specs/2026-05-31-stock-holdings-intelligence-design.md @@ -0,0 +1,122 @@ +# 주식 보유종목 인텔리전스 — 설계 Spec + +- **작성일**: 2026-05-31 +- **상태**: 설계 승인 (구현 plan 대기) +- **대상 서비스**: `stock` + `agent-office`(StockAgent) + `web-ui`(stock/포트폴리오 페이지) +- **사이클**: 스마트 에이전트 고도화 3종 중 **2번 주식**. (1번 로또 완료, 3번 인스타 후속) + +--- + +## 1. 배경 & 목표 + +현재 StockAgent는 아침 뉴스 요약(07:30) · KRX 강세주 스크리너(16:30) · AI 뉴스 sentiment(08:00)를 브리핑한다. CEO는 여기서 더 나아가 **내 보유종목을 집중 분석**해 ①종목별 매수/매도 자세 ②이슈 정리 ③포트폴리오 건강을 매일 advisory로 브리핑받길 원한다. + +### 핵심 결정 (2026-05-31 brainstorming) +1. **실행 수준 = 브리핑 전용(advisory)**. `/api/trade/order`(KIS 실주문) 미사용. 매수/매도는 "제안"만, 실제 주문은 사용자 수동. (로또와 동일한 정직·관찰 철학) +2. **분석 주기 = 일봉 EOD + 장중 경량 가드**. 장마감 후 일봉으로 기술분석 → 다음날 아침 브리핑. 장중엔 현재가로 손절·급변(±N%)만 경도 알림. 인트라데이 분봉 파이프라인 신설 안 함. +3. **브리핑 범위 = 보유종목 + 포트 레벨**. 종목별 액션 + 포트폴리오 건강(집중도·비중·현금·손익). +4. **이슈 소스 = 기존 뉴스+감성+LLM 요약 + 급변·거래량·외인수급 이벤트**. 신규 스크래핑 0 (DART·실적 일정 제외). + +### 기존 자산 (100% 재활용, 신규 ML/데이터소스 없음) +- `stock/app/screener/snapshot.py` → `krx_daily_prices`(일봉 OHLCV) + `krx_master`(listing) + naver 외인 flow. 스크리너 잡(평일 16:30)이 갱신. +- `stock/app/screener/engine.py` + `nodes/`(ma_alignment·momentum·rs_rating·vcp_lite·volume_surge·foreign_buy·high52w·hygiene). **`ScreenContext.restrict(tickers)`** + `latest_close()`/`latest_high()`로 보유종목 한정 분석 가능. +- `portfolio` 테이블(broker·ticker·name·quantity·avg_price·purchase_price) + `/api/portfolio`(현재가·손익 계산) + `broker_cash`(예수금). +- `price_fetcher`(현재가 3분 TTL) · `news_sentiment` 테이블(종목별 감성) · `ai_summarizer`(Claude Haiku). + +### 알려진 제약 (설계 반영) +- **섹터 필드 없음**: `portfolio`·`krx_master`에 sector 없음 → 섹터 편중은 best-effort(FDR `StockListing`의 Sector/Industry가 있으면 사용, 없으면 생략)이고, **시장(KOSPI/KOSDAQ)·종목 비중 집중도**를 기본 지표로 사용. +- **KRX 외 종목**(미국주 등): krx_daily_prices 밖 → 기술분석 불가, **뉴스·현재가·손익만** graceful 처리. +- **snapshot 히스토리 의존**: MA200·52주 고점 노드는 ~1년 일봉 필요. 스크리너가 이미 이 노드들을 쓰므로 윈도우는 충족 가정(plan에서 lookback 확인 단계 포함). + +--- + +## 2. 데이터 모델 & 컴포넌트 + +### 신규 테이블 `holdings_signals` (stock.db, 일별 종목 시그널 이력) +``` +date TEXT NOT NULL -- KST 거래일 +ticker TEXT NOT NULL +name TEXT +action TEXT NOT NULL -- 'add' | 'hold' | 'trim' | 'sell' +tech_score REAL -- 매수강도(score 노드 가중합, 0~1 정규화) +exit_flags TEXT NOT NULL DEFAULT '{}' -- JSON {stop_loss,ma50_break,ma200_break,momentum_loss,take_profit,climax} +issues TEXT NOT NULL DEFAULT '[]' -- JSON [{type, severity, summary}] +close INTEGER +pnl_rate REAL -- 평단 대비 % (스냅샷 시점) +reasons TEXT -- 액션 근거 텍스트 +created_at TEXT NOT NULL DEFAULT (datetime('now')) +PRIMARY KEY(date, ticker) -- 멱등 upsert +``` +> 추세/이력은 이 테이블에서 조회. 포트 레벨 요약은 on-the-fly 계산(별도 테이블 불필요). + +### 신규 `stock/app/holdings_intel.py` (순수연산 중심, FastAPI 의존성 최소) +- `get_holdings() -> list[dict]` — `portfolio` 행 + 현재가(price_fetcher) + pnl_rate. KRX 여부 플래그(`is_krx`). +- `technical_posture(ctx_restricted, tickers) -> dict[ticker, score]` — `ScreenContext.restrict(tickers)`에 score 노드 실행 → 매수강도. +- `exit_rules(holding, prices_df, params) -> dict` — **신규**: 손절·MA이탈·모멘텀소멸·익절·클라이맥스 flag 산출 (§3). +- `decide_action(tech_score, exit_flags, pnl) -> (action, reasons)` — **신규**: 매수강도+exit 조합 → add/hold/trim/sell + 근거. +- `market_events(prices_df, flow, params) -> dict[ticker, list]` — 급변(±N%)·거래량 Z-score·외인 순매도. +- `news_issues(tickers) -> dict[ticker, list]` — news+news_sentiment 필터 → Claude Haiku 악재·심각도 요약(악재 있는 종목만). +- `portfolio_health(holdings, cash) -> dict` — 종목 비중 집중도(HHI/최대비중)·시장 mix·현금 비중·총 손익. +- `compute_and_store(asof) -> dict` — 위를 조합해 holdings_signals upsert (멱등). +- `build_holdings_brief(asof) -> dict` — 브리핑/UI payload 조립(종목별 action+issues + portfolio_health + 추세). + +### API (stock) +| 메서드 | 경로 | 설명 | +|--------|------|------| +| GET | `/api/stock/holdings/intel` | 최신 브리핑 payload | +| GET | `/api/stock/holdings/intel/history?ticker=&days=` | 종목 시그널 추세 | +| POST | `/api/stock/holdings/intel/run` | 수동 계산 트리거(BackgroundTask) | + +--- + +## 3. 매도/리스크 룰 & 이슈 (설정 가능 임계값 — 기본값 제시) + +### exit_flags (각 boolean + 값) +- **stop_loss**: `current < avg_price × (1 − STOP_PCT)` (기본 STOP_PCT=0.08, Minervini식) +- **ma50_break / ma200_break**: 종가 < MA50 / MA200 +- **momentum_loss**: momentum/RS 노드 점수가 직전 대비 임계 하락 (or 음전환) +- **take_profit**: `pnl_rate ≥ TAKE_PCT` (기본 25%) — 부분 익절 후보 +- **climax**: 거래량 급증(vol > avg×CLIMAX_VOL_X) + 종가 상단 꼬리 (분산 의심) + +### decide_action 매트릭스 +- tech_score 高 + exit_flags 無 → **add**(추가매수 후보) +- exit_flags 無 (강건) → **hold** +- ma50_break 또는 momentum_loss 또는 take_profit → **trim**(일부 축소) +- stop_loss 또는 ma200_break → **sell**(청산 후보) +- 각 결정에 trigger된 flag를 근거 텍스트로 동봉. (advisory — "제안") + +### issues +- **시장이벤트** (기존 데이터): 일봉 ±EVENT_PCT% 급변 / 거래량 Z-score>임계 / naver flow 외인 순매도 N일 연속. +- **뉴스이슈**: 보유종목 최근 뉴스 + news_sentiment 음수 → Claude Haiku로 `{type, severity(low/med/high), summary}` 요약. 악재 있는 종목만 호출(비용 bounded). + +--- + +## 4. 플로우 · 에이전트 · UI + +1. **EOD 계산 (평일 16:40)**: 기존 스크리너/뉴스 잡과 동일하게 **agent-office cron이 orchestrate** — `_run_stock_holdings_eod()` → `StockAgent.run_holdings_eod()` → stock `POST /api/stock/holdings/intel/run` → `holdings_intel.compute_and_store(today)` → holdings_signals upsert. 스크리너 snapshot 갱신(16:30) 직후라 일봉 준비됨. +2. **아침 브리핑 (평일 08:30, agent-office StockAgent.run_holdings_brief)**: 저장된 최신 시그널 + 야간 갭(현재가) → 텔레그램 1통(종목별 액션 + 포트 건강 + 상위 이슈). AI 뉴스(08:00) 다음 슬롯. +3. **장중 경량 가드 (평일 09:00~15:30, 30분 간격)**: 현재가로 손절선 이탈·급변(±N%)만 점검 → 발생 시 텔레그램 alert. throttle(종목·유형별 재발화 억제) + daily cap (로또 시그널 패턴 재활용). +4. **agent-office**: `service_proxy`에 holdings intel 호출 추가 + StockAgent 메서드(run_holdings_brief / intraday_guard) + scheduler cron. +5. **UI (web-ui)**: stock/포트폴리오 페이지에 **"보유종목 인텔리전스" 탭/섹션 통합** — 종목별 액션 카드(자세·exit flags·근거) + 포트 건강 위젯 + 이슈 피드 + 종목 시그널 추세(history). + +--- + +## 5. 에러·성능·테스트·리스크 + +- **멱등성**: holdings_signals PRIMARY KEY(date,ticker) upsert → 재계산 안전. +- **성능 (NAS Celeron)**: 보유종목만 restrict(소수 종목)이라 전체 스크리너 대비 매우 가벼움. LLM 이슈 요약은 악재 종목만(bounded). EOD 1회 + 장중 가드는 현재가만(경량). +- **graceful degrade**: price_fetcher/KIS/news 실패 시 부분 데이터로 진행 + 경고 로그. KRX 외 종목은 기술분석 skip(뉴스·손익만). 텔레그램 실패는 로그만(job 성공 유지). +- **테스트**: exit_rules 각 flag, decide_action 매트릭스 전 분기, market_events 검출, portfolio_health 계산, holdings_signals 멱등, KRX 외 종목 graceful, 뉴스 0건 경로. +- **리스크**: ①기술적 시그널은 휴리스틱이지 보장 아님 → advisory 프레이밍·자동매매 없음 ②섹터 데이터 갭 → 시장·비중 집중도로 대체 ③snapshot 히스토리 의존 → plan에 lookback 확인 ④보유종목 출처는 portfolio 테이블(사용자/KIS 동기화) — 누락 시 빈 브리핑 graceful. + +--- + +## 6. 결정 로그 (2026-05-31) +1. 실행 수준 = **advisory 전용** (KIS 실주문 미사용) +2. 주기 = **일봉 EOD + 장중 경량 가드** +3. 범위 = **보유종목 + 포트 레벨** +4. 이슈 소스 = **기존 뉴스+감성+LLM + 급변·거래량·외인 이벤트** + +## 7. 스코프 밖 / 향후 +- 자동매매(승인후/완전자동), 인트라데이 분봉, DART 공시·실적 일정, 신규 매수후보 발굴(기존 16:30 스크리너가 담당), 교체(rotation) 제안 — 향후 사이클. +- 인스타 에이전트(자율 카드 발급) — 다음 사이클. diff --git a/stock/app/db.py b/stock/app/db.py index bfb50bf..5d4c9cc 100644 --- a/stock/app/db.py +++ b/stock/app/db.py @@ -1,6 +1,7 @@ import sqlite3 import os import hashlib +import json from typing import List, Dict, Any, Optional from app.screener.schema import ensure_screener_schema @@ -103,6 +104,27 @@ def init_db(): if "commission" not in sh_cols: conn.execute("ALTER TABLE sell_history ADD COLUMN commission REAL NOT NULL DEFAULT 0") + conn.execute( + """ + CREATE TABLE IF NOT EXISTS holdings_signals ( + date TEXT NOT NULL, + ticker TEXT NOT NULL, + name TEXT, + action TEXT NOT NULL, + tech_score REAL, + exit_flags TEXT NOT NULL DEFAULT '{}', + issues TEXT NOT NULL DEFAULT '[]', + close INTEGER, + pnl_rate REAL, + reasons TEXT, + created_at TEXT NOT NULL DEFAULT (datetime('now','localtime')), + PRIMARY KEY (date, ticker) + ); + """ + ) + conn.execute("CREATE INDEX IF NOT EXISTS idx_holdings_sig_ticker " + "ON holdings_signals(ticker, date DESC);") + # Screener 스키마 부트스트랩 (7테이블 + 디폴트 설정 시드) ensure_screener_schema(conn) @@ -297,3 +319,63 @@ def get_asset_snapshots(days: int = 30) -> List[Dict[str, Any]]: ).fetchall() rows = list(reversed(rows)) return [dict(r) for r in rows] + + +# --- KRX Master --- + +def get_krx_tickers() -> set: + with _conn() as conn: + try: + rows = conn.execute("SELECT ticker FROM krx_master").fetchall() + except Exception: + return set() + return {r["ticker"] for r in rows} + + +# --- Holdings Signals CRUD --- + +def upsert_holdings_signal( + date: str, ticker: str, name: Optional[str], action: str, + tech_score: Optional[float], exit_flags: dict, issues: list, + close: Optional[int], pnl_rate: Optional[float], reasons: Optional[str], +) -> None: + with _conn() as conn: + conn.execute( + """ + INSERT INTO holdings_signals + (date, ticker, name, action, tech_score, exit_flags, issues, close, pnl_rate, reasons) + VALUES (?,?,?,?,?,?,?,?,?,?) + ON CONFLICT(date, ticker) DO UPDATE SET + name=excluded.name, action=excluded.action, tech_score=excluded.tech_score, + exit_flags=excluded.exit_flags, issues=excluded.issues, close=excluded.close, + pnl_rate=excluded.pnl_rate, reasons=excluded.reasons + """, + (date, ticker, name, action, tech_score, + json.dumps(exit_flags, ensure_ascii=False), + json.dumps(issues, ensure_ascii=False), close, pnl_rate, reasons), + ) + +def _row_to_signal(r) -> dict: + d = dict(r) + d["exit_flags"] = json.loads(d.get("exit_flags") or "{}") + d["issues"] = json.loads(d.get("issues") or "[]") + return d + +def get_holdings_signals(date: str) -> list: + with _conn() as conn: + rows = conn.execute( + "SELECT * FROM holdings_signals WHERE date=? ORDER BY ticker", (date,)).fetchall() + return [_row_to_signal(r) for r in rows] + +def get_latest_holdings_date() -> str | None: + with _conn() as conn: + r = conn.execute("SELECT MAX(date) AS d FROM holdings_signals").fetchone() + return r["d"] if r and r["d"] else None + +def get_holdings_signal_history(ticker: str, limit: int = 30) -> list: + """최근 N개 시그널 행 (시그널은 거래일당 1행이라 ≈ N 거래일).""" + with _conn() as conn: + rows = conn.execute( + "SELECT * FROM holdings_signals WHERE ticker=? ORDER BY date DESC LIMIT ?", + (ticker, limit)).fetchall() + return [_row_to_signal(r) for r in rows] diff --git a/stock/app/holdings_intel.py b/stock/app/holdings_intel.py new file mode 100644 index 0000000..838d3e8 --- /dev/null +++ b/stock/app/holdings_intel.py @@ -0,0 +1,344 @@ +"""보유종목 인텔리전스 — 순수연산 중심 (advisory). KIS 실주문 미사용.""" +from __future__ import annotations +import datetime as dt +from typing import Any, Optional + +import pandas as pd + +from . import db +from . import price_fetcher +from .screener.engine import combine + + +def _krx_tickers() -> set: + """krx_master ticker 집합 (KRX 판별용).""" + return db.get_krx_tickers() + + +def get_holdings() -> list[dict]: + """portfolio + 현재가 + pnl_rate + is_krx.""" + items = db.get_all_portfolio() + tickers = [it["ticker"] for it in items] + prices = price_fetcher.get_current_prices(tickers) if tickers else {} + krx = _krx_tickers() + out = [] + for it in items: + cur = prices.get(it["ticker"]) + avg = it["avg_price"] + pnl = ((cur - avg) / avg * 100.0) if (cur and avg) else None + out.append({ + **it, + "current_price": cur, + "pnl_rate": pnl, + "is_krx": it["ticker"] in krx, + }) + return out + + +# ---- Task 2.1: technical_posture ---- + + +def _score_nodes_and_weights(): + """NODE_REGISTRY에서 보유종목 매수강도 계산용 노드 인스턴스화.""" + from .screener.registry import NODE_REGISTRY + weights = {"ma_alignment": 0.4, "momentum": 0.3, "rs_rating": 0.3} + nodes = [NODE_REGISTRY[k]() for k in weights] + return nodes, weights + + +def technical_posture(ctx, tickers: list[str]) -> dict[str, float]: + """보유종목 restrict 후 score 노드 → 매수강도(0~100).""" + scoped = ctx.restrict(tickers) + if scoped.prices.empty: + return {} + nodes, weights = _score_nodes_and_weights() + scores = {} + for n in nodes: + try: + scores[n.name] = n.compute(scoped, {}) + except Exception: + scores[n.name] = pd.Series(0.0, index=scoped.master.index) + scores_ne = {k: s for k, s in scores.items() if not s.empty} + weights_ne = {k: w for k, w in weights.items() if k in scores_ne} + if not weights_ne: + return {} + total = combine(scores_ne, weights_ne) + return {t: float(total.get(t, 0.0)) for t in tickers if t in total.index} + + +# ---- Task 2.2: exit_rules ---- + +_DEFAULT_EXIT_PARAMS = {"stop_pct": 0.08, "take_pct": 0.25, "climax_vol_x": 3.0} + + +def _ma(closes: "pd.Series", window: int) -> Optional[float]: + if len(closes) < window: + return None + val = closes.rolling(window).mean().iloc[-1] + return float(val) if pd.notna(val) else None + + +def exit_rules(holding: dict, ticker_prices: "pd.DataFrame", params: dict) -> dict: + """가격 기반 청산/리스크 flag (stop_loss/ma50_break/ma200_break/take_profit/climax). + + Note: momentum_loss는 compute_and_store 단계에서 집계하므로 여기서 설정하지 않는다. + """ + p = {**_DEFAULT_EXIT_PARAMS, **(params or {})} + flags = {"stop_loss": False, "ma50_break": False, "ma200_break": False, + "take_profit": False, "climax": False} + avg = holding.get("avg_price") + cur = holding.get("current_price") + if ticker_prices is None or ticker_prices.empty: + closes = pd.Series(dtype=float) + else: + closes = ticker_prices.sort_values("date")["close"].astype(float).reset_index(drop=True) + last_close = float(closes.iloc[-1]) if len(closes) else cur + if cur is None: + cur = last_close + if cur is not None and avg: + if cur < avg * (1 - p["stop_pct"]): + flags["stop_loss"] = True + if avg > 0 and (cur - avg) / avg >= p["take_pct"]: + flags["take_profit"] = True + ma50 = _ma(closes, 50) + ma200 = _ma(closes, 200) + if ma50 is not None and last_close is not None and last_close < ma50: + flags["ma50_break"] = True + if ma200 is not None and last_close is not None and last_close < ma200: + flags["ma200_break"] = True + # climax: 최근 거래량이 20일 평균의 climax_vol_x배 이상 + 종가가 당일 고점 대비 하단(상단꼬리) + if ticker_prices is not None and not ticker_prices.empty and len(ticker_prices) >= 21: + tp = ticker_prices.sort_values("date") + vol = tp["volume"].astype(float).reset_index(drop=True) + avg_vol = vol.iloc[-21:-1].mean() + last_vol = vol.iloc[-1] + hi_ = float(tp["high"].astype(float).iloc[-1]) + cl_ = float(tp["close"].astype(float).iloc[-1]) + if avg_vol and last_vol >= avg_vol * p["climax_vol_x"] and hi_ > 0 and cl_ < hi_ * 0.97: + flags["climax"] = True + return flags + + +# ---- Task 2.3: decide_action ---- + +ADD_SCORE = 70.0 # 이 이상이면 추가매수 후보 + + +# ---- Task 3.1: market_events ---- + +_DEFAULT_EVENT_PARAMS = {"move_pct": 7.0, "vol_z": 2.5} + + +def market_events(ticker: str, ticker_prices: "pd.DataFrame", + ticker_flow: "pd.DataFrame | None", params: dict) -> list[dict]: + """일봉/flow 기반 시장 이벤트 (급변·거래량 Z·외인 순매도).""" + p = {**_DEFAULT_EVENT_PARAMS, **(params or {})} + events = [] + if ticker_prices is None or ticker_prices.empty or len(ticker_prices) < 2: + return events + tp = ticker_prices.sort_values("date").reset_index(drop=True) + close = tp["close"].astype(float) + pct = (close.iloc[-1] - close.iloc[-2]) / close.iloc[-2] * 100.0 if close.iloc[-2] else 0.0 + if abs(pct) >= p["move_pct"]: + events.append({ + "type": "price_move", + "severity": "high" if abs(pct) >= p["move_pct"] * 1.5 else "med", + "summary": f"전일 대비 {pct:+.1f}%", + }) + vol = tp["volume"].astype(float) + if len(vol) >= 21: + base = vol.iloc[-21:-1] + mu, sd = base.mean(), base.std(ddof=0) + last_vol = vol.iloc[-1] + if mu > 0 and ( + (sd and (last_vol - mu) / sd >= p["vol_z"]) + or (not sd and last_vol >= mu * p["vol_z"]) # sd=0 (평탄 기준선): vol_z를 Z-score가 아닌 단순 배수로 사용 + ): + z_txt = f"{(last_vol - mu) / sd:.1f}" if sd else f"ratio={last_vol / mu:.1f}x" + events.append({ + "type": "volume_surge", + "severity": "med", + "summary": f"거래량 평소 대비 급증(Z={z_txt})", + }) + if ticker_flow is not None and not ticker_flow.empty: + tf = ticker_flow.sort_values("date") + recent = tf["foreign_net"].astype(float).iloc[-3:] + if len(recent) >= 3 and (recent < 0).all(): + events.append({ + "type": "foreign_selling", + "severity": "med", + "summary": "외국인 3일 연속 순매도", + }) + return events + + +# ---- Task 3.2: news_issues ---- + +NEG_SENTIMENT = -0.3 # 이하면 악재 후보 + + +def _news_sentiment_map(date: str) -> dict: + """date 기준 news_sentiment 테이블에서 ticker → {score_raw, news_count} 맵 반환.""" + with db._conn() as conn: + try: + rows = conn.execute( + "SELECT ticker, score_raw, news_count FROM news_sentiment WHERE date=?", + (date,), + ).fetchall() + except Exception: + return {} + return {r["ticker"]: {"score_raw": r["score_raw"], "news_count": r["news_count"]} + for r in rows} + + +def news_issues(tickers: list[str], date: str, use_llm: bool = True) -> dict[str, list]: + """news_sentiment 음수 → 악재 flag. (LLM 요약은 best-effort; 단위 테스트는 use_llm=False로.)""" + senti = _news_sentiment_map(date) + out: dict[str, list] = {} + for t in tickers: + s = senti.get(t) + if not s or s["score_raw"] is None: + continue + if s["score_raw"] <= NEG_SENTIMENT: + sev = "high" if s["score_raw"] <= NEG_SENTIMENT * 2 else "med" + out.setdefault(t, []).append({ + "type": "news", + "severity": sev, + "summary": f"부정 뉴스 감성({s['score_raw']:+.2f}, {s.get('news_count', 0)}건)", + }) + return out + + +# ---- Task 3.3: portfolio_health ---- + + +def portfolio_health(holdings: list[dict], total_cash: int = 0) -> dict: + """비중 집중도(최대비중·HHI) + 현금비중 + 총손익 요약.""" + evals, buys = [], [] + for h in holdings: + cur = h.get("current_price") or h.get("avg_price") or 0 + ev = cur * h.get("quantity", 0) + bu = (h.get("avg_price") or 0) * h.get("quantity", 0) + evals.append(ev) + buys.append(bu) + total_eval = sum(evals) + total_buy = sum(buys) + weights = [e / total_eval for e in evals] if total_eval else [] + hhi = sum(w * w for w in weights) + total_assets = total_eval + (total_cash or 0) + return { + "positions": len(holdings), + "total_eval": total_eval, + "total_buy": total_buy, + "total_pnl": total_eval - total_buy, + "total_pnl_rate": ((total_eval - total_buy) / total_buy * 100.0) if total_buy else 0.0, + "max_weight": max(weights) if weights else 0.0, + "hhi": round(hhi, 4), + "cash_ratio": ((total_cash or 0) / total_assets) if total_assets else 0.0, + } + + +DEFAULT_PARAMS = { + "stop_pct": 0.08, "take_pct": 0.25, "climax_vol_x": 3.0, + "move_pct": 7.0, "vol_z": 2.5, + "momentum_drop": 15.0, "momentum_low": 35.0, +} + + +def _load_ctx(asof: dt.date): + """ScreenContext.load를 감싸는 thin wrapper (테스트에서 monkeypatch 대상).""" + from .screener.engine import ScreenContext + with db._conn() as conn: + return ScreenContext.load(conn, asof) + + +def _today_kst() -> dt.date: + return (dt.datetime.utcnow() + dt.timedelta(hours=9)).date() + + +def compute_and_store(asof: Optional[dt.date] = None, use_llm: bool = True, + params: dict | None = None) -> dict: + """보유종목 시그널 계산 → holdings_signals upsert (멱등). + + Returns: + {"stored": N, "date": "YYYY-MM-DD"} or {"stored": 0, "reason": "..."} + """ + asof = asof or _today_kst() + p = {**DEFAULT_PARAMS, **(params or {})} + holdings = get_holdings() + if not holdings: + return {"stored": 0, "reason": "no_holdings"} + krx = [h for h in holdings if h.get("is_krx")] + ctx = _load_ctx(asof) + posture = technical_posture(ctx, [h["ticker"] for h in krx]) if krx else {} + date_iso = asof.isoformat() + issues_map = news_issues([h["ticker"] for h in holdings], date_iso, use_llm=use_llm) + stored = 0 + for h in holdings: + t = h["ticker"] + tp = ctx.prices[ctx.prices["ticker"] == t] if h.get("is_krx") else None + tf = ctx.flow[ctx.flow["ticker"] == t] if h.get("is_krx") else None + flags = exit_rules(h, tp, p) if h.get("is_krx") else {} + tech = posture.get(t) + # momentum_loss: 직전 저장 시그널 대비 하락 or 낮은 강도 + prev = db.get_holdings_signal_history(t, limit=2) + prev_score = next((r["tech_score"] for r in prev if r["date"] != date_iso), None) + if tech is not None and ( + (prev_score is not None and tech < prev_score - p["momentum_drop"]) + or tech < p["momentum_low"] + ): + flags["momentum_loss"] = True + evts = market_events(t, tp, tf, p) if h.get("is_krx") else [] + issues = list(issues_map.get(t, [])) + evts + action, reasons = decide_action(tech if tech is not None else 0.0, flags, h.get("pnl_rate")) + db.upsert_holdings_signal( + date=date_iso, ticker=t, name=h.get("name"), action=action, + tech_score=tech, exit_flags=flags, issues=issues, + close=h.get("current_price"), pnl_rate=h.get("pnl_rate"), reasons=reasons, + ) + stored += 1 + return {"stored": stored, "date": date_iso} + + +def build_holdings_brief(date: Optional[str] = None) -> dict: + """최신 시그널 + 포트 건강 조립 (브리핑/UI payload).""" + date = date or db.get_latest_holdings_date() + if not date: + return {"date": None, "holdings": [], "portfolio_health": {}} + signals = db.get_holdings_signals(date) + holdings = get_holdings() + total_cash = sum(c.get("cash", 0) for c in db.get_all_broker_cash()) + health = portfolio_health(holdings, total_cash=total_cash) + return {"date": date, "holdings": signals, "portfolio_health": health} + + +def decide_action(tech_score: float, exit_flags: dict, pnl: float | None, + add_score: float = ADD_SCORE) -> tuple[str, str]: + """액션 결정 매트릭스: sell > trim > add > hold (우선순위 순). + + Returns: + (action, reasons_text) action ∈ {"sell","trim","add","hold"} + """ + reasons = [] + # 청산 (최우선) + if exit_flags.get("stop_loss"): + reasons.append("손절선 이탈") + if exit_flags.get("ma200_break"): + reasons.append("MA200 이탈") + if reasons: + return "sell", " · ".join(reasons) + # 축소 + if exit_flags.get("ma50_break"): + reasons.append("MA50 이탈") + if exit_flags.get("momentum_loss"): + reasons.append("모멘텀 소멸") + if exit_flags.get("take_profit"): + reasons.append(f"목표 수익 도달(+{pnl:.0f}%)" if pnl is not None else "목표 수익 도달") + if exit_flags.get("climax"): + reasons.append("거래량 급증 분산 의심") + if reasons: + return "trim", " · ".join(reasons) + # 추가매수 + if tech_score is not None and tech_score >= add_score: + return "add", f"기술적 강도 양호({tech_score:.0f})" + return "hold", "특이 신호 없음" diff --git a/stock/app/main.py b/stock/app/main.py index 8f32835..58a7d95 100644 --- a/stock/app/main.py +++ b/stock/app/main.py @@ -3,7 +3,7 @@ import json import logging from datetime import date as date_type from typing import Optional -from fastapi import FastAPI, Query, Header, Depends, HTTPException +from fastapi import FastAPI, Query, Header, Depends, HTTPException, BackgroundTasks from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware import requests @@ -27,6 +27,7 @@ from .price_fetcher import get_current_prices, get_current_prices_detail from .ai_summarizer import summarize_news, OllamaError from .auth import verify_webai_key from . import webai_cache +from . import holdings_intel app = FastAPI() install_access_log(app) @@ -652,5 +653,25 @@ def remove_sell_history(record_id: int): return {"ok": True} +# --- Holdings Intelligence API --- + +@app.get("/api/stock/holdings/intel") +def holdings_intel_brief(): + """보유종목 인텔리전스 브리핑 (최신 시그널 + 포트 건강)""" + return holdings_intel.build_holdings_brief() + + +@app.get("/api/stock/holdings/intel/history") +def holdings_intel_history(ticker: str, days: int = 30): + """종목별 시그널 이력 조회""" + from . import db + return {"ticker": ticker, "history": db.get_holdings_signal_history(ticker, days)} + + +@app.post("/api/stock/holdings/intel/run") +def holdings_intel_run(background_tasks: BackgroundTasks, use_llm: bool = True): + """보유종목 시그널 계산 트리거 (BackgroundTask)""" + background_tasks.add_task(holdings_intel.compute_and_store, None, use_llm) + return {"ok": True, "queued": True} diff --git a/stock/app/test_holdings_api.py b/stock/app/test_holdings_api.py new file mode 100644 index 0000000..d812364 --- /dev/null +++ b/stock/app/test_holdings_api.py @@ -0,0 +1,42 @@ +import os +import sys +import tempfile + +from fastapi.testclient import TestClient + + +def _client(monkeypatch): + # Add web-backend root to sys.path so _shared can be imported by main.py + sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) + from app import db + monkeypatch.setattr(db, "DB_PATH", os.path.join(tempfile.mkdtemp(), "stock.db")) + db.init_db() + from app.main import app + return TestClient(app) + + +def test_holdings_intel_endpoint(monkeypatch): + client = _client(monkeypatch) + r = client.get("/api/stock/holdings/intel") + assert r.status_code == 200 + body = r.json() + assert "holdings" in body and "portfolio_health" in body + + +def test_holdings_intel_history_endpoint(monkeypatch): + client = _client(monkeypatch) + r = client.get("/api/stock/holdings/intel/history?ticker=005930") + assert r.status_code == 200 + body = r.json() + assert body["ticker"] == "005930" + assert "history" in body + assert isinstance(body["history"], list) + + +def test_holdings_intel_run_endpoint(monkeypatch): + client = _client(monkeypatch) + r = client.post("/api/stock/holdings/intel/run") + assert r.status_code == 200 + body = r.json() + assert body["ok"] is True + assert body["queued"] is True diff --git a/stock/app/test_holdings_db.py b/stock/app/test_holdings_db.py new file mode 100644 index 0000000..a5bd030 --- /dev/null +++ b/stock/app/test_holdings_db.py @@ -0,0 +1,37 @@ +import os, tempfile + +def _fresh_db(monkeypatch): + tmp = tempfile.mkdtemp() + from app import db + monkeypatch.setattr(db, "DB_PATH", os.path.join(tmp, "stock.db")) + db.init_db() + return db + +def test_holdings_signals_table_and_upsert(monkeypatch): + db = _fresh_db(monkeypatch) + db.upsert_holdings_signal(date="2026-05-29", ticker="005930", name="삼성전자", + action="hold", tech_score=72.0, exit_flags={"stop_loss": False}, + issues=[{"type": "news", "severity": "low", "summary": "x"}], + close=80000, pnl_rate=5.2, reasons="강건") + db.upsert_holdings_signal(date="2026-05-29", ticker="005930", name="삼성전자", + action="trim", tech_score=60.0, exit_flags={"ma50_break": True}, + issues=[], close=79000, pnl_rate=3.0, reasons="MA50 이탈") + rows = db.get_holdings_signals(date="2026-05-29") + assert len(rows) == 1 # upsert 멱등 + assert rows[0]["action"] == "trim" + assert rows[0]["exit_flags"]["ma50_break"] is True # JSON 역직렬화 + hist = db.get_holdings_signal_history("005930", limit=30) + assert len(hist) == 1 + + +def test_get_latest_holdings_date(monkeypatch): + db = _fresh_db(monkeypatch) + # empty table → None + assert db.get_latest_holdings_date() is None + # after an upsert → returns that date + db.upsert_holdings_signal( + date="2026-05-30", ticker="005930", name="삼성전자", + action="hold", tech_score=70.0, exit_flags={}, issues=[], + close=80000, pnl_rate=4.0, reasons="테스트", + ) + assert db.get_latest_holdings_date() == "2026-05-30" diff --git a/stock/app/test_holdings_intel.py b/stock/app/test_holdings_intel.py new file mode 100644 index 0000000..ea34580 --- /dev/null +++ b/stock/app/test_holdings_intel.py @@ -0,0 +1,387 @@ +import datetime as dt +import pandas as pd + +from app import holdings_intel as hi + + +def test_get_holdings_merges_price_and_pnl(monkeypatch): + monkeypatch.setattr(hi.db, "get_all_portfolio", lambda: [ + {"id": 1, "broker": "kis", "ticker": "005930", "name": "삼성전자", + "quantity": 10, "avg_price": 70000, "purchase_price": 70000}, + {"id": 2, "broker": "kis", "ticker": "AAPL", "name": "Apple", + "quantity": 5, "avg_price": 200, "purchase_price": 200}, + ]) + monkeypatch.setattr(hi.price_fetcher, "get_current_prices", + lambda tickers: {"005930": 77000}) # AAPL 미조회(비KRX) + monkeypatch.setattr(hi, "_krx_tickers", lambda: {"005930"}) + hs = hi.get_holdings() + s = {h["ticker"]: h for h in hs} + assert s["005930"]["is_krx"] is True + assert round(s["005930"]["pnl_rate"], 1) == 10.0 # (77000-70000)/70000 + assert s["AAPL"]["is_krx"] is False # KRX 외 + + +def test_get_holdings_zero_avg_price(monkeypatch): + """avg_price=0인 종목은 pnl_rate가 None이어야 한다 (ZeroDivisionError 없음).""" + monkeypatch.setattr(hi.db, "get_all_portfolio", lambda: [ + {"id": 1, "broker": "kis", "ticker": "005930", "name": "삼성전자", + "quantity": 10, "avg_price": 0, "purchase_price": 0}, + ]) + monkeypatch.setattr(hi.price_fetcher, "get_current_prices", + lambda tickers: {"005930": 80000}) + monkeypatch.setattr(hi, "_krx_tickers", lambda: {"005930"}) + hs = hi.get_holdings() + assert hs[0]["pnl_rate"] is None + + +def test_get_holdings_empty_portfolio(monkeypatch): + """포트폴리오가 비어있으면 빈 리스트를 반환하고 가격 조회를 호출하지 않는다.""" + monkeypatch.setattr(hi.db, "get_all_portfolio", lambda: []) + called = [] + monkeypatch.setattr(hi.price_fetcher, "get_current_prices", + lambda tickers: called.append(tickers) or {}) + monkeypatch.setattr(hi, "_krx_tickers", lambda: set()) + result = hi.get_holdings() + assert result == [] + assert called == [] # get_current_prices must NOT have been called + + +def test_get_holdings_price_missing(monkeypatch): + """prices dict에 ticker가 없으면 current_price와 pnl_rate는 None이다.""" + monkeypatch.setattr(hi.db, "get_all_portfolio", lambda: [ + {"id": 1, "broker": "kis", "ticker": "000660", "name": "SK하이닉스", + "quantity": 5, "avg_price": 150000, "purchase_price": 150000}, + ]) + monkeypatch.setattr(hi.price_fetcher, "get_current_prices", + lambda tickers: {}) # 가격 없음 + monkeypatch.setattr(hi, "_krx_tickers", lambda: {"000660"}) + hs = hi.get_holdings() + assert hs[0]["current_price"] is None + assert hs[0]["pnl_rate"] is None + + +# ---- Phase 2 tests ---- + +def _toy_ctx(tickers=("005930",), n=300): + """결정적 일봉으로 ScreenContext 유사 객체 구성.""" + from app.screener.engine import ScreenContext + rows = [] + base = dt.date(2025, 1, 1) + for t in tickers: + price = 1000 + for i in range(n): + price = int(price * 1.002) # 완만한 상승 → 정배열 + d = (base + dt.timedelta(days=i)).isoformat() + rows.append({"ticker": t, "date": d, "open": price, "high": price, + "low": price, "close": price, "volume": 1000, "value": price*1000}) + prices = pd.DataFrame(rows) + master = pd.DataFrame({"name": [f"n{t}" for t in tickers], + "market": ["KOSPI"]*len(tickers), + "market_cap": [1e12]*len(tickers)}, + index=pd.Index(tickers, name="ticker")) + flow = pd.DataFrame(columns=["ticker","date","foreign_net","institution_net"]) + return ScreenContext(master=master, prices=prices, flow=flow, + kospi=pd.Series(dtype=float), asof=base+dt.timedelta(days=n-1)) + + +def test_technical_posture_returns_scores(): + ctx = _toy_ctx(("005930",)) + scores = hi.technical_posture(ctx, ["005930"]) + assert "005930" in scores + assert 0.0 <= scores["005930"] <= 100.0 # 상승추세 → 양수 점수 + + +# ---- Task 2.2 tests ---- + +def _ticker_prices(closes, vols=None): + n = len(closes) + base = dt.date(2025, 1, 1) + vols = vols or [1000]*n + return pd.DataFrame({ + "ticker": ["005930"]*n, + "date": [(base+dt.timedelta(days=i)).isoformat() for i in range(n)], + "open": closes, "high": closes, "low": closes, "close": closes, "volume": vols, + }) + + +DEFAULT_EXIT = {"stop_pct": 0.08, "take_pct": 0.25, "climax_vol_x": 3.0} + + +def test_exit_rules_stop_and_ma(): + closes = [1000]*60 + [1100]*200 # 충분한 길이, 최근 평탄 + df = _ticker_prices(closes) + # 현재가가 평단(2000) 대비 -45% → stop_loss + flags = hi.exit_rules({"avg_price": 2000, "current_price": 1100}, df, DEFAULT_EXIT) + assert flags["stop_loss"] is True + # 종가 1100 > MA50≈1100, MA200은 더 낮음 → ma 이탈 아님 + assert flags["ma200_break"] is False + + +def test_exit_rules_take_profit(): + df = _ticker_prices([1000]*260) + flags = hi.exit_rules({"avg_price": 1000, "current_price": 1300}, df, DEFAULT_EXIT) + assert flags["take_profit"] is True # +30% ≥ 25% + + +# ---- Task 2.3 tests ---- + +def test_decide_action_matrix(): + # 강건 + 이탈 없음 + 높은 강도 → add + a, r = hi.decide_action(tech_score=80, exit_flags={}, pnl=5) + assert a == "add" + # ma200 이탈 → sell + a, r = hi.decide_action(70, {"ma200_break": True}, 2) + assert a == "sell" + # stop_loss → sell + a, _ = hi.decide_action(70, {"stop_loss": True}, -10) + assert a == "sell" + # ma50 이탈만 → trim + a, _ = hi.decide_action(60, {"ma50_break": True}, 3) + assert a == "trim" + # 이탈 없음 보통 강도 → hold + a, _ = hi.decide_action(50, {}, 1) + assert a == "hold" + + +# ---- Phase 2 hardening tests (m3) ---- + +def _ticker_prices_hl(closes, highs, vols): + n = len(closes) + base = dt.date(2025, 1, 1) + return pd.DataFrame({ + "ticker": ["005930"] * n, + "date": [(base + dt.timedelta(days=i)).isoformat() for i in range(n)], + "open": closes, + "high": highs, + "low": closes, + "close": closes, + "volume": vols, + }) + + +def test_exit_rules_climax(): + closes = [1000] * 30 + highs = [1000] * 29 + [1100] # 마지막날 상단꼬리(종가1000 < 고가1100*0.97) + vols = [1000] * 29 + [5000] # 거래량 5x + flags = hi.exit_rules({"avg_price": 900, "current_price": 1000}, + _ticker_prices_hl(closes, highs, vols), {}) + assert flags["climax"] is True + + +def test_exit_rules_ma200_break(): + closes = list(range(1000, 1000 + 260))[::-1] # 하락 추세 → 종가 < MA200 + df = _ticker_prices(closes) + flags = hi.exit_rules({"avg_price": 2000, "current_price": closes[-1]}, df, {}) + assert flags["ma200_break"] is True + + +def test_technical_posture_short_history_returns_low_not_crash(): + ctx = _toy_ctx(("005930",), n=100) # <252 → MA 노드 NaN→0, but no crash + scores = hi.technical_posture(ctx, ["005930"]) + assert "005930" in scores + assert 0.0 <= scores["005930"] <= 100.0 + + +def test_technical_posture_empty_kospi_not_penalized(): + # rs_rating는 빈 kospi에서 빈 Series → combine에서 제외되어야 (C1) + ctx = _toy_ctx(("005930",), n=300) # kospi 빈 fixture + scores = hi.technical_posture(ctx, ["005930"]) + # ma_alignment+momentum만으로 정규화 → 상승추세면 충분히 높은 점수 + assert scores["005930"] > 50.0 + + +# ---- Phase 3 tests ---- + +DEFAULT_EVENT = {"move_pct": 7.0, "vol_z": 2.5} + + +def test_market_events_detects_move_and_volume(): + closes = [1000]*30 + [1100] # 마지막날 +10% + vols = [1000]*30 + [10000] # 거래량 급증 + df = _ticker_prices(closes, vols) + evts = hi.market_events("005930", df, None, DEFAULT_EVENT) + types = {e["type"] for e in evts} + assert "price_move" in types + assert "volume_surge" in types + + +def test_news_issues_flags_negative_sentiment(monkeypatch): + # news_sentiment: 005930 음수 점수 → 악재 flag + monkeypatch.setattr(hi, "_news_sentiment_map", lambda date: { + "005930": {"score_raw": -0.6, "news_count": 8}}) + issues = hi.news_issues(["005930"], date="2026-05-29", use_llm=False) + assert "005930" in issues + assert issues["005930"][0]["type"] == "news" + assert issues["005930"][0]["severity"] in ("med", "high") + + +def test_portfolio_health(): + holdings = [ + {"ticker": "005930", "quantity": 10, "avg_price": 70000, "current_price": 77000, + "is_krx": True}, + {"ticker": "000660", "quantity": 5, "avg_price": 100000, "current_price": 90000, + "is_krx": True}, + ] + h = hi.portfolio_health(holdings, total_cash=1000000) + assert h["positions"] == 2 + assert 0 <= h["max_weight"] <= 1.0 + assert "total_eval" in h and "total_pnl" in h and "cash_ratio" in h + + +def test_market_events_volume_surge_zscore_path(): + # 변동 있는 기준선 → Z-score 경로(sd>0) 검증 (sd=0 fallback 아님) + import random as _r + _r.seed(1) + base_vols = [1000 + _r.randint(-50, 50) for _ in range(30)] + closes = [1000] * 30 + [1010] + vols = base_vols + [max(base_vols) * 10] # 마지막날 큰 급증 + df = _ticker_prices(closes, vols) + evts = hi.market_events("005930", df, None, DEFAULT_EVENT) + assert any(e["type"] == "volume_surge" for e in evts) + + +def test_market_events_foreign_selling(): + closes = [1000] * 5 + df = _ticker_prices(closes) + import datetime as _dt + base = _dt.date(2025, 1, 1) + flow = pd.DataFrame({ + "ticker": ["005930"] * 5, + "date": [(base + _dt.timedelta(days=i)).isoformat() for i in range(5)], + "foreign_net": [100, 50, -10, -20, -30], # 최근 3일 연속 순매도 + "institution_net": [0] * 5, + }) + evts = hi.market_events("005930", df, flow, DEFAULT_EVENT) + assert any(e["type"] == "foreign_selling" for e in evts) + + +def test_news_issues_severity_high_boundary(monkeypatch): + monkeypatch.setattr(hi, "_news_sentiment_map", lambda date: { + "005930": {"score_raw": -0.6, "news_count": 5}}) # 정확히 high 경계 + issues = hi.news_issues(["005930"], date="2026-05-29", use_llm=False) + assert issues["005930"][0]["severity"] == "high" + + +def test_portfolio_health_empty_and_zero(): + # 빈 포트 → 0/빈값, 크래시 없음 + h0 = hi.portfolio_health([], total_cash=0) + assert h0["positions"] == 0 + assert h0["max_weight"] == 0.0 + assert h0["total_pnl_rate"] == 0.0 + assert h0["cash_ratio"] == 0.0 + # total_buy=0 (avg_price 0) → div-by-zero 없이 0.0 + h1 = hi.portfolio_health([{"ticker": "X", "quantity": 1, "avg_price": 0, + "current_price": 0, "is_krx": True}], total_cash=0) + assert h1["total_pnl_rate"] == 0.0 + + +# ---- Phase 4 tests ---- + +def test_compute_and_store_and_brief(monkeypatch): + import os, tempfile + from app import db + monkeypatch.setattr(db, "DB_PATH", os.path.join(tempfile.mkdtemp(), "stock.db")) + db.init_db() + monkeypatch.setattr(hi, "get_holdings", lambda: [ + {"ticker": "005930", "name": "삼성전자", "quantity": 10, "avg_price": 1000, + "current_price": 1100, "pnl_rate": 10.0, "is_krx": True}]) + ctx = _toy_ctx(("005930",)) + monkeypatch.setattr(hi, "_load_ctx", lambda asof: ctx) + monkeypatch.setattr(hi, "_news_sentiment_map", lambda date: {}) + monkeypatch.setattr(hi.db, "get_all_broker_cash", lambda: [{"broker": "kis", "cash": 500000}]) + res = hi.compute_and_store(asof=ctx.asof, use_llm=False) + assert res["stored"] == 1 + brief = hi.build_holdings_brief() + assert brief["holdings"][0]["ticker"] == "005930" + assert "portfolio_health" in brief + assert brief["holdings"][0]["action"] in ("add", "hold", "trim", "sell") + + +def test_compute_momentum_loss_flag(monkeypatch): + """직전 시그널 tech_score HIGH → 오늘 LOW → momentum_loss=True.""" + import os, tempfile + from app import db + + monkeypatch.setattr(db, "DB_PATH", os.path.join(tempfile.mkdtemp(), "stock.db")) + db.init_db() + + yesterday = (dt.date.today() - dt.timedelta(days=1)).isoformat() + today = dt.date.today() + today_iso = today.isoformat() + + # 어제 시그널 삽입: tech_score=90 (HIGH) + db.upsert_holdings_signal( + date=yesterday, ticker="005930", name="삼성전자", + action="hold", tech_score=90.0, exit_flags={}, issues=[], + close=1000, pnl_rate=0.0, reasons="x", + ) + + monkeypatch.setattr(hi, "get_holdings", lambda: [ + {"ticker": "005930", "name": "삼성전자", "quantity": 10, "avg_price": 1000, + "current_price": 1000, "pnl_rate": 0.0, "is_krx": True} + ]) + ctx = _toy_ctx(("005930",)) + monkeypatch.setattr(hi, "_load_ctx", lambda asof: ctx) + monkeypatch.setattr(hi, "_news_sentiment_map", lambda date: {}) + monkeypatch.setattr(hi.db, "get_all_broker_cash", lambda: []) + # tech_score=30 → 낮음 (momentum_low=35 미만, 또한 90-30=60 > momentum_drop=15) + monkeypatch.setattr(hi, "technical_posture", lambda ctx, tickers: {"005930": 30.0}) + + res = hi.compute_and_store(asof=today, use_llm=False) + assert res["stored"] == 1 + + signals = db.get_holdings_signals(today_iso) + assert len(signals) == 1 + assert signals[0]["exit_flags"]["momentum_loss"] is True + + +def test_compute_idempotent(monkeypatch): + """동일 입력으로 compute_and_store 두 번 실행 → upsert로 1건만 저장.""" + import os, tempfile + from app import db + + monkeypatch.setattr(db, "DB_PATH", os.path.join(tempfile.mkdtemp(), "stock.db")) + db.init_db() + + monkeypatch.setattr(hi, "get_holdings", lambda: [ + {"ticker": "005930", "name": "삼성전자", "quantity": 10, "avg_price": 1000, + "current_price": 1100, "pnl_rate": 10.0, "is_krx": True} + ]) + ctx = _toy_ctx(("005930",)) + monkeypatch.setattr(hi, "_load_ctx", lambda asof: ctx) + monkeypatch.setattr(hi, "_news_sentiment_map", lambda date: {}) + monkeypatch.setattr(hi.db, "get_all_broker_cash", lambda: []) + + hi.compute_and_store(asof=ctx.asof, use_llm=False) + hi.compute_and_store(asof=ctx.asof, use_llm=False) + + signals = db.get_holdings_signals(ctx.asof.isoformat()) + assert len(signals) == 1, f"upsert 실패: {len(signals)}건 저장됨" + + +def test_compute_non_krx_holding(monkeypatch): + """is_krx=False 종목은 tech_score=None·action='hold'로 저장된다.""" + import os, tempfile + from app import db + + monkeypatch.setattr(db, "DB_PATH", os.path.join(tempfile.mkdtemp(), "stock.db")) + db.init_db() + + monkeypatch.setattr(hi, "get_holdings", lambda: [ + {"ticker": "AAPL", "name": "Apple", "quantity": 5, "avg_price": 200, + "current_price": 220, "pnl_rate": 10.0, "is_krx": False} + ]) + ctx = _toy_ctx(()) # ticker 없는 빈 ctx + monkeypatch.setattr(hi, "_load_ctx", lambda asof: ctx) + monkeypatch.setattr(hi, "_news_sentiment_map", lambda date: {}) + monkeypatch.setattr(hi.db, "get_all_broker_cash", lambda: []) + + res = hi.compute_and_store(asof=ctx.asof, use_llm=False) + assert res["stored"] == 1 + + signals = db.get_holdings_signals(ctx.asof.isoformat()) + assert len(signals) == 1 + sig = signals[0] + assert sig["ticker"] == "AAPL" + assert sig["tech_score"] is None + assert sig["action"] == "hold"