44 Commits

Author SHA1 Message Date
5d9be51dba merge: 주식 보유종목 인텔리전스 (Phase 1-5)
스크리너 엔진을 보유종목에 restrict + 매도/리스크 룰 + 이슈 감지
(급변·거래량·외인·뉴스감성) + 포트 건강 → 매일 advisory 브리핑.
EOD(16:50)+아침(08:30) cron. KIS 실주문 미사용.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 22:56:00 +09:00
cd4fb27d5a fix(agent-office): EOD 16:50 stagger(부분일봉 방지)·idle가드 문서화·proxy/import 정리
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 22:28:12 +09:00
b94b5973d6 feat(agent-office): StockAgent holdings EOD(16:40)+브리핑(08:30) cron
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 22:22:04 +09:00
f54ade2c0d feat(agent-office): 보유종목 브리핑 텔레그램 포매터
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 22:21:58 +09:00
2cbc830004 feat(agent-office): stock holdings run/brief 프록시
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 22:21:54 +09:00
d0c057358a test(stock): Phase 4 회귀 (momentum_loss·멱등·non-KRX 경로)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 22:18:58 +09:00
7d7064ae93 feat(stock): holdings intel API (intel/history/run)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 22:12:28 +09:00
789785fe3a feat(stock): compute_and_store + build_holdings_brief
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 22:11:45 +09:00
c3a3055060 test(stock): Phase 3 커버리지 보강 (volume Z경로·외인매도·severity경계·빈포트)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 22:09:05 +09:00
3056e8d35f feat(stock): portfolio_health (집중도·현금·손익)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 22:03:21 +09:00
4ed3794f71 feat(stock): news_issues (감성 기반 악재 flag)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 22:02:45 +09:00
241c24943f feat(stock): market_events (급변·거래량Z·외인순매도)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 22:02:10 +09:00
c756b20c77 fix(stock): Phase 2 결정엔진 견고화 (빈노드 제외·cur=0 손절·params기본값·NaN MA·테스트)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 22:00:02 +09:00
fba6dbf1fd feat(stock): decide_action 매트릭스 (sell>trim>add>hold)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 21:48:52 +09:00
b13c088739 feat(stock): exit_rules (손절·MA이탈·익절·클라이맥스)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 21:48:37 +09:00
116b2540c2 feat(stock): technical_posture (스크리너 노드 보유종목 적용)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 21:48:01 +09:00
62169ad33f refactor(stock): Phase 1 리뷰 반영 (public get_krx_tickers·타입·limit명명·테스트)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 21:45:19 +09:00
0ef7d414b7 feat(stock): get_holdings (현재가·손익·KRX판별)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 21:37:01 +09:00
885d52d8f5 feat(stock): holdings_signals 테이블 + CRUD
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 21:36:27 +09:00
e3088f7cc6 docs(plan): 주식 보유종목 인텔리전스 구현 plan (7 Phase, TDD)
Phase 1 데이터모델+get_holdings → 2 기술분석·매도룰·decide_action →
3 이슈(market_events·news·portfolio_health) → 4 compute+brief+API →
5 agent-office EOD·아침브리핑 → 6 web-ui 탭 → 7 검증. 장중 가드는 후속.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 21:33:55 +09:00
2996cf16d1 docs(spec): 주식 보유종목 인텔리전스 설계
스크리너 엔진을 보유종목에 restrict 적용 + 신규 매도/리스크 룰 +
이슈 감지(급변·거래량·외인·뉴스 LLM) + 포트 건강 → 매일 advisory 브리핑.
EOD 일봉 + 장중 경량 가드, KIS 실주문 미사용. 기존 screener/snapshot/
news_sentiment/portfolio 재활용, 신규 데이터소스 0.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 21:25:21 +09:00
03ee5ce147 merge: 로또 자가학습 백테스트 & 캘리브레이션 (Phase 1-5)
forward 가상구매(6 engine_w + 6 random_null + coverage) + winner 캘리브레이션
+ evolver lift 학습신호(best-vs-best, ε게이팅) + 일요 회고 텔레그램.
null-model 베이스라인으로 무작위 대비 우위를 정직하게 측정.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 21:04:40 +09:00
11212c4afd fix(agent-office): 일요 회고 견고화 (dead import 제거·send 가드·부분 payload 방어)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 18:02:01 +09:00
1b8548a73f feat(agent-office): LottoAgent 일 09:00 sunday_review cron
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 17:53:01 +09:00
c4ba7e81e6 feat(agent-office): 일요 회고 텔레그램 포매터
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 17:51:34 +09:00
e8270c5a63 feat(agent-office): lotto backtest review/run-forward 프록시
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 17:51:29 +09:00
4063f29cd3 fix(lotto): 학습 게이트 정직화 (engine-best vs random-best 6trial·명시적 gated·정체성 일관)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 17:47:52 +09:00
03056a4747 feat(lotto): evaluate_weekly 학습 신호를 forward lift로 승격
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 17:33:27 +09:00
8e7b4adabd feat(lotto): select_winner_by_lift + ε-게이팅
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 17:32:37 +09:00
add433233a fix(lotto): Phase 3 리뷰 반영 (run-forward 백그라운드·review 404·track_record distinct·테스트 보강)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 17:30:10 +09:00
74f385c7bd feat(lotto): 새 회차 동기화 시 forward+calibration 자동 실행
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 17:20:59 +09:00
3bc4f423db feat(lotto): backtest API 라우터 + main 등록
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 17:20:32 +09:00
a425bb8809 feat(lotto): track_record + build_review_payload 집계
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 17:19:05 +09:00
850638ae58 fix(lotto): Phase 2 리뷰 반영 (engine_w 회차주 기준·누출제거·N+1제거·테스트 보강)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 17:17:09 +09:00
94a94e260c feat(lotto): run_forward_purchase 3전략 구매·채점·저장
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 17:07:26 +09:00
c196da4902 feat(lotto): calibrate_winner + backfill (멱등·청크)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 17:06:00 +09:00
aaba4fbc46 feat(lotto): calibrate_winner_compute 당첨조합 역분석+percentile
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 17:05:06 +09:00
9f897ea4a0 feat(lotto): point_in_time_draws 헬퍼
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 17:04:35 +09:00
77efa9b653 refactor(lotto): Phase 1 코드리뷰 반영 (로컬 RNG·write-once·가드·테스트 보강)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 17:02:16 +09:00
8dbb1abaeb feat(lotto): 티켓 생성 3전략 (engine_w/random_null/coverage)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 16:50:28 +09:00
41ad56e3ef feat(lotto): grade_tickets 매칭 채점 + 등수 매핑
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 16:49:49 +09:00
bb0e771a4a feat(lotto): backtest_runs/winner_calibration 테이블 + CRUD
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 16:49:03 +09:00
160fc27279 docs(plan): 로또 자가학습 백테스트 구현 plan (7 Phase, TDD)
Phase 1 데이터모델+구매/채점 → 2 캘리브레이션+forward+백필 →
3 API+스케줄러 → 4 evolver lift 학습신호 → 5 agent-office 일요회고 →
6 web-ui 자율학습 탭 → 7 통합검증. 각 task TDD bite-sized + 멱등.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 16:44:21 +09:00
f3f6cccd33 docs(spec): 로또 자가학습 백테스트 & 캘리브레이션 설계
3종 스마트 에이전트 고도화 중 로또 1번. forward 가상구매(수천 장/회차)
+ winner 캘리브레이션(역대 백필) + 일요 회고 브리핑 + weight_evolver
학습 신호 강화(W-무관 결함 수정). null-model 베이스라인 내장으로
무작위 대비 우위를 정직하게 측정. NAS-first, Windows WSL 이전 가능 설계.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 16:37:25 +09:00
27 changed files with 5057 additions and 1 deletions

View File

@@ -22,6 +22,8 @@ class LottoAgent(BaseAgent):
return await self.run_signal_check(source=source)
if action == "daily_digest":
return await self.run_daily_digest()
if action == "sunday_review":
return await self.run_sunday_review()
return {"ok": False, "message": f"unknown action: {action}"}
async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None:
@@ -155,6 +157,29 @@ class LottoAgent(BaseAgent):
add_log("lotto", f"daily_digest 예외: {e}", level="error", task_id=task_id)
return {"ok": False, "message": f"{type(e).__name__}: {e}"}
async def run_sunday_review(self) -> dict:
"""일 09:00 — 최신 회차 forward+calibration 보장 후 회고 텔레그램."""
from ..service_proxy import lotto_latest_draw, lotto_backtest_review
from ..notifiers.telegram_lotto import send_sunday_review
from ..db import create_task, update_task_status, add_log
task_id = create_task("lotto", "sunday_review", {})
try:
draw_no = await lotto_latest_draw()
if not draw_no:
update_task_status(task_id, "failed", result_data={"reason": "no_draw"})
return {"ok": False, "message": "no latest draw"}
# forward는 lotto cron이 이미 돌렸을 수 있으나 멱등이라 안전 — review만 호출
payload = await lotto_backtest_review(draw_no)
await send_sunday_review(payload)
update_task_status(task_id, "succeeded", result_data={"draw_no": draw_no})
add_log("lotto", f"sunday_review 발송: #{draw_no}", task_id=task_id)
return {"ok": True, "draw_no": draw_no}
except Exception as e:
update_task_status(task_id, "failed", result_data={"error": str(e)})
add_log("lotto", f"sunday_review 예외: {e}", level="error", task_id=task_id)
return {"ok": False, "message": f"{type(e).__name__}: {e}"}
async def run_weekly_evolution_report(self) -> dict:
"""토 22:15 — lotto-lab evaluate-now 트리거 후 텔레그램 리포트. task_id wrap."""
from ..service_proxy import lotto_evolver_evaluate, lotto_evolver_status

View File

@@ -336,7 +336,48 @@ class StockAgent(BaseAgent):
await self.transition("idle", "AI 뉴스 완료")
async def run_holdings_eod(self) -> dict:
"""평일 16:50 — 보유종목 시그널 계산·저장."""
# idle 가드 없음(의도적): 스크리너 진행 중에도 EOD/브리핑은 독립적으로 실행되어야 함
from ..service_proxy import stock_holdings_run
from ..db import create_task, update_task_status, add_log
task_id = create_task(self.agent_id, "holdings_eod", {})
try:
res = await stock_holdings_run()
update_task_status(task_id, "succeeded", res)
add_log(self.agent_id, f"holdings_eod: {res}", "info", task_id)
return {"ok": True, **res}
except Exception as e:
update_task_status(task_id, "failed", {"error": str(e)})
add_log(self.agent_id, f"holdings_eod 실패: {e}", "error", task_id)
return {"ok": False, "message": str(e)}
async def run_holdings_brief(self) -> dict:
"""평일 08:30 — 저장된 시그널 브리핑 텔레그램."""
# idle 가드 없음(의도적): 스크리너 진행 중에도 EOD/브리핑은 독립적으로 실행되어야 함
from ..service_proxy import stock_holdings_brief
from ..notifiers.telegram_stock import send_holdings_brief
from ..db import create_task, update_task_status, add_log
task_id = create_task(self.agent_id, "holdings_brief", {})
try:
payload = await stock_holdings_brief()
await send_holdings_brief(payload)
update_task_status(task_id, "succeeded", {"date": payload.get("date"),
"count": len(payload.get("holdings", []))})
add_log(self.agent_id, f"holdings_brief 발송: {payload.get('date')}", "info", task_id)
return {"ok": True}
except Exception as e:
update_task_status(task_id, "failed", {"error": str(e)})
add_log(self.agent_id, f"holdings_brief 실패: {e}", "error", task_id)
return {"ok": False, "message": str(e)}
async def on_command(self, command: str, params: dict) -> dict:
if command == "holdings_eod":
return await self.run_holdings_eod()
if command == "holdings_brief":
return await self.run_holdings_brief()
if command == "run_screener":
await self.on_screener_schedule()
return {"ok": True, "message": "스크리너 실행 트리거 완료"}

View File

@@ -225,3 +225,42 @@ async def send_evolution_report(eval_result: Dict[str, Any], current_base: List[
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_lotto] evolution report send failed: {e}")
# ---------- 일요 회고 브리핑 ----------
def format_sunday_review(payload: Dict[str, Any]) -> str:
"""일요 회고 브리핑 텍스트 (HTML parse_mode)."""
wa = payload.get("winner_analysis") or {}
draw_no = payload.get("draw_no") or "?"
pct = wa.get("percentile")
pct_txt = f"{pct*100:.0f}%" if pct is not None else ""
lines = [f"🔍 <b>로또 #{draw_no} 일요 회고</b>", ""]
if wa:
lines.append(f"이번 당첨조합 분석치: <b>{wa.get('score_total',0):.2f}</b> "
f"(무작위 분포 상위 {pct_txt})")
lines.append(f" 빈도 {wa.get('score_frequency',0):.2f} · 지문 {wa.get('score_fingerprint',0):.2f} "
f"· 갭 {wa.get('score_gap',0):.2f} · 공동출현 {wa.get('score_cooccur',0):.2f} "
f"· 다양성 {wa.get('score_diversity',0):.2f}")
lines.append("")
if payload.get("forward"):
lines.append("📊 <b>이번 회차 가상구매 성적</b>")
for f in payload.get("forward", []):
p = f.get("prizes") or {}
name = {"engine_w": f"엔진({f.get('label','')})", "random_null": "무작위", "coverage": "커버리지"}.get(
f.get("strategy", ""), f.get("strategy", "?"))
lines.append(f" {name}: 최고 {f.get('best_match','?')}일치 / "
f"4등 {p.get('4th', 0)} · 5등 {p.get('5th', 0)}")
else:
lines.append("📊 <b>이번 회차 가상구매 성적</b>: 데이터 없음 (아직 집계 전)")
lines.append("")
lines.append(" 무작위 대비 우위가 통계적으로 의미있을 때만 가중치가 진화합니다.")
return "\n".join(lines)
async def send_sunday_review(payload: Dict[str, Any]) -> None:
text = format_sunday_review(payload)
try:
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_lotto] sunday review send failed: {e}")

View File

@@ -0,0 +1,42 @@
"""보유종목 인텔리전스 텔레그램 포매터 (advisory)."""
import logging
from typing import Any, Dict
from ..telegram.messaging import send_raw
logger = logging.getLogger("agent-office")
_ACTION_KR = {"add": "🟢 추가매수", "hold": "⚪ 보유", "trim": "🟡 축소", "sell": "🔴 매도"}
_SEV = {"high": "🔴", "med": "🟠", "low": "🟡"}
def format_holdings_brief(payload: Dict[str, Any]) -> str:
date = payload.get("date") or "?"
lines = [f"📊 <b>보유종목 인텔리전스</b> ({date})", ""]
ph = payload.get("portfolio_health") or {}
if ph:
lines.append(f"포트 손익 {ph.get('total_pnl_rate',0):+.1f}% · "
f"종목 {ph.get('positions',0)} · 최대비중 {ph.get('max_weight',0)*100:.0f}% · "
f"현금 {ph.get('cash_ratio',0)*100:.0f}%")
lines.append("")
for h in payload.get("holdings", []):
act = _ACTION_KR.get(h.get("action"), h.get("action", "?"))
pnl = h.get("pnl_rate")
pnl_txt = f"{pnl:+.1f}%" if pnl is not None else ""
line = f"{act} <b>{h.get('name') or h.get('ticker')}</b> ({pnl_txt})"
if h.get("reasons"):
line += f"{h['reasons']}"
lines.append(line)
for iss in (h.get("issues") or [])[:3]:
lines.append(f" {_SEV.get(iss.get('severity'),'')} {iss.get('summary','')}")
lines.append("")
lines.append(" 투자 판단 보조용 제안입니다(자동매매 아님).")
return "\n".join(lines)
async def send_holdings_brief(payload: Dict[str, Any]) -> None:
text = format_holdings_brief(payload)
try:
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_stock] holdings brief send failed: {e}")

View File

