28 Commits

Author SHA1 Message Date
1e4c1b42b7 fix(insta-lab): 프롬프트 템플릿 GET이 미저장 시 코드 기본값 반환
slate_writer/category_seeds가 DB에 없으면 404 대신 생성 파이프라인이
실제 폴백하는 코드 기본값(card_writer.DEFAULT_PROMPT,
DEFAULT_CATEGORY_SEEDS)을 is_default=true로 반환. 편집 UI가 마스터
프롬프트를 표시·수정 가능. 미지정 이름은 여전히 404. 테스트 4건.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 02:50:33 +09:00
0190a6c206 feat(agent-office): 인스타 큐레이터 후보를 중복 제거 + 신뢰도 0.7+ 필터
_dedup_and_filter_keywords: score>=0.7만 남기고 동일 keyword 중복 제거
(최고 score 유지) 후 내림차순. _push_keyword_candidates가 이 필터를 거쳐
"확실한 것만" 전송, 후보 없으면 안내 메시지. 헬퍼 테스트 5건.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 02:50:33 +09:00
6ef4160da2 fix(stock): AI 뉴스 호재/악재 명확히 구분
(1) 부호 게이트: top_pos는 score>0, top_neg는 score<0만 분류해 양수(호재)
종목이 악재란에 채워지는 문제 제거. 중립(0)은 양쪽 모두 제외.
(2) 프롬프트: reason을 score 부호와 같은 방향 근거만 쓰도록 명시 —
호재 평가에 악재 내용, 악재 평가에 호재 내용 혼입 금지.
부호 게이트 회귀 테스트 2건 추가.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 02:50:18 +09:00
078c9f008a fix(agent-office): /agents/{id}/tasks response에 tasks/items 양쪽 키 유지 (backward compat) 2026-05-23 02:12:50 +09:00
918151bda8 feat(agent-office): GET /agents/{id}/tasks에 task_type/days 필터 추가 2026-05-23 02:11:28 +09:00
2ce6721c35 fix(tests): fresh_db fixture가 매 test마다 db.DB_PATH 재패치 (cross-file isolation)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 02:08:01 +09:00
c5303151c0 feat(lotto-agent): sync_evolver_activity 매일 09:30 cron + 멱등 가드 + 3 테스트
- LottoAgent.sync_evolver_activity(): lotto-lab evolver status polling → agent_office.db task+log 미러링
- UTC 날짜 기준 멱등 guard (get_tasks_by_agent_date_kind 활용)
- 일요일(dow=6) → 5 clamp (lotto-lab trials는 0~5)
- 월요일 6-trial 완성 시 evolver_generate task 추가 생성
- scheduler.py: lotto_evolver_activity_sync cron 09:30 등록
- tests: creates_apply_task / idempotent / no_picks_no_task 3종

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 02:06:30 +09:00
ee61405ff1 feat(lotto-agent): run_weekly_evolution_report task_id wrap
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 01:59:56 +09:00
fef5f7a835 feat(lotto-agent): run_daily_digest task_id wrap
daily_digest에 create_task/update_task_status/add_log task_id wrap 적용.
test_run_daily_digest_creates_task 추가 (75 passed).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 01:57:40 +09:00
e47ccdb762 feat(lotto-agent): run_signal_check task_id wrap + 단위 테스트
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 01:55:20 +09:00
4b6996b0f7 feat(lotto-agent): get_agent_tasks 필터 + get_tasks_by_agent_date_kind 멱등 guard
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 01:52:05 +09:00
0f65aa53e4 docs(plan): Lotto Evolver UI + 활동 가시화 구현 plan (12 tasks)
Why: spec (2026-05-23-lotto-evolver-ui-design.md)을 12개 atomic task로
분해. Phase 1-2 web-backend (task_id wrap + sync cron + API 확장),
Phase 3 web-ui (Evolver 페이지 + 5 컴포넌트 + 라우터), Phase 4 배포 검증.
TDD red→green→commit + 멱등 guard 패턴.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 01:45:47 +09:00
ea3485cde6 docs(spec): Lotto Evolver UI + 에이전트 활동 가시화 (v2.1)
Why: v2 텔레그램 메시지의 /lotto/evolver 링크가 404 → 페이지 신설.
+ LottoAgent 활동(signal/digest/evolution/curate)이 agent_tasks에
누락된 거 보강. 모든 활동을 한 timeline에서 추적 가능.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 01:31:56 +09:00
d6366a38f3 fix(stock): 원달러 환율 등락 방향 판별 수정
네이버 환율 HTML에 .blind span이 "미국 USD"/"원"/"상승" 3개라
select_one(".blind")이 첫 번째 "미국 USD"를 잡아 방향 추출 실패 →
direction="" + 부호 없는 change_value → 프론트가 항상 상승으로 표시.
해외 지수와 동일하게 .head_info의 point_up/point_dn 클래스로 판별,
직속 .blind 텍스트(상승/하락)를 fallback으로 사용.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 01:17:12 +09:00
0f8c71c552 fix(lotto-evolver): previous_base diff + 일요일 cron skip + idempotent evaluate
- weight_evolver.evaluate_weekly: save_base_history 직전에 current_base를
  previous_base로 캡처해 return dict에 포함 → formatter가 진짜 diff 표시 가능
- evaluate_weekly: same effective_from row 이미 존재 시 save skip + idempotent
  return (토 22:00 lotto cron과 agent-office 22:15 재호출 중복 row 방지)
- main._run_weight_evolver_daily: 일요일(weekday=6) 도 skip — 토요일 trial을
  INSERT OR REPLACE로 덮어쓰는 문제 방지
- telegram_lotto._format_evolution_report: eval_result.previous_base 우선
  사용 (없으면 current_base 폴백) → diff 자기 자신 비교 버그 수정
- test_lotto_evolution_format: previous_base 키 추가 + 새 diff 검증 테스트

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 03:35:20 +09:00
1401c5703d docs(CLAUDE): lotto-lab weight_evolver API/스케줄러/테이블 추가
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 03:27:41 +09:00
92329f6fd5 feat(lotto-evolver): LottoAgent.run_weekly_evolution_report + 토 22:15 cron
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 03:24:18 +09:00
d0047c2b9d feat(lotto-evolver): 텔레그램 주간 evolution report 포맷 + 발송
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 03:21:23 +09:00
088944499c feat(lotto-evolver): service_proxy.lotto_evolver_status/evaluate helpers 2026-05-22 03:17:50 +09:00
a9fdbf8a93 feat(weight-evolver): evolver API 5종 (status/history/trials/generate-now/evaluate-now)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 03:15:57 +09:00
f46851d481 feat(weight-evolver): cron 3종 등록 (월 generate+apply / 일 apply / 토 evaluate)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 03:14:23 +09:00
11b3700959 feat(weight-evolver): run_simulation이 active W를 score_combination에 전달 2026-05-22 03:12:24 +09:00
1db8a0063d fix(weight-evolver): draws 테이블 컬럼명 n1..n6 사용 (drw_num1..6 X) + datetime import 정렬
evaluate_weekly()에서 당첨번호 참조 시 존재하지 않는 drw_num1..6 컬럼을
실제 테이블 컬럼명 n1..n6으로 수정. datetime/timedelta/timezone import를
파일 중간(line 128)에서 상단 stdlib imports 섹션으로 이동.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 03:11:37 +09:00
f017a61c79 feat(weight-evolver): DB 통합 진입점 (generate_weekly/apply_today/evaluate_weekly)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 03:08:56 +09:00
1694823129 feat(analyzer): score_combination에 weights 파라미터 추가 (None=기존 fixed)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 03:06:26 +09:00
a4614ebeae feat(weight-evolver): lotto.db에 weight_trials/auto_picks/weight_base_history + CRUD 2026-05-22 03:03:51 +09:00
875e750f77 feat(weight-evolver): 순수 함수 (clamp/perturb/Dirichlet/score/base-rule)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 02:59:38 +09:00
9cb40fb4e5 test(weight-evolver): 순수 함수 + base update rule 단위 테스트 2026-05-22 02:56:06 +09:00
28 changed files with 3724 additions and 97 deletions

View File

@@ -164,10 +164,16 @@ docker compose up -d
| `lotto_briefings` | AI 큐레이터 주간 브리핑 (5세트 + 내러티브 + 토큰·비용 집계) |
| `todos` | 투두리스트 (UUID PK) — personal 서비스로 이전됨, 레거시 테이블 유지 |
| `blog_posts` | 블로그 글 (tags: JSON 배열) — personal 서비스로 이전됨, 레거시 테이블 유지 |
| `weight_trials` | 주별 6일치 후보 가중치 (4 perturb + 2 dirichlet) |
| `auto_picks` | 매일 N=5 시도 번호 + 채점 결과 |
| `weight_base_history` | base 갱신 이력 (winner_4plus / ema_blend / unchanged / cold_start) |
**스케줄러 job**
- 09:10 / 21:10 매일 — 당첨번호 동기화 + 채점 (`sync_latest``check_results_for_draw`)
- 00:05, 04:05, 08:05, 12:05, 16:05, 20:05 — 몬테카를로 시뮬레이션 (20,000후보 → 상위100 → best_picks 20개 교체)
- 월요일 09:00 — weight_evolver_weekly (6개 후보 생성 + 그날 N=5 추출)
- 매일 09:00 — weight_evolver_daily (월요일 제외, 오늘 W로 N=5 추출)
- 토요일 22:00 — weight_evolver_eval (회고 + 다음주 base 갱신)
**lotto-lab API 목록**
@@ -204,6 +210,11 @@ docker compose up -d
| GET | `/api/lotto/briefing/latest` | 최신 브리핑 |
| GET | `/api/lotto/briefing/{draw_no}` | 특정 회차 브리핑 |
| GET | `/api/lotto/briefing` | 브리핑 이력 |
| GET | `/api/lotto/evolver/status` | weight_evolver 이번주 trials + current_base + 진행 상황 |
| GET | `/api/lotto/evolver/history?weeks=12` | base 변경 이력 |
| GET | `/api/lotto/evolver/trials/{week_start}` | 특정 주 6 trials + 채점 결과 |
| POST | `/api/lotto/evolver/generate-now` | 수동 트리거 — 이번주 후보 생성 |
| POST | `/api/lotto/evolver/evaluate-now` | 수동 회고 + 다음주 base 갱신 |
### stock (stock/)
- Windows AI 서버 연동: `WINDOWS_AI_SERVER_URL=http://192.168.45.59:8000`
@@ -586,6 +597,7 @@ docker compose up -d
- 매 4시간 :15 — 로또 sim_check (00/04/08/12/16/20시)
- 일/수 21:15 — 로또 deep_check (큐레이션 후 confidence 포함 평가)
- 09:25 매일 — 로또 daily_digest (지난 24h 발화 텔레그램 1통)
- 토요일 22:15 — 로또 weight_evolver 주간 텔레그램 리포트
**RealestateAgent (`agents/realestate.py`)**
- 진입점: `on_new_matches(matches: list[dict]) -> {sent, sent_ids, message_id}`

View File