@@ -22,6 +22,16 @@ async def _run_stock_ai_news():
if agent:
await agent.on_ai_news_schedule()
async def _run_stock_holdings_eod():
agent = AGENT_REGISTRY.get("stock")
if agent:
await agent.run_holdings_eod()
async def _run_stock_holdings_brief():
agent = AGENT_REGISTRY.get("stock")
if agent:
await agent.run_holdings_brief()
async def _run_insta_schedule():
agent = AGENT_REGISTRY.get("insta")
if agent:
@@ -68,6 +78,11 @@ async def _run_lotto_sync_evolver_activity():
if agent:
await agent.sync_evolver_activity()
async def _run_lotto_sunday_review():
agent = AGENT_REGISTRY.get("lotto")
if agent:
await agent.run_sunday_review()
async def _run_youtube_research():
agent = AGENT_REGISTRY.get("youtube")
if agent:
@@ -106,6 +121,8 @@ def init_scheduler():
minute=0,
id="stock_ai_news_sentiment",
)
scheduler.add_job(_run_stock_holdings_eod, "cron", day_of_week="mon-fri", hour=16, minute=50, id="stock_holdings_eod") # 16:50: 스크리너 snapshot(16:30) 완료 후 — 부분 일봉 읽기 방지
scheduler.add_job(_run_stock_holdings_brief, "cron", day_of_week="mon-fri", hour=8, minute=30, id="stock_holdings_brief")
scheduler.add_job(_run_insta_schedule, "cron", hour=9, minute=30, id="insta_pipeline")
# 외부 트렌드 수집은 장 마감 후 16:40 — 9시 주식 활발 시간대 NAS 자원 회피.
# screener(16:30)와 10분 스태거: Celeron 2C/2.0GHz 동시 실행 시 CPU 폭주 방지 (CHECK_POINT FU-A)
@@ -116,6 +133,7 @@ def init_scheduler():
scheduler.add_job(_run_lotto_deep_check, "cron", day_of_week="sun,wed", hour=21, minute=15, id="lotto_deep_check")
scheduler.add_job(_run_lotto_daily_digest, "cron", hour=9, minute=25, id="lotto_digest")
scheduler.add_job(_run_lotto_weekly_evolution_report, "cron", day_of_week="sat", hour=22, minute=15, id="lotto_evolution_weekly")
scheduler.add_job(_run_lotto_sunday_review, "cron", day_of_week="sun", hour=9, minute=0, id="lotto_sunday_review")
scheduler.add_job(
_run_lotto_sync_evolver_activity,
"cron", hour=9, minute=30,

View File

@@ -88,6 +88,29 @@ async def scrape_stock_news() -> Dict[str, Any]:
resp.raise_for_status()
return resp.json()
async def stock_holdings_run() -> Dict[str, Any]:
"""보유종목 시그널 계산 트리거 (EOD, use_llm=True).
stock BackgroundTask 등록 후 즉시 {ok, queued} 반환.
실제 계산은 stock 컨테이너 백그라운드에서 진행 — 여유있게 120s.
"""
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{STOCK_URL}/api/stock/holdings/intel/run",
params={"use_llm": True},
)
resp.raise_for_status()
return resp.json()
async def stock_holdings_brief() -> Dict[str, Any]:
"""보유종목 최신 브리핑 payload 조회 (GET, 모듈 레벨 _client 사용)."""
resp = await _client.get(f"{STOCK_URL}/api/stock/holdings/intel")
resp.raise_for_status()
return resp.json()
async def generate_music(payload: dict) -> Dict[str, Any]:
resp = await _client.post(f"{MUSIC_LAB_URL}/api/music/generate", json=payload)
resp.raise_for_status()
@@ -399,6 +422,14 @@ async def lotto_evolver_evaluate() -> Dict[str, Any]:
return resp.json()
async def lotto_backtest_review(draw_no: int) -> Dict[str, Any]:
from .config import LOTTO_BACKEND_URL
resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/backtest/review/{draw_no}")
resp.raise_for_status()
return resp.json()
from .config import AGENT_CONTAINER_MAP

View File

@@ -0,0 +1,82 @@
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from app.notifiers import telegram_stock as ts
def test_format_holdings_brief():
payload = {
"date": "2026-05-29",
"holdings": [
{"ticker": "005930", "name": "삼성전자", "action": "trim", "tech_score": 60.0,
"exit_flags": {"ma50_break": True}, "issues": [{"type":"news","severity":"high","summary":"악재"}],
"pnl_rate": 5.2, "reasons": "MA50 이탈"},
{"ticker": "000660", "name": "SK하이닉스", "action": "hold", "tech_score": 75.0,
"exit_flags": {}, "issues": [], "pnl_rate": -2.0, "reasons": "특이 신호 없음"},
],
"portfolio_health": {"positions": 2, "total_pnl_rate": 3.1, "max_weight": 0.6, "cash_ratio": 0.2},
}
txt = ts.format_holdings_brief(payload)
assert "삼성전자" in txt
assert "축소" in txt or "trim" in txt
assert "%" in txt
def test_format_holdings_brief_empty_holdings():
"""빈 holdings + None portfolio_health에도 크래시 없음."""
payload = {"date": "2026-05-29", "holdings": [], "portfolio_health": None}
txt = ts.format_holdings_brief(payload)
assert "보유종목 인텔리전스" in txt
assert "자동매매" in txt
def test_format_holdings_brief_missing_fields():
"""pnl_rate None·name None·issues None 방어적 처리."""
payload = {
"date": None,
"holdings": [
{"ticker": "005930", "name": None, "action": "sell",
"pnl_rate": None, "reasons": None, "issues": None},
],
"portfolio_health": {},
}
txt = ts.format_holdings_brief(payload)
assert "005930" in txt # ticker fallback
assert "🔴 매도" in txt
def test_format_holdings_brief_sell_action():
"""sell 액션은 🔴 매도로 표시."""
payload = {
"date": "2026-05-29",
"holdings": [
{"ticker": "000660", "name": "SK하이닉스", "action": "sell",
"pnl_rate": -12.5, "reasons": "손절선 이탈", "issues": []},
],
"portfolio_health": {"positions": 1, "total_pnl_rate": -12.5,
"max_weight": 1.0, "cash_ratio": 0.0},
}
txt = ts.format_holdings_brief(payload)
assert "🔴 매도" in txt
assert "-12.5%" in txt
def test_format_holdings_brief_issue_severity_icons():
"""이슈 심각도별 이모지 매핑 확인."""
payload = {
"date": "2026-05-29",
"holdings": [
{"ticker": "005930", "name": "삼성전자", "action": "hold", "pnl_rate": 2.0,
"reasons": "특이 신호 없음",
"issues": [
{"type": "news", "severity": "high", "summary": "심각 악재"},
{"type": "volume_surge", "severity": "med", "summary": "거래량 급증"},
{"type": "price_move", "severity": "low", "summary": "소폭 변동"},
]},
],
"portfolio_health": {},
}
txt = ts.format_holdings_brief(payload)
assert "🔴" in txt # high severity
assert "🟠" in txt # med severity
assert "🟡" in txt # low severity

View File

@@ -0,0 +1,38 @@
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from app.notifiers import telegram_lotto as tl
def test_format_sunday_review_text():
payload = {
"draw_no": 1170,
"winner_analysis": {"score_total": 0.41, "percentile": 0.33,
"score_frequency": 0.4, "score_fingerprint": 0.5, "score_gap": 0.3,
"score_cooccur": 0.45, "score_diversity": 0.6},
"forward": [
{"strategy": "engine_w", "label": "w1", "prizes": {"1st":0,"2nd":0,"3rd":0,"4th":1,"5th":12}, "best_match": 4, "avg_meta_score": 0.55},
{"strategy": "random_null", "label": "-", "prizes": {"1st":0,"2nd":0,"3rd":0,"4th":0,"5th":10}, "best_match": 3, "avg_meta_score": 0.33},
],
"track_record": {},
"calibration_trend": [{"draw_no":1170,"score_total":0.41,"percentile":0.33}],
}
txt = tl.format_sunday_review(payload)
assert "1170" in txt
assert "%" in txt # percentile 표기
assert "engine" in txt.lower() or "엔진" in txt
def test_format_sunday_review_no_calibration():
payload = {"draw_no": 1171, "winner_analysis": None, "forward": []}
txt = tl.format_sunday_review(payload)
assert "1171" in txt
assert "%" not in txt # no percentile section when calibration absent
assert "데이터 없음" in txt
def test_format_sunday_review_missing_prizes_no_crash():
payload = {"draw_no": 1171, "winner_analysis": None,
"forward": [{"strategy": "engine_w", "label": "w1", "best_match": 3}]} # no 'prizes'
txt = tl.format_sunday_review(payload) # must NOT raise
assert "1171" in txt

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,191 @@
# 로또 자가학습 백테스트 & 캘리브레이션 — 설계 Spec
- **작성일**: 2026-05-31
- **상태**: 설계 승인 (구현 plan 대기)
- **대상 서비스**: `lotto` (lotto-lab) + `agent-office` (LottoAgent) + `web-ui` (/lotto 자율학습 탭)
- **사이클**: 스마트 에이전트 고도화 3종(로또/주식/인스타) 중 **1번 로또**. 주식·인스타는 후속 사이클.
---
## 1. 배경 & 목표
사용자(CEO)는 로또 에이전트를 "분석 번호를 계속 가상구매해 시도횟수를 늘리고, 실제 당첨조합을 역분석해 스스로 학습·디벨롭하며 일요일에 회고 브리핑하는 스마트 에이전트"로 고도화하길 원한다. 명시 목표는 "로또 1등".
### ⚠️ 정직성 전제 (설계의 토대)
로또는 매 회차 균등·독립 추첨이다. C(45,6)=8,145,060 조합이 전부 동일 확률이며 회차 간 독립이다. 따라서:
- **과거 데이터(빈도·갭·공동출현)의 미래 예측력은 수학적으로 0.** 통계 분석으로 1등 확률을 올릴 수 없다.
- 고정 예산 N장으로 1등 확률을 최대화하는 유일한 방법은 **서로 다른(distinct) 조합 N개**를 사는 것이다.
이 사실을 부정하지 않고 **시스템에 내장**한다. 본 프로젝트의 가치는 "예측"이 아니라:
1. **정직한 측정** — "내 분석 엔진이 무작위를 이기는가?"를 null-model 대조군으로 매번 엄밀히 검정.
2. **자가학습 엔진 인프라** — 측정→학습→회고 루프 자체의 엔지니어링.
3. **커버리지 최적화** — 1등이 목표라면 distinct 조합 커버리지 최대화가 수학적 최적.
→ 사용자 결정(2026-05-31): **"정직한 측정 + 커버리지 최적"** 프레이밍 채택. 패턴 학습은 계속하되 모든 백테스트에 null-model 베이스라인을 내장한다.
### 기존 자산 (100% 재활용, 신규 ML 없음)
- `analyzer.build_analysis_cache(draws)` / `score_combination(numbers, cache, weights)` — 임의 조합의 5개 sub-score + 종합점수(0~1) = **"분석치"**.
- `analyzer.build_number_weights` + `utils.weighted_sample_6` — 가중 후보 생성.
- `generator.run_simulation` — 20k 후보를 `score_combination(·, active_weights)`로 랭킹→best_picks. **W가 선택을 바꾸는 경로가 이미 존재.**
- `weight_evolver` — 토 22:00 주간 6 가중치 후보 채점→base 갱신.
### 발견된 잠재 결함 (본 작업으로 수정)
`weight_evolver.apply_today_and_pick``recommend_numbers(draws)`(W 미사용)로 픽을 뽑은 뒤 W로 점수만 매긴다. 즉 **현재 daily 픽은 W와 무관**하고, evolver가 평가하는 매칭 결과도 W-독립이라 가중치 진화가 픽 품질에 연결돼 있지 않다. → forward 가상구매를 **시뮬레이션 선택 경로(풀 생성→W 랭킹→상위 K 구매)**로 구현하면 W가 결과를 실제로 바꿔 가중치 학습이 비로소 의미를 갖고 이 결함도 해소된다.
---
## 2. 핵심 개념 — Self-Learning Backtest Loop
세 축으로 구성:
### 축 A — Forward 가상구매 (매주, 회차당 수천 장)
매 회차 추첨 후, 각 전략별로 대량 후보를 생성·랭킹해 상위 K장을 "구매"로 간주 → 실제 당첨번호로 채점 → **회차별 집계 1행만 영구 저장**. 개별 티켓 미저장.
- 전략: `engine_w`(6개 trial 가중치 각각) / `random_null`(무작위 대조군) / `coverage`(distinct 최대화).
- 이 매칭 결과가 evolver의 학습 신호가 된다.
### 축 B — Winner 캘리브레이션 (역대 전체 백필 + 매주 증분)
각 회차의 **실제 당첨조합을 그 시점 이전 데이터로 만든 캐시(point-in-time)에 넣어** 5개 분석치 + 종합점수 + percentile을 기록.
- percentile = 당첨조합 score_total이 그 시점 무작위 M개 표본 분포에서 차지하는 위치.
- "내 엔진이 실제 당첨번호에 높은 점수를 주는가?"의 가장 정직한 신호. 당첨조합이 일관되게 낮은 percentile이면 엔진은 헛다리.
### 축 C — 일요일 회고 브리핑
토 추첨(20:45)→동기화(21:10)→기존 evolver 리포트(토 22:15) 이후, **일 09:00**에 차분히 회고. 이번 회차 forward 성적 + 당첨조합 역분석 + 내 추천과 비교 + 캘리브레이션 추세 + 가중치 진화를 텔레그램 1통 + UI.
---
## 3. 데이터 모델 (lotto.db 신규)
집계 전용 — row 수 ≈ 회차 × 전략 (수천 규모, 무시 가능).
### `backtest_runs` — forward 가상구매 집계
```
id INTEGER PK
draw_no INTEGER NOT NULL -- 채점 대상(당첨 확정된) 회차
strategy TEXT NOT NULL -- 'engine_w' | 'random_null' | 'coverage'
weight_label TEXT NOT NULL -- engine_w는 trial day_of_week('w0'..'w5'), 그 외 '-'
weight_json TEXT -- 사용한 W (random/coverage는 NULL)
trial_id INTEGER -- FK weight_trials (engine_w만, nullable)
n_tickets INTEGER NOT NULL -- 구매(채점) 장수
m3 INTEGER NOT NULL DEFAULT 0 -- 3개 일치 장수
m4 INTEGER NOT NULL DEFAULT 0
m5 INTEGER NOT NULL DEFAULT 0
m6 INTEGER NOT NULL DEFAULT 0
bonus_hits INTEGER NOT NULL DEFAULT 0 -- 5+보너스(2등) 장수
best_match INTEGER NOT NULL DEFAULT 0
avg_meta_score REAL -- 구매 티켓 평균 분석치
created_at TEXT NOT NULL
UNIQUE(draw_no, strategy, weight_label) -- 멱등
```
- 등수 매핑: 1등=m6, 2등=bonus_hits, 3등=m5bonus_hits, 4등=m4, 5등=m3.
### `winner_calibration` — 회차별 당첨조합 역분석
```
draw_no INTEGER PK -- 멱등
winning_json TEXT NOT NULL -- [n1..n6] (보너스 별도 보관 안 함)
score_total REAL NOT NULL
score_frequency REAL NOT NULL
score_fingerprint REAL NOT NULL
score_gap REAL NOT NULL
score_cooccur REAL NOT NULL
score_diversity REAL NOT NULL
percentile REAL -- 0~1, 무작위 M표본 대비 당첨조합 점수 위치
my_pick_avg REAL -- 그 회차 engine 추천 평균 분석치(있으면)
cache_draws INTEGER NOT NULL -- point-in-time 캐시에 쓰인 회차 수
created_at TEXT NOT NULL
```
> 누적 성적표(track record)는 `backtest_runs` SUM 집계로 on-the-fly 계산 — 별도 테이블 불필요.
---
## 4. 컴포넌트
### 4.1 lotto-lab `app/backtest.py` (순수 연산 — FastAPI 의존성 0, Windows 이전 대비)
- `generate_pool(cache, number_weights, n) -> list[tuple]``weighted_sample_6` 반복으로 distinct 후보 풀.
- `purchase_tickets(pool, cache, W, k) -> list[dict]` — 풀을 `score_combination(·, W)`로 랭킹→상위 k장 distinct.
- `coverage_select(pool, k) -> list` — distinct 보장 상위 커버리지(초기엔 단순 distinct, 휠링은 향후).
- `grade_tickets(tickets, winning6, bonus) -> dict` — 매칭 히스토그램 + 등수 카운트 + best_match + avg_meta. `bonus`는 draws 레코드에서 가져옴(2등=5일치+보너스 판정용).
- `run_forward_purchase(draw_no, k=5000, pool_n=20000) -> dict` — engine(6 W)+random_null+coverage 각각 **전략당 k=5000장(수천 장)** 구매·채점·`backtest_runs` 저장(멱등). 풀 pool_n=20000에서 랭킹.
- `calibrate_winner(draw_no, sample_m=2000) -> dict``draws[:idx]`(대상 회차 제외) 캐시로 당첨조합 채점 + 무작위 sample_m 표본 percentile → `winner_calibration` 저장(멱등).
- `backfill_calibration(batch=50) -> dict` — 미처리 회차만 청크 처리, 재개 가능.
- `build_review_payload(draw_no) -> dict` — 회고 브리핑용 조립(당첨조합 분해 + 내 추천 비교 + forward 성적 + 캘리브레이션 추세 + 진화 결과).
### 4.2 lotto-lab `app/routers/backtest.py`
| 메서드 | 경로 | 설명 |
|--------|------|------|
| GET | `/api/lotto/backtest/track-record` | 누적 성적표(전략별 등수 카운트, engine vs random) |
| GET | `/api/lotto/backtest/calibration?weeks=N` | 캘리브레이션 이력 + 추세 |
| GET | `/api/lotto/backtest/review/{draw_no}` | 회고 payload |
| POST | `/api/lotto/backtest/run-forward?draw_no=` | forward 수동 트리거 |
| POST | `/api/lotto/backtest/backfill` | 캘리브레이션 백필(백그라운드) |
### 4.3 weight_evolver 업그레이드
- `evaluate_weekly`: 학습 신호를 N=5(W-무관)에서 **forward 가상구매(engine_w 6전략) + null-model 대비 lift**로 승격.
- lift = engine_w 등수 점수 random_null 등수 점수(동일 회차).
- 승자 = lift 최대 trial. **모든 W의 lift가 노이즈 범위(±ε) 내면 base `unchanged`** → 노이즈 과적합 방지.
- `decide_base_update` 규칙은 유지하되 입력(winner)을 backtest 기반으로 교체.
- 기존 `auto_picks` 경로는 하위호환·일일 활동표시용으로 유지(evolver 결정에는 미사용).
---
## 5. 플로우
1. **캘리브레이션 백필 (1회)**: `POST /backtest/backfill` → 백그라운드 청크(50회차/배치, 멱등 재개). 이후 회차마다 증분.
2. **주간 forward**: 당첨번호 동기화 직후 `run_forward_purchase(latest)`. 참고: 6 W × 20k 풀은 기존 시뮬이 **하루 6회** 돌리는 부하보다 가벼움 → NAS 부담 작음.
3. **일 09:00 회고 (agent-office 신규 cron)**: `LottoAgent.run_sunday_review()` → forward+calibration 보장 → `GET /backtest/review/{latest}` → 텔레그램 1통.
4. **evolver (토 22:00, 기존 cron)**: backtest 집계를 학습 신호로 소비.
### Windows 이전 경로 (NAS 부하 측정 후 필요시)
`backtest.py`가 순수 함수라, lotto-lab은 system-of-record 유지 + 무거운 연산만 Windows WSL docker 워커에 위임(`/api/internal/lotto/*` webhook, 기존 music/video/image 워커 패턴 재활용) + agent 폴링. 코드 경계가 깨끗해 마이그레이션 비용 최소. **초기 구현은 NAS-first**, 측정 후 결정.
---
## 6. 출력
### 6.1 텔레그램 (일 09:00, `notifiers/telegram_lotto.py` 신규 섹션)
이번 당첨조합 5분석치 분해 + 내 추천 평균과 비교 + 이번주 forward 성적(등수 카운트, **무작위 대비 lift**) + 캘리브레이션 percentile 추세 + 가중치 진화 결과.
### 6.2 web-ui `/lotto` "자율 학습" 탭 확장 (`.lotto-evolver-*` 다크 네임스페이스 재활용)
- **TrackRecordCard**: 누적 "매주 전략당 5,000장 샀다면" 등수 — engine vs random_null 나란히 + 총지출 대비 당첨금(정직하게 적자 표시).
- **CalibrationChart**: 당첨조합 score_total 추세 + 내 추천 평균 오버레이 + percentile 밴드 → "우위 없음"을 시각화.
- **WinnerAnalysisCard**: 이번 회차 당첨조합 5분석치 레이더 + 내 추천 비교.
---
## 7. 에러·성능·멱등
- **멱등성**: `winner_calibration` UNIQUE(draw_no), `backtest_runs` UNIQUE(draw_no,strategy,weight_label) → 재실행 skip.
- **NAS 성능**: 주간 forward는 기존 시뮬보다 가벼움. 백필만 1회 무거움(≈1100 point-in-time 캐시 재구성) → 청크+백그라운드+멱등 재개. 야간/유휴 트리거 권장.
- **텔레그램 실패**: 로그만 남기고 job은 성공 처리(기존 패턴). 회고 데이터는 이미 DB에 있어 UI는 영향 없음.
## 8. 테스트 전략
- 등수 매핑(m3~m6/bonus → 1~5등) 단위 테스트.
- null-model 기대값 + lift 계산.
- percentile 계산 정확성.
- **point-in-time 캐시가 대상 회차를 제외하는지** (calibrate_winner 정직성 핵심).
- 멱등 백필(재실행 시 중복 row 없음, 중단 후 재개).
- evolver의 lift-over-random 승자 선택 + ε-게이팅(노이즈 시 unchanged).
- 기존 `count_match`/`calc_pick_score` 테스트 유지.
## 9. 리스크 & 완화
| 리스크 | 완화 |
|--------|------|
| 무작위성 → 실제 우위 없음 | null-model 정직 프레이밍, 우위 없음을 데이터로 보고하는 게 목표 |
| Celeron 백필 부하 | 청크+1회성+멱등 재개, 필요시 Windows 이전 |
| evolver 노이즈 추종 | lift-over-random + ε-게이팅으로 unchanged 처리 |
| DB 증가 | 집계 전용, row 수 무시 가능 |
| forward 풀 중복으로 커버리지 손실 | distinct 강제 + coverage 전략 별도 측정 |
## 10. 결정 로그 (2026-05-31 brainstorming)
1. 3종 중 **로또 먼저**, 주식·인스타는 후속 사이클.
2. 회고 브리핑 = **토 추첨 직후 일 09:00**.
3. 시도 규모 = **수천 장/회차 + 집계만 저장**.
4. 자율성 = **가중치 자동튜닝 강화**(산식 구조 고정).
5. 백테스트 범위 = **캘리브레이션 전체 백필 + 가상구매 forward**.
6. 출력 = **텔레그램 + 기존 자율학습 탭 확장**.
7. 프레이밍 = **정직한 측정(null-model) + 커버리지 최적**.
8. 연산 위치 = **NAS-first, 필요시 Windows WSL 이전**.
## 11. 스코프 밖 / 향후
- 주식 에이전트(보유종목 집중 분석+차트 매수/매도 시그널), 인스타 에이전트(자율 카드 발급) — 별도 사이클.
- 휠링/커버링 디자인(하위 등수 최소 보장) — coverage 전략 고도화로 향후.
- Windows WSL 워커 분리 — NAS 부하 측정 후.

View File

@@ -0,0 +1,122 @@
# 주식 보유종목 인텔리전스 — 설계 Spec
- **작성일**: 2026-05-31
- **상태**: 설계 승인 (구현 plan 대기)
- **대상 서비스**: `stock` + `agent-office`(StockAgent) + `web-ui`(stock/포트폴리오 페이지)
- **사이클**: 스마트 에이전트 고도화 3종 중 **2번 주식**. (1번 로또 완료, 3번 인스타 후속)
---
## 1. 배경 & 목표
현재 StockAgent는 아침 뉴스 요약(07:30) · KRX 강세주 스크리너(16:30) · AI 뉴스 sentiment(08:00)를 브리핑한다. CEO는 여기서 더 나아가 **내 보유종목을 집중 분석**해 ①종목별 매수/매도 자세 ②이슈 정리 ③포트폴리오 건강을 매일 advisory로 브리핑받길 원한다.
### 핵심 결정 (2026-05-31 brainstorming)
1. **실행 수준 = 브리핑 전용(advisory)**. `/api/trade/order`(KIS 실주문) 미사용. 매수/매도는 "제안"만, 실제 주문은 사용자 수동. (로또와 동일한 정직·관찰 철학)
2. **분석 주기 = 일봉 EOD + 장중 경량 가드**. 장마감 후 일봉으로 기술분석 → 다음날 아침 브리핑. 장중엔 현재가로 손절·급변(±N%)만 경도 알림. 인트라데이 분봉 파이프라인 신설 안 함.
3. **브리핑 범위 = 보유종목 + 포트 레벨**. 종목별 액션 + 포트폴리오 건강(집중도·비중·현금·손익).
4. **이슈 소스 = 기존 뉴스+감성+LLM 요약 + 급변·거래량·외인수급 이벤트**. 신규 스크래핑 0 (DART·실적 일정 제외).
### 기존 자산 (100% 재활용, 신규 ML/데이터소스 없음)
- `stock/app/screener/snapshot.py``krx_daily_prices`(일봉 OHLCV) + `krx_master`(listing) + naver 외인 flow. 스크리너 잡(평일 16:30)이 갱신.
- `stock/app/screener/engine.py` + `nodes/`(ma_alignment·momentum·rs_rating·vcp_lite·volume_surge·foreign_buy·high52w·hygiene). **`ScreenContext.restrict(tickers)`** + `latest_close()`/`latest_high()`로 보유종목 한정 분석 가능.
- `portfolio` 테이블(broker·ticker·name·quantity·avg_price·purchase_price) + `/api/portfolio`(현재가·손익 계산) + `broker_cash`(예수금).
- `price_fetcher`(현재가 3분 TTL) · `news_sentiment` 테이블(종목별 감성) · `ai_summarizer`(Claude Haiku).
### 알려진 제약 (설계 반영)
- **섹터 필드 없음**: `portfolio`·`krx_master`에 sector 없음 → 섹터 편중은 best-effort(FDR `StockListing`의 Sector/Industry가 있으면 사용, 없으면 생략)이고, **시장(KOSPI/KOSDAQ)·종목 비중 집중도**를 기본 지표로 사용.
- **KRX 외 종목**(미국주 등): krx_daily_prices 밖 → 기술분석 불가, **뉴스·현재가·손익만** graceful 처리.
- **snapshot 히스토리 의존**: MA200·52주 고점 노드는 ~1년 일봉 필요. 스크리너가 이미 이 노드들을 쓰므로 윈도우는 충족 가정(plan에서 lookback 확인 단계 포함).
---
## 2. 데이터 모델 & 컴포넌트
### 신규 테이블 `holdings_signals` (stock.db, 일별 종목 시그널 이력)
```
date TEXT NOT NULL -- KST 거래일
ticker TEXT NOT NULL
name TEXT
action TEXT NOT NULL -- 'add' | 'hold' | 'trim' | 'sell'
tech_score REAL -- 매수강도(score 노드 가중합, 0~1 정규화)
exit_flags TEXT NOT NULL DEFAULT '{}' -- JSON {stop_loss,ma50_break,ma200_break,momentum_loss,take_profit,climax}
issues TEXT NOT NULL DEFAULT '[]' -- JSON [{type, severity, summary}]
close INTEGER
pnl_rate REAL -- 평단 대비 % (스냅샷 시점)
reasons TEXT -- 액션 근거 텍스트
created_at TEXT NOT NULL DEFAULT (datetime('now'))
PRIMARY KEY(date, ticker) -- 멱등 upsert
```
> 추세/이력은 이 테이블에서 조회. 포트 레벨 요약은 on-the-fly 계산(별도 테이블 불필요).
### 신규 `stock/app/holdings_intel.py` (순수연산 중심, FastAPI 의존성 최소)
- `get_holdings() -> list[dict]``portfolio` 행 + 현재가(price_fetcher) + pnl_rate. KRX 여부 플래그(`is_krx`).
- `technical_posture(ctx_restricted, tickers) -> dict[ticker, score]``ScreenContext.restrict(tickers)`에 score 노드 실행 → 매수강도.
- `exit_rules(holding, prices_df, params) -> dict`**신규**: 손절·MA이탈·모멘텀소멸·익절·클라이맥스 flag 산출 (§3).
- `decide_action(tech_score, exit_flags, pnl) -> (action, reasons)`**신규**: 매수강도+exit 조합 → add/hold/trim/sell + 근거.
- `market_events(prices_df, flow, params) -> dict[ticker, list]` — 급변(±N%)·거래량 Z-score·외인 순매도.
- `news_issues(tickers) -> dict[ticker, list]` — news+news_sentiment 필터 → Claude Haiku 악재·심각도 요약(악재 있는 종목만).
- `portfolio_health(holdings, cash) -> dict` — 종목 비중 집중도(HHI/최대비중)·시장 mix·현금 비중·총 손익.
- `compute_and_store(asof) -> dict` — 위를 조합해 holdings_signals upsert (멱등).
- `build_holdings_brief(asof) -> dict` — 브리핑/UI payload 조립(종목별 action+issues + portfolio_health + 추세).
### API (stock)
| 메서드 | 경로 | 설명 |
|--------|------|------|
| GET | `/api/stock/holdings/intel` | 최신 브리핑 payload |
| GET | `/api/stock/holdings/intel/history?ticker=&days=` | 종목 시그널 추세 |
| POST | `/api/stock/holdings/intel/run` | 수동 계산 트리거(BackgroundTask) |
---
## 3. 매도/리스크 룰 & 이슈 (설정 가능 임계값 — 기본값 제시)
### exit_flags (각 boolean + 값)
- **stop_loss**: `current < avg_price × (1 STOP_PCT)` (기본 STOP_PCT=0.08, Minervini식)
- **ma50_break / ma200_break**: 종가 < MA50 / MA200
- **momentum_loss**: momentum/RS 노드 점수가 직전 대비 임계 하락 (or 음전환)
- **take_profit**: `pnl_rate ≥ TAKE_PCT` (기본 25%) — 부분 익절 후보
- **climax**: 거래량 급증(vol > avg×CLIMAX_VOL_X) + 종가 상단 꼬리 (분산 의심)
### decide_action 매트릭스
- tech_score 高 + exit_flags 無 → **add**(추가매수 후보)
- exit_flags 無 (강건) → **hold**
- ma50_break 또는 momentum_loss 또는 take_profit → **trim**(일부 축소)
- stop_loss 또는 ma200_break → **sell**(청산 후보)
- 각 결정에 trigger된 flag를 근거 텍스트로 동봉. (advisory — "제안")
### issues
- **시장이벤트** (기존 데이터): 일봉 ±EVENT_PCT% 급변 / 거래량 Z-score>임계 / naver flow 외인 순매도 N일 연속.
- **뉴스이슈**: 보유종목 최근 뉴스 + news_sentiment 음수 → Claude Haiku로 `{type, severity(low/med/high), summary}` 요약. 악재 있는 종목만 호출(비용 bounded).
---
## 4. 플로우 · 에이전트 · UI
1. **EOD 계산 (평일 16:40)**: 기존 스크리너/뉴스 잡과 동일하게 **agent-office cron이 orchestrate**`_run_stock_holdings_eod()``StockAgent.run_holdings_eod()` → stock `POST /api/stock/holdings/intel/run``holdings_intel.compute_and_store(today)` → holdings_signals upsert. 스크리너 snapshot 갱신(16:30) 직후라 일봉 준비됨.
2. **아침 브리핑 (평일 08:30, agent-office StockAgent.run_holdings_brief)**: 저장된 최신 시그널 + 야간 갭(현재가) → 텔레그램 1통(종목별 액션 + 포트 건강 + 상위 이슈). AI 뉴스(08:00) 다음 슬롯.
3. **장중 경량 가드 (평일 09:00~15:30, 30분 간격)**: 현재가로 손절선 이탈·급변(±N%)만 점검 → 발생 시 텔레그램 alert. throttle(종목·유형별 재발화 억제) + daily cap (로또 시그널 패턴 재활용).
4. **agent-office**: `service_proxy`에 holdings intel 호출 추가 + StockAgent 메서드(run_holdings_brief / intraday_guard) + scheduler cron.
5. **UI (web-ui)**: stock/포트폴리오 페이지에 **"보유종목 인텔리전스" 탭/섹션 통합** — 종목별 액션 카드(자세·exit flags·근거) + 포트 건강 위젯 + 이슈 피드 + 종목 시그널 추세(history).
---
## 5. 에러·성능·테스트·리스크
- **멱등성**: holdings_signals PRIMARY KEY(date,ticker) upsert → 재계산 안전.
- **성능 (NAS Celeron)**: 보유종목만 restrict(소수 종목)이라 전체 스크리너 대비 매우 가벼움. LLM 이슈 요약은 악재 종목만(bounded). EOD 1회 + 장중 가드는 현재가만(경량).
- **graceful degrade**: price_fetcher/KIS/news 실패 시 부분 데이터로 진행 + 경고 로그. KRX 외 종목은 기술분석 skip(뉴스·손익만). 텔레그램 실패는 로그만(job 성공 유지).
- **테스트**: exit_rules 각 flag, decide_action 매트릭스 전 분기, market_events 검출, portfolio_health 계산, holdings_signals 멱등, KRX 외 종목 graceful, 뉴스 0건 경로.
- **리스크**: ①기술적 시그널은 휴리스틱이지 보장 아님 → advisory 프레이밍·자동매매 없음 ②섹터 데이터 갭 → 시장·비중 집중도로 대체 ③snapshot 히스토리 의존 → plan에 lookback 확인 ④보유종목 출처는 portfolio 테이블(사용자/KIS 동기화) — 누락 시 빈 브리핑 graceful.
---
## 6. 결정 로그 (2026-05-31)
1. 실행 수준 = **advisory 전용** (KIS 실주문 미사용)
2. 주기 = **일봉 EOD + 장중 경량 가드**
3. 범위 = **보유종목 + 포트 레벨**
4. 이슈 소스 = **기존 뉴스+감성+LLM + 급변·거래량·외인 이벤트**
## 7. 스코프 밖 / 향후
- 자동매매(승인후/완전자동), 인트라데이 분봉, DART 공시·실적 일정, 신규 매수후보 발굴(기존 16:30 스크리너가 담당), 교체(rotation) 제안 — 향후 사이클.
- 인스타 에이전트(자율 카드 발급) — 다음 사이클.

272
lotto/app/backtest.py Normal file
View File