@@ -18,6 +18,26 @@ from ..telegram import messaging
logger = logging.getLogger(__name__)
# 텔레그램 후보 푸시 시 "확실한 것만" 보내기 위한 최소 신뢰도 (키워드 score 0~1)
KEYWORD_MIN_SCORE = 0.7
def _dedup_and_filter_keywords(
keywords: List[Dict[str, Any]], min_score: float = KEYWORD_MIN_SCORE,
) -> List[Dict[str, Any]]:
"""score >= min_score 인 키워드만 남기고, 동일 keyword 중복 제거(최고 score 유지).
결과는 score 내림차순. 텔레그램 후보 푸시 전 정리용."""
best: Dict[str, Dict[str, Any]] = {}
for k in keywords:
if float(k.get("score", 0)) < min_score:
continue
name = str(k.get("keyword", "")).strip()
if not name:
continue
if name not in best or k["score"] > best[name]["score"]:
best[name] = k
return sorted(best.values(), key=lambda k: -k["score"])
async def _send_media_group(media: List[Dict[str, Any]], caption: str = "") -> Dict[str, Any]:
"""텔레그램 sendMediaGroup. media는 InputMediaPhoto dicts.
@@ -89,14 +109,18 @@ class InstaAgent(BaseAgent):
raise TimeoutError(f"{step} timeout {timeout_sec}s")
async def _push_keyword_candidates(self, keywords: List[Dict[str, Any]]) -> None:
by_cat: Dict[str, List[Dict[str, Any]]] = {}
for k in keywords:
by_cat.setdefault(k["category"], []).append(k)
if not by_cat:
await messaging.send_raw("📰 [인스타 큐레이터] 오늘은 추천 키워드가 없습니다.")
# 중복 제거 + 신뢰도(score) 임계값 이상만 — "확실한 것만" 정리해서 전송
filtered = _dedup_and_filter_keywords(keywords)
if not filtered:
await messaging.send_raw(
f"📰 [인스타 큐레이터] 오늘은 확실한 추천 키워드가 없습니다 (신뢰도 {KEYWORD_MIN_SCORE:.1f}+ 기준)."
)
return
by_cat: Dict[str, List[Dict[str, Any]]] = {}
for k in filtered:
by_cat.setdefault(k["category"], []).append(k)
rows: List[List[Dict[str, Any]]] = []
text_lines = ["📰 <b>[인스타 큐레이터]</b> 오늘의 키워드 후보"]
text_lines = [f"📰 <b>[인스타 큐레이터]</b> 오늘의 키워드 후보 (신뢰도 {KEYWORD_MIN_SCORE:.1f}+)"]
for cat, items in by_cat.items():
text_lines.append(f"\n<b>{cat}</b>")
for k in items[:5]:

View File

@@ -28,30 +28,32 @@ class LottoAgent(BaseAgent):
pass
async def run_signal_check(self, source: str = "light") -> dict:
"""비-LLM 시그널 평가 (light/sim) 또는 deep_check (LLM 호출 후).
Phase 3 (Task 9): urgent 시그널 텔레그램 발송 + throttle/daily-cap 추가.
"""
"""비-LLM 시그널 평가. task_id wrap 적용."""
from ..curator.signal_runner import run_signal_check
from ..config import LOTTO_Z_NORMAL, LOTTO_Z_URGENT
from ..db import add_log
from ..config import (
LOTTO_Z_NORMAL, LOTTO_Z_URGENT,
LOTTO_THROTTLE_HOURS, LOTTO_URGENT_DAILY_MAX,
)
from ..db import (
create_task, update_task_status, add_log,
get_last_signal_notification, get_recent_urgent_count,
mark_signal_notified,
)
from ..notifiers.telegram_lotto import send_urgent_signal
from ..service_proxy import lotto_latest_draw
if self.state not in ("idle", "reporting"):
return {"ok": False, "message": f"busy ({self.state})"}
task_id = create_task("lotto", "signal_check", {"source": source})
try:
curate_result = None
# 회차 단위 메트릭(drift/confidence) 가드를 위해 항상 최신 회차 가져옴
from ..service_proxy import lotto_latest_draw
current_draw_no = await lotto_latest_draw()
if source == "deep":
from ..curator.pipeline import curate_weekly
cw = await curate_weekly(source="signal_deep")
# curate_weekly returns {"ok", "draw_no", "confidence", "tokens", "payload"}
curate_result = {"confidence": cw.get("confidence")}
# deep_check 시 curate_weekly가 반환하는 draw_no를 우선 사용 (직접 수집)
if cw.get("draw_no"):
current_draw_no = cw.get("draw_no")
@@ -62,35 +64,19 @@ class LottoAgent(BaseAgent):
curate_result=curate_result,
current_draw_no=current_draw_no,
)
add_log(
self.agent_id,
f"signal_check({source}) → overall={outcome['overall_fire']} results={len(outcome['results'])}",
)
# --- Throttle + 텔레그램 urgent 발송 ---
from ..config import LOTTO_THROTTLE_HOURS, LOTTO_URGENT_DAILY_MAX
from ..db import (
get_last_signal_notification, get_recent_urgent_count,
mark_signal_notified,
)
from ..notifiers.telegram_lotto import send_urgent_signal
# urgent 텔레그램 + throttle (기존 동작 유지)
if outcome["overall_fire"] == "urgent":
if get_recent_urgent_count(hours=24) >= LOTTO_URGENT_DAILY_MAX:
add_log(
self.agent_id,
"urgent daily cap 도달 → normal로 강등 (digest 합류)",
level="warning",
)
add_log("lotto", "urgent daily cap 도달 → normal로 강등", level="warning", task_id=task_id)
else:
blocked = False
for r in outcome["results"]:
if r["fire_level"] in ("normal", "urgent"):
last = get_last_signal_notification(
if get_last_signal_notification(
metric=r["metric"], fire_level=r["fire_level"],
hours=LOTTO_THROTTLE_HOURS,
)
if last:
):
blocked = True
break
if not blocked:
@@ -104,48 +90,150 @@ class LottoAgent(BaseAgent):
for r in outcome["results"]:
if r["fire_level"] in ("normal", "urgent"):
mark_signal_notified(r["signal_id"])
add_log(self.agent_id, f"urgent 텔레그램 발송 완료 (시그널 {len(outcome['results'])}마킹)")
add_log("lotto", f"urgent 텔레그램 발송 ({len(outcome['results'])}시그널)", task_id=task_id)
fired_metrics = [
r["metric"] for r in outcome["results"]
if r["fire_level"] not in ("noop", "warmup")
]
update_task_status(task_id, "succeeded", result_data={
"source": source,
"overall_fire": outcome["overall_fire"],
"n_results": len(outcome["results"]),
"fired_metrics": fired_metrics,
})
add_log("lotto", f"signal_check({source}) → {outcome['overall_fire']} results={len(outcome['results'])}", task_id=task_id)
return {"ok": True, **outcome}
except Exception as e:
add_log(self.agent_id, f"signal_check 예외: {e}", level="error")
update_task_status(task_id, "failed", result_data={"error": str(e)})
add_log("lotto", f"signal_check 예외: {e}", level="error", task_id=task_id)
return {"ok": False, "message": f"{type(e).__name__}: {e}"}
async def run_daily_digest(self) -> dict:
"""일일 요약 — 지난 24h normal/urgent 발화를 묶어 텔레그램 1통."""
"""일일 요약 — 지난 24h normal/urgent 발화 텔레그램 1통. task_id wrap."""
from ..db import (
get_recent_lotto_signals, get_signals_history, add_log,
get_baseline,
create_task, update_task_status, add_log,
get_recent_lotto_signals, get_signals_history, get_baseline,
)
from ..notifiers.telegram_lotto import send_signal_summary
sigs = get_recent_lotto_signals(hours=24, min_fire="normal")
total_24h = get_signals_history(days=1)
evaluated = len(total_24h)
# weights_trend: drift_weights_cache의 prev/curr 차이
trend = {}
task_id = create_task("lotto", "daily_digest", {})
try:
cache = get_baseline("drift_weights_cache")
if cache and isinstance(cache["window_values"], list) and len(cache["window_values"]) >= 2:
prev_w = cache["window_values"][-2]
curr_w = cache["window_values"][-1]
trend = {
k: curr_w.get(k, 0.0) - prev_w.get(k, 0.0)
for k in (set(prev_w) | set(curr_w))
}
except Exception as e:
add_log(self.agent_id, f"weights_trend 계산 실패: {e}", level="warning")
sigs = get_recent_lotto_signals(hours=24, min_fire="normal")
total_24h = get_signals_history(days=1)
evaluated = len(total_24h)
digest = {
"evaluated": evaluated,
"fired": len(sigs),
"signals": sigs,
"weights_trend": trend,
}
await send_signal_summary(digest)
add_log(self.agent_id, f"daily_digest 발송: 평가 {evaluated} / 발화 {len(sigs)}")
return {"ok": True, **digest}
trend = {}
try:
cache = get_baseline("drift_weights_cache")
if cache and isinstance(cache["window_values"], list) and len(cache["window_values"]) >= 2:
prev_w = cache["window_values"][-2]
curr_w = cache["window_values"][-1]
trend = {
k: curr_w.get(k, 0.0) - prev_w.get(k, 0.0)
for k in (set(prev_w) | set(curr_w))
}
except Exception as e:
add_log("lotto", f"weights_trend 계산 실패: {e}", level="warning", task_id=task_id)
digest = {
"evaluated": evaluated,
"fired": len(sigs),
"signals": sigs,
"weights_trend": trend,
}
await send_signal_summary(digest)
update_task_status(task_id, "succeeded", result_data={
"evaluated": evaluated,
"fired": len(sigs),
"signals_count": len(sigs),
})
add_log("lotto", f"daily_digest 발송: 평가 {evaluated} / 발화 {len(sigs)}", task_id=task_id)
return {"ok": True, **digest}
except Exception as e:
update_task_status(task_id, "failed", result_data={"error": str(e)})
add_log("lotto", f"daily_digest 예외: {e}", level="error", task_id=task_id)
return {"ok": False, "message": f"{type(e).__name__}: {e}"}
async def run_weekly_evolution_report(self) -> dict:
"""토 22:15 — lotto-lab evaluate-now 트리거 후 텔레그램 리포트. task_id wrap."""
from ..service_proxy import lotto_evolver_evaluate, lotto_evolver_status
from ..notifiers.telegram_lotto import send_evolution_report
from ..db import create_task, update_task_status, add_log
task_id = create_task("lotto", "weekly_evolution_report", {})
try:
eval_result = await lotto_evolver_evaluate()
status = await lotto_evolver_status()
current_base = status.get("current_base") or [0.2] * 5
await send_evolution_report(eval_result, current_base)
winner = eval_result.get("winner") or {}
update_task_status(task_id, "succeeded", result_data={
"draw_no": eval_result.get("draw_no"),
"update_reason": eval_result.get("update_reason"),
"winner_day_of_week": winner.get("day_of_week"),
"winner_max_correct": winner.get("max_correct"),
})
add_log("lotto", f"weekly_evolution_report 발송: draw={eval_result.get('draw_no')} reason={eval_result.get('update_reason')}", task_id=task_id)
return {"ok": True, **eval_result}
except Exception as e:
update_task_status(task_id, "failed", result_data={"error": str(e)})
add_log("lotto", f"weekly_evolution_report 예외: {e}", level="error", task_id=task_id)
return {"ok": False, "message": f"{type(e).__name__}: {e}"}
async def sync_evolver_activity(self) -> dict:
"""매일 09:30 — lotto-lab evolver 상태 polling → agent_office.db에 task+log 거울. 멱등."""
from datetime import datetime, timezone, timedelta
from ..service_proxy import lotto_evolver_status
from ..db import (
create_task, update_task_status, add_log,
get_tasks_by_agent_date_kind,
)
KST = timezone(timedelta(hours=9))
today_kst = datetime.now(KST).date()
# created_at은 UTC로 저장되므로 idempotency guard는 UTC 날짜 기준
today_utc_iso = datetime.now(timezone.utc).date().isoformat()
dow = today_kst.weekday()
if dow == 6:
dow = 5
try:
status = await lotto_evolver_status()
except Exception as e:
add_log("lotto", f"sync_evolver_activity: lotto-lab status fetch 실패: {e}", level="warning")
return {"ok": False, "reason": "status_fetch_failed", "error": str(e)}
results = {"created": []}
today_trial = next((t for t in status.get("trials", []) if t.get("day_of_week") == dow), None)
if today_trial and today_trial.get("picks"):
if not get_tasks_by_agent_date_kind("lotto", today_utc_iso, "evolver_apply"):
tid = create_task("lotto", "evolver_apply", {
"date": today_utc_iso,
"trial_id": today_trial["id"],
"day_of_week": dow,
"weight": today_trial["weight"],
})
update_task_status(tid, "succeeded", result_data={
"n_picks": len(today_trial["picks"]),
"meta_scores": [p.get("meta_score") for p in today_trial["picks"]],
})
add_log("lotto", f"evolver_apply: 오늘({dow}) W로 {len(today_trial['picks'])}세트 추출", task_id=tid)
results["created"].append("evolver_apply")
if today_kst.weekday() == 0 and len(status.get("trials", [])) == 6:
if not get_tasks_by_agent_date_kind("lotto", today_utc_iso, "evolver_generate"):
tid = create_task("lotto", "evolver_generate", {"week_start": status.get("week_start")})
update_task_status(tid, "succeeded", result_data={
"trials_count": 6,
"candidates_per_source": {"perturb": 4, "dirichlet": 2},
})
add_log("lotto", f"evolver_generate: {status.get('week_start')} 주의 6 trials 생성", task_id=tid)
results["created"].append("evolver_generate")
return {"ok": True, **results}
async def _run(self, source: str) -> dict:
task_id = create_task(self.agent_id, "curate_weekly", {"source": source})

View File

@@ -236,12 +236,24 @@ def get_task(task_id: str) -> Optional[Dict[str, Any]]:
return _task_to_dict(r) if r else None
def get_agent_tasks(agent_id: str, limit: int = 20) -> List[Dict[str, Any]]:
def get_agent_tasks(
agent_id: str,
limit: int = 20,
task_type: Optional[str] = None,
days: Optional[int] = None,
) -> List[Dict[str, Any]]:
sql = "SELECT * FROM agent_tasks WHERE agent_id=?"
params: List[Any] = [agent_id]
if task_type is not None:
sql += " AND task_type=?"
params.append(task_type)
if days is not None and days > 0:
sql += " AND created_at >= datetime('now', ?)"
params.append(f"-{int(days)} days")
sql += " ORDER BY created_at DESC LIMIT ?"
params.append(limit)
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM agent_tasks WHERE agent_id=? ORDER BY created_at DESC LIMIT ?",
(agent_id, limit),
).fetchall()
rows = conn.execute(sql, params).fetchall()
return [_task_to_dict(r) for r in rows]
@@ -739,3 +751,18 @@ def get_all_baselines() -> List[Dict[str, Any]]:
d["window_values"] = json.loads(d["window_values"])
out.append(d)
return out
def get_tasks_by_agent_date_kind(agent_id: str, date_iso: str, task_type: str) -> List[Dict[str, Any]]:
"""같은 (agent, date, task_type)으로 이미 생성된 task 조회. 멱등 guard."""
with _conn() as conn:
rows = conn.execute(
"""
SELECT * FROM agent_tasks
WHERE agent_id = ? AND task_type = ?
AND substr(created_at, 1, 10) = ?
ORDER BY created_at DESC
""",
(agent_id, task_type, date_iso),
).fetchall()
return [_task_to_dict(r) for r in rows]

View File

@@ -1,5 +1,6 @@
import os
import json
from typing import Optional
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
@@ -104,8 +105,15 @@ def update_agent(agent_id: str, body: AgentConfigUpdate):
return {"ok": True}
@app.get("/api/agent-office/agents/{agent_id}/tasks")
def agent_tasks(agent_id: str, limit: int = 20):
return {"tasks": get_agent_tasks(agent_id, limit)}
def agent_tasks(
agent_id: str,
limit: int = 20,
task_type: Optional[str] = None,
days: Optional[int] = None,
):
tasks_list = get_agent_tasks(agent_id, limit=limit, task_type=task_type, days=days)
# Backward compat: 기존 client는 'tasks', 신규 client는 'items' 사용
return {"tasks": tasks_list, "items": tasks_list}
@app.get("/api/agent-office/agents/{agent_id}/logs")
def agent_logs(agent_id: str, limit: int = 50):

View File

@@ -1,6 +1,6 @@
"""로또 큐레이션·당첨 알림 — 텔레그램 푸시."""
import logging
from typing import Dict, Any
from typing import Dict, Any, List
# 기존 에이전트들과 동일한 패턴: send_raw(text, reply_markup=None, chat_id=None)
# chat_id 생략 시 기본 TELEGRAM_CHAT_ID로 자동 발송.
@@ -159,3 +159,69 @@ async def send_signal_summary(digest: Dict[str, Any]) -> None:
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_lotto] digest send failed: {e}")
# ---------- Weight Evolver 주간 리포트 ----------
_DAY_NAMES = ["", "", "", "", "", ""]
_METRIC_NAMES = ["freq", "finger", "gap", "cooccur", "divers"]
_REASON_LABEL = {
"winner_4plus": "4개 이상 일치 → base 교체",
"ema_blend": "3개 일치 → EMA blend (0.3)",
"unchanged": "유효 성과 없음 → base 유지",
"cold_start": "초기 균등 적용",
}
def _format_evolution_report(eval_result: Dict[str, Any], current_base: List[float]) -> str:
"""주간 weight evolution 텔레그램 메시지. ok=False 또는 winner 없으면 빈 문자열."""
if not eval_result or "winner" not in eval_result:
return ""
draw_no = eval_result.get("draw_no", "?")
winner = eval_result["winner"]
new_base = eval_result.get("new_base") or [0.0] * 5
reason = eval_result.get("update_reason", "")
dow = winner.get("day_of_week", 0)
day_name = _DAY_NAMES[dow] if 0 <= dow < len(_DAY_NAMES) else "?"
lines = [
f"🧬 로또 학습 주간 리포트 ({draw_no}회차)",
"",
f"이번주 시도: 6일 × {winner.get('n_picks', 5)}세트",
"",
f"🏆 Winner: {day_name}요일",
f" W = [" + ", ".join(
f"{name} {w:.2f}" for name, w in zip(_METRIC_NAMES, winner["weight"])
) + "]",
f" 최고 적중: {winner.get('max_correct', 0)}개 일치 (max={winner.get('max_correct', 0)})",
f" 평균 점수: {winner.get('avg_score', 0):.2f}",
"",
f"📊 다음주 base 변경 ({reason}):",
]
# 우선순위: eval_result.previous_base > current_base (eval 직후 stale) > 균등 fallback
base_now = eval_result.get("previous_base") or current_base or [0.2] * 5
for i, (cur, new) in enumerate(zip(base_now, new_base)):
diff = new - cur
if abs(diff) < 0.005:
marker = "="
elif diff > 0:
marker = "+" if diff < 0.05 else "++"
else:
marker = "-" if diff > -0.05 else "--"
lines.append(f" {_METRIC_NAMES[i]:8s} {cur:.2f}{new:.2f} ({marker})")
lines.append("")
lines.append(f"{_REASON_LABEL.get(reason, reason)}")
lines.append("")
lines.append(f"[웹에서 차트 보기] ({LOTTO_URL}/evolver)")
return "\n".join(lines)
async def send_evolution_report(eval_result: Dict[str, Any], current_base: List[float]) -> None:
text = _format_evolution_report(eval_result, current_base)
if not text:
return
try:
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_lotto] evolution report send failed: {e}")

View File

@@ -56,6 +56,16 @@ async def _run_lotto_daily_digest():
if agent:
await agent.run_daily_digest()
async def _run_lotto_weekly_evolution_report():
agent = AGENT_REGISTRY.get("lotto")
if agent:
await agent.run_weekly_evolution_report()
async def _run_lotto_sync_evolver_activity():
agent = AGENT_REGISTRY.get("lotto")
if agent:
await agent.sync_evolver_activity()
async def _run_youtube_research():
agent = AGENT_REGISTRY.get("youtube")
if agent:
@@ -90,13 +100,20 @@ def init_scheduler():
id="stock_ai_news_sentiment",
)
scheduler.add_job(_run_insta_schedule, "cron", hour=9, minute=30, id="insta_pipeline")
# 09:00 cron 스태거링 — Celeron 2C/2.0GHz에서 동시 실행 시 CPU 폭주 (CHECK_POINT FU-A)
scheduler.add_job(_run_insta_trends_collect, "cron", hour=9, minute=0, id="insta_trends_collect")
# 외부 트렌드 수집은 장 마감 후 16:40 — 9시 주식 활발 시간대 NAS 자원 회피.
# screener(16:30)와 10분 스태거: Celeron 2C/2.0GHz 동시 실행 시 CPU 폭주 방지 (CHECK_POINT FU-A)
scheduler.add_job(_run_insta_trends_collect, "cron", hour=16, minute=40, id="insta_trends_collect")
scheduler.add_job(_run_lotto_schedule, "cron", day_of_week="mon", hour=9, minute=5, id="lotto_curate")
scheduler.add_job(_run_lotto_light_check, "cron", hour=9, minute=15, id="lotto_light_check")
scheduler.add_job(_run_lotto_sim_check, "cron", minute=15, hour="0,4,8,12,16,20", id="lotto_sim_check")
scheduler.add_job(_run_lotto_deep_check, "cron", day_of_week="sun,wed", hour=21, minute=15, id="lotto_deep_check")
scheduler.add_job(_run_lotto_daily_digest, "cron", hour=9, minute=25, id="lotto_digest")
scheduler.add_job(_run_lotto_weekly_evolution_report, "cron", day_of_week="sat", hour=22, minute=15, id="lotto_evolution_weekly")
scheduler.add_job(
_run_lotto_sync_evolver_activity,
"cron", hour=9, minute=30,
id="lotto_evolver_activity_sync",
)
scheduler.add_job(_run_youtube_research, "cron", hour=9, minute=10, id="youtube_research")
scheduler.add_job(_send_youtube_weekly_report, "cron", day_of_week="mon", hour=8, minute=0, id="youtube_weekly_report")
scheduler.add_job(_poll_pipelines, "interval", seconds=30, id="pipeline_poll")

View File

@@ -377,3 +377,20 @@ async def lotto_latest_draw() -> Optional[int]:
return None
except Exception:
return None
async def lotto_evolver_status() -> Dict[str, Any]:
"""GET /api/lotto/evolver/status — 이번주 trials + 다음주 base 정보."""
from .config import LOTTO_BACKEND_URL
resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/evolver/status")
resp.raise_for_status()
return resp.json()
async def lotto_evolver_evaluate() -> Dict[str, Any]:
"""POST /api/lotto/evolver/evaluate-now — 회고 트리거 (텔레그램 리포트용)."""
from .config import LOTTO_BACKEND_URL
async with httpx.AsyncClient(timeout=60.0) as client:
resp = await client.post(f"{LOTTO_BACKEND_URL}/api/lotto/evolver/evaluate-now")
resp.raise_for_status()
return resp.json()

View File

@@ -0,0 +1,55 @@
import os
import sys
import tempfile
_fd, _TMP = tempfile.mkstemp(suffix=".db")
os.close(_fd)
os.unlink(_TMP)
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from app.agents.insta import _dedup_and_filter_keywords, KEYWORD_MIN_SCORE
def test_filters_below_threshold():
"""score < 임계값(0.7) 키워드는 제외."""
kws = [
{"id": 1, "keyword": "금리인하", "category": "경제", "score": 0.9},
{"id": 2, "keyword": "환율", "category": "경제", "score": 0.6}, # 컷
{"id": 3, "keyword": "반도체", "category": "경제", "score": 0.71},
]
out = _dedup_and_filter_keywords(kws, min_score=0.7)
kept = {k["keyword"] for k in out}
assert kept == {"금리인하", "반도체"}
def test_dedup_keeps_highest_score():
"""동일 keyword 중복 시 최고 score 1개만 유지."""
kws = [
{"id": 1, "keyword": "AI", "category": "경제", "score": 0.75},
{"id": 2, "keyword": "AI", "category": "기술", "score": 0.92}, # 같은 키워드, 더 높음
]
out = _dedup_and_filter_keywords(kws, min_score=0.7)
assert len(out) == 1
assert out[0]["id"] == 2
assert out[0]["score"] == 0.92
def test_sorted_by_score_desc():
kws = [
{"id": 1, "keyword": "a", "category": "c", "score": 0.72},
{"id": 2, "keyword": "b", "category": "c", "score": 0.95},
{"id": 3, "keyword": "c", "category": "c", "score": 0.80},
]
out = _dedup_and_filter_keywords(kws, min_score=0.7)
assert [k["keyword"] for k in out] == ["b", "c", "a"]
def test_empty_when_all_below_threshold():
kws = [{"id": 1, "keyword": "x", "category": "c", "score": 0.4}]
assert _dedup_and_filter_keywords(kws, min_score=0.7) == []
def test_default_threshold_is_0_7():
assert KEYWORD_MIN_SCORE == 0.7

View File

@@ -0,0 +1,87 @@
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from app.notifiers.telegram_lotto import _format_evolution_report
def test_evolution_report_winner_4plus():
eval_result = {
"ok": True,
"draw_no": 1225,
"week_start": "2026-05-18",
"winner": {
"day_of_week": 3,
"weight": [0.18, 0.32, 0.20, 0.22, 0.08],
"avg_score": 0.42,
"max_correct": 4,
"n_picks": 5,
},
"new_base": [0.18, 0.32, 0.20, 0.22, 0.08],
"previous_base": [0.20, 0.20, 0.20, 0.20, 0.20],
"update_reason": "winner_4plus",
"per_day": [
{"day_of_week": 0, "avg_score": 0.20, "max_correct": 2},
{"day_of_week": 3, "avg_score": 0.42, "max_correct": 4},
],
}
current_base = [0.20, 0.20, 0.20, 0.20, 0.20]
text = _format_evolution_report(eval_result, current_base)
assert "🧬" in text
assert "1225" in text
assert "목요일" in text or "Winner" in text
assert "4개 일치" in text or "max=4" in text
assert "winner_4plus" in text
def test_evolution_report_unchanged():
eval_result = {
"ok": True,
"draw_no": 1226,
"week_start": "2026-05-25",
"winner": {
"day_of_week": 1,
"weight": [0.21, 0.19, 0.20, 0.20, 0.20],
"avg_score": 0.10,
"max_correct": 2,
"n_picks": 5,
},
"new_base": [0.20, 0.20, 0.20, 0.20, 0.20],
"update_reason": "unchanged",
"per_day": [],
}
current_base = [0.20, 0.20, 0.20, 0.20, 0.20]
text = _format_evolution_report(eval_result, current_base)
assert "unchanged" in text or "유지" in text
assert "2개 일치" in text or "max=2" in text
def test_evolution_report_empty_returns_empty():
"""evaluate가 ok=False면 빈 문자열 (발송 skip)."""
text = _format_evolution_report({"ok": False, "reason": "no_trials"}, [0.2]*5)
assert text == ""
def test_evolution_report_uses_previous_base_for_diff():
"""previous_base와 new_base 차이가 메시지 diff에 정확히 반영됨."""
eval_result = {
"ok": True,
"draw_no": 1227,
"winner": {
"day_of_week": 0,
"weight": [0.30, 0.20, 0.20, 0.20, 0.10],
"avg_score": 0.50,
"max_correct": 4,
"n_picks": 5,
},
"new_base": [0.30, 0.20, 0.20, 0.20, 0.10],
"previous_base": [0.20, 0.20, 0.20, 0.20, 0.20],
"update_reason": "winner_4plus",
}
# current_base는 stale (post-update 값) — previous_base가 우선 적용되어야 함
text = _format_evolution_report(eval_result, [0.30, 0.20, 0.20, 0.20, 0.10])
# freq: 0.20 → 0.30 (+0.10 = "++")
# divers: 0.20 → 0.10 (-0.10 = "--")
assert "0.20 → 0.30" in text # freq 증가
assert "0.20 → 0.10" in text # divers 감소
assert "(++)" in text or "(+)" in text # freq marker
assert "(--)" in text or "(-)" in text # divers marker

View File

@@ -0,0 +1,154 @@
# agent-office/tests/test_lotto_task_wrap.py
import os
import sys
import tempfile
import gc
_fd, _TMP = tempfile.mkstemp(suffix=".db")
os.close(_fd)
os.unlink(_TMP)
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import pytest
from app import db
db.DB_PATH = _TMP
@pytest.fixture(autouse=True)
def fresh_db():
# Re-patch DB_PATH at the start of every test (cross-file isolation)
db.DB_PATH = _TMP
gc.collect()
if os.path.exists(_TMP):
os.remove(_TMP)
db.init_db()
yield
gc.collect()
if os.path.exists(_TMP):
try:
os.remove(_TMP)
except PermissionError:
pass
@pytest.mark.asyncio
async def test_run_signal_check_creates_task_row(monkeypatch):
"""run_signal_check이 agent_tasks에 row를 만들고 result_data를 저장."""
from app.agents.lotto import LottoAgent
from app.curator import signal_runner
async def fake_run_signal_check(**kwargs):
return {
"overall_fire": "normal",
"results": [
{"signal_id": 1, "metric": "sim_signal",
"value": 0.6, "z_score": 1.7, "fire_level": "normal",
"baseline_mu": 0.5, "baseline_sigma": 0.05, "payload": {}},
],
}
monkeypatch.setattr(signal_runner, "run_signal_check", fake_run_signal_check)
from app import service_proxy
async def fake_latest():
return 1226
monkeypatch.setattr(service_proxy, "lotto_latest_draw", fake_latest)
from app.notifiers import telegram_lotto
async def fake_send(_event): pass
monkeypatch.setattr(telegram_lotto, "send_urgent_signal", fake_send)
agent = LottoAgent()
result = await agent.run_signal_check(source="light")
assert result["ok"] is True
tasks = db.get_agent_tasks("lotto", task_type="signal_check", days=1)
assert len(tasks) == 1
t = tasks[0]
assert t["status"] == "succeeded"
assert t["result_data"]["source"] == "light"
assert t["result_data"]["overall_fire"] == "normal"
assert "sim_signal" in t["result_data"]["fired_metrics"]
@pytest.mark.asyncio
async def test_run_signal_check_failure_marks_task_failed(monkeypatch):
from app.agents.lotto import LottoAgent
from app.curator import signal_runner
from app import service_proxy
async def boom(**kwargs):
raise RuntimeError("boom")
monkeypatch.setattr(signal_runner, "run_signal_check", boom)
async def fake_latest():
return 1226
monkeypatch.setattr(service_proxy, "lotto_latest_draw", fake_latest)
agent = LottoAgent()
result = await agent.run_signal_check(source="sim")
assert result["ok"] is False
tasks = db.get_agent_tasks("lotto", task_type="signal_check", days=1)
assert len(tasks) == 1
assert tasks[0]["status"] == "failed"
assert "boom" in tasks[0]["result_data"]["error"]
@pytest.mark.asyncio
async def test_run_daily_digest_creates_task(monkeypatch):
"""run_daily_digest이 agent_tasks에 task 생성 + result_data 저장."""
from app.agents.lotto import LottoAgent
from app.notifiers import telegram_lotto
async def fake_send(_d): pass
monkeypatch.setattr(telegram_lotto, "send_signal_summary", fake_send)
agent = LottoAgent()
result = await agent.run_daily_digest()
assert result["ok"] is True
tasks = db.get_agent_tasks("lotto", task_type="daily_digest", days=1)
assert len(tasks) == 1
assert tasks[0]["status"] == "succeeded"
assert "fired" in tasks[0]["result_data"]
assert "evaluated" in tasks[0]["result_data"]
@pytest.mark.asyncio
async def test_run_weekly_evolution_report_creates_task(monkeypatch):
"""run_weekly_evolution_report이 task 생성 + result_data 저장."""
from app.agents.lotto import LottoAgent
from app import service_proxy
from app.notifiers import telegram_lotto
async def fake_eval():
return {
"ok": True, "draw_no": 1225,
"winner": {"day_of_week": 3, "weight": [0.18, 0.32, 0.20, 0.22, 0.08],
"avg_score": 0.42, "max_correct": 4, "n_picks": 5},
"new_base": [0.18, 0.32, 0.20, 0.22, 0.08],
"previous_base": [0.2] * 5,
"update_reason": "winner_4plus",
}
async def fake_status():
return {"current_base": [0.2] * 5}
async def fake_send(_e, _b): pass
monkeypatch.setattr(service_proxy, "lotto_evolver_evaluate", fake_eval)
monkeypatch.setattr(service_proxy, "lotto_evolver_status", fake_status)
monkeypatch.setattr(telegram_lotto, "send_evolution_report", fake_send)
agent = LottoAgent()
result = await agent.run_weekly_evolution_report()
assert result["ok"] is True
tasks = db.get_agent_tasks("lotto", task_type="weekly_evolution_report", days=1)
assert len(tasks) == 1
r = tasks[0]["result_data"]
assert tasks[0]["status"] == "succeeded"
assert r["draw_no"] == 1225
assert r["update_reason"] == "winner_4plus"
assert r["winner_day_of_week"] == 3
assert r["winner_max_correct"] == 4

View File

@@ -0,0 +1,123 @@
# agent-office/tests/test_sync_evolver_activity.py
import os
import sys
import tempfile
import gc
from datetime import datetime, timezone, timedelta
_fd, _TMP = tempfile.mkstemp(suffix=".db")
os.close(_fd)
os.unlink(_TMP)
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import pytest
from app import db
db.DB_PATH = _TMP
@pytest.fixture(autouse=True)
def fresh_db():
# Re-patch DB_PATH at the start of every test (cross-file isolation)
db.DB_PATH = _TMP
gc.collect()
if os.path.exists(_TMP):
os.remove(_TMP)
db.init_db()
yield
gc.collect()
if os.path.exists(_TMP):
try:
os.remove(_TMP)
except PermissionError:
pass
def _today_dow_clamped():
"""오늘의 weekday() (일요일=6은 5로 clamp)."""
KST = timezone(timedelta(hours=9))
dow = datetime.now(KST).weekday()
return 5 if dow == 6 else dow
def _fake_status_with_picks(dow_with_picks):
async def fake():
return {
"week_start": "2026-05-18",
"current_base": [0.2] * 5,
"trials": [
{
"id": 100 + i,
"day_of_week": i,
"weight": [0.2] * 5,
"source": "perturb",
"picks": ([
{"id": j, "numbers": [1,2,3,4,5,6], "meta_score": 0.5}
for j in range(5)
] if i == dow_with_picks else []),
}
for i in range(6)
],
}
return fake
@pytest.mark.asyncio
async def test_sync_evolver_activity_creates_apply_task(monkeypatch):
"""오늘 trial에 picks가 있으면 evolver_apply task 1개 생성."""
from app.agents.lotto import LottoAgent
from app import service_proxy
dow = _today_dow_clamped()
monkeypatch.setattr(service_proxy, "lotto_evolver_status", _fake_status_with_picks(dow))
agent = LottoAgent()
await agent.sync_evolver_activity()
apply_tasks = db.get_agent_tasks("lotto", task_type="evolver_apply", days=1)
assert len(apply_tasks) == 1
assert apply_tasks[0]["result_data"]["n_picks"] == 5
assert apply_tasks[0]["input_data"]["day_of_week"] == dow
@pytest.mark.asyncio
async def test_sync_evolver_activity_idempotent(monkeypatch):
"""같은 날 두 번 호출해도 task는 1개만 (멱등)."""
from app.agents.lotto import LottoAgent
from app import service_proxy
dow = _today_dow_clamped()
monkeypatch.setattr(service_proxy, "lotto_evolver_status", _fake_status_with_picks(dow))
agent = LottoAgent()
await agent.sync_evolver_activity()
await agent.sync_evolver_activity()
apply_tasks = db.get_agent_tasks("lotto", task_type="evolver_apply", days=1)
assert len(apply_tasks) == 1
@pytest.mark.asyncio
async def test_sync_evolver_activity_no_picks_no_task(monkeypatch):
"""오늘 trial에 picks가 없으면 task 생성하지 않음."""
from app.agents.lotto import LottoAgent
from app import service_proxy
async def fake_status():
return {
"week_start": "2026-05-18",
"current_base": [0.2] * 5,
"trials": [
{"id": 100 + i, "day_of_week": i, "weight": [0.2]*5,
"source": "perturb", "picks": []}
for i in range(6)
],
}
monkeypatch.setattr(service_proxy, "lotto_evolver_status", fake_status)
agent = LottoAgent()
await agent.sync_evolver_activity()
apply_tasks = db.get_agent_tasks("lotto", task_type="evolver_apply", days=1)
assert len(apply_tasks) == 0

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,368 @@
# Lotto Evolver UI + 에이전트 활동 가시화 설계 (v2.1)
- **상태**: Draft (사용자 리뷰 대기)
- **작성일**: 2026-05-23
- **대상 저장소**:
- `web-ui` (프론트엔드) — `/lotto/evolver` 페이지 신설 + 공용 활동 컴포넌트
- `web-backend` agent-office — LottoAgent task_id 도입 + sync_evolver_activity cron
- **선행 작업**: v2 Lotto Weight Evolver (2026-05-22 배포, 운영 중)
- **목표**: 토요일 22:15 텔레그램 리포트의 "[웹에서 차트 보기]" 링크가 가리키는 페이지 구축 + 로또 에이전트의 모든 활동(시그널·digest·큐레이션·evolver)을 한 곳에서 추적 가능하게.
---
## 1. 문제 정의
v2 텔레그램 메시지가 `https://gahusb.synology.me/lotto/evolver` 링크를 포함하지만 web-ui repo에 해당 라우트가 없음 → React Router catch-all 404. spec section 13에서 "프론트 UI는 별도 PR"로 명시했지만 링크는 미리 박혀있음 → UX 깨짐.
또한 LottoAgent의 활동(signals / digest / weekly_evolution_report / curate)이 agent_office.db의 `agent_logs`에는 기록되지만 `agent_tasks` 테이블에는 **`curate_weekly`만** 들어감 → agent-office UI에서 "Tasks" 섹션 봤을 때 활동 이력이 누락. lotto-lab의 weight_evolver cron(매일 apply / 월 generate / 토 evaluate)은 lotto.db에만 기록 → agent_office에서 완전히 안 보임.
사용자 의도: "로또 에이전트가 무엇을 했는지" 한 곳에서 확인 가능하게.
## 2. 의사결정 요약
| 결정 사항 | 선택 | 비고 |
|---|---|---|
| 라우트 위치 | 별도 `/lotto/evolver` (텔레그램 링크와 일치) | `/stock/trade`, `/stock/screener` 패턴 따름 |
| 사용 시나리오 | 토 22:15 텔레그램 직후 주간 요약 대시보드 | 평일 운영·장기 분석은 부차 |
| 페이지 구조 | 단일 스크롤, 5개 카드 (Header / Winner / TrialsGrid / BaseDiff / BaseHistory / Actions) | sub-tab 불필요 |
| 차트 | Recharts (이미 dep) — Radar / Bar / Line + 인라인 metric-card | small multiples 대신 텍스트 강조 |
| 활동 노출 위치 | `/lotto/evolver` + `/agent-office` 양쪽 (공용 컴포넌트) | DRY |
| 백엔드 보강 | 기존 add_log만 있던 LottoAgent 메서드에 task_id 도입 + 신규 sync_evolver_activity cron | 멱등 guard 포함 |
## 3. 아키텍처
### 3.1 컴포넌트 다이어그램
```
┌─────────────────────────────────────────────────────────────┐
│ web-ui (신규 컴포넌트) │
│ │
│ src/pages/lotto/ │
│ Evolver.jsx ← /lotto/evolver 진입점 │
│ Evolver.css │
│ evolver/ │
│ WinnerCard.jsx ← Radar (5축) + 메타 │
│ TrialsGrid.jsx ← 6일 Bar 비교 + 펼치기 │
│ BaseDiff.jsx ← 5 metric-card (텍스트+arrow)│
│ BaseHistory.jsx ← LineChart 12주 시계열 │
│ EvolverActions.jsx ← 수동 트리거 (dev) │
│ useEvolverApi.js ← status+history+activity hook│
│ │
│ src/components/lotto/ │
│ LottoActivityTimeline.jsx ← 공용 활동 timeline │
│ /lotto/evolver + /agent-office│
└─────────────────────────────────────────────────────────────┘
↓ (HTTP)
┌─────────────────────────────────────────────────────────────┐
│ web-backend (보강) │
│ │
│ agent-office/app/agents/lotto.py │
│ • run_signal_check → task_id 도입 (신규) │
│ • run_daily_digest → task_id 도입 (신규) │
│ • run_weekly_evolution_report → task_id 도입 (신규) │
│ • sync_evolver_activity → 신규 메서드 │
│ │
│ agent-office/app/scheduler.py │
│ • lotto_evolver_activity_sync — 매일 09:30 cron 신규 │
│ │
│ agent-office/app/db.py │
│ • get_tasks_by_agent_date_kind — 멱등 guard helper 신규 │
│ │
│ agent-office/app/main.py │
│ • GET /agents/{id}/tasks에 task_type 필터 추가 (확장) │
│ │
│ lotto-lab: 변경 없음 (web-ui가 evolver API 직접 소비) │
└─────────────────────────────────────────────────────────────┘
```
### 3.2 책임 경계
- **web-ui Evolver 페이지**: 데이터 시각화 전담. 비즈니스 로직 없음. fetch는 useEvolverApi에 집중.
- **LottoActivityTimeline**: 시간순 timeline 표현만. logs/tasks/evolverEvents 3종 입력 받아 merge sort + 렌더.
- **LottoAgent**: 모든 자율 작업 시 task row 생성 (다른 에이전트와 동일 패턴).
- **sync_evolver_activity**: lotto-lab의 결과를 agent_office.db에 거울 비추기. 백엔드 polling 패턴. 멱등.
- **lotto-lab**: 변경 없음. 모든 evolver API는 web-ui가 직접 호출.
## 4. 페이지 정보 layout
```
┌─────────────────────────────────────────────────────────────┐
│ HEADER │
│ Lotto · Weight Evolver │
│ "스스로 가중치를 조절하는 자율 학습 루프" │
│ 마지막 회고: 1225회 (2026-05-21 22:00) │
├─────────────────────────────────────────────────────────────┤
│ ① WinnerCard (대형, 메인) │
│ 🏆 목요일 · W_4 · max=4개 일치 │
│ ┌─ Radar Chart (5축) ──┐ │
│ │ freq, finger, gap, │ │
│ │ cooccur, divers │ │
│ └──────────────────────┘ │
│ avg_score · n_picks graded · update reason │
├─────────────────────────────────────────────────────────────┤
│ ② TrialsGrid │
│ 월 화 수 목⭐ 금 토 (가로 6개 Bar) │
│ ░░ ▓▓ ░░ ██ ▒▒ ░░ │
│ max=2 1 3 4 2 1 │
│ 클릭 → 그날 5세트 numbers + scores 펼침 │
├─────────────────────────────────────────────────────────────┤
│ ③ BaseDiff │
│ 5개 metric-card 가로 정렬 │
│ freq 0.20 → 0.18 ↓ -10% │
│ finger 0.20 → 0.32 ↑↑ +60% │
│ gap 0.20 → 0.20 = (변화 없음) │
│ cooccur 0.20 → 0.22 ↑ +10% │
│ divers 0.20 → 0.08 ↓↓ -60% │
│ → reason: winner_4plus │
├─────────────────────────────────────────────────────────────┤
│ ④ BaseHistory (12주) │
│ LineChart 5 라인 (freq/finger/gap/cooccur/divers) │
│ X축: effective_from, Y축: weight 0~1 │
│ dot click → reason tooltip + 회차 표시 │
├─────────────────────────────────────────────────────────────┤
│ ⑤ LottoActivityTimeline (compact=false) │
│ 최근 7일 — task + log + lotto-lab evolver 이벤트 merge │
│ 2026-05-23 22:15 🧬 weekly_evolution_report succeeded │
│ 2026-05-23 22:00 ⚖️ weight_evolver_eval (lotto-lab) │
│ 2026-05-23 21:15 🔍 deep_check succeeded │
│ ... │
├─────────────────────────────────────────────────────────────┤
│ ⑥ EvolverActions (개발자 모드) │
│ [수동 generate-now] [수동 evaluate-now] │
│ 응답 JSON 콘솔에 표시 │
└─────────────────────────────────────────────────────────────┘
```
### 4.1 모바일 반응형
- ≤640px: 1 컬럼, 차트는 가로폭 100%
- 641-1024px: WinnerCard·TrialsGrid 가로 분할 (50/50)
- ≥1025px: 위 layout 그대로
## 5. 데이터 흐름
### 5.1 useEvolverApi hook
```js
function useEvolverApi({ days = 7, weeks = 12 } = {}) {
// 4개 fetch 동시 — Promise.all
// 1. GET /api/lotto/evolver/status → status
// 2. GET /api/lotto/evolver/history?weeks=12 → history
// 3. GET /api/agent-office/agents/lotto/logs?days=7 → logs
// 4. GET /api/agent-office/agents/lotto/tasks?days=7 → tasks
//
// activity = merge(logs, tasks, evolverEventsFromHistory) sorted by timestamp DESC
return { status, history, activity, loading, error, refetch };
}
```
`activity` 합성 규칙:
- agent_logs의 created_at + level + message + task_id
- agent_tasks의 created_at + task_type + status + result_data
- history.items의 created_at + update_reason + weight (evolver eval 자체 이벤트로 별도 표시)
- 클라이언트에서 timestamp DESC sort → React에서 렌더링
### 5.2 Recharts 매핑
| 컴포넌트 | 차트 | data prop |
|---|---|---|
| WinnerCard | `RadarChart` | `[{metric, value, previous}]` 5점 (overlay: previous_base) |
| TrialsGrid | `BarChart` 수평 6개 | `[{day_name, avg_score, max_correct, is_winner}]` |
| BaseHistory | `LineChart` | `[{effective_from, freq, finger, gap, cooccur, divers}, ...]` |
### 5.3 LottoActivityTimeline
```jsx
<LottoActivityTimeline
logs={agentLogs}
tasks={agentTasks}
evolverEvents={evolverEventsFromHistory}
days={7}
compact={false}
/>
```
merge & sort:
```js
const stream = [
...logs.map(l => ({ ts: l.created_at, kind: 'log', payload: l })),
...tasks.map(t => ({ ts: t.created_at, kind: 'task', payload: t })),
...evolverEvents.map(e => ({ ts: e.created_at, kind: 'evolver', payload: e })),
].sort((a, b) => b.ts.localeCompare(a.ts));
```
각 stream item:
- kind='task': 아이콘 + task_type label + status badge + (completed_at - created_at) 소요시간
- kind='log': 아이콘(level) + message
- kind='evolver': ⚖️ + update_reason + winner_score
icon · color mapping (task_type 기준):
```
curate_weekly 📋 blue
signal_check 🔍 green / fired면 amber
daily_digest 📊 cyan
weekly_evolution_report 🧬 purple
evolver_generate 🌱 teal
evolver_apply 🎲 gray
```
### 5.4 cold start / empty state
- `weight_base_history` empty → 큰 빈 카드: "아직 학습 시작 전. 다음 월요일 09:00 자동 시작" + `[수동 generate-now 트리거]` 버튼
- `trials` empty (월 09:00 전) → 안내 카드
- `activity` empty → 회색 "최근 활동 없음"
## 6. 백엔드 보강
### 6.1 LottoAgent 메서드 — task_id 도입
3개 메서드에 `_run` 패턴(`create_task` + try/except + `update_task_status` + `add_log(..., task_id=...)`) 적용:
| 메서드 | 새 task_type | result_data 핵심 |
|---|---|---|
| `run_signal_check(source)` | `signal_check` | source, overall_fire, n_results, fired_metrics |
| `run_daily_digest()` | `daily_digest` | evaluated, fired, signals_count |
| `run_weekly_evolution_report()` | `weekly_evolution_report` | draw_no, update_reason, winner_day |
기존 `_run`(`curate_weekly`)은 그대로.
### 6.2 sync_evolver_activity — 신규 메서드
매일 09:30 cron. lotto-lab의 today_trial 가져와 agent_office.db에 task+log 기록. 멱등 guard.
```python
async def sync_evolver_activity(self):
"""lotto-lab evolver 상태 polling → agent_office.db에 거울. 멱등."""
today_iso = _today_kst_iso()
dow = _today_dow()
status = await service_proxy.lotto_evolver_status()
# 오늘 trial + picks → evolver_apply task
today_trial = next((t for t in status["trials"] if t["day_of_week"] == dow), None)
if today_trial and today_trial.get("picks") and not db.get_tasks_by_agent_date_kind("lotto", today_iso, "evolver_apply"):
tid = db.create_task("lotto", "evolver_apply", {
"date": today_iso, "trial_id": today_trial["id"],
"day_of_week": dow, "weight": today_trial["weight"],
})
db.update_task_status(tid, "succeeded", result_data={
"n_picks": len(today_trial["picks"]),
"meta_scores": [p["meta_score"] for p in today_trial["picks"]],
})
db.add_log("lotto", f"evolver_apply: 오늘 W로 {len(today_trial['picks'])}세트 추출", task_id=tid)
# 월요일 + 6 trials 완성 → evolver_generate task
if dow == 0 and len(status["trials"]) == 6 and not db.get_tasks_by_agent_date_kind("lotto", today_iso, "evolver_generate"):
tid = db.create_task("lotto", "evolver_generate", {"week_start": status["week_start"]})
db.update_task_status(tid, "succeeded", result_data={"trials_count": 6})
db.add_log("lotto", f"evolver_generate: {status['week_start']} 주의 6 trials 생성", task_id=tid)
```
토요일 22:15 evaluate는 `run_weekly_evolution_report`가 이미 task 기록 → sync 불필요.
### 6.3 db.py — 신규 helper
```python
def get_tasks_by_agent_date_kind(agent_id: str, date_iso: str, task_type: str) -> List[Dict[str, Any]]:
"""같은 (agent, date, task_type)으로 이미 생성된 task 조회 — 멱등 guard."""
with _conn() as conn:
rows = conn.execute(
"""
SELECT * FROM agent_tasks
WHERE agent_id = ? AND task_type = ?
AND substr(created_at, 1, 10) = ?
ORDER BY created_at DESC
""",
(agent_id, task_type, date_iso),
).fetchall()
return [dict(r) for r in rows]
```
### 6.4 scheduler.py — cron 추가
```python
async def _run_lotto_sync_evolver_activity():
agent = AGENT_REGISTRY.get("lotto")
if agent:
await agent.sync_evolver_activity()
scheduler.add_job(
_run_lotto_sync_evolver_activity,
"cron", hour=9, minute=30,
id="lotto_evolver_activity_sync",
)
```
### 6.5 main.py — API 확장
`GET /api/agent-office/agents/{id}/tasks`에 query param 추가:
```python
@app.get("/api/agent-office/agents/{agent_id}/tasks")
async def get_agent_tasks(agent_id: str, days: int = 7, task_type: Optional[str] = None):
return {"items": db.get_agent_tasks(agent_id, days=days, task_type=task_type)}
```
`db.get_agent_tasks`도 task_type 필터 추가 (기존 함수 보강).
### 6.6 task_type 명세 (참조)
| task_type | 트리거 | 어디서 생성 |
|---|---|---|
| `curate_weekly` | 월 09:05 또는 deep_check | LottoAgent._run (기존) |
| `signal_check` | light / sim / deep cron | LottoAgent.run_signal_check (신규 wrap) |
| `daily_digest` | 매일 09:25 | LottoAgent.run_daily_digest (신규 wrap) |
| `weekly_evolution_report` | 토 22:15 | LottoAgent.run_weekly_evolution_report (신규 wrap) |
| `evolver_generate` | 월 09:30 sync | LottoAgent.sync_evolver_activity (신규) |
| `evolver_apply` | 매일 09:30 sync | LottoAgent.sync_evolver_activity (신규) |
## 7. 라우터 등록
`web-ui/src/routes.jsx`에 추가:
```jsx
const Evolver = lazy(() => import('./pages/lotto/Evolver'));
// appRoutes 배열에 추가:
{
path: 'lotto/evolver',
element: <Evolver />,
},
```
## 8. 구현 Phase
| Phase | 범위 | 검증 |
|---|---|---|
| 1 | agent-office 백엔드 보강 (LottoAgent task_id wrap + sync cron + db helper) + 단위 테스트 | task row 생성 확인, 멱등 가드 동작 |
| 2 | agent-office API 확장 (task_type 필터) | curl로 필터링 동작 확인 |
| 3 | web-ui Evolver 페이지 — useEvolverApi + WinnerCard + TrialsGrid + BaseDiff + BaseHistory + EvolverActions | 로컬 dev 브라우저에서 모든 카드 정상 렌더, 모바일 반응형 |
| 4 | LottoActivityTimeline 공용 컴포넌트 — /lotto/evolver에 통합 + /agent-office LottoAgent 카드에 compact 모드 통합 | 두 페이지에서 동일 데이터 보임 |
| 5 | 라우터 등록 + 텔레그램 링크 404 해결 확인 | `release:nas` → 텔레그램 [차트 보기] 클릭 → 정상 페이지 |
Phase 1-2: web-backend repo, Phase 3-5: web-ui repo. 각 repo는 별도 git, 별도 배포 (web-backend git push → Gitea webhook auto, web-ui `npm run release:nas`).
## 9. 비기능 요구
- **백워드 호환**: 기존 LottoAgent 호출자 (cron 등) 시그니처 변경 없음. 내부 task_id wrap만 추가.
- **장애 격리**: sync_evolver_activity 실패해도 lotto-lab 영향 없음. task_id wrap 실패 시 try/except로 메서드 자체는 계속 동작.
- **멱등성**: sync_evolver_activity는 멱등 guard로 cron 재실행·재시작 안전.
- **테스트**:
- LottoAgent task_id wrap — mock task_id 받아 update 호출 확인
- sync_evolver_activity 멱등 — 같은 날 2번 호출 시 1 row만
- LottoActivityTimeline merge sort — unit test로 stream 순서·아이콘 매핑
- **관측**: 모든 LottoAgent 메서드의 result_data 표준화 (Section 6.1 표 참조)
## 10. 비목표 (Out of scope)
- TrialsGrid에서 과거 주 deep dive 조회 (`GET /trials/{week_start}` 사용) — v2.2 후속, 별도 UI
- 차트 export / CSV 다운로드
- 가중치 수동 편집 UI — v3에서 사용자 개입 모드 도입 검토
- 다른 에이전트(stock / music / realestate)의 활동 통합 timeline — 현재 spec은 lotto만
- 실시간 WebSocket 푸시 (agent-office에 ws 있지만 evolver 활동은 polling으로 충분)
## 11. v3 후속 검토
- 다른 에이전트 활동도 같은 패턴(LottoActivityTimeline 제너릭화 → AgentActivityTimeline)으로 노출
- /lotto/evolver 페이지에 사용자 의견 입력 (이번 winner가 마음에 듦/싫음) → 학습 시그널로 활용
- BaseHistory에 brush 도입 (긴 history 시계열 zoom)
- TrialsGrid에 picks 채점 결과 통계 (몇 개 trial에서 4개 일치 났는지 등)

View File

@@ -271,12 +271,40 @@ class TemplateBody(BaseModel):
description: str = ""
def _default_prompt_templates() -> dict:
"""DB에 저장된 override가 없을 때 노출할 코드 기본값.
생성 파이프라인이 실제로 폴백하는 값과 동일한 단일 소스를 사용."""
return {
"slate_writer": {
"template": card_writer.DEFAULT_PROMPT,
"description": "카드 10페이지 카피 생성 마스터 프롬프트 (Claude Sonnet). "
"{category}/{keyword}/{articles} 치환자 필수.",
},
"category_seeds": {
"template": json.dumps(DEFAULT_CATEGORY_SEEDS, ensure_ascii=False, indent=2),
"description": "트렌드 수집·분류용 카테고리별 시드 키워드 (JSON). "
"최상위 키가 분류 라벨로도 쓰임.",
},
}
@app.get("/api/insta/templates/prompts/{name}")
def get_prompt(name: str):
pt = db.get_prompt_template(name)
if not pt:
raise HTTPException(404)
return pt
if pt:
return pt
# DB override 없음 → 코드 기본값 노출 (편집 UI가 마스터 프롬프트를 보고 수정 가능)
defaults = _default_prompt_templates()
if name in defaults:
d = defaults[name]
return {
"name": name,
"template": d["template"],
"description": d["description"],
"updated_at": None,
"is_default": True,
}
raise HTTPException(404)
@app.put("/api/insta/templates/prompts/{name}")

View File

@@ -0,0 +1,63 @@
import os
import gc
import json
import tempfile
import pytest
from fastapi.testclient import TestClient
from app import db as db_module
@pytest.fixture
def client(monkeypatch):
fd, path = tempfile.mkstemp(suffix=".db")
os.close(fd)
monkeypatch.setattr(db_module, "DB_PATH", path)
db_module.init_db()
from app import main
monkeypatch.setattr(main, "DB_PATH", path)
with TestClient(main.app) as c:
yield c
gc.collect()
for ext in ("", "-wal", "-shm"):
try:
os.remove(path + ext)
except OSError:
pass
def test_get_slate_writer_returns_default_when_unset(client):
"""DB에 없으면 코드 기본 마스터 프롬프트를 200으로 반환 (404 아님)."""
resp = client.get("/api/insta/templates/prompts/slate_writer")
assert resp.status_code == 200
body = resp.json()
assert body["is_default"] is True
assert "{keyword}" in body["template"]
assert "{category}" in body["template"]
def test_get_category_seeds_returns_default_when_unset(client):
"""category_seeds 기본값은 유효한 JSON (카테고리→시드 배열)."""
resp = client.get("/api/insta/templates/prompts/category_seeds")
assert resp.status_code == 200
body = resp.json()
assert body["is_default"] is True
seeds = json.loads(body["template"])
assert "economy" in seeds and isinstance(seeds["economy"], list)
def test_get_unknown_prompt_still_404(client):
resp = client.get("/api/insta/templates/prompts/does_not_exist")
assert resp.status_code == 404
def test_saved_template_overrides_default(client):
"""PUT로 저장하면 이후 GET은 저장본(is_default 없음)을 반환."""
client.put("/api/insta/templates/prompts/slate_writer",
json={"template": "내 커스텀 프롬프트", "description": "custom"})
resp = client.get("/api/insta/templates/prompts/slate_writer")
assert resp.status_code == 200
body = resp.json()
assert body["template"] == "내 커스텀 프롬프트"
assert not body.get("is_default")

View File

@@ -170,7 +170,11 @@ def build_number_weights(cache: Dict[str, Any]) -> Dict[int, float]:
return weights
def score_combination(numbers: List[int], cache: Dict[str, Any]) -> Dict[str, float]:
def score_combination(
numbers: List[int],
cache: Dict[str, Any],
weights: Optional[List[float]] = None,
) -> Dict[str, float]:
"""
6개 번호 조합의 통계적 품질 점수 계산 (0~1 범위 정규화).
@@ -181,6 +185,13 @@ def score_combination(numbers: List[int], cache: Dict[str, Any]) -> Dict[str, fl
- score_cooccur (15%): 공동 출현 기댓값 대비
- score_diversity (10%): 연속번호, 범위, 구간 다양성
Args:
numbers: 6개 번호 리스트
cache: build_analysis_cache() 반환 딕셔너리
weights: 5가지 기법별 가중치 리스트 [frequency, fingerprint, gap, cooccur, diversity].
None이면 기본값 [0.25, 0.30, 0.20, 0.15, 0.10] 사용.
길이가 5가 아니면 ValueError 발생.
Returns:
{"score_total": ..., "score_frequency": ..., ...}
"""
@@ -282,12 +293,16 @@ def score_combination(numbers: List[int], cache: Dict[str, Any]) -> Dict[str, fl
)
# ── 최종 가중 합산 ────────────────────────────────────────────────────────
if weights is None:
weights = [0.25, 0.30, 0.20, 0.15, 0.10]
if len(weights) != 5:
raise ValueError("weights must have 5 elements")
score_total = (
score_frequency * 0.25
+ score_fingerprint * 0.30
+ score_gap * 0.20
+ score_cooccur * 0.15
+ score_diversity * 0.10
score_frequency * weights[0]
+ score_fingerprint * weights[1]
+ score_gap * weights[2]
+ score_cooccur * weights[3]
+ score_diversity * weights[4]
)
return {

View File

@@ -300,7 +300,51 @@ def init_db() -> None:
_ensure_column(conn, "lotto_briefings", "tier_rationale",
"ALTER TABLE lotto_briefings ADD COLUMN tier_rationale TEXT NOT NULL DEFAULT '{}'")
# ── weight_trials / auto_picks / weight_base_history 테이블 ──────────
conn.execute("""
CREATE TABLE IF NOT EXISTS weight_trials (
id INTEGER PRIMARY KEY AUTOINCREMENT,
week_start TEXT NOT NULL,
day_of_week INTEGER NOT NULL,
weight_json TEXT NOT NULL,
source TEXT NOT NULL,
base_at_gen TEXT,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
UNIQUE(week_start, day_of_week)
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_wt_week
ON weight_trials(week_start, day_of_week)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS auto_picks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
trial_id INTEGER NOT NULL REFERENCES weight_trials(id) ON DELETE CASCADE,
pick_no INTEGER NOT NULL,
numbers TEXT NOT NULL,
meta_score REAL,
correct INTEGER,
rank INTEGER,
graded_at TEXT,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
UNIQUE(trial_id, pick_no)
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_ap_trial ON auto_picks(trial_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_ap_graded ON auto_picks(graded_at)")
conn.execute("""
CREATE TABLE IF NOT EXISTS weight_base_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
effective_from TEXT NOT NULL,
weight_json TEXT NOT NULL,
source_trial_id INTEGER REFERENCES weight_trials(id),
update_reason TEXT,
winner_score REAL,
winner_max_correct INTEGER,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
def upsert_draw(row: Dict[str, Any]) -> None:
@@ -1247,3 +1291,155 @@ def list_reviews(limit: int = 10) -> List[Dict[str, Any]]:
).fetchall()
return [_review_row(r) for r in rows]
# --- weight_trials / auto_picks / weight_base_history CRUD ---
def save_weight_trial(
week_start: str,
day_of_week: int,
weight: List[float],
source: str,
base_at_gen: Optional[List[float]] = None,
) -> int:
with _conn() as conn:
cur = conn.execute(
"""
INSERT INTO weight_trials (week_start, day_of_week, weight_json, source, base_at_gen)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(week_start, day_of_week) DO UPDATE SET
weight_json = excluded.weight_json,
source = excluded.source,
base_at_gen = excluded.base_at_gen
""",
(week_start, day_of_week, json.dumps(weight),
source, json.dumps(base_at_gen) if base_at_gen else None),
)
if cur.lastrowid:
return cur.lastrowid
row = conn.execute(
"SELECT id FROM weight_trials WHERE week_start=? AND day_of_week=?",
(week_start, day_of_week),
).fetchone()
return int(row["id"])
def get_weight_trial(week_start: str, day_of_week: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute(
"SELECT * FROM weight_trials WHERE week_start=? AND day_of_week=?",
(week_start, day_of_week),
).fetchone()
if not row:
return None
d = dict(row)
d["weight"] = json.loads(d.pop("weight_json"))
if d.get("base_at_gen"):
d["base_at_gen"] = json.loads(d["base_at_gen"])
return d
def get_weekly_trials(week_start: str) -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM weight_trials WHERE week_start=? ORDER BY day_of_week",
(week_start,),
).fetchall()
out = []
for r in rows:
d = dict(r)
d["weight"] = json.loads(d.pop("weight_json"))
if d.get("base_at_gen"):
d["base_at_gen"] = json.loads(d["base_at_gen"])
out.append(d)
return out
def save_auto_pick(
trial_id: int,
pick_no: int,
numbers: List[int],
meta_score: Optional[float] = None,
) -> int:
with _conn() as conn:
cur = conn.execute(
"""
INSERT OR REPLACE INTO auto_picks (trial_id, pick_no, numbers, meta_score)
VALUES (?, ?, ?, ?)
""",
(trial_id, pick_no, json.dumps(sorted(numbers)), meta_score),
)
return cur.lastrowid
def get_auto_picks(trial_id: int) -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM auto_picks WHERE trial_id=? ORDER BY pick_no",
(trial_id,),
).fetchall()
out = []
for r in rows:
d = dict(r)
d["numbers"] = json.loads(d["numbers"])
out.append(d)
return out
def update_auto_pick_grade(pick_id: int, correct: int, rank: Optional[int]) -> None:
with _conn() as conn:
conn.execute(
"""
UPDATE auto_picks
SET correct=?, rank=?, graded_at=strftime('%Y-%m-%dT%H:%M:%fZ','now')
WHERE id=?
""",
(correct, rank, pick_id),
)
def get_current_base() -> Optional[List[float]]:
"""weight_base_history 최신 row의 weight. 없으면 None (cold start)."""
with _conn() as conn:
row = conn.execute(
"SELECT weight_json FROM weight_base_history ORDER BY id DESC LIMIT 1",
).fetchone()
if not row:
return None
return json.loads(row["weight_json"])
def save_base_history(
effective_from: str,
weight: List[float],
source_trial_id: Optional[int],
update_reason: str,
winner_score: Optional[float],
winner_max_correct: Optional[int],
) -> int:
with _conn() as conn:
cur = conn.execute(
"""
INSERT INTO weight_base_history
(effective_from, weight_json, source_trial_id, update_reason,
winner_score, winner_max_correct)
VALUES (?, ?, ?, ?, ?, ?)
""",
(effective_from, json.dumps(weight), source_trial_id,
update_reason, winner_score, winner_max_correct),
)
return cur.lastrowid
def get_base_history(limit: int = 12) -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM weight_base_history ORDER BY id DESC LIMIT ?",
(limit,),
).fetchall()
out = []
for r in rows:
d = dict(r)
d["weight"] = json.loads(d.pop("weight_json"))
out.append(d)
return out

View File

@@ -25,6 +25,7 @@ from .db import (
)
from .analyzer import build_analysis_cache, build_number_weights, score_combination
from .utils import weighted_sample_6
from .weight_evolver import get_active_weight
def run_simulation(
@@ -54,6 +55,7 @@ def run_simulation(
# ── 1. 통계 캐시 및 가중치 구성 (시뮬레이션 전체에서 재사용) ────────────
cache = build_analysis_cache(draws)
weights = build_number_weights(cache)
active_weights = get_active_weight() # None → analyzer uses fixed default
# ── 2. 후보 생성 및 스코어링 ──────────────────────────────────────────────
candidates: List[Dict[str, Any]] = []
@@ -69,7 +71,7 @@ def run_simulation(
continue
seen_keys.add(key)
scores = score_combination(nums, cache)
scores = score_combination(nums, cache, weights=active_weights)
candidates.append({
"numbers": sorted(nums),
**scores,

View File

@@ -38,6 +38,11 @@ from .strategy_evolver import (
get_weights_with_trend, recalculate_weights,
generate_smart_recommendation,
)
from .weight_evolver import (
generate_weekly_candidates_and_save,
apply_today_and_pick,
evaluate_weekly,
)
from .routers import curator as curator_router
from .routers import briefing as briefing_router
from .routers import review as review_router
@@ -111,9 +116,42 @@ def on_startup():
id="grade_weekly_review",
)
scheduler.add_job(_run_weight_evolver_weekly, "cron", day_of_week="mon", hour=9, minute=0, id="weight_evolver_weekly")
scheduler.add_job(_run_weight_evolver_daily, "cron", hour=9, minute=0, id="weight_evolver_daily")
scheduler.add_job(_run_weight_evolver_eval, "cron", day_of_week="sat", hour=22, minute=0, id="weight_evolver_eval")
scheduler.start()
async def _run_weight_evolver_weekly():
"""월 09:00 — 6개 후보 생성 후 inline으로 apply_today도 호출."""
try:
generate_weekly_candidates_and_save()
apply_today_and_pick(n=5)
except Exception as e:
logger.error(f"[weight_evolver_weekly] {e}")
async def _run_weight_evolver_daily():
"""매일 09:00 (월/일 제외 — 월=weekly inline, 일=토 trial 보호)."""
try:
from datetime import datetime, timezone, timedelta
KST = timezone(timedelta(hours=9))
if datetime.now(KST).weekday() in (0, 6):
return
apply_today_and_pick(n=5)
except Exception as e:
logger.error(f"[weight_evolver_daily] {e}")
async def _run_weight_evolver_eval():
"""토 22:00 — 회고 + 다음주 base 갱신."""
try:
evaluate_weekly()
except Exception as e:
logger.error(f"[weight_evolver_eval] {e}")
@app.get("/health")
def health():
return {"ok": True}
@@ -383,6 +421,62 @@ def api_strategy_evolve():
return {"ok": True, "weights": new_weights}
# ── weight-evolver API ───────────────────────────────────────────────────────
@app.get("/api/lotto/evolver/status")
async def evolver_status():
"""현재 base + 이번주 trials + auto_picks 진행 상황."""
from .weight_evolver import get_week_start
from .db import get_current_base, get_weekly_trials, get_auto_picks, get_latest_draw
ws = get_week_start()
trials = get_weekly_trials(ws)
trials_with_picks = []
for t in trials:
picks = get_auto_picks(t["id"])
trials_with_picks.append({**t, "picks": picks})
latest = get_latest_draw()
return {
"week_start": ws,
"current_base": get_current_base(),
"trials": trials_with_picks,
"latest_draw": latest["drw_no"] if latest else None,
}
@app.get("/api/lotto/evolver/history")
async def evolver_history(weeks: int = 12):
"""weight_base_history 최근 N개."""
from .db import get_base_history
return {"items": get_base_history(limit=weeks)}
@app.get("/api/lotto/evolver/trials/{week_start}")
async def evolver_trials(week_start: str):
"""특정 주 6 trials + 채점 결과."""
from .db import get_weekly_trials, get_auto_picks
trials = get_weekly_trials(week_start)
out = []
for t in trials:
picks = get_auto_picks(t["id"])
out.append({**t, "picks": picks})
return {"week_start": week_start, "trials": out}
@app.post("/api/lotto/evolver/generate-now")
async def evolver_generate_now():
"""수동 트리거 — 이번주 후보 생성."""
from .weight_evolver import generate_weekly_candidates_and_save
candidates = generate_weekly_candidates_and_save()
return {"ok": True, "candidates_count": len(candidates), "candidates": candidates}
@app.post("/api/lotto/evolver/evaluate-now")
async def evolver_evaluate_now():
"""수동 회고 + 다음주 base 갱신."""
from .weight_evolver import evaluate_weekly
return evaluate_weekly()
# ── 스마트 추천 API ────────────────────────────────────────────────────────
@app.get("/api/lotto/recommend/smart")

View File

@@ -4,3 +4,4 @@ requests==2.32.3
httpx==0.27.2
beautifulsoup4==4.12.3
APScheduler==3.10.4
numpy>=1.26

314
lotto/app/weight_evolver.py Normal file
View File

@@ -0,0 +1,314 @@
# lotto/app/weight_evolver.py
"""5종 시뮬 점수 가중치 자율 학습 루프.
순수 함수 (clamp/perturb/Dirichlet/score/base-rule) + DB 진입점은 별도 섹션.
"""
from __future__ import annotations
import math
import random
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
MIN_WEIGHT = 0.05
N_METRICS = 5
DEFAULT_UNIFORM = [0.2] * N_METRICS # cold start
RANK_BY_CORRECT = {6: 1, 5: 3, 4: 4, 3: 5}
RANK_BONUS = {1: 1.0, 2: 0.8, 3: 0.6, 4: 0.3, 5: 0.1}
def clamp_and_normalize(W: List[float], min_w: float = MIN_WEIGHT) -> List[float]:
"""각 값 ≥ min_w + 합=1.0. 보장 안 되면 raise."""
if len(W) != N_METRICS:
raise ValueError(f"W must have {N_METRICS} elements")
# Iteratively clamp then normalize until all values satisfy min_w floor.
# (Normalizing after clamping can reduce some already-floored values below
# min_w when the denominator is large — iterate to convergence.)
vals = [float(w) for w in W]
for _ in range(100): # converges in a few iterations in practice
clamped = [max(min_w, v) for v in vals]
total = sum(clamped)
vals = [v / total for v in clamped]
if all(v >= min_w - 1e-12 for v in vals):
break
return vals
def perturb_weights(
base: List[float],
sigma: float = 0.05,
seed: Optional[int] = None,
) -> List[float]:
"""base에 정규분포 noise(σ) 추가 → clamp+normalize."""
if seed is not None:
np.random.seed(seed)
noise = np.random.normal(0, sigma, size=N_METRICS)
perturbed = [b + n for b, n in zip(base, noise)]
return clamp_and_normalize(perturbed)
def dirichlet_weights(
alpha: float = 2.0,
seed: Optional[int] = None,
) -> List[float]:
"""Dirichlet(α, α, α, α, α) 샘플 → clamp+normalize."""
if seed is not None:
np.random.seed(seed)
sample = np.random.dirichlet([alpha] * N_METRICS).tolist()
return clamp_and_normalize(sample)
def generate_weekly_candidates(
base: Optional[List[float]] = None,
seed: Optional[int] = None,
) -> List[Dict[str, Any]]:
"""6개 후보 — 4 perturb + 2 dirichlet. day_of_week 0..5 매핑.
Returns:
[{"day_of_week": 0, "weight": [...], "source": "perturb"}, ...]
"""
if base is None:
base = DEFAULT_UNIFORM[:]
if seed is not None:
np.random.seed(seed)
trials = []
for i in range(4):
trials.append({
"day_of_week": i,
"weight": perturb_weights(base, sigma=0.05),
"source": "perturb",
})
for i in range(4, 6):
trials.append({
"day_of_week": i,
"weight": dirichlet_weights(alpha=2.0),
"source": "dirichlet",
})
return trials
def count_match(pick: List[int], winning: List[int]) -> int:
"""본번호 6개 일치 개수. 보너스 제외."""
return len(set(pick) & set(winning[:6]))
def calc_pick_score(pick_numbers: List[int], winning_numbers: List[int]) -> float:
"""correct/6 + RANK_BONUS. 보너스 번호 미고려."""
correct = count_match(pick_numbers, winning_numbers)
base = correct / 6.0
rank = RANK_BY_CORRECT.get(correct)
bonus = RANK_BONUS.get(rank, 0) if rank else 0
return base + bonus
def decide_base_update(
winner_max_correct: int,
winner_W: List[float],
current_base: Optional[List[float]],
) -> Tuple[List[float], str]:
"""Hybrid base update rule.
Returns:
(new_base, reason) — reason ∈ {'winner_4plus','ema_blend','unchanged','cold_start'}
"""
if winner_max_correct >= 4:
return list(winner_W), "winner_4plus"
if winner_max_correct == 3 and current_base is not None:
blended = [0.3 * w + 0.7 * c for w, c in zip(winner_W, current_base)]
return clamp_and_normalize(blended), "ema_blend"
if current_base is None:
return DEFAULT_UNIFORM[:], "cold_start"
return list(current_base), "unchanged"
# ---------- DB-touching entry points ----------
KST = timezone(timedelta(hours=9))
def _db():
from . import db as _db_mod
return _db_mod
def _today_kst():
return datetime.now(KST).date()
def get_week_start(d=None) -> str:
"""주어진 날짜의 월요일 ISO 'YYYY-MM-DD'."""
if d is None:
d = _today_kst()
ws = d - timedelta(days=d.weekday())
return ws.isoformat()
def get_active_weight() -> Optional[List[float]]:
"""오늘 적용 중인 W. 없으면 None (균등 폴백)."""
today = _today_kst()
week_start = get_week_start(today)
dow = today.weekday()
if dow == 6:
dow = 5 # 일요일은 토요일 W 유지
trial = _db().get_weight_trial(week_start, dow)
if trial:
return trial["weight"]
return None
def generate_weekly_candidates_and_save(seed: Optional[int] = None) -> List[Dict[str, Any]]:
"""월요일 09:00 cron 진입점. 6 trials 생성 후 DB 저장."""
db = _db()
base = db.get_current_base()
if base is None:
base = DEFAULT_UNIFORM[:]
db.save_base_history(
effective_from=get_week_start(),
weight=base,
source_trial_id=None,
update_reason="cold_start",
winner_score=None,
winner_max_correct=None,
)
candidates = generate_weekly_candidates(base, seed=seed)
week_start = get_week_start()
for c in candidates:
db.save_weight_trial(
week_start=week_start,
day_of_week=c["day_of_week"],
weight=c["weight"],
source=c["source"],
base_at_gen=base,
)
return candidates
def apply_today_and_pick(n: int = 5) -> Dict[str, Any]:
"""매일 09:00 cron 진입점. 오늘 W로 N=5 세트 추출 후 auto_picks 저장."""
db = _db()
from . import analyzer, recommender
today = _today_kst()
week_start = get_week_start(today)
dow = min(today.weekday(), 5)
trial = db.get_weight_trial(week_start, dow)
if trial is None:
return {"ok": False, "reason": "no_trial_for_today"}
W = trial["weight"]
draws = db.get_all_draw_numbers()
cache = analyzer.build_analysis_cache(draws)
picks_saved = []
for i in range(1, n + 1):
try:
r = recommender.recommend_numbers(draws)
nums = r["numbers"]
s = analyzer.score_combination(nums, cache, weights=W)
pid = db.save_auto_pick(trial["id"], i, nums, meta_score=s["score_total"])
picks_saved.append({"id": pid, "numbers": nums, "score": s["score_total"]})
except Exception:
continue
return {
"ok": True,
"trial_id": trial["id"],
"weight": W,
"picks": picks_saved,
}
def evaluate_weekly() -> Dict[str, Any]:
"""토 22:00 cron 진입점. 6일 trials × N picks 채점 + base 갱신."""
db = _db()
today = _today_kst()
week_start = get_week_start(today)
trials = db.get_weekly_trials(week_start)
if not trials:
return {"ok": False, "reason": "no_trials"}
latest = db.get_latest_draw()
if latest is None:
return {"ok": False, "reason": "no_latest_draw"}
winning = [
latest["n1"], latest["n2"], latest["n3"],
latest["n4"], latest["n5"], latest["n6"],
]
per_day = []
for trial in trials:
picks = db.get_auto_picks(trial["id"])
if not picks:
continue
day_scores = []
max_c = 0
for p in picks:
correct = count_match(p["numbers"], winning)
rank = RANK_BY_CORRECT.get(correct)
db.update_auto_pick_grade(p["id"], correct, rank)
day_scores.append(calc_pick_score(p["numbers"], winning))
if correct > max_c:
max_c = correct
avg_score = sum(day_scores) / len(day_scores)
per_day.append({
"trial_id": trial["id"],
"day_of_week": trial["day_of_week"],
"weight": trial["weight"],
"avg_score": avg_score,
"max_correct": max_c,
"n_picks": len(picks),
})
if not per_day:
return {"ok": False, "reason": "no_picks_graded"}
winner = max(per_day, key=lambda d: d["avg_score"])
current_base = db.get_current_base()
new_base, reason = decide_base_update(
winner_max_correct=winner["max_correct"],
winner_W=winner["weight"],
current_base=current_base,
)
next_monday = today + timedelta(days=(7 - today.weekday()) % 7 or 7)
next_monday_iso = next_monday.isoformat()
# Idempotent guard: 같은 effective_from으로 이미 저장된 row가 있으면 skip
existing = db.get_base_history(limit=1)
if existing and existing[0]["effective_from"] == next_monday_iso:
return {
"ok": True,
"draw_no": latest["drw_no"],
"week_start": week_start,
"previous_base": existing[0].get("weight"),
"winner": winner,
"new_base": existing[0]["weight"], # 이미 저장된 값
"update_reason": existing[0].get("update_reason", "idempotent_skip"),
"per_day": per_day,
}
db.save_base_history(
effective_from=next_monday_iso,
weight=new_base,
source_trial_id=winner["trial_id"],
update_reason=reason,
winner_score=winner["avg_score"],
winner_max_correct=winner["max_correct"],
)
return {
"ok": True,
"draw_no": latest["drw_no"],
"week_start": week_start,
"previous_base": current_base, # save 이전에 캡처한 값 — diff 계산용
"winner": winner,
"new_base": new_base,
"update_reason": reason,
"per_day": per_day,
}

View File

@@ -0,0 +1,45 @@
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
import pytest
from analyzer import score_combination, build_analysis_cache
@pytest.fixture
def cache():
# build_analysis_cache expects [(drw_no, [n1,n2,n3,n4,n5,n6]), ...] tuples
fake_draws = [
(1, [1, 2, 3, 4, 5, 6]),
(2, [7, 8, 9, 10, 11, 12]),
]
return build_analysis_cache(fake_draws)
def test_score_default_uses_fixed_weights(cache):
"""weights=None은 기존 fixed [0.25, 0.30, 0.20, 0.15, 0.10]과 동등."""
s = score_combination([1, 2, 3, 4, 5, 6], cache)
assert "score_total" in s
assert 0.0 <= s["score_total"] <= 2.0
for k in ("score_frequency", "score_fingerprint", "score_gap",
"score_cooccur", "score_diversity"):
assert k in s
def test_score_with_custom_weights_sums_correctly(cache):
"""weights=[1,0,0,0,0]은 score_total == score_frequency."""
s = score_combination([1, 2, 3, 4, 5, 6], cache, weights=[1.0, 0.0, 0.0, 0.0, 0.0])
assert s["score_total"] == pytest.approx(s["score_frequency"], rel=1e-3)
def test_score_with_uniform_weights(cache):
"""weights=[0.2]*5는 단순 평균."""
s = score_combination([1, 2, 3, 4, 5, 6], cache, weights=[0.2] * 5)
expected = 0.2 * (s["score_frequency"] + s["score_fingerprint"]
+ s["score_gap"] + s["score_cooccur"] + s["score_diversity"])
assert s["score_total"] == pytest.approx(expected, rel=1e-3)
def test_score_weights_wrong_length_raises(cache):
with pytest.raises((ValueError, AssertionError)):
score_combination([1, 2, 3, 4, 5, 6], cache, weights=[0.5, 0.5])

View File

@@ -0,0 +1,122 @@
# lotto/tests/test_weight_evolver.py
import json
import math
import pytest
from app import weight_evolver as we
def test_clamp_and_normalize_min_floor():
"""모든 값이 0.05 이상이 되도록 보장 + 합=1.0."""
W = we.clamp_and_normalize([0.01, 0.6, 0.2, 0.1, 0.09])
assert all(w >= 0.05 - 1e-9 for w in W)
assert abs(sum(W) - 1.0) < 1e-9
def test_clamp_and_normalize_negative_becomes_floor():
W = we.clamp_and_normalize([-0.1, 0.5, 0.3, 0.2, 0.1])
assert W[0] >= 0.05 - 1e-9
assert abs(sum(W) - 1.0) < 1e-9
def test_perturbation_changes_around_base():
"""σ=0.05 정규분포 perturbation 후 정규화 — 각 값이 합리적 범위 안."""
base = [0.2, 0.2, 0.2, 0.2, 0.2]
W = we.perturb_weights(base, sigma=0.05, seed=42)
assert abs(sum(W) - 1.0) < 1e-9
assert all(w >= 0.05 - 1e-9 for w in W)
def test_dirichlet_random_distribution():
"""Dirichlet α=2 — 5종 비음수 합=1."""
W = we.dirichlet_weights(alpha=2.0, seed=42)
assert abs(sum(W) - 1.0) < 1e-9
assert all(0.05 - 1e-9 <= w <= 1.0 for w in W)
def test_generate_weekly_candidates_count():
"""6개 후보 생성 — 4 perturb + 2 dirichlet."""
base = [0.2, 0.2, 0.2, 0.2, 0.2]
trials = we.generate_weekly_candidates(base, seed=42)
assert len(trials) == 6
sources = [t["source"] for t in trials]
assert sources.count("perturb") == 4
assert sources.count("dirichlet") == 2
days = sorted(t["day_of_week"] for t in trials)
assert days == [0, 1, 2, 3, 4, 5]
def test_calc_pick_score_six_match():
"""6개 모두 일치 → 1등 → base=1.0 + bonus 1.0 = 2.0."""
score = we.calc_pick_score([1, 2, 3, 4, 5, 6], [1, 2, 3, 4, 5, 6])
assert score == pytest.approx(2.0)
def test_calc_pick_score_four_match():
"""4개 일치 → 4등 → base=4/6 + bonus 0.3."""
score = we.calc_pick_score([1, 2, 3, 4, 7, 8], [1, 2, 3, 4, 5, 6])
assert score == pytest.approx(4/6 + 0.3)
def test_calc_pick_score_three_match():
"""3개 일치 → 5등 → base=3/6 + bonus 0.1."""
score = we.calc_pick_score([1, 2, 3, 7, 8, 9], [1, 2, 3, 4, 5, 6])
assert score == pytest.approx(3/6 + 0.1)
def test_calc_pick_score_two_match_no_bonus():
"""2개 일치 → 미당첨 → base=2/6 + bonus 0."""
score = we.calc_pick_score([1, 2, 7, 8, 9, 10], [1, 2, 3, 4, 5, 6])
assert score == pytest.approx(2/6)
def test_decide_base_update_winner_4plus_replaces():
"""winner_max_correct ≥ 4 → 교체."""
current = [0.2, 0.2, 0.2, 0.2, 0.2]
winner_W = [0.1, 0.3, 0.2, 0.3, 0.1]
new_base, reason = we.decide_base_update(
winner_max_correct=4,
winner_W=winner_W,
current_base=current,
)
assert new_base == winner_W
assert reason == "winner_4plus"
def test_decide_base_update_winner_3_ema_blend():
"""winner_max_correct = 3 → 0.3*winner + 0.7*current."""
current = [0.2, 0.2, 0.2, 0.2, 0.2]
winner_W = [0.1, 0.3, 0.2, 0.3, 0.1]
new_base, reason = we.decide_base_update(
winner_max_correct=3,
winner_W=winner_W,
current_base=current,
)
expected = [0.3 * w + 0.7 * c for w, c in zip(winner_W, current)]
assert all(abs(a - b) < 1e-9 for a, b in zip(new_base, expected))
assert reason == "ema_blend"
def test_decide_base_update_winner_lt3_unchanged():
"""winner_max_correct ≤ 2 → 직전 base 유지."""
current = [0.2, 0.2, 0.2, 0.2, 0.2]
winner_W = [0.1, 0.3, 0.2, 0.3, 0.1]
new_base, reason = we.decide_base_update(
winner_max_correct=2,
winner_W=winner_W,
current_base=current,
)
assert new_base == current
assert reason == "unchanged"
def test_decide_base_update_cold_start_returns_default():
"""current_base=None (첫 회) → 균등 default 반환."""
winner_W = [0.1, 0.3, 0.2, 0.3, 0.1]
new_base, reason = we.decide_base_update(
winner_max_correct=4,
winner_W=winner_W,
current_base=None,
)
assert new_base == winner_W
assert reason == "winner_4plus"

View File

@@ -199,11 +199,21 @@ def fetch_major_indices() -> Dict[str, Any]:
value = usd_item.select_one(".value").get_text(strip=True)
change_val = usd_item.select_one(".change").get_text(strip=True)
# 방향 (blind 텍스트: 상승, 하락)
# 방향: .head_info의 point_up/point_dn 클래스로 판별 (해외 지수와 동일 패턴).
# .blind span이 "미국 USD"/"원"/"상승" 3개라 select_one(".blind")은 첫 번째 "미국 USD"를
# 잡아 방향 추출에 실패함 → head_info 클래스를 1순위로, 직속 .blind 텍스트를 fallback으로 사용.
direction = ""
blind_txt = usd_item.select_one(".blind").get_text(strip=True)
if "상승" in blind_txt: direction = "red"
elif "하락" in blind_txt: direction = "blue"
head_info = usd_item.select_one(".head_info")
hi_classes = head_info.get("class", []) if head_info else []
if "point_up" in hi_classes:
direction = "red"
elif "point_dn" in hi_classes:
direction = "blue"
else:
dir_blind = usd_item.select_one(".head_info > span.blind")
blind_txt = dir_blind.get_text(strip=True) if dir_blind else ""
if "상승" in blind_txt: direction = "red"
elif "하락" in blind_txt: direction = "blue"
# change_val은 네이버 HTML에서 부호 없이 숫자만 옴 → direction 기반으로 부호 붙여줌
# (프론트 getDirection()이 부호로 색/화살표를 판별하므로)

View File

@@ -15,9 +15,15 @@ PROMPT_TEMPLATE = """다음은 종목 {name}({ticker})에 대한 최근 뉴스 {
{news_block}
이 뉴스들이 종목에 호재인지 악재인지 평가하세요.
score: -10(매우 강한 악재) ~ +10(매우 강한 호재) 사이의 실수. 0은 중립.
reason: 30자 이내 한 줄 근거.
이 뉴스들이 종목 주가에 호재인지 악재인지 종합 평가하세요.
규칙:
- score: -10(매우 강한 악재) ~ +10(매우 강한 호재) 사이의 실수. 명확한 방향성이 없으면 0(중립).
- 뉴스가 호재·악재로 섞여 있으면 주가에 더 우세한 쪽을 기준으로 부호를 정하세요.
- reason은 반드시 score 부호와 같은 방향의 근거만 쓰세요.
· score가 양수(호재)면 호재 근거만, 음수(악재)면 악재 근거만 적습니다.
· 호재 평가에 악재 내용을, 악재 평가에 호재 내용을 섞지 마세요.
- reason: 30자 이내 한 줄.
JSON으로만 응답하세요. 다른 텍스트 금지:
{{"score": <float>, "reason": "<string>"}}"""

View File

@@ -124,8 +124,10 @@ async def refresh_daily(
if successes:
_upsert_news_sentiment(conn, asof, successes, source="articles")
top_pos = sorted(successes, key=lambda r: -r["score_raw"])[:5]
top_neg = sorted(successes, key=lambda r: r["score_raw"])[:5]
# 부호 게이트: 호재(score>0)·악재(score<0)만 분류. score 미만 종목이 5개 미만이어도
# 반대 부호 종목으로 채우지 않음 (양수 종목이 악재란에 섞이는 문제 방지). 중립(0)은 제외.
top_pos = sorted([r for r in successes if r["score_raw"] > 0], key=lambda r: -r["score_raw"])[:5]
top_neg = sorted([r for r in successes if r["score_raw"] < 0], key=lambda r: r["score_raw"])[:5]
return {
"asof": asof.isoformat(),

View File

@@ -140,6 +140,71 @@ async def test_refresh_daily_no_match_ticker_skipped(conn):
assert {r["ticker"] for r in rows} == {"005930"}
@pytest.mark.asyncio
async def test_refresh_daily_sign_gate_no_positive_in_neg(conn):
"""전 종목 양수 점수면 top_neg는 비어야 함 (호재 종목이 악재란에 채워지면 안 됨)."""
asof = dt.date(2026, 5, 13)
fake_articles_by_ticker = {
"005930": [{"title": "h", "summary": "", "press": "", "pub_date": ""}],
"000660": [{"title": "h", "summary": "", "press": "", "pub_date": ""}],
"373220": [{"title": "h", "summary": "", "press": "", "pub_date": ""}],
}
fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3}
scores = {"005930": 6.0, "000660": 2.0, "373220": 0.5} # 모두 양수
async def fake_score(llm, ticker, news, *, name=None, model="m"):
return {
"ticker": ticker, "score_raw": scores[ticker], "reason": "r",
"news_count": 1, "tokens_input": 1, "tokens_output": 1, "model": model,
}
with patch.object(pipeline, "articles_source") as mas, \
patch.object(pipeline, "_analyzer") as ma, \
patch.object(pipeline, "_make_llm") as ml:
mas.gather_articles_for_tickers = MagicMock(return_value=(fake_articles_by_ticker, fake_stats))
ma.score_sentiment = fake_score
ml.return_value.__aenter__.return_value = AsyncMock()
ml.return_value.__aexit__.return_value = None
result = await pipeline.refresh_daily(conn, asof, concurrency=3)
assert len(result["top_pos"]) == 3
assert result["top_neg"] == [] # 양수 종목이 악재란에 들어가면 안 됨
@pytest.mark.asyncio
async def test_refresh_daily_sign_gate_excludes_neutral(conn):
"""score=0(중립)은 호재·악재 어디에도 포함되지 않음."""
asof = dt.date(2026, 5, 13)
fake_articles_by_ticker = {
"005930": [{"title": "h", "summary": "", "press": "", "pub_date": ""}],
"000660": [{"title": "h", "summary": "", "press": "", "pub_date": ""}],
"373220": [{"title": "h", "summary": "", "press": "", "pub_date": ""}],
}
fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3}
scores = {"005930": 3.0, "000660": 0.0, "373220": -3.0}
async def fake_score(llm, ticker, news, *, name=None, model="m"):
return {
"ticker": ticker, "score_raw": scores[ticker], "reason": "r",
"news_count": 1, "tokens_input": 1, "tokens_output": 1, "model": model,
}
with patch.object(pipeline, "articles_source") as mas, \
patch.object(pipeline, "_analyzer") as ma, \
patch.object(pipeline, "_make_llm") as ml:
mas.gather_articles_for_tickers = MagicMock(return_value=(fake_articles_by_ticker, fake_stats))
ma.score_sentiment = fake_score
ml.return_value.__aenter__.return_value = AsyncMock()
ml.return_value.__aexit__.return_value = None
result = await pipeline.refresh_daily(conn, asof, concurrency=3)
pos_tickers = {r["ticker"] for r in result["top_pos"]}
neg_tickers = {r["ticker"] for r in result["top_neg"]}
assert pos_tickers == {"005930"}
assert neg_tickers == {"373220"}
assert "000660" not in pos_tickers and "000660" not in neg_tickers
def test_top_market_cap_tickers(conn):
out = pipeline._top_market_cap_tickers(conn, n=2)
assert out == ["005930", "000660"]