@@ -0,0 +1,272 @@
"""로또 자가학습 백테스트 — 순수 연산 (FastAPI 의존성 0, Windows 이전 대비)."""
import logging
import random
from typing import Any, Dict, List, Optional, Tuple
from .analyzer import build_analysis_cache, build_number_weights, score_combination
from .utils import weighted_sample_6
# engine_w trials 수와 동일하게 맞춰 selection bias를 상쇄한다.
N_NULL_TRIALS = 6
def grade_tickets(tickets: List[List[int]], winning6: List[int], bonus: int) -> Dict[str, Any]:
"""티켓 묶음을 당첨번호로 채점 → 매칭 히스토그램 + 보너스 + best_match.
2등 판정: 5일치 AND 보너스 번호를 티켓이 포함."""
win = set(winning6)
hist = {"m3": 0, "m4": 0, "m5": 0, "m6": 0, "bonus_hits": 0}
best = 0
for t in tickets:
c = len(set(t) & win)
if c > best:
best = c
if c == 6:
hist["m6"] += 1
elif c == 5:
hist["m5"] += 1
if bonus in t:
hist["bonus_hits"] += 1
elif c == 4:
hist["m4"] += 1
elif c == 3:
hist["m3"] += 1
return {**hist, "best_match": best}
def prize_counts(hist: Dict[str, Any]) -> Dict[str, int]:
"""매칭 히스토그램 → 등수 카운트.
1등=m6, 2등=bonus_hits, 3등=m5bonus_hits, 4등=m4, 5등=m3."""
return {
"1st": hist.get("m6", 0),
"2nd": hist.get("bonus_hits", 0),
"3rd": hist.get("m5", 0) - hist.get("bonus_hits", 0),
"4th": hist.get("m4", 0),
"5th": hist.get("m3", 0),
}
def generate_pool(cache, number_weights, n: int = 20000,
seed: Optional[int] = None) -> List[List[int]]:
"""가중 샘플링으로 distinct 후보 풀 생성."""
rng = random.Random(seed)
seen, pool = set(), []
attempts, cap = 0, n * 4
while len(pool) < n and attempts < cap:
attempts += 1
nums = tuple(sorted(weighted_sample_6(number_weights)))
if nums in seen:
continue
seen.add(nums)
pool.append(list(nums))
if len(pool) < n:
logging.getLogger(__name__).warning(
"generate_pool: requested %d, got %d", n, len(pool)
)
return pool
def purchase_tickets(pool, cache, W: List[float], k: int) -> List[List[int]]:
"""풀을 score_combination(·, W)로 랭킹 → 상위 k장 distinct."""
if k > len(pool):
raise ValueError(f"k={k} exceeds pool size {len(pool)}")
ranked = sorted(pool, key=lambda t: -score_combination(t, cache, W)["score_total"])
return ranked[:k]
def random_null_tickets(k: int, seed: Optional[int] = None) -> List[List[int]]:
"""무작위 distinct 티켓 k장 (null-model 대조군)."""
rng = random.Random(seed)
seen, out = set(), []
guard = 0
while len(out) < k and guard < k * 200:
guard += 1
nums = tuple(sorted(rng.sample(range(1, 46), 6)))
if nums in seen:
continue
seen.add(nums)
out.append(list(nums))
return out
def point_in_time_draws(draws: List[Tuple[int, List[int]]],
target_draw_no: int) -> List[Tuple[int, List[int]]]:
"""target 회차 추첨 '직전' 시점의 데이터 — target_draw_no 미만만."""
return [(d, nums) for d, nums in draws if d < target_draw_no]
def calibrate_winner_compute(draws, target_draw_no, winning6,
sample_m: int = 2000, seed: Optional[int] = None) -> Dict[str, Any]:
"""순수 연산: point-in-time 캐시로 당첨조합 채점 + 무작위 M표본 percentile."""
pit = point_in_time_draws(draws, target_draw_no)
cache = build_analysis_cache(pit)
scores = score_combination(sorted(winning6), cache)
win_total = scores["score_total"]
samples = random_null_tickets(sample_m, seed=seed)
le = sum(1 for t in samples
if score_combination(t, cache)["score_total"] <= win_total)
percentile = le / max(len(samples), 1)
return {"scores": scores, "percentile": percentile, "cache_draws": len(pit)}
MIN_HISTORY = 30 # point-in-time 캐시 최소 회차 (이 미만은 캘리브레이션 skip)
def _db():
from . import db as _db_mod
return _db_mod
def calibrate_winner(draw_no: int, sample_m: int = 2000, draws=None) -> Dict[str, Any]:
"""DB 진입점: 회차 1개 캘리브레이션 후 저장 (멱등).
draws를 외부에서 전달하면 N+1 조회를 방지한다."""
db = _db()
if draws is None:
draws = db.get_all_draw_numbers()
row = db.get_draw(draw_no)
if row is None:
return {"ok": False, "reason": "no_draw"}
pit = point_in_time_draws(draws, draw_no)
if len(pit) < MIN_HISTORY:
return {"ok": False, "reason": "insufficient_history"}
winning6 = [row["n1"], row["n2"], row["n3"], row["n4"], row["n5"], row["n6"]]
res = calibrate_winner_compute(draws, draw_no, winning6, sample_m=sample_m)
db.save_winner_calibration(
draw_no=draw_no, winning=winning6, scores=res["scores"],
percentile=res["percentile"], my_pick_avg=None,
cache_draws=res["cache_draws"],
)
return {"ok": True, "draw_no": draw_no, **res}
def backfill_calibration(batch: int = 50, sample_m: int = 2000) -> Dict[str, Any]:
"""미처리 회차만 batch개 캘리브레이션 (멱등·재개 가능)."""
db = _db()
draws = db.get_all_draw_numbers()
done = db.get_calibrated_draw_nos()
todo = [d for d, _ in draws if d not in done and d > MIN_HISTORY]
todo.sort()
n = 0
for draw_no in todo[:batch]:
r = calibrate_winner(draw_no, sample_m=sample_m, draws=draws)
if r.get("ok"):
n += 1
return {"calibrated": n, "remaining": max(0, len(todo) - batch)}
def run_forward_purchase(draw_no: int, k: int = 5000, pool_n: int = 20000,
sample_seed: Optional[int] = None) -> Dict[str, Any]:
"""회차 추첨 '직전' 시점 데이터로 3전략 구매 → 당첨번호로 채점 → 저장(멱등).
engine_w: 그 주 weight_trials 6개(없으면 current_base 1개)로 각각 구매."""
db = _db()
draws = db.get_all_draw_numbers()
row = db.get_draw(draw_no)
if row is None:
return {"ok": False, "reason": "no_draw"}
pit = point_in_time_draws(draws, draw_no)
if len(pit) < MIN_HISTORY:
return {"ok": False, "reason": "insufficient_history"}
winning6 = [row["n1"], row["n2"], row["n3"], row["n4"], row["n5"], row["n6"]]
bonus = row["bonus"]
cache = build_analysis_cache(pit)
nw = build_number_weights(cache)
pool = generate_pool(cache, nw, n=pool_n, seed=sample_seed)
def _store(strategy, label, weight_json, trial_id, tickets):
graded = grade_tickets(tickets, winning6, bonus)
avg_meta = (sum(score_combination(t, cache)["score_total"] for t in tickets)
/ max(len(tickets), 1))
db.save_backtest_run(
draw_no=draw_no, strategy=strategy, weight_label=label,
weight_json=weight_json, trial_id=trial_id, n_tickets=len(tickets),
hist=graded, best_match=graded["best_match"], avg_meta_score=avg_meta,
)
# 1) engine_w — 그 주 trials(있으면) 아니면 uniform fallback (leak-free)
from datetime import date as _date
from . import weight_evolver as we
draw_date = _date.fromisoformat(row["drw_date"])
week_start = we.get_week_start(draw_date)
trials = db.get_weekly_trials(week_start)
if trials:
for t in trials:
bought = purchase_tickets(pool, cache, t["weight"], k)
_store("engine_w", f"w{t['day_of_week']}", t["weight"], t["id"], bought)
else:
base = [0.2] * 5
bought = purchase_tickets(pool, cache, base, k)
_store("engine_w", "base", base, None, bought)
# 2) random_null — N_NULL_TRIALS 개 (engine_w 수와 동일해 selection bias 상쇄)
for _i in range(N_NULL_TRIALS):
seed_i = None if sample_seed is None else sample_seed + 100 + _i
_store("random_null", f"r{_i}", None, None, random_null_tickets(k, seed=seed_i))
# 3) coverage
_store("coverage", "-", None, None, coverage_tickets(k, seed=sample_seed))
return {"ok": True, "draw_no": draw_no}
def track_record() -> Dict[str, Any]:
"""전략별 누적 등수 집계 (engine_w는 라벨 합산)."""
db = _db()
rows = db.get_backtest_runs()
agg: Dict[str, Dict[str, int]] = {}
draw_sets: Dict[str, set] = {}
for r in rows:
a = agg.setdefault(r["strategy"], {
"n_tickets": 0, "1st": 0, "2nd": 0, "3rd": 0, "4th": 0, "5th": 0, "draws": 0})
p = prize_counts(r)
a["n_tickets"] += r["n_tickets"]
for tier in ("1st", "2nd", "3rd", "4th", "5th"):
a[tier] += p[tier]
draw_sets.setdefault(r["strategy"], set()).add(r["draw_no"])
for strat, s in draw_sets.items():
agg[strat]["draws"] = len(s)
return {"by_strategy": agg}
def build_review_payload(draw_no: int) -> Dict[str, Any]:
"""일요 회고 브리핑용 조립."""
db = _db()
cal = db.get_winner_calibration(draw_no)
runs = db.get_backtest_runs(draw_no=draw_no)
hist = db.get_calibration_history(limit=12)
forward = []
for r in runs:
forward.append({"strategy": r["strategy"], "label": r["weight_label"],
"prizes": prize_counts(r), "best_match": r["best_match"],
"avg_meta_score": r["avg_meta_score"]})
return {
"draw_no": draw_no,
"winner_analysis": cal, # score_* + percentile
"forward": forward,
"track_record": track_record()["by_strategy"],
"calibration_trend": [
{"draw_no": h["draw_no"], "score_total": h["score_total"],
"percentile": h["percentile"]} for h in hist
],
}
def coverage_tickets(k: int, seed: Optional[int] = None) -> List[List[int]]:
"""greedy 커버리지 — 아직 덜 쓰인 번호를 우선 배치해 번호를 넓게 분산.
(휠링/보장설계는 향후. 현재는 distinct + 번호 사용 균등화)"""
rng = random.Random(seed)
usage = {n: 0 for n in range(1, 46)}
seen, out = set(), []
guard = 0
while len(out) < k and guard < k * 50:
guard += 1
ranked = sorted(range(1, 46), key=lambda n: (usage[n], rng.random()))
nums = tuple(sorted(ranked[:6]))
if nums in seen:
# 동점 흔들기: top-6과 disjoint한 영역에서 샘플
nums = tuple(sorted(rng.sample(ranked[6:12], 6)))
if nums in seen:
continue
seen.add(nums)
out.append(list(nums))
for n in nums:
usage[n] += 1
return out

View File

@@ -125,6 +125,48 @@ def init_db() -> None:
"ON simulation_candidates(is_best, score_total DESC);"
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS backtest_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
draw_no INTEGER NOT NULL,
strategy TEXT NOT NULL,
weight_label TEXT NOT NULL DEFAULT '-',
weight_json TEXT,
trial_id INTEGER,
n_tickets INTEGER NOT NULL,
m3 INTEGER NOT NULL DEFAULT 0,
m4 INTEGER NOT NULL DEFAULT 0,
m5 INTEGER NOT NULL DEFAULT 0,
m6 INTEGER NOT NULL DEFAULT 0,
bonus_hits INTEGER NOT NULL DEFAULT 0,
best_match INTEGER NOT NULL DEFAULT 0,
avg_meta_score REAL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
"""
)
conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS uq_backtest_run "
"ON backtest_runs(draw_no, strategy, weight_label);")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS winner_calibration (
draw_no INTEGER PRIMARY KEY,
winning_json TEXT NOT NULL,
score_total REAL NOT NULL,
score_frequency REAL NOT NULL,
score_fingerprint REAL NOT NULL,
score_gap REAL NOT NULL,
score_cooccur REAL NOT NULL,
score_diversity REAL NOT NULL,
percentile REAL,
my_pick_avg REAL,
cache_draws INTEGER NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS best_picks (
@@ -1443,3 +1485,79 @@ def get_base_history(limit: int = 12) -> List[Dict[str, Any]]:
out.append(d)
return out
# ── backtest_runs / winner_calibration CRUD ───────────────────────────────────
def save_backtest_run(draw_no, strategy, weight_label, weight_json, trial_id,
n_tickets, hist, best_match, avg_meta_score) -> None:
with _conn() as conn:
conn.execute(
"""
INSERT INTO backtest_runs
(draw_no, strategy, weight_label, weight_json, trial_id, n_tickets,
m3, m4, m5, m6, bonus_hits, best_match, avg_meta_score)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)
ON CONFLICT(draw_no, strategy, weight_label) DO UPDATE SET
weight_json=excluded.weight_json, trial_id=excluded.trial_id,
n_tickets=excluded.n_tickets, m3=excluded.m3, m4=excluded.m4,
m5=excluded.m5, m6=excluded.m6, bonus_hits=excluded.bonus_hits,
best_match=excluded.best_match, avg_meta_score=excluded.avg_meta_score
""",
(draw_no, strategy, weight_label,
# weight_json must be a dict/list (not a pre-serialized string) to avoid double-encoding
json.dumps(weight_json) if weight_json is not None else None,
trial_id, n_tickets,
hist.get("m3",0), hist.get("m4",0), hist.get("m5",0), hist.get("m6",0),
hist.get("bonus_hits",0), best_match, avg_meta_score),
)
def get_backtest_runs(draw_no=None, strategy=None) -> List[Dict[str, Any]]:
q = "SELECT * FROM backtest_runs WHERE 1=1"
args = []
if draw_no is not None:
q += " AND draw_no=?"; args.append(draw_no)
if strategy is not None:
q += " AND strategy=?"; args.append(strategy)
q += " ORDER BY draw_no DESC, strategy, weight_label"
with _conn() as conn:
return [dict(r) for r in conn.execute(q, args).fetchall()]
def save_winner_calibration(draw_no, winning, scores, percentile,
my_pick_avg, cache_draws) -> None:
with _conn() as conn:
conn.execute(
"""
INSERT INTO winner_calibration
(draw_no, winning_json, score_total, score_frequency, score_fingerprint,
score_gap, score_cooccur, score_diversity, percentile, my_pick_avg, cache_draws)
VALUES (?,?,?,?,?,?,?,?,?,?,?)
ON CONFLICT(draw_no) DO UPDATE SET
winning_json=excluded.winning_json, score_total=excluded.score_total,
score_frequency=excluded.score_frequency, score_fingerprint=excluded.score_fingerprint,
score_gap=excluded.score_gap, score_cooccur=excluded.score_cooccur,
score_diversity=excluded.score_diversity, percentile=excluded.percentile,
my_pick_avg=excluded.my_pick_avg, cache_draws=excluded.cache_draws
""",
(draw_no, json.dumps(winning), scores["score_total"], scores["score_frequency"],
scores["score_fingerprint"], scores["score_gap"], scores["score_cooccur"],
scores["score_diversity"], percentile, my_pick_avg, cache_draws),
)
def get_winner_calibration(draw_no: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute("SELECT * FROM winner_calibration WHERE draw_no=?",
(draw_no,)).fetchone()
return dict(r) if r else None
def get_calibration_history(limit: int = 52) -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM winner_calibration ORDER BY draw_no DESC LIMIT ?",
(limit,)).fetchall()
return [dict(r) for r in rows]
def get_calibrated_draw_nos() -> set[int]:
with _conn() as conn:
return {r["draw_no"] for r in
conn.execute("SELECT draw_no FROM winner_calibration").fetchall()}

View File

@@ -47,13 +47,16 @@ from .weight_evolver import (
from .routers import curator as curator_router
from .routers import briefing as briefing_router
from .routers import review as review_router
from .routers import backtest as backtest_router
from .jobs.grade_weekly_review import run_for_latest as grade_run_for_latest
from . import backtest
app = FastAPI()
install_access_log(app)
app.include_router(curator_router.router)
app.include_router(briefing_router.router)
app.include_router(review_router.router)
app.include_router(backtest_router.router)
scheduler = BackgroundScheduler(timezone=os.getenv("TZ", "Asia/Seoul"))
ALL_URL = os.getenv("LOTTO_ALL_URL", "https://smok95.github.io/lotto/results/all.json")
@@ -82,6 +85,12 @@ def on_startup():
if res["was_new"]:
check_results_for_draw(res["drawNo"])
_refresh_perf_cache() # 새 채점 결과 반영 → 즉시 갱신
# 자가학습 백테스트 — 새 회차 forward 구매 + 당첨조합 캘리브레이션
try:
backtest.run_forward_purchase(draw_no=res["drawNo"])
backtest.calibrate_winner(res["drawNo"])
except Exception as e:
logger.warning(f"backtest 갱신 실패: {e}")
scheduler.add_job(_sync_and_check, "cron", hour="9,21", minute=10)

View File

@@ -0,0 +1,39 @@
from fastapi import APIRouter, BackgroundTasks, Query
from .. import backtest, db
router = APIRouter(prefix="/api/lotto/backtest", tags=["backtest"])
@router.get("/track-record")
def track_record():
return backtest.track_record()
@router.get("/calibration")
def calibration(weeks: int = Query(52, ge=1, le=520)):
return {"history": db.get_calibration_history(limit=weeks)}
@router.get("/review/{draw_no}")
def review(draw_no: int):
if db.get_draw(draw_no) is None:
from fastapi import HTTPException
raise HTTPException(404, f"no draw {draw_no}")
return backtest.build_review_payload(draw_no)
@router.post("/run-forward")
def run_forward(
background_tasks: BackgroundTasks,
draw_no: int = Query(...),
k: int = Query(5000, ge=1, le=5000),
pool_n: int = Query(20000, ge=1000, le=20000),
):
background_tasks.add_task(backtest.run_forward_purchase, draw_no, k, pool_n)
return {"ok": True, "queued": True, "draw_no": draw_no}
@router.post("/backfill")
def backfill(background_tasks: BackgroundTasks, batch: int = 50, sample_m: int = 2000):
background_tasks.add_task(backtest.backfill_calibration, batch, sample_m)
return {"ok": True, "message": f"backfill 시작 (batch={batch})"}

View File

@@ -4,6 +4,7 @@
순수 함수 (clamp/perturb/Dirichlet/score/base-rule) + DB 진입점은 별도 섹션.
"""
from __future__ import annotations
import json
import math
import random
from datetime import datetime, timedelta, timezone
@@ -18,6 +19,34 @@ DEFAULT_UNIFORM = [0.2] * N_METRICS # cold start
RANK_BY_CORRECT = {6: 1, 5: 3, 4: 4, 3: 5}
RANK_BONUS = {1: 1.0, 2: 0.8, 3: 0.6, 4: 0.3, 5: 0.1}
LIFT_EPSILON = 10.0 # best-of-engine vs best-of-random margin;
# selection bias already cancelled by equal group sizes (N_NULL_TRIALS == engine trial count);
# tune as needed.
PRIZE_WEIGHTS = {"m6": 1000.0, "bonus_hits": 50.0, "m5": 30.0, "m4": 4.0, "m3": 1.0}
def select_winner_by_lift(per_w: List[Dict[str, Any]], random_score: float,
epsilon: float = LIFT_EPSILON) -> Dict[str, Any]:
"""engine_w 후보들 중 random 대비 lift 최대 선택.
최대 lift가 epsilon 미만이면 gated=True (노이즈 → base 유지 권고)."""
scored = [{**w, "lift": w["prize_score"] - random_score} for w in per_w]
best = max(scored, key=lambda w: w["lift"])
return {**best, "gated": best["lift"] < epsilon}
def prize_score_from_hist(hist: Dict[str, int]) -> float:
"""매칭 히스토그램 → 등수 가중 합산 점수.
1등=m6, 2등=bonus_hits, 3등=m5bonus_hits, 4등=m4, 5등=m3.
m3/m4/m5/m6/bonus_hits 키만 읽으며 나머지는 무시하므로
DB 전체 행(backtest_runs row)을 그대로 넘겨도 안전하다."""
third = max(0, hist.get("m5", 0) - hist.get("bonus_hits", 0))
return (hist.get("m6", 0) * PRIZE_WEIGHTS["m6"]
+ hist.get("bonus_hits", 0) * PRIZE_WEIGHTS["bonus_hits"]
+ third * PRIZE_WEIGHTS["m5"]
+ hist.get("m4", 0) * PRIZE_WEIGHTS["m4"]
+ hist.get("m3", 0) * PRIZE_WEIGHTS["m3"])
def clamp_and_normalize(W: List[float], min_w: float = MIN_WEIGHT) -> List[float]:
"""각 값 ≥ min_w + 합=1.0. 보장 안 되면 raise."""
@@ -269,6 +298,47 @@ def evaluate_weekly() -> Dict[str, Any]:
winner = max(per_day, key=lambda d: d["avg_score"])
# 자가학습 강화: backtest forward 등수점수 lift로 winner 재선정.
# best-of-engine vs best-of-random 비교 — 동등 그룹 크기로 selection bias 상쇄.
latest_no = latest["drw_no"]
runs = db.get_backtest_runs(draw_no=latest_no)
engine_runs = [r for r in runs if r["strategy"] == "engine_w"]
null_runs = [r for r in runs if r["strategy"] == "random_null"]
gated = False # 이후 decide_base_update override에 사용
if engine_runs and null_runs:
# base 단독 행이 있고 w* 행도 있으면 base 행 제외 (identity collision 방지)
has_w_trials = any(r["weight_label"].startswith("w") for r in engine_runs)
if has_w_trials:
engine_runs = [r for r in engine_runs if r["weight_label"] != "base"]
# best-of-random: 동등 그룹의 최댓값 (selection bias 상쇄)
random_best = max(prize_score_from_hist(r) for r in null_runs)
per_w = []
for r in engine_runs:
per_w.append({
"trial_id": r["trial_id"],
"weight_label": r["weight_label"],
"weight": json.loads(r["weight_json"]) if r["weight_json"] else DEFAULT_UNIFORM[:],
"prize_score": prize_score_from_hist(r),
"best_match": r["best_match"],
})
lift_winner = select_winner_by_lift(per_w, random_score=random_best)
if not lift_winner["gated"]:
# lift winner의 정체성과 채점값을 일관되게 사용
winner = {
"trial_id": lift_winner["trial_id"],
"weight": lift_winner["weight"],
"max_correct": lift_winner["best_match"], # 이 trial의 실제값
"avg_score": lift_winner["prize_score"], # lift winner의 prize score
"lift": lift_winner["lift"],
}
else:
# 노이즈 → gated 플래그 설정; decide_base_update 이후 명시적으로 override
gated = True
winner = {**winner, "lift": lift_winner["lift"]}
current_base = db.get_current_base()
new_base, reason = decide_base_update(
winner_max_correct=winner["max_correct"],
@@ -276,6 +346,11 @@ def evaluate_weekly() -> Dict[str, Any]:
current_base=current_base,
)
# gated path: decide_base_update 결과와 무관하게 base 유지 강제
if gated:
new_base = list(current_base) if current_base is not None else DEFAULT_UNIFORM[:]
reason = "unchanged_gated"
next_monday = today + timedelta(days=(7 - today.weekday()) % 7 or 7)
next_monday_iso = next_monday.isoformat()

View File

@@ -0,0 +1,100 @@
from app import backtest as bt
from app.analyzer import build_analysis_cache, build_number_weights, score_combination
def _toy_draws(n=120):
# 결정적 가짜 회차: 분석 캐시 구성용 (오름차순 (drw_no, [6 nums]))
import random as _r
_r.seed(1)
out = []
for i in range(1, n + 1):
nums = sorted(_r.sample(range(1, 46), 6))
out.append((i, nums))
return out
def test_grade_tickets_histogram_and_prizes():
winning6 = [1, 2, 3, 4, 5, 6]
bonus = 7
tickets = [
[1, 2, 3, 4, 5, 6], # 6일치 = 1등
[1, 2, 3, 4, 5, 7], # 5일치 + 보너스 = 2등
[1, 2, 3, 4, 5, 8], # 5일치 = 3등
[1, 2, 3, 4, 9, 10], # 4일치 = 4등
[1, 2, 3, 11, 12, 13], # 3일치 = 5등
[40, 41, 42, 43, 44, 45], # 0일치
]
r = bt.grade_tickets(tickets, winning6, bonus)
assert r["m6"] == 1
assert r["m5"] == 2 # 5일치 총 2장(보너스 포함)
assert r["bonus_hits"] == 1 # 그 중 보너스 1장
assert r["m4"] == 1
assert r["m3"] == 1
assert r["best_match"] == 6
# 등수 매핑 헬퍼
prizes = bt.prize_counts(r)
assert prizes == {"1st": 1, "2nd": 1, "3rd": 1, "4th": 1, "5th": 1}
def test_purchase_tickets_distinct_and_count():
draws = _toy_draws()
cache = build_analysis_cache(draws)
nw = build_number_weights(cache)
pool = bt.generate_pool(cache, nw, n=2000, seed=7)
W = [0.25, 0.30, 0.20, 0.15, 0.10]
bought = bt.purchase_tickets(pool, cache, W, k=50)
assert len(bought) == 50
assert len({tuple(t) for t in bought}) == 50 # distinct
# W로 랭킹된 상위 k → 평균 점수가 풀 전체 평균 이상이어야
avg_bought = sum(score_combination(t, cache, W)["score_total"] for t in bought) / 50
avg_pool = sum(score_combination(t, cache, W)["score_total"] for t in pool) / len(pool)
assert avg_bought >= avg_pool
def test_random_null_and_coverage_distinct():
rnd = bt.random_null_tickets(k=50, seed=3)
assert len(rnd) == 50 and len({tuple(t) for t in rnd}) == 50
cov = bt.coverage_tickets(k=9, seed=3) # 9장 = 54슬롯 ≥ 45번호 전수 커버 가능
flat = {n for t in cov for n in t}
assert len(cov) == 9 and len({tuple(t) for t in cov}) == 9
assert len(flat) >= 40 # 커버리지 전략은 번호를 넓게 퍼뜨림
def test_point_in_time_excludes_target_draw():
draws = _toy_draws(50) # drw_no 1..50
pit = bt.point_in_time_draws(draws, target_draw_no=30)
assert all(d < 30 for d, _ in pit) # 30 이상 제외
assert max(d for d, _ in pit) == 29
assert len(pit) == 29
def test_calibrate_winner_scores_and_percentile():
draws = _toy_draws(60)
winning6 = [3, 11, 19, 27, 35, 44]
res = bt.calibrate_winner_compute(draws, target_draw_no=60,
winning6=winning6, sample_m=500, seed=9)
assert set(res["scores"].keys()) >= {"score_total", "score_frequency",
"score_fingerprint", "score_gap", "score_cooccur", "score_diversity"}
assert 0.0 <= res["percentile"] <= 1.0
assert res["cache_draws"] == 59 # 1..59
def test_generate_pool_partial_fill(monkeypatch):
"""weighted_sample_6이 항상 같은 조합만 반환하도록 패치 → cap에 먼저 걸려 len < n — 예외 없이 반환."""
import random as _r
_r.seed(42)
tiny_draws = [(i, sorted(_r.sample(range(1, 46), 6))) for i in range(1, 10)]
cache = build_analysis_cache(tiny_draws)
nw = build_number_weights(cache)
# weighted_sample_6을 항상 동일한 하나의 조합만 반환하도록 패치
# → 두 번째 시도부터 seen에 막혀 n개를 채울 수 없고 cap=n*4 이후 종료
import app.backtest as _bt_mod
monkeypatch.setattr(_bt_mod, "weighted_sample_6", lambda _w: [1, 2, 3, 4, 5, 6])
n = 50
pool = bt.generate_pool(cache, nw, n=n, seed=0)
# 예외 없이 반환해야 하고, 결과는 n 미만이어야 하며 모두 distinct
assert isinstance(pool, list)
assert len(pool) < n
assert len({tuple(t) for t in pool}) == len(pool)

View File

@@ -0,0 +1,75 @@
import os, sys, tempfile, random as _r
# _shared lives in web-backend/_shared; add the parent dir so it can be found
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
from fastapi.testclient import TestClient
def _client(monkeypatch):
tmp = tempfile.mkdtemp()
from app import db
monkeypatch.setattr(db, "DB_PATH", os.path.join(tmp, "lotto.db"))
db.init_db()
from app.main import app
return TestClient(app), db
def _seed_draws(db, n=40):
rows = []
_r.seed(2)
for i in range(1, n + 1):
s = sorted(_r.sample(range(1, 46), 6))
rows.append({"drw_no": i, "drw_date": f"2020-01-{(i % 28) + 1:02d}",
"n1": s[0], "n2": s[1], "n3": s[2], "n4": s[3],
"n5": s[4], "n6": s[5], "bonus": ((s[5] % 45) + 1)})
db.upsert_many_draws(rows)
def test_backtest_endpoints(monkeypatch):
client, db = _client(monkeypatch)
r = client.get("/api/lotto/backtest/track-record")
assert r.status_code == 200
assert "by_strategy" in r.json()
r2 = client.get("/api/lotto/backtest/calibration?weeks=4")
assert r2.status_code == 200
assert isinstance(r2.json().get("history"), list)
def test_track_record_with_data(monkeypatch):
"""seed 40 draws + forward run → track-record contains random_null."""
client, db_mod = _client(monkeypatch)
_seed_draws(db_mod, 40)
from app import backtest as bt
bt.run_forward_purchase(40, k=20, pool_n=500, sample_seed=5)
r = client.get("/api/lotto/backtest/track-record")
assert r.status_code == 200
body = r.json()
assert "by_strategy" in body
assert "random_null" in body["by_strategy"]
def test_review_known_and_unknown(monkeypatch):
"""Known draw with calibration → 200 + non-null winner_analysis; unknown → 404."""
client, db_mod = _client(monkeypatch)
_seed_draws(db_mod, 40)
from app import backtest as bt
bt.run_forward_purchase(40, k=20, pool_n=500, sample_seed=5)
bt.calibrate_winner(40, sample_m=200)
r = client.get("/api/lotto/backtest/review/40")
assert r.status_code == 200
body = r.json()
assert body["winner_analysis"] is not None
assert "score_total" in body["winner_analysis"]
r2 = client.get("/api/lotto/backtest/review/99999")
assert r2.status_code == 404
def test_calibration_weeks_bounds(monkeypatch):
"""weeks=0 and weeks=521 should be rejected with 422."""
client, _ = _client(monkeypatch)
r0 = client.get("/api/lotto/backtest/calibration?weeks=0")
assert r0.status_code == 422
r521 = client.get("/api/lotto/backtest/calibration?weeks=521")
assert r521.status_code == 422

View File

@@ -0,0 +1,320 @@
import os, tempfile
def _fresh_db(monkeypatch):
tmp = tempfile.mkdtemp()
path = os.path.join(tmp, "lotto.db")
from app import db
monkeypatch.setattr(db, "DB_PATH", path)
db.init_db()
return db
def test_backtest_tables_exist(monkeypatch):
db = _fresh_db(monkeypatch)
with db._conn() as conn:
tables = {r["name"] for r in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'").fetchall()}
assert "backtest_runs" in tables
assert "winner_calibration" in tables
def test_backtest_runs_unique(monkeypatch):
db = _fresh_db(monkeypatch)
db.save_backtest_run(draw_no=100, strategy="random_null", weight_label="-",
weight_json=None, trial_id=None, n_tickets=10,
hist={"m3":1,"m4":0,"m5":0,"m6":0,"bonus_hits":0},
best_match=3, avg_meta_score=0.5)
db.save_backtest_run(draw_no=100, strategy="random_null", weight_label="-",
weight_json=None, trial_id=None, n_tickets=10,
hist={"m3":2,"m4":0,"m5":0,"m6":0,"bonus_hits":0},
best_match=3, avg_meta_score=0.6) # 멱등 upsert
rows = db.get_backtest_runs(draw_no=100)
assert len(rows) == 1
assert rows[0]["m3"] == 2 # 마지막 값으로 갱신
_SCORES = {
"score_total": 1.23,
"score_frequency": 0.30,
"score_fingerprint": 0.25,
"score_gap": 0.20,
"score_cooccur": 0.28,
"score_diversity": 0.20,
}
def test_winner_calibration_upsert(monkeypatch):
"""save_winner_calibration 두 번 호출 시 upsert — 행 1개, 값은 마지막 것."""
db = _fresh_db(monkeypatch)
winning = [3, 7, 15, 22, 33, 41]
db.save_winner_calibration(draw_no=200, winning=winning,
scores=_SCORES, percentile=75.0,
my_pick_avg=0.9, cache_draws=100)
# 두 번째 저장 — percentile, my_pick_avg 업데이트
scores2 = {**_SCORES, "score_total": 2.00}
db.save_winner_calibration(draw_no=200, winning=winning,
scores=scores2, percentile=80.0,
my_pick_avg=1.1, cache_draws=110)
row = db.get_winner_calibration(200)
assert row is not None
# 행이 1개만 존재하는지 확인
with db._conn() as conn:
cnt = conn.execute(
"SELECT COUNT(*) AS c FROM winner_calibration WHERE draw_no=200"
).fetchone()["c"]
assert cnt == 1
assert row["percentile"] == 80.0
assert row["score_total"] == 2.00
def _seed_draws(db, n=40):
rows = []
import random as _r; _r.seed(2)
for i in range(1, n + 1):
s = sorted(_r.sample(range(1, 46), 6))
rows.append({"drw_no": i, "drw_date": f"2020-01-{(i%28)+1:02d}",
"n1": s[0], "n2": s[1], "n3": s[2], "n4": s[3],
"n5": s[4], "n6": s[5], "bonus": ((s[5] % 45) + 1)})
db.upsert_many_draws(rows)
def test_backfill_calibration_idempotent(monkeypatch):
db = _fresh_db(monkeypatch)
_seed_draws(db, 40)
from app import backtest as bt
r1 = bt.backfill_calibration(batch=15, sample_m=200)
# 첫 회차는 point-in-time 데이터가 빈약 → min_history 이후만 처리
done1 = len(db.get_calibrated_draw_nos())
assert done1 > 0
r2 = bt.backfill_calibration(batch=100, sample_m=200) # 나머지
done2 = len(db.get_calibrated_draw_nos())
assert done2 >= done1
r3 = bt.backfill_calibration(batch=100, sample_m=200) # 재실행 → 추가 0
assert r3["calibrated"] == 0
def test_run_forward_purchase_persists_all_strategies(monkeypatch):
db = _fresh_db(monkeypatch)
_seed_draws(db, 40)
from app import backtest as bt
# 작은 규모로 빠르게
res = bt.run_forward_purchase(draw_no=40, k=20, pool_n=500, sample_seed=5)
assert res["ok"] is True
rows = db.get_backtest_runs(draw_no=40)
strategies = {r["strategy"] for r in rows}
assert "random_null" in strategies
assert "coverage" in strategies
assert "engine_w" in strategies # base 가중치로 최소 1건
for r in rows:
assert r["n_tickets"] == 20
def test_calibrate_winner_no_draw(monkeypatch):
"""DB에 없는 회차 번호 → ok=False, reason='no_draw'."""
db = _fresh_db(monkeypatch)
_seed_draws(db, 40)
from app import backtest as bt
r = bt.calibrate_winner(99999)
assert r["ok"] is False
assert r["reason"] == "no_draw"
def test_calibrate_winner_insufficient_history(monkeypatch):
"""point-in-time 이력이 MIN_HISTORY(30) 미만인 회차 → reason='insufficient_history'.
draw_no=20이면 PIT 이력이 19개(draws 1~19)로 30 미만."""
db = _fresh_db(monkeypatch)
_seed_draws(db, 40)
from app import backtest as bt
r = bt.calibrate_winner(20)
assert r["ok"] is False
assert r["reason"] == "insufficient_history"
def test_run_forward_purchase_with_trials(monkeypatch):
"""그 주 weight_trials가 존재하면 engine_w 행의 weight_label이 'w0'..'w5' 형식이어야 한다."""
db = _fresh_db(monkeypatch)
_seed_draws(db, 40)
# draw 40: drw_date='2020-01-13' → week_start='2020-01-13'
from datetime import date, timedelta
draw_date = date.fromisoformat("2020-01-13")
ws = (draw_date - timedelta(days=draw_date.weekday())).isoformat()
# 해당 주에 trial 2개 심기 (day_of_week 0, 1)
db.save_weight_trial(ws, 0, [0.1, 0.3, 0.2, 0.2, 0.2], "perturb")
db.save_weight_trial(ws, 1, [0.25, 0.25, 0.25, 0.15, 0.1], "perturb")
from app import backtest as bt
res = bt.run_forward_purchase(draw_no=40, k=20, pool_n=500, sample_seed=5)
assert res["ok"] is True
rows = db.get_backtest_runs(draw_no=40)
engine_w_labels = {r["weight_label"] for r in rows if r["strategy"] == "engine_w"}
# trials가 있으므로 'base'가 아닌 'w0', 'w1' 형식이어야 한다
assert "base" not in engine_w_labels
assert any(lbl.startswith("w") for lbl in engine_w_labels)
def test_run_forward_purchase_idempotent(monkeypatch):
"""run_forward_purchase 두 번 호출 시 upsert — 행 수 변화 없음."""
db = _fresh_db(monkeypatch)
_seed_draws(db, 40)
from app import backtest as bt
bt.run_forward_purchase(draw_no=40, k=20, pool_n=500, sample_seed=5)
count_after_first = len(db.get_backtest_runs(draw_no=40))
bt.run_forward_purchase(draw_no=40, k=20, pool_n=500, sample_seed=5)
count_after_second = len(db.get_backtest_runs(draw_no=40))
assert count_after_second == count_after_first
def test_get_calibrated_draw_nos(monkeypatch):
"""저장된 draw_no 집합이 get_calibrated_draw_nos에 포함되어야 한다."""
db = _fresh_db(monkeypatch)
winning = [1, 2, 3, 4, 5, 6]
for draw_no in (301, 302, 303):
db.save_winner_calibration(draw_no=draw_no, winning=winning,
scores=_SCORES, percentile=50.0,
my_pick_avg=0.5, cache_draws=50)
nos = db.get_calibrated_draw_nos()
assert isinstance(nos, set)
assert {301, 302, 303}.issubset(nos)
def test_track_record_and_review_payload(monkeypatch):
db = _fresh_db(monkeypatch)
_seed_draws(db, 40)
from app import backtest as bt
bt.run_forward_purchase(draw_no=40, k=20, pool_n=500, sample_seed=5)
bt.calibrate_winner(40, sample_m=200)
tr = bt.track_record()
assert "random_null" in tr["by_strategy"]
# 이제 random_null은 N_NULL_TRIALS=6 행이므로 6*20=120장
assert tr["by_strategy"]["random_null"]["n_tickets"] >= 20
payload = bt.build_review_payload(40)
assert payload["draw_no"] == 40
assert "winner_analysis" in payload # 당첨조합 5분석치
assert "forward" in payload # 이번 회차 전략별 성적
assert "calibration_trend" in payload
assert payload["winner_analysis"] is not None
assert "score_total" in payload["winner_analysis"]
def test_run_forward_purchase_random_null_count(monkeypatch):
"""run_forward_purchase는 random_null을 N_NULL_TRIALS=6개 저장해야 한다."""
db = _fresh_db(monkeypatch)
_seed_draws(db, 40)
from app import backtest as bt
res = bt.run_forward_purchase(draw_no=40, k=20, pool_n=500, sample_seed=7)
assert res["ok"] is True
rows = db.get_backtest_runs(draw_no=40)
null_rows = [r for r in rows if r["strategy"] == "random_null"]
assert len(null_rows) == bt.N_NULL_TRIALS # 6개
null_labels = {r["weight_label"] for r in null_rows}
assert null_labels == {f"r{i}" for i in range(bt.N_NULL_TRIALS)}
for r in null_rows:
assert r["n_tickets"] == 20
def test_evaluate_weekly_gated_keeps_base_unchanged(monkeypatch):
"""Fix 5 통합 테스트 (end-to-end gated path).
접근: DB에 draws, weight_trials, auto_picks, backtest_runs, base_history를 직접 심어
evaluate_weekly()의 gated 분기가 base를 바꾸지 않음을 검증한다.
gated 조건: engine_w 최고 prize_score random_best < LIFT_EPSILON(10.0).
engine_best=5, random_best=20 → lift=-15 → gated.
evaluate_weekly 내부 흐름:
- get_weekly_trials(week_start) : _today_kst() 기준 week_start 사용
- get_latest_draw() : draws 테이블에서 max(drw_no) 반환
두 참조가 같은 날짜 기준이어야 하므로 _today_kst를 monkeypatch로 고정하고
draws의 최신 회차 날짜(drw_date)를 해당 주의 날짜로 맞춘다.
"""
import json as _json
from datetime import date, timedelta, datetime as _dt, timezone as _tz, timedelta as _td
db = _fresh_db(monkeypatch)
# KST 오늘 날짜 — evaluate_weekly가 이 날짜를 기준으로 week_start 계산
KST = _tz(_td(hours=9))
today_kst = _dt.now(KST).date()
from app import weight_evolver as we
week_start = we.get_week_start(today_kst)
# 1) draws 심기 — 최신 회차의 drw_date를 week_start 주 안의 날짜로 맞춤
import random as _r; _r.seed(99)
rows = []
for i in range(1, 41):
s = sorted(_r.sample(range(1, 46), 6))
# 마지막 회차(40)는 오늘 날짜 사용 (week_start 주 내)
if i == 40:
drw_date = today_kst.isoformat()
else:
drw_date = f"2020-01-{(i % 28) + 1:02d}"
rows.append({
"drw_no": i, "drw_date": drw_date,
"n1": s[0], "n2": s[1], "n3": s[2],
"n4": s[3], "n5": s[4], "n6": s[5],
"bonus": (s[5] % 45) + 1,
})
db.upsert_many_draws(rows)
latest = db.get_latest_draw()
assert latest is not None
assert latest["drw_date"] == today_kst.isoformat()
# 2) weight trial 1개 심기 (day_of_week=0, week_start=오늘 주)
trial_w = [0.2, 0.2, 0.2, 0.2, 0.2]
db.save_weight_trial(week_start, 0, trial_w, "perturb")
trial_rows = db.get_weekly_trials(week_start)
assert len(trial_rows) == 1
trial_id = trial_rows[0]["id"]
# 3) auto_picks 1개 심기 (winning 번호와 2개 일치 → max_correct=2)
winning6 = [latest["n1"], latest["n2"], latest["n3"],
latest["n4"], latest["n5"], latest["n6"]]
pick = winning6[:2] + [40, 41, 42, 43]
db.save_auto_pick(trial_id, 1, pick, meta_score=0.5)
# 4) backtest_runs: engine_w prize_score=5, random_null 6개 prize_score=20 (gated 확실)
LOW_HIST = {"m3": 5, "m4": 0, "m5": 0, "m6": 0, "bonus_hits": 0} # prize=5
HIGH_HIST = {"m3": 20, "m4": 0, "m5": 0, "m6": 0, "bonus_hits": 0} # prize=20
draw_no = latest["drw_no"]
db.save_backtest_run(
draw_no=draw_no, strategy="engine_w", weight_label="w0",
weight_json=_json.dumps(trial_w), trial_id=trial_id, n_tickets=20,
hist=LOW_HIST, best_match=2, avg_meta_score=0.5,
)
from app import backtest as bt
for i in range(bt.N_NULL_TRIALS):
db.save_backtest_run(
draw_no=draw_no, strategy="random_null", weight_label=f"r{i}",
weight_json=None, trial_id=None, n_tickets=20,
hist=HIGH_HIST, best_match=3, avg_meta_score=0.5,
)
# 5) current base 저장 (이전 주 월요일 effective_from)
base_w = [0.2, 0.2, 0.2, 0.2, 0.2]
prev_monday = (today_kst - timedelta(weeks=1, days=today_kst.weekday())).isoformat()
db.save_base_history(
effective_from=prev_monday,
weight=base_w,
source_trial_id=None,
update_reason="cold_start",
winner_score=None,
winner_max_correct=None,
)
assert db.get_current_base() == base_w
# 6) evaluate_weekly 호출 — _today_kst()를 monkeypatch로 오늘 날짜 고정
monkeypatch.setattr(we, "_today_kst", lambda: today_kst)
result = we.evaluate_weekly()
assert result.get("ok") is True, f"evaluate_weekly 실패: {result}"
# gated path 검증
update_reason = result.get("update_reason", "")
assert update_reason in ("unchanged_gated", "idempotent_skip"), (
f"gated여야 하는데 reason='{update_reason}' — 게이팅 로직 깨짐"
)
# base가 바뀌지 않았는지 검증
new_base = result.get("new_base")
assert new_base == base_w, (
f"gated인데 base가 변경됨: {new_base} != {base_w}"
)

View File

@@ -120,3 +120,79 @@ def test_decide_base_update_cold_start_returns_default():
)
assert new_base == winner_W
assert reason == "winner_4plus"
def test_select_winner_by_lift_gating():
# engine_w 3개 + random_null 기준. lift = engine 등수점수 random 등수점수
per_w = [
{"trial_id": 1, "day_of_week": 0, "weight": [0.2]*5, "prize_score": 5.0},
{"trial_id": 2, "day_of_week": 1, "weight": [0.3,0.2,0.2,0.2,0.1], "prize_score": 9.0},
{"trial_id": 3, "day_of_week": 2, "weight": [0.1,0.3,0.2,0.2,0.2], "prize_score": 4.0},
]
# random baseline이 8.0이면 lift는 -3, +1, -4 → 최대 lift(+1) < ε(2) → 게이팅
winner = we.select_winner_by_lift(per_w, random_score=8.0, epsilon=2.0)
assert winner["gated"] is True # 최대 lift(+1) < ε(2) → 게이팅
winner2 = we.select_winner_by_lift(per_w, random_score=3.0, epsilon=2.0)
assert winner2["gated"] is False
assert winner2["trial_id"] == 2 # prize 9 → lift +6
def test_prize_score_from_hist():
# 등수 가중치: 1등 매우 큼, 하위는 작게
s = we.prize_score_from_hist({"m3": 10, "m4": 2, "m5": 0, "m6": 0, "bonus_hits": 0})
s_big = we.prize_score_from_hist({"m3": 0, "m4": 0, "m5": 0, "m6": 1, "bonus_hits": 0})
assert s_big > s # 1등 1장이 5등 다수보다 큼
def test_select_winner_by_lift_preserves_all_keys():
"""select_winner_by_lift는 per_w 항목의 모든 키를 보존해야 한다.
best_match, weight_label 등 identity 필드가 누락되면 evaluate_weekly가 깨진다."""
per_w = [
{
"trial_id": 10,
"weight_label": "w0",
"weight": [0.2] * 5,
"prize_score": 3.0,
"best_match": 3,
},
{
"trial_id": 11,
"weight_label": "w1",
"weight": [0.3, 0.2, 0.2, 0.2, 0.1],
"prize_score": 20.0,
"best_match": 4,
},
]
result = we.select_winner_by_lift(per_w, random_score=5.0, epsilon=2.0)
assert result["gated"] is False
assert result["trial_id"] == 11
assert result["weight_label"] == "w1" # identity 키 보존
assert result["best_match"] == 4 # best_match 키 보존
assert "lift" in result # lift 추가됨
assert result["lift"] == pytest.approx(15.0)
def test_gated_path_keeps_base_via_select_winner():
"""gated=True일 때 select_winner_by_lift의 반환값 검증.
evaluate_weekly 내의 gated 분기가 올바른 값에 의존함을 확인한다."""
per_w = [
{"trial_id": 1, "weight_label": "w0", "weight": [0.2]*5,
"prize_score": 5.0, "best_match": 2},
{"trial_id": 2, "weight_label": "w1", "weight": [0.3,0.2,0.2,0.2,0.1],
"prize_score": 7.0, "best_match": 3},
]
# random_best=8.0 → 최대 engine lift=7-8=-1 → gated
result = we.select_winner_by_lift(per_w, random_score=8.0, epsilon=we.LIFT_EPSILON)
assert result["gated"] is True
assert result["lift"] < 0
# decide_base_update를 통해 gated가 unchanged를 유도하는지 확인
# (gated override가 없더라도, 현재 LIFT_EPSILON=10.0 하에서 lift<0이면 항상 gated)
current = [0.2, 0.2, 0.2, 0.2, 0.2]
# gated이면 evaluate_weekly가 current_base를 그대로 유지해야 함
# 여기서는 override 로직을 직접 재현해 검증한다
gated = result["gated"]
new_base_override = list(current) if gated else None
reason_override = "unchanged_gated" if gated else "should_not_reach"
assert new_base_override == current
assert reason_override == "unchanged_gated"

View File

@@ -1,6 +1,7 @@
import sqlite3
import os
import hashlib
import json
from typing import List, Dict, Any, Optional
from app.screener.schema import ensure_screener_schema
@@ -103,6 +104,27 @@ def init_db():
if "commission" not in sh_cols:
conn.execute("ALTER TABLE sell_history ADD COLUMN commission REAL NOT NULL DEFAULT 0")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS holdings_signals (
date TEXT NOT NULL,
ticker TEXT NOT NULL,
name TEXT,
action TEXT NOT NULL,
tech_score REAL,
exit_flags TEXT NOT NULL DEFAULT '{}',
issues TEXT NOT NULL DEFAULT '[]',
close INTEGER,
pnl_rate REAL,
reasons TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now','localtime')),
PRIMARY KEY (date, ticker)
);
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_holdings_sig_ticker "
"ON holdings_signals(ticker, date DESC);")
# Screener 스키마 부트스트랩 (7테이블 + 디폴트 설정 시드)
ensure_screener_schema(conn)
@@ -297,3 +319,63 @@ def get_asset_snapshots(days: int = 30) -> List[Dict[str, Any]]:
).fetchall()
rows = list(reversed(rows))
return [dict(r) for r in rows]
# --- KRX Master ---
def get_krx_tickers() -> set:
with _conn() as conn:
try:
rows = conn.execute("SELECT ticker FROM krx_master").fetchall()
except Exception:
return set()
return {r["ticker"] for r in rows}
# --- Holdings Signals CRUD ---
def upsert_holdings_signal(
date: str, ticker: str, name: Optional[str], action: str,
tech_score: Optional[float], exit_flags: dict, issues: list,
close: Optional[int], pnl_rate: Optional[float], reasons: Optional[str],
) -> None:
with _conn() as conn:
conn.execute(
"""
INSERT INTO holdings_signals
(date, ticker, name, action, tech_score, exit_flags, issues, close, pnl_rate, reasons)
VALUES (?,?,?,?,?,?,?,?,?,?)
ON CONFLICT(date, ticker) DO UPDATE SET
name=excluded.name, action=excluded.action, tech_score=excluded.tech_score,
exit_flags=excluded.exit_flags, issues=excluded.issues, close=excluded.close,
pnl_rate=excluded.pnl_rate, reasons=excluded.reasons
""",
(date, ticker, name, action, tech_score,
json.dumps(exit_flags, ensure_ascii=False),
json.dumps(issues, ensure_ascii=False), close, pnl_rate, reasons),
)
def _row_to_signal(r) -> dict:
d = dict(r)
d["exit_flags"] = json.loads(d.get("exit_flags") or "{}")
d["issues"] = json.loads(d.get("issues") or "[]")
return d
def get_holdings_signals(date: str) -> list:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM holdings_signals WHERE date=? ORDER BY ticker", (date,)).fetchall()
return [_row_to_signal(r) for r in rows]
def get_latest_holdings_date() -> str | None:
with _conn() as conn:
r = conn.execute("SELECT MAX(date) AS d FROM holdings_signals").fetchone()
return r["d"] if r and r["d"] else None
def get_holdings_signal_history(ticker: str, limit: int = 30) -> list:
"""최근 N개 시그널 행 (시그널은 거래일당 1행이라 ≈ N 거래일)."""
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM holdings_signals WHERE ticker=? ORDER BY date DESC LIMIT ?",
(ticker, limit)).fetchall()
return [_row_to_signal(r) for r in rows]

344
stock/app/holdings_intel.py Normal file
View File

@@ -0,0 +1,344 @@
"""보유종목 인텔리전스 — 순수연산 중심 (advisory). KIS 실주문 미사용."""
from __future__ import annotations
import datetime as dt
from typing import Any, Optional
import pandas as pd
from . import db
from . import price_fetcher
from .screener.engine import combine
def _krx_tickers() -> set:
"""krx_master ticker 집합 (KRX 판별용)."""
return db.get_krx_tickers()
def get_holdings() -> list[dict]:
"""portfolio + 현재가 + pnl_rate + is_krx."""
items = db.get_all_portfolio()
tickers = [it["ticker"] for it in items]
prices = price_fetcher.get_current_prices(tickers) if tickers else {}
krx = _krx_tickers()
out = []
for it in items:
cur = prices.get(it["ticker"])
avg = it["avg_price"]
pnl = ((cur - avg) / avg * 100.0) if (cur and avg) else None
out.append({
**it,
"current_price": cur,
"pnl_rate": pnl,
"is_krx": it["ticker"] in krx,
})
return out
# ---- Task 2.1: technical_posture ----
def _score_nodes_and_weights():
"""NODE_REGISTRY에서 보유종목 매수강도 계산용 노드 인스턴스화."""
from .screener.registry import NODE_REGISTRY
weights = {"ma_alignment": 0.4, "momentum": 0.3, "rs_rating": 0.3}
nodes = [NODE_REGISTRY[k]() for k in weights]
return nodes, weights
def technical_posture(ctx, tickers: list[str]) -> dict[str, float]:
"""보유종목 restrict 후 score 노드 → 매수강도(0~100)."""
scoped = ctx.restrict(tickers)
if scoped.prices.empty:
return {}
nodes, weights = _score_nodes_and_weights()
scores = {}
for n in nodes:
try:
scores[n.name] = n.compute(scoped, {})
except Exception:
scores[n.name] = pd.Series(0.0, index=scoped.master.index)
scores_ne = {k: s for k, s in scores.items() if not s.empty}
weights_ne = {k: w for k, w in weights.items() if k in scores_ne}
if not weights_ne:
return {}
total = combine(scores_ne, weights_ne)
return {t: float(total.get(t, 0.0)) for t in tickers if t in total.index}
# ---- Task 2.2: exit_rules ----
_DEFAULT_EXIT_PARAMS = {"stop_pct": 0.08, "take_pct": 0.25, "climax_vol_x": 3.0}
def _ma(closes: "pd.Series", window: int) -> Optional[float]:
if len(closes) < window:
return None
val = closes.rolling(window).mean().iloc[-1]
return float(val) if pd.notna(val) else None
def exit_rules(holding: dict, ticker_prices: "pd.DataFrame", params: dict) -> dict:
"""가격 기반 청산/리스크 flag (stop_loss/ma50_break/ma200_break/take_profit/climax).
Note: momentum_loss는 compute_and_store 단계에서 집계하므로 여기서 설정하지 않는다.
"""
p = {**_DEFAULT_EXIT_PARAMS, **(params or {})}
flags = {"stop_loss": False, "ma50_break": False, "ma200_break": False,
"take_profit": False, "climax": False}
avg = holding.get("avg_price")
cur = holding.get("current_price")
if ticker_prices is None or ticker_prices.empty:
closes = pd.Series(dtype=float)
else:
closes = ticker_prices.sort_values("date")["close"].astype(float).reset_index(drop=True)
last_close = float(closes.iloc[-1]) if len(closes) else cur
if cur is None:
cur = last_close
if cur is not None and avg:
if cur < avg * (1 - p["stop_pct"]):
flags["stop_loss"] = True
if avg > 0 and (cur - avg) / avg >= p["take_pct"]:
flags["take_profit"] = True
ma50 = _ma(closes, 50)
ma200 = _ma(closes, 200)
if ma50 is not None and last_close is not None and last_close < ma50:
flags["ma50_break"] = True
if ma200 is not None and last_close is not None and last_close < ma200:
flags["ma200_break"] = True
# climax: 최근 거래량이 20일 평균의 climax_vol_x배 이상 + 종가가 당일 고점 대비 하단(상단꼬리)
if ticker_prices is not None and not ticker_prices.empty and len(ticker_prices) >= 21:
tp = ticker_prices.sort_values("date")
vol = tp["volume"].astype(float).reset_index(drop=True)
avg_vol = vol.iloc[-21:-1].mean()
last_vol = vol.iloc[-1]
hi_ = float(tp["high"].astype(float).iloc[-1])
cl_ = float(tp["close"].astype(float).iloc[-1])
if avg_vol and last_vol >= avg_vol * p["climax_vol_x"] and hi_ > 0 and cl_ < hi_ * 0.97:
flags["climax"] = True
return flags
# ---- Task 2.3: decide_action ----
ADD_SCORE = 70.0 # 이 이상이면 추가매수 후보
# ---- Task 3.1: market_events ----
_DEFAULT_EVENT_PARAMS = {"move_pct": 7.0, "vol_z": 2.5}
def market_events(ticker: str, ticker_prices: "pd.DataFrame",
ticker_flow: "pd.DataFrame | None", params: dict) -> list[dict]:
"""일봉/flow 기반 시장 이벤트 (급변·거래량 Z·외인 순매도)."""
p = {**_DEFAULT_EVENT_PARAMS, **(params or {})}
events = []
if ticker_prices is None or ticker_prices.empty or len(ticker_prices) < 2:
return events
tp = ticker_prices.sort_values("date").reset_index(drop=True)
close = tp["close"].astype(float)
pct = (close.iloc[-1] - close.iloc[-2]) / close.iloc[-2] * 100.0 if close.iloc[-2] else 0.0
if abs(pct) >= p["move_pct"]:
events.append({
"type": "price_move",
"severity": "high" if abs(pct) >= p["move_pct"] * 1.5 else "med",
"summary": f"전일 대비 {pct:+.1f}%",
})
vol = tp["volume"].astype(float)
if len(vol) >= 21:
base = vol.iloc[-21:-1]
mu, sd = base.mean(), base.std(ddof=0)
last_vol = vol.iloc[-1]
if mu > 0 and (
(sd and (last_vol - mu) / sd >= p["vol_z"])
or (not sd and last_vol >= mu * p["vol_z"]) # sd=0 (평탄 기준선): vol_z를 Z-score가 아닌 단순 배수로 사용
):
z_txt = f"{(last_vol - mu) / sd:.1f}" if sd else f"ratio={last_vol / mu:.1f}x"
events.append({
"type": "volume_surge",
"severity": "med",
"summary": f"거래량 평소 대비 급증(Z={z_txt})",
})
if ticker_flow is not None and not ticker_flow.empty:
tf = ticker_flow.sort_values("date")
recent = tf["foreign_net"].astype(float).iloc[-3:]
if len(recent) >= 3 and (recent < 0).all():
events.append({
"type": "foreign_selling",
"severity": "med",
"summary": "외국인 3일 연속 순매도",
})
return events
# ---- Task 3.2: news_issues ----
NEG_SENTIMENT = -0.3 # 이하면 악재 후보
def _news_sentiment_map(date: str) -> dict:
"""date 기준 news_sentiment 테이블에서 ticker → {score_raw, news_count} 맵 반환."""
with db._conn() as conn:
try:
rows = conn.execute(
"SELECT ticker, score_raw, news_count FROM news_sentiment WHERE date=?",
(date,),
).fetchall()
except Exception:
return {}
return {r["ticker"]: {"score_raw": r["score_raw"], "news_count": r["news_count"]}
for r in rows}
def news_issues(tickers: list[str], date: str, use_llm: bool = True) -> dict[str, list]:
"""news_sentiment 음수 → 악재 flag. (LLM 요약은 best-effort; 단위 테스트는 use_llm=False로.)"""
senti = _news_sentiment_map(date)
out: dict[str, list] = {}
for t in tickers:
s = senti.get(t)
if not s or s["score_raw"] is None:
continue
if s["score_raw"] <= NEG_SENTIMENT:
sev = "high" if s["score_raw"] <= NEG_SENTIMENT * 2 else "med"
out.setdefault(t, []).append({
"type": "news",
"severity": sev,
"summary": f"부정 뉴스 감성({s['score_raw']:+.2f}, {s.get('news_count', 0)}건)",
})
return out
# ---- Task 3.3: portfolio_health ----
def portfolio_health(holdings: list[dict], total_cash: int = 0) -> dict:
"""비중 집중도(최대비중·HHI) + 현금비중 + 총손익 요약."""
evals, buys = [], []
for h in holdings:
cur = h.get("current_price") or h.get("avg_price") or 0
ev = cur * h.get("quantity", 0)
bu = (h.get("avg_price") or 0) * h.get("quantity", 0)
evals.append(ev)
buys.append(bu)
total_eval = sum(evals)
total_buy = sum(buys)
weights = [e / total_eval for e in evals] if total_eval else []
hhi = sum(w * w for w in weights)
total_assets = total_eval + (total_cash or 0)
return {
"positions": len(holdings),
"total_eval": total_eval,
"total_buy": total_buy,
"total_pnl": total_eval - total_buy,
"total_pnl_rate": ((total_eval - total_buy) / total_buy * 100.0) if total_buy else 0.0,
"max_weight": max(weights) if weights else 0.0,
"hhi": round(hhi, 4),
"cash_ratio": ((total_cash or 0) / total_assets) if total_assets else 0.0,
}
DEFAULT_PARAMS = {
"stop_pct": 0.08, "take_pct": 0.25, "climax_vol_x": 3.0,
"move_pct": 7.0, "vol_z": 2.5,
"momentum_drop": 15.0, "momentum_low": 35.0,
}
def _load_ctx(asof: dt.date):
"""ScreenContext.load를 감싸는 thin wrapper (테스트에서 monkeypatch 대상)."""
from .screener.engine import ScreenContext
with db._conn() as conn:
return ScreenContext.load(conn, asof)
def _today_kst() -> dt.date:
return (dt.datetime.utcnow() + dt.timedelta(hours=9)).date()
def compute_and_store(asof: Optional[dt.date] = None, use_llm: bool = True,
params: dict | None = None) -> dict:
"""보유종목 시그널 계산 → holdings_signals upsert (멱등).
Returns:
{"stored": N, "date": "YYYY-MM-DD"} or {"stored": 0, "reason": "..."}
"""
asof = asof or _today_kst()
p = {**DEFAULT_PARAMS, **(params or {})}
holdings = get_holdings()
if not holdings:
return {"stored": 0, "reason": "no_holdings"}
krx = [h for h in holdings if h.get("is_krx")]
ctx = _load_ctx(asof)
posture = technical_posture(ctx, [h["ticker"] for h in krx]) if krx else {}
date_iso = asof.isoformat()
issues_map = news_issues([h["ticker"] for h in holdings], date_iso, use_llm=use_llm)
stored = 0
for h in holdings:
t = h["ticker"]
tp = ctx.prices[ctx.prices["ticker"] == t] if h.get("is_krx") else None
tf = ctx.flow[ctx.flow["ticker"] == t] if h.get("is_krx") else None
flags = exit_rules(h, tp, p) if h.get("is_krx") else {}
tech = posture.get(t)
# momentum_loss: 직전 저장 시그널 대비 하락 or 낮은 강도
prev = db.get_holdings_signal_history(t, limit=2)
prev_score = next((r["tech_score"] for r in prev if r["date"] != date_iso), None)
if tech is not None and (
(prev_score is not None and tech < prev_score - p["momentum_drop"])
or tech < p["momentum_low"]
):
flags["momentum_loss"] = True
evts = market_events(t, tp, tf, p) if h.get("is_krx") else []
issues = list(issues_map.get(t, [])) + evts
action, reasons = decide_action(tech if tech is not None else 0.0, flags, h.get("pnl_rate"))
db.upsert_holdings_signal(
date=date_iso, ticker=t, name=h.get("name"), action=action,
tech_score=tech, exit_flags=flags, issues=issues,
close=h.get("current_price"), pnl_rate=h.get("pnl_rate"), reasons=reasons,
)
stored += 1
return {"stored": stored, "date": date_iso}
def build_holdings_brief(date: Optional[str] = None) -> dict:
"""최신 시그널 + 포트 건강 조립 (브리핑/UI payload)."""
date = date or db.get_latest_holdings_date()
if not date:
return {"date": None, "holdings": [], "portfolio_health": {}}
signals = db.get_holdings_signals(date)
holdings = get_holdings()
total_cash = sum(c.get("cash", 0) for c in db.get_all_broker_cash())
health = portfolio_health(holdings, total_cash=total_cash)
return {"date": date, "holdings": signals, "portfolio_health": health}
def decide_action(tech_score: float, exit_flags: dict, pnl: float | None,
add_score: float = ADD_SCORE) -> tuple[str, str]:
"""액션 결정 매트릭스: sell > trim > add > hold (우선순위 순).
Returns:
(action, reasons_text) action ∈ {"sell","trim","add","hold"}
"""
reasons = []
# 청산 (최우선)
if exit_flags.get("stop_loss"):
reasons.append("손절선 이탈")
if exit_flags.get("ma200_break"):
reasons.append("MA200 이탈")
if reasons:
return "sell", " · ".join(reasons)
# 축소
if exit_flags.get("ma50_break"):
reasons.append("MA50 이탈")
if exit_flags.get("momentum_loss"):
reasons.append("모멘텀 소멸")
if exit_flags.get("take_profit"):
reasons.append(f"목표 수익 도달(+{pnl:.0f}%)" if pnl is not None else "목표 수익 도달")
if exit_flags.get("climax"):
reasons.append("거래량 급증 분산 의심")
if reasons:
return "trim", " · ".join(reasons)
# 추가매수
if tech_score is not None and tech_score >= add_score:
return "add", f"기술적 강도 양호({tech_score:.0f})"
return "hold", "특이 신호 없음"

View File

@@ -3,7 +3,7 @@ import json
import logging
from datetime import date as date_type
from typing import Optional
from fastapi import FastAPI, Query, Header, Depends, HTTPException
from fastapi import FastAPI, Query, Header, Depends, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import requests
@@ -27,6 +27,7 @@ from .price_fetcher import get_current_prices, get_current_prices_detail
from .ai_summarizer import summarize_news, OllamaError
from .auth import verify_webai_key
from . import webai_cache
from . import holdings_intel
app = FastAPI()
install_access_log(app)
@@ -652,5 +653,25 @@ def remove_sell_history(record_id: int):
return {"ok": True}
# --- Holdings Intelligence API ---
@app.get("/api/stock/holdings/intel")
def holdings_intel_brief():
"""보유종목 인텔리전스 브리핑 (최신 시그널 + 포트 건강)"""
return holdings_intel.build_holdings_brief()
@app.get("/api/stock/holdings/intel/history")
def holdings_intel_history(ticker: str, days: int = 30):
"""종목별 시그널 이력 조회"""
from . import db
return {"ticker": ticker, "history": db.get_holdings_signal_history(ticker, days)}
@app.post("/api/stock/holdings/intel/run")
def holdings_intel_run(background_tasks: BackgroundTasks, use_llm: bool = True):
"""보유종목 시그널 계산 트리거 (BackgroundTask)"""
background_tasks.add_task(holdings_intel.compute_and_store, None, use_llm)
return {"ok": True, "queued": True}

View File

@@ -0,0 +1,42 @@
import os
import sys
import tempfile
from fastapi.testclient import TestClient
def _client(monkeypatch):
# Add web-backend root to sys.path so _shared can be imported by main.py
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from app import db
monkeypatch.setattr(db, "DB_PATH", os.path.join(tempfile.mkdtemp(), "stock.db"))
db.init_db()
from app.main import app
return TestClient(app)
def test_holdings_intel_endpoint(monkeypatch):
client = _client(monkeypatch)
r = client.get("/api/stock/holdings/intel")
assert r.status_code == 200
body = r.json()
assert "holdings" in body and "portfolio_health" in body
def test_holdings_intel_history_endpoint(monkeypatch):
client = _client(monkeypatch)
r = client.get("/api/stock/holdings/intel/history?ticker=005930")
assert r.status_code == 200
body = r.json()
assert body["ticker"] == "005930"
assert "history" in body
assert isinstance(body["history"], list)
def test_holdings_intel_run_endpoint(monkeypatch):
client = _client(monkeypatch)
r = client.post("/api/stock/holdings/intel/run")
assert r.status_code == 200
body = r.json()
assert body["ok"] is True
assert body["queued"] is True

View File

@@ -0,0 +1,37 @@
import os, tempfile
def _fresh_db(monkeypatch):
tmp = tempfile.mkdtemp()
from app import db
monkeypatch.setattr(db, "DB_PATH", os.path.join(tmp, "stock.db"))
db.init_db()
return db
def test_holdings_signals_table_and_upsert(monkeypatch):
db = _fresh_db(monkeypatch)
db.upsert_holdings_signal(date="2026-05-29", ticker="005930", name="삼성전자",
action="hold", tech_score=72.0, exit_flags={"stop_loss": False},
issues=[{"type": "news", "severity": "low", "summary": "x"}],
close=80000, pnl_rate=5.2, reasons="강건")
db.upsert_holdings_signal(date="2026-05-29", ticker="005930", name="삼성전자",
action="trim", tech_score=60.0, exit_flags={"ma50_break": True},
issues=[], close=79000, pnl_rate=3.0, reasons="MA50 이탈")
rows = db.get_holdings_signals(date="2026-05-29")
assert len(rows) == 1 # upsert 멱등
assert rows[0]["action"] == "trim"
assert rows[0]["exit_flags"]["ma50_break"] is True # JSON 역직렬화
hist = db.get_holdings_signal_history("005930", limit=30)
assert len(hist) == 1
def test_get_latest_holdings_date(monkeypatch):
db = _fresh_db(monkeypatch)
# empty table → None
assert db.get_latest_holdings_date() is None
# after an upsert → returns that date
db.upsert_holdings_signal(
date="2026-05-30", ticker="005930", name="삼성전자",
action="hold", tech_score=70.0, exit_flags={}, issues=[],
close=80000, pnl_rate=4.0, reasons="테스트",
)
assert db.get_latest_holdings_date() == "2026-05-30"

View File

@@ -0,0 +1,387 @@
import datetime as dt
import pandas as pd
from app import holdings_intel as hi
def test_get_holdings_merges_price_and_pnl(monkeypatch):
monkeypatch.setattr(hi.db, "get_all_portfolio", lambda: [
{"id": 1, "broker": "kis", "ticker": "005930", "name": "삼성전자",
"quantity": 10, "avg_price": 70000, "purchase_price": 70000},
{"id": 2, "broker": "kis", "ticker": "AAPL", "name": "Apple",
"quantity": 5, "avg_price": 200, "purchase_price": 200},
])
monkeypatch.setattr(hi.price_fetcher, "get_current_prices",
lambda tickers: {"005930": 77000}) # AAPL 미조회(비KRX)
monkeypatch.setattr(hi, "_krx_tickers", lambda: {"005930"})
hs = hi.get_holdings()
s = {h["ticker"]: h for h in hs}
assert s["005930"]["is_krx"] is True
assert round(s["005930"]["pnl_rate"], 1) == 10.0 # (77000-70000)/70000
assert s["AAPL"]["is_krx"] is False # KRX 외
def test_get_holdings_zero_avg_price(monkeypatch):
"""avg_price=0인 종목은 pnl_rate가 None이어야 한다 (ZeroDivisionError 없음)."""
monkeypatch.setattr(hi.db, "get_all_portfolio", lambda: [
{"id": 1, "broker": "kis", "ticker": "005930", "name": "삼성전자",
"quantity": 10, "avg_price": 0, "purchase_price": 0},
])
monkeypatch.setattr(hi.price_fetcher, "get_current_prices",
lambda tickers: {"005930": 80000})
monkeypatch.setattr(hi, "_krx_tickers", lambda: {"005930"})
hs = hi.get_holdings()
assert hs[0]["pnl_rate"] is None
def test_get_holdings_empty_portfolio(monkeypatch):
"""포트폴리오가 비어있으면 빈 리스트를 반환하고 가격 조회를 호출하지 않는다."""
monkeypatch.setattr(hi.db, "get_all_portfolio", lambda: [])
called = []
monkeypatch.setattr(hi.price_fetcher, "get_current_prices",
lambda tickers: called.append(tickers) or {})
monkeypatch.setattr(hi, "_krx_tickers", lambda: set())
result = hi.get_holdings()
assert result == []
assert called == [] # get_current_prices must NOT have been called
def test_get_holdings_price_missing(monkeypatch):
"""prices dict에 ticker가 없으면 current_price와 pnl_rate는 None이다."""
monkeypatch.setattr(hi.db, "get_all_portfolio", lambda: [
{"id": 1, "broker": "kis", "ticker": "000660", "name": "SK하이닉스",
"quantity": 5, "avg_price": 150000, "purchase_price": 150000},
])
monkeypatch.setattr(hi.price_fetcher, "get_current_prices",
lambda tickers: {}) # 가격 없음
monkeypatch.setattr(hi, "_krx_tickers", lambda: {"000660"})
hs = hi.get_holdings()
assert hs[0]["current_price"] is None
assert hs[0]["pnl_rate"] is None
# ---- Phase 2 tests ----
def _toy_ctx(tickers=("005930",), n=300):
"""결정적 일봉으로 ScreenContext 유사 객체 구성."""
from app.screener.engine import ScreenContext
rows = []
base = dt.date(2025, 1, 1)
for t in tickers:
price = 1000
for i in range(n):
price = int(price * 1.002) # 완만한 상승 → 정배열
d = (base + dt.timedelta(days=i)).isoformat()
rows.append({"ticker": t, "date": d, "open": price, "high": price,
"low": price, "close": price, "volume": 1000, "value": price*1000})
prices = pd.DataFrame(rows)
master = pd.DataFrame({"name": [f"n{t}" for t in tickers],
"market": ["KOSPI"]*len(tickers),
"market_cap": [1e12]*len(tickers)},
index=pd.Index(tickers, name="ticker"))
flow = pd.DataFrame(columns=["ticker","date","foreign_net","institution_net"])
return ScreenContext(master=master, prices=prices, flow=flow,
kospi=pd.Series(dtype=float), asof=base+dt.timedelta(days=n-1))
def test_technical_posture_returns_scores():
ctx = _toy_ctx(("005930",))
scores = hi.technical_posture(ctx, ["005930"])
assert "005930" in scores
assert 0.0 <= scores["005930"] <= 100.0 # 상승추세 → 양수 점수
# ---- Task 2.2 tests ----
def _ticker_prices(closes, vols=None):
n = len(closes)
base = dt.date(2025, 1, 1)
vols = vols or [1000]*n
return pd.DataFrame({
"ticker": ["005930"]*n,
"date": [(base+dt.timedelta(days=i)).isoformat() for i in range(n)],
"open": closes, "high": closes, "low": closes, "close": closes, "volume": vols,
})
DEFAULT_EXIT = {"stop_pct": 0.08, "take_pct": 0.25, "climax_vol_x": 3.0}
def test_exit_rules_stop_and_ma():
closes = [1000]*60 + [1100]*200 # 충분한 길이, 최근 평탄
df = _ticker_prices(closes)
# 현재가가 평단(2000) 대비 -45% → stop_loss
flags = hi.exit_rules({"avg_price": 2000, "current_price": 1100}, df, DEFAULT_EXIT)
assert flags["stop_loss"] is True
# 종가 1100 > MA50≈1100, MA200은 더 낮음 → ma 이탈 아님
assert flags["ma200_break"] is False
def test_exit_rules_take_profit():
df = _ticker_prices([1000]*260)
flags = hi.exit_rules({"avg_price": 1000, "current_price": 1300}, df, DEFAULT_EXIT)
assert flags["take_profit"] is True # +30% ≥ 25%
# ---- Task 2.3 tests ----
def test_decide_action_matrix():
# 강건 + 이탈 없음 + 높은 강도 → add
a, r = hi.decide_action(tech_score=80, exit_flags={}, pnl=5)
assert a == "add"
# ma200 이탈 → sell
a, r = hi.decide_action(70, {"ma200_break": True}, 2)
assert a == "sell"
# stop_loss → sell
a, _ = hi.decide_action(70, {"stop_loss": True}, -10)
assert a == "sell"
# ma50 이탈만 → trim
a, _ = hi.decide_action(60, {"ma50_break": True}, 3)
assert a == "trim"
# 이탈 없음 보통 강도 → hold
a, _ = hi.decide_action(50, {}, 1)
assert a == "hold"
# ---- Phase 2 hardening tests (m3) ----
def _ticker_prices_hl(closes, highs, vols):
n = len(closes)
base = dt.date(2025, 1, 1)
return pd.DataFrame({
"ticker": ["005930"] * n,
"date": [(base + dt.timedelta(days=i)).isoformat() for i in range(n)],
"open": closes,
"high": highs,
"low": closes,
"close": closes,
"volume": vols,
})
def test_exit_rules_climax():
closes = [1000] * 30
highs = [1000] * 29 + [1100] # 마지막날 상단꼬리(종가1000 < 고가1100*0.97)
vols = [1000] * 29 + [5000] # 거래량 5x
flags = hi.exit_rules({"avg_price": 900, "current_price": 1000},
_ticker_prices_hl(closes, highs, vols), {})
assert flags["climax"] is True
def test_exit_rules_ma200_break():
closes = list(range(1000, 1000 + 260))[::-1] # 하락 추세 → 종가 < MA200
df = _ticker_prices(closes)
flags = hi.exit_rules({"avg_price": 2000, "current_price": closes[-1]}, df, {})
assert flags["ma200_break"] is True
def test_technical_posture_short_history_returns_low_not_crash():
ctx = _toy_ctx(("005930",), n=100) # <252 → MA 노드 NaN→0, but no crash
scores = hi.technical_posture(ctx, ["005930"])
assert "005930" in scores
assert 0.0 <= scores["005930"] <= 100.0
def test_technical_posture_empty_kospi_not_penalized():
# rs_rating는 빈 kospi에서 빈 Series → combine에서 제외되어야 (C1)
ctx = _toy_ctx(("005930",), n=300) # kospi 빈 fixture
scores = hi.technical_posture(ctx, ["005930"])
# ma_alignment+momentum만으로 정규화 → 상승추세면 충분히 높은 점수
assert scores["005930"] > 50.0
# ---- Phase 3 tests ----
DEFAULT_EVENT = {"move_pct": 7.0, "vol_z": 2.5}
def test_market_events_detects_move_and_volume():
closes = [1000]*30 + [1100] # 마지막날 +10%
vols = [1000]*30 + [10000] # 거래량 급증
df = _ticker_prices(closes, vols)
evts = hi.market_events("005930", df, None, DEFAULT_EVENT)
types = {e["type"] for e in evts}
assert "price_move" in types
assert "volume_surge" in types
def test_news_issues_flags_negative_sentiment(monkeypatch):
# news_sentiment: 005930 음수 점수 → 악재 flag
monkeypatch.setattr(hi, "_news_sentiment_map", lambda date: {
"005930": {"score_raw": -0.6, "news_count": 8}})
issues = hi.news_issues(["005930"], date="2026-05-29", use_llm=False)
assert "005930" in issues
assert issues["005930"][0]["type"] == "news"
assert issues["005930"][0]["severity"] in ("med", "high")
def test_portfolio_health():
holdings = [
{"ticker": "005930", "quantity": 10, "avg_price": 70000, "current_price": 77000,
"is_krx": True},
{"ticker": "000660", "quantity": 5, "avg_price": 100000, "current_price": 90000,
"is_krx": True},
]
h = hi.portfolio_health(holdings, total_cash=1000000)
assert h["positions"] == 2
assert 0 <= h["max_weight"] <= 1.0
assert "total_eval" in h and "total_pnl" in h and "cash_ratio" in h
def test_market_events_volume_surge_zscore_path():
# 변동 있는 기준선 → Z-score 경로(sd>0) 검증 (sd=0 fallback 아님)
import random as _r
_r.seed(1)
base_vols = [1000 + _r.randint(-50, 50) for _ in range(30)]
closes = [1000] * 30 + [1010]
vols = base_vols + [max(base_vols) * 10] # 마지막날 큰 급증
df = _ticker_prices(closes, vols)
evts = hi.market_events("005930", df, None, DEFAULT_EVENT)
assert any(e["type"] == "volume_surge" for e in evts)
def test_market_events_foreign_selling():
closes = [1000] * 5
df = _ticker_prices(closes)
import datetime as _dt
base = _dt.date(2025, 1, 1)
flow = pd.DataFrame({
"ticker": ["005930"] * 5,
"date": [(base + _dt.timedelta(days=i)).isoformat() for i in range(5)],
"foreign_net": [100, 50, -10, -20, -30], # 최근 3일 연속 순매도
"institution_net": [0] * 5,
})
evts = hi.market_events("005930", df, flow, DEFAULT_EVENT)
assert any(e["type"] == "foreign_selling" for e in evts)
def test_news_issues_severity_high_boundary(monkeypatch):
monkeypatch.setattr(hi, "_news_sentiment_map", lambda date: {
"005930": {"score_raw": -0.6, "news_count": 5}}) # 정확히 high 경계
issues = hi.news_issues(["005930"], date="2026-05-29", use_llm=False)
assert issues["005930"][0]["severity"] == "high"
def test_portfolio_health_empty_and_zero():
# 빈 포트 → 0/빈값, 크래시 없음
h0 = hi.portfolio_health([], total_cash=0)
assert h0["positions"] == 0
assert h0["max_weight"] == 0.0
assert h0["total_pnl_rate"] == 0.0
assert h0["cash_ratio"] == 0.0
# total_buy=0 (avg_price 0) → div-by-zero 없이 0.0
h1 = hi.portfolio_health([{"ticker": "X", "quantity": 1, "avg_price": 0,
"current_price": 0, "is_krx": True}], total_cash=0)
assert h1["total_pnl_rate"] == 0.0
# ---- Phase 4 tests ----
def test_compute_and_store_and_brief(monkeypatch):
import os, tempfile
from app import db
monkeypatch.setattr(db, "DB_PATH", os.path.join(tempfile.mkdtemp(), "stock.db"))
db.init_db()
monkeypatch.setattr(hi, "get_holdings", lambda: [
{"ticker": "005930", "name": "삼성전자", "quantity": 10, "avg_price": 1000,
"current_price": 1100, "pnl_rate": 10.0, "is_krx": True}])
ctx = _toy_ctx(("005930",))
monkeypatch.setattr(hi, "_load_ctx", lambda asof: ctx)
monkeypatch.setattr(hi, "_news_sentiment_map", lambda date: {})
monkeypatch.setattr(hi.db, "get_all_broker_cash", lambda: [{"broker": "kis", "cash": 500000}])
res = hi.compute_and_store(asof=ctx.asof, use_llm=False)
assert res["stored"] == 1
brief = hi.build_holdings_brief()
assert brief["holdings"][0]["ticker"] == "005930"
assert "portfolio_health" in brief
assert brief["holdings"][0]["action"] in ("add", "hold", "trim", "sell")
def test_compute_momentum_loss_flag(monkeypatch):
"""직전 시그널 tech_score HIGH → 오늘 LOW → momentum_loss=True."""
import os, tempfile
from app import db
monkeypatch.setattr(db, "DB_PATH", os.path.join(tempfile.mkdtemp(), "stock.db"))
db.init_db()
yesterday = (dt.date.today() - dt.timedelta(days=1)).isoformat()
today = dt.date.today()
today_iso = today.isoformat()
# 어제 시그널 삽입: tech_score=90 (HIGH)
db.upsert_holdings_signal(
date=yesterday, ticker="005930", name="삼성전자",
action="hold", tech_score=90.0, exit_flags={}, issues=[],
close=1000, pnl_rate=0.0, reasons="x",
)
monkeypatch.setattr(hi, "get_holdings", lambda: [
{"ticker": "005930", "name": "삼성전자", "quantity": 10, "avg_price": 1000,
"current_price": 1000, "pnl_rate": 0.0, "is_krx": True}
])
ctx = _toy_ctx(("005930",))
monkeypatch.setattr(hi, "_load_ctx", lambda asof: ctx)
monkeypatch.setattr(hi, "_news_sentiment_map", lambda date: {})
monkeypatch.setattr(hi.db, "get_all_broker_cash", lambda: [])
# tech_score=30 → 낮음 (momentum_low=35 미만, 또한 90-30=60 > momentum_drop=15)
monkeypatch.setattr(hi, "technical_posture", lambda ctx, tickers: {"005930": 30.0})
res = hi.compute_and_store(asof=today, use_llm=False)
assert res["stored"] == 1
signals = db.get_holdings_signals(today_iso)
assert len(signals) == 1
assert signals[0]["exit_flags"]["momentum_loss"] is True
def test_compute_idempotent(monkeypatch):
"""동일 입력으로 compute_and_store 두 번 실행 → upsert로 1건만 저장."""
import os, tempfile
from app import db
monkeypatch.setattr(db, "DB_PATH", os.path.join(tempfile.mkdtemp(), "stock.db"))
db.init_db()
monkeypatch.setattr(hi, "get_holdings", lambda: [
{"ticker": "005930", "name": "삼성전자", "quantity": 10, "avg_price": 1000,
"current_price": 1100, "pnl_rate": 10.0, "is_krx": True}
])
ctx = _toy_ctx(("005930",))
monkeypatch.setattr(hi, "_load_ctx", lambda asof: ctx)
monkeypatch.setattr(hi, "_news_sentiment_map", lambda date: {})
monkeypatch.setattr(hi.db, "get_all_broker_cash", lambda: [])
hi.compute_and_store(asof=ctx.asof, use_llm=False)
hi.compute_and_store(asof=ctx.asof, use_llm=False)
signals = db.get_holdings_signals(ctx.asof.isoformat())
assert len(signals) == 1, f"upsert 실패: {len(signals)}건 저장됨"
def test_compute_non_krx_holding(monkeypatch):
"""is_krx=False 종목은 tech_score=None·action='hold'로 저장된다."""
import os, tempfile
from app import db
monkeypatch.setattr(db, "DB_PATH", os.path.join(tempfile.mkdtemp(), "stock.db"))
db.init_db()
monkeypatch.setattr(hi, "get_holdings", lambda: [
{"ticker": "AAPL", "name": "Apple", "quantity": 5, "avg_price": 200,
"current_price": 220, "pnl_rate": 10.0, "is_krx": False}
])
ctx = _toy_ctx(()) # ticker 없는 빈 ctx
monkeypatch.setattr(hi, "_load_ctx", lambda asof: ctx)
monkeypatch.setattr(hi, "_news_sentiment_map", lambda date: {})
monkeypatch.setattr(hi.db, "get_all_broker_cash", lambda: [])
res = hi.compute_and_store(asof=ctx.asof, use_llm=False)
assert res["stored"] == 1
signals = db.get_holdings_signals(ctx.asof.isoformat())
assert len(signals) == 1
sig = signals[0]
assert sig["ticker"] == "AAPL"
assert sig["tech_score"] is None
assert sig["action"] == "hold"