5 Commits

Author SHA1 Message Date
36e8d11060 fix(stock): AI 뉴스 리포트 하루 밀림 해소 — asof를 KST로 보정 + LLM에 현재 일자 주입
근본원인: stock 컨테이너는 python:3.12-alpine + tzdata 미설치라 TZ=Asia/Seoul이
무효 → date.today()가 UTC를 반환. AI 뉴스 리포트 cron은 08:00 KST(=전날 23:00 UTC)라
asof가 어제로 계산돼 라벨·기사 윈도우·news_sentiment 저장이 전부 하루 밀렸음
(월요일은 일요일 UTC로 계산돼 skip_weekend까지).

- screener/router.py: _today_kst()(=utcnow+9h, holdings_intel 관용) 추가.
  /snapshot/refresh · /snapshot/refresh-news-sentiment의 asof 기본값을 KST로.
- ai_news/analyzer.py: score_sentiment(asof=...) → 프롬프트 앞에 "오늘 날짜" 명시,
  LLM이 현재 일자 기준으로 뉴스 평가(사용자 요청).
- ai_news/pipeline.py: refresh_daily가 asof를 score_sentiment까지 스레딩.
- 테스트: _today_kst KST 보정 + analyzer asof 주입 2종 TDD Red→Green.
  기존 pipeline 목 시그니처에 asof 반영. stock 전체 149 passed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01EqCYBhvTcdeCTUDX3RhWx9
2026-07-02 14:38:51 +09:00
db6fed72b3 feat(music-lab): 파이프라인 하드 삭제 엔드포인트 DELETE /api/music/pipeline/{id}
cancel(state→cancelled, active/failed 뷰에서만 제거)만으론 status=all 뷰에
행이 남아 옛 dead 파이프라인을 완전히 치울 수 없었음. DELETE로 하드 삭제 추가.

- db.delete_pipeline(pid)→bool: 자식행(pipeline_feedback, pipeline_jobs) 먼저
  삭제 후 video_pipelines 삭제(SQLite FK 미강제라 명시적 cascade). 존재 여부 bool.
- DELETE /api/music/pipeline/{id}: 없으면 404, 있으면 {"ok":true,"deleted":id}.
  상태 가드 없음(관리자 정리 용도, cancel과 동일한 단순 정책).
- 테스트 3종(삭제+404+자식행 cascade) TDD Red→Green. music-lab 152 passed.
- CLAUDE.md 엔드포인트 카탈로그 갱신.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01EqCYBhvTcdeCTUDX3RhWx9
2026-07-02 13:52:11 +09:00
7cce5c422f fix(agent-office): 파이프라인 실패 알림 dedup을 DB 영속화 (재시작 재알림 스팸 해소)
youtube_publisher._notified_failed(인메모리 set)가 컨테이너 재시작 시 소실되어
기존 failed 파이프라인(예: video 인코딩 구버전 실패 #3)을 매 재시작마다 "신규"로
재알림하던 스팸 버그를 notified_failed_pipelines 테이블로 영속화해 해결.

부수 버그 fix: failed 폴링이 예외를 던지면 failed=[]로 오해해 원장을 통째로
비우던 코드 → 예외 시 early-return(원장 보존).

진행 중 *_pending 승인 dedup(_notified_state_per_pipeline)은 의도적으로 인메모리
유지(재시작 시 살아있는 파이프라인 승인 재알림은 유용한 리마인더).

테스트: 재시작 지속성 + 일시적 폴링 예외 재현 테스트 2종 추가(TDD Red→Green).
DB_PATH 첫 import 고정으로 인한 테스트 간 영속 테이블 누수를 monkeypatch로 격리.
agent-office 전체 140개 통과.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01EqCYBhvTcdeCTUDX3RhWx9
2026-07-01 15:20:07 +09:00
94beecbfaf docs(CLAUDE.md): agent-office 카탈로그에 /nodes 엔드포인트 + node_monitor.py 등재
분산 워커 관측 시스템 — GET /api/agent-office/nodes(heartbeat 생사+큐깊이+
dead-letter 집계, web-ui /infra 소비) 엔드포인트 표 추가 + 핵심파일에
node_monitor.py 추가. 상세는 infra_distributed_workers.md 메모리.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
2026-07-01 02:56:37 +09:00
98b17f3a3a fix(redis): bgsave fork 실패로 인한 쓰기 차단 해소 (--save "" + stop-writes off)
근본원인: NAS vm.overcommit_memory=0 + Committed_AS≈CommitLimit(98%)로 redis
bgsave fork()가 거부되어 stop-writes-on-bgsave-error(기본 yes)가 모든 쓰기를
차단(6/29 20:36 이후). AOF가 durability를 담당하므로 실패하는 RDB 스냅샷을
비활성화(--save "")하고 stop-writes-on-bgsave-error no로 안전망 추가.
호스트 vm.overcommit_memory=1(sudo)은 별도 권장.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019LV86jBozkNhSFXJA412fq
2026-06-30 14:21:09 +09:00
14 changed files with 246 additions and 26 deletions

View File

@@ -268,6 +268,7 @@ docker compose up -d
| POST/GET | `/api/music/compile` (+ `/compiles/{id}/export`) | 컴파일 | | POST/GET | `/api/music/compile` (+ `/compiles/{id}/export`) | 컴파일 |
| POST/GET/DELETE | `/api/music/video-project` (+ `/{id}/render`, `/export`) | 영상 프로젝트 | | POST/GET/DELETE | `/api/music/video-project` (+ `/{id}/render`, `/export`) | 영상 프로젝트 |
| ALL | `/api/music/pipeline` (생성/start/feedback/cancel/publish/retry/telegram-msg/lookup) | YouTube 자동화 파이프라인. `POST /{id}/retry`=실패 step 재개(publish+업로드완료 시 409) | | ALL | `/api/music/pipeline` (생성/start/feedback/cancel/publish/retry/telegram-msg/lookup) | YouTube 자동화 파이프라인. `POST /{id}/retry`=실패 step 재개(publish+업로드완료 시 409) |
| DELETE | `/api/music/pipeline/{id}` | 파이프라인 행 하드 삭제(자식 jobs/feedback 포함, 전체 목록에서 제거). 없으면 404 |
| GET/PUT | `/api/music/setup` | 파이프라인 설정 | | GET/PUT | `/api/music/setup` | 파이프라인 설정 |
| GET | `/api/music/youtube/auth-url`, `/callback`, `/status`; POST `/disconnect` | YouTube OAuth | | GET | `/api/music/youtube/auth-url`, `/callback`, `/status`; POST `/disconnect` | YouTube OAuth |
| GET/POST/PUT/DELETE | `/api/music/revenue` (+ `/dashboard`) | 수익 기록 | | GET/POST/PUT/DELETE | `/api/music/revenue` (+ `/dashboard`) | 수익 기록 |
@@ -347,7 +348,7 @@ docker compose up -d
### agent-office (agent-office/) ### agent-office (agent-office/)
AI 에이전트 가상 오피스 — 기존 서비스 API를 프록시로 호출, 실시간 WebSocket + 텔레그램 봇. AI 에이전트 가상 오피스 — 기존 서비스 API를 프록시로 호출, 실시간 WebSocket + 텔레그램 봇.
- 핵심 파일: `main.py`, `db.py`, `config.py`, `websocket_manager.py`, `service_proxy.py`, `telegram_bot.py`, `scheduler.py`, `agents/`(stock/music/realestate/youtube/youtube_publisher/lotto/base) - 핵심 파일: `main.py`, `db.py`, `config.py`, `websocket_manager.py`, `service_proxy.py`, `telegram_bot.py`, `scheduler.py`, `node_monitor.py`(분산 워커 관측 집계+경보), `agents/`(stock/music/realestate/youtube/youtube_publisher/lotto/base)
- 에이전트 7종 레지스트리. 명령 API body 필드명 → `reference_agent_office_command_api.md` - 에이전트 7종 레지스트리. 명령 API body 필드명 → `reference_agent_office_command_api.md`
- 📌 상세(DB 9테이블·FSM·전체 cron 목록·AGENT_CONTAINER_MAP·텔레그램 캐싱·env): **`service_agent_office.md`** - 📌 상세(DB 9테이블·FSM·전체 cron 목록·AGENT_CONTAINER_MAP·텔레그램 캐싱·env): **`service_agent_office.md`**
@@ -362,6 +363,7 @@ AI 에이전트 가상 오피스 — 기존 서비스 API를 프록시로 호출
| POST | `/api/agent-office/telegram/webhook` | 텔레그램 Webhook (realestate_bookmark_* 콜백 포함) | | POST | `/api/agent-office/telegram/webhook` | 텔레그램 Webhook (realestate_bookmark_* 콜백 포함) |
| POST | `/api/agent-office/realestate/notify` | realestate-lab 전용 push 수신 → 텔레그램 | | POST | `/api/agent-office/realestate/notify` | realestate-lab 전용 push 수신 → 텔레그램 |
| GET | `/api/agent-office/states` | 전체 에이전트 상태 | | GET | `/api/agent-office/states` | 전체 에이전트 상태 |
| GET | `/api/agent-office/nodes` | 분산 워커(NAS↔Windows) 관측 — heartbeat 생사+큐깊이+dead-letter 집계 (web-ui `/infra` Three.js 시각화 소비). 상세 → `infra_distributed_workers.md` |
| GET | `/api/agent-office/activity` | 전 에이전트 통합 활동 피드 (tasks+logs UNION). 필터 `agent_id`/`type`(task\|log)/`status`/`days` + `limit`/`offset` | | GET | `/api/agent-office/activity` | 전 에이전트 통합 활동 피드 (tasks+logs UNION). 필터 `agent_id`/`type`(task\|log)/`status`/`days` + `limit`/`offset` |
| GET | `/api/agent-office/conversation/stats` | 텔레그램 대화 토큰·캐시 통계 (`days`) | | GET | `/api/agent-office/conversation/stats` | 텔레그램 대화 토큰·캐시 통계 (`days`) |
| POST/GET | `/api/agent-office/youtube/research` (+ `/status`) | YouTube 트렌드 수집 트리거/상태 | | POST/GET | `/api/agent-office/youtube/research` (+ `/status`) | YouTube 트렌드 수집 트리거/상태 |

View File

@@ -4,7 +4,12 @@ import logging
from .base import BaseAgent from .base import BaseAgent
from . import classify_intent from . import classify_intent
from .. import service_proxy from .. import service_proxy
from ..db import add_log from ..db import (
add_log,
get_notified_failed_pipelines,
add_notified_failed_pipeline,
prune_notified_failed_pipelines,
)
from ..telegram.messaging import send_raw from ..telegram.messaging import send_raw
logger = logging.getLogger("agent-office.youtube_publisher") logger = logging.getLogger("agent-office.youtube_publisher")
@@ -25,8 +30,9 @@ class YoutubePublisherAgent(BaseAgent):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
# 진행 중(*_pending) 승인 요청 dedup — 인메모리 유지(의도적).
# 재시작 시 살아있는 파이프라인 승인 재알림은 유용한 리마인더라 스팸 아님.
self._notified_state_per_pipeline: dict[int, tuple] = {} self._notified_state_per_pipeline: dict[int, tuple] = {}
self._notified_failed: set[int] = set()
async def poll_state_changes(self) -> None: async def poll_state_changes(self) -> None:
"""주기적으로 호출되어 *_pending 신규 진입 시 텔레그램 발송.""" """주기적으로 호출되어 *_pending 신규 진입 시 텔레그램 발송."""
@@ -52,18 +58,21 @@ class YoutubePublisherAgent(BaseAgent):
try: try:
failed = await service_proxy.list_failed_pipelines() failed = await service_proxy.list_failed_pipelines()
except Exception as e: except Exception as e:
# 일시적 폴링 실패를 "failed 없음"으로 오해하면 원장을 비워 재알림 스팸이 남.
# → 원장을 건드리지 않고 조용히 종료(다음 폴링에서 재시도).
logger.warning("failed 폴링 실패: %s", e) logger.warning("failed 폴링 실패: %s", e)
failed = [] return
notified = get_notified_failed_pipelines()
for p in failed: for p in failed:
pid = p.get("id") pid = p.get("id")
if pid is None: if pid is None:
continue continue
if pid not in self._notified_failed: if pid not in notified:
await self._notify_failed(p) await self._notify_failed(p)
self._notified_failed.add(pid) add_notified_failed_pipeline(pid)
# 재개되어 failed에서 벗어난 파이프라인은 재알림 가능하도록 해제 # 재개되어 failed에서 벗어난 파이프라인은 재알림 가능하도록 원장에서 제거
failed_ids = {p.get("id") for p in failed} failed_ids = {p.get("id") for p in failed if p.get("id") is not None}
self._notified_failed &= failed_ids prune_notified_failed_pipelines(failed_ids)
async def _notify_failed(self, p: dict) -> None: async def _notify_failed(self, p: dict) -> None:
reason = p.get("failed_reason") or "?" reason = p.get("failed_reason") or "?"

View File

@@ -158,6 +158,12 @@ def init_db() -> None:
CREATE INDEX IF NOT EXISTS idx_tarot_favorite CREATE INDEX IF NOT EXISTS idx_tarot_favorite
ON tarot_readings(favorite, created_at DESC) ON tarot_readings(favorite, created_at DESC)
""") """)
conn.execute("""
CREATE TABLE IF NOT EXISTS notified_failed_pipelines (
pipeline_id INTEGER PRIMARY KEY,
notified_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
# Seed default agent configs # Seed default agent configs
for agent_id, name in [ for agent_id, name in [
("stock", "주식 트레이더"), ("stock", "주식 트레이더"),
@@ -826,6 +832,47 @@ def get_all_baselines() -> List[Dict[str, Any]]:
return out return out
# --- notified_failed_pipelines (파이프라인 실패 알림 dedup 원장, 재시작 지속) ---
def get_notified_failed_pipelines() -> set:
"""이미 실패 알림을 발송한 pipeline_id 집합."""
with _conn() as conn:
rows = conn.execute(
"SELECT pipeline_id FROM notified_failed_pipelines"
).fetchall()
return {r["pipeline_id"] for r in rows}
def add_notified_failed_pipeline(pipeline_id: int) -> None:
with _conn() as conn:
conn.execute(
"INSERT OR IGNORE INTO notified_failed_pipelines(pipeline_id) VALUES(?)",
(pipeline_id,),
)
def prune_notified_failed_pipelines(active_failed_ids) -> None:
"""현재 failed 목록에 없는 pipeline_id를 원장에서 제거.
재개되어 failed에서 벗어난 파이프라인이 다시 실패하면 재알림 가능하도록 함.
(기존 인메모리 `_notified_failed &= failed_ids`의 영속 버전)
"""
keep = set(active_failed_ids)
with _conn() as conn:
existing = {
r["pipeline_id"]
for r in conn.execute(
"SELECT pipeline_id FROM notified_failed_pipelines"
).fetchall()
}
stale = existing - keep
for pid in stale:
conn.execute(
"DELETE FROM notified_failed_pipelines WHERE pipeline_id=?",
(pid,),
)
def get_tasks_by_agent_date_kind(agent_id: str, date_iso: str, task_type: str) -> List[Dict[str, Any]]: def get_tasks_by_agent_date_kind(agent_id: str, date_iso: str, task_type: str) -> List[Dict[str, Any]]:
"""같은 (agent, date, task_type)으로 이미 생성된 task 조회. 멱등 guard.""" """같은 (agent, date, task_type)으로 이미 생성된 task 조회. 멱등 guard."""
with _conn() as conn: with _conn() as conn:

View File

@@ -14,13 +14,20 @@ from unittest.mock import AsyncMock, patch
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def _init_db(): def _init_db(monkeypatch):
import gc import gc
gc.collect() gc.collect()
if os.path.exists(_TMP): # config.DB_PATH는 첫 import 시 1회 고정되므로, 다른 테스트 파일과 조합 실행 시
os.remove(_TMP) # db가 이 파일의 _TMP가 아닌 다른 경로를 쓸 수 있다. db.DB_PATH를 이 파일 전용으로
from app.db import init_db # 강제해 영속 테이블(notified_failed_pipelines 등)의 테스트 간 누수를 결정적으로 차단.
init_db() import app.db as _db
monkeypatch.setattr(_db, "DB_PATH", _TMP)
# WAL 사이드카(-wal/-shm)까지 지워야 영속 상태가 남지 않음
for suffix in ("", "-wal", "-shm"):
p = _TMP + suffix
if os.path.exists(p):
os.remove(p)
_db.init_db()
yield yield
gc.collect() gc.collect()
@@ -211,3 +218,70 @@ async def test_failed_poll_exception_is_silent():
# active 알림은 정상 발송 # active 알림은 정상 발송
assert sent.await_count == 1 assert sent.await_count == 1
@pytest.mark.asyncio
async def test_failed_notification_persists_across_restart():
"""컨테이너 재시작(새 에이전트 인스턴스)해도 이미 알린 failed는 재알림하지 않음."""
from app.agents.youtube_publisher import YoutubePublisherAgent
failed_pipeline = {
"id": 3,
"state": "failed",
"failed_reason": "video: timeout",
"track_title": "beat music v2",
}
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
with patch(
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
new=AsyncMock(return_value=[]),
), patch(
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
new=AsyncMock(return_value=[failed_pipeline]),
), patch(
"app.agents.youtube_publisher.send_raw",
new=sent,
):
agent1 = YoutubePublisherAgent()
await agent1.poll_state_changes()
# 컨테이너 재시작 시뮬레이션: 완전히 새로운 인스턴스(인메모리 상태 소실)
agent2 = YoutubePublisherAgent()
await agent2.poll_state_changes()
# 재시작해도 DB 원장으로 중복 방지 → 1회만 알림
assert sent.await_count == 1
@pytest.mark.asyncio
async def test_transient_failed_poll_keeps_ledger():
"""failed 폴링이 일시적으로 예외를 던져도 원장을 비우지 않아 다음 폴링에서 재알림하지 않음."""
from app.agents.youtube_publisher import YoutubePublisherAgent
failed_pipeline = {
"id": 3,
"state": "failed",
"failed_reason": "video: timeout",
"track_title": "beat music v2",
}
list_failed = AsyncMock(
side_effect=[[failed_pipeline], Exception("boom"), [failed_pipeline]]
)
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
with patch(
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
new=AsyncMock(return_value=[]),
), patch(
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
new=list_failed,
), patch(
"app.agents.youtube_publisher.send_raw",
new=sent,
):
agent = YoutubePublisherAgent()
await agent.poll_state_changes() # #3 최초 알림
await agent.poll_state_changes() # 예외 → 원장 유지되어야 (섣부른 정리 금지)
await agent.poll_state_changes() # #3 여전히 failed → 재알림 없어야
assert sent.await_count == 1

View File

@@ -464,7 +464,7 @@ services:
- "6379:6379" - "6379:6379"
volumes: volumes:
- ${RUNTIME_PATH}/redis-data:/data - ${RUNTIME_PATH}/redis-data:/data
command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru command: redis-server --appendonly yes --save "" --stop-writes-on-bgsave-error no --maxmemory 256mb --maxmemory-policy allkeys-lru
healthcheck: healthcheck:
test: ["CMD", "redis-cli", "ping"] test: ["CMD", "redis-cli", "ping"]
interval: 60s interval: 60s

View File

@@ -1100,6 +1100,19 @@ def get_pipeline(pid: int) -> Optional[Dict[str, Any]]:
return _parse_pipeline_row(row) return _parse_pipeline_row(row)
def delete_pipeline(pid: int) -> bool:
"""파이프라인과 자식행(pipeline_feedback, pipeline_jobs)을 하드 삭제.
SQLite FK를 강제하지 않으므로 자식행을 명시적으로 먼저 삭제한다.
파이프라인이 존재했으면 True, 없었으면 False.
"""
with _conn() as conn:
conn.execute("DELETE FROM pipeline_feedback WHERE pipeline_id = ?", (pid,))
conn.execute("DELETE FROM pipeline_jobs WHERE pipeline_id = ?", (pid,))
cur = conn.execute("DELETE FROM video_pipelines WHERE id = ?", (pid,))
return cur.rowcount > 0
def update_pipeline_state(pid: int, state: str, **fields) -> None: def update_pipeline_state(pid: int, state: str, **fields) -> None:
"""파이프라인 state를 갱신하고 옵션 컬럼을 함께 업데이트한다. """파이프라인 state를 갱신하고 옵션 컬럼을 함께 업데이트한다.

View File

@@ -1133,6 +1133,14 @@ def cancel_pipeline(pid: int):
return {"ok": True} return {"ok": True}
@app.delete("/api/music/pipeline/{pid}")
def delete_pipeline_endpoint(pid: int):
"""파이프라인 행을 하드 삭제(전체 목록에서 완전 제거). 없으면 404."""
if not _db_module.delete_pipeline(pid):
raise HTTPException(404)
return {"ok": True, "deleted": pid}
@app.post("/api/music/pipeline/{pid}/retry", status_code=202) @app.post("/api/music/pipeline/{pid}/retry", status_code=202)
async def retry_pipeline(pid: int, bg: BackgroundTasks): async def retry_pipeline(pid: int, bg: BackgroundTasks):
from .pipeline.state_machine import STEPS from .pipeline.state_machine import STEPS

View File

@@ -105,6 +105,29 @@ def test_cancel_pipeline(client):
assert db.get_pipeline(pid)["state"] == "cancelled" assert db.get_pipeline(pid)["state"] == "cancelled"
def test_delete_pipeline_removes_from_db(client):
pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
r = client.request("DELETE", f"/api/music/pipeline/{pid}")
assert r.status_code == 200
assert r.json()["ok"] is True
assert db.get_pipeline(pid) is None
all_ids = [p["id"] for p in client.get("/api/music/pipeline?status=all").json()["pipelines"]]
assert pid not in all_ids
def test_delete_pipeline_not_found_returns_404(client):
r = client.request("DELETE", "/api/music/pipeline/99999")
assert r.status_code == 404
def test_delete_pipeline_removes_child_jobs(client):
pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
db.create_pipeline_job(pid, "cover")
assert len(db.list_pipeline_jobs(pid)) == 1
client.request("DELETE", f"/api/music/pipeline/{pid}")
assert db.list_pipeline_jobs(pid) == []
def test_setup_get_returns_defaults(client): def test_setup_get_returns_defaults(client):
r = client.get("/api/music/setup") r = client.get("/api/music/setup")
assert r.status_code == 200 assert r.status_code == 200

View File

@@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
import datetime as dt
import json import json
import logging import logging
import os import os
@@ -59,13 +60,19 @@ async def score_sentiment(
*, *,
name: str | None = None, name: str | None = None,
model: str = DEFAULT_MODEL, model: str = DEFAULT_MODEL,
asof: dt.date | None = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Returns {ticker, score_raw, reason, news_count, tokens_input, tokens_output, model}.""" """Returns {ticker, score_raw, reason, news_count, tokens_input, tokens_output, model}.
asof(현재 KST 일자)를 주면 prompt 맨 앞에 오늘 날짜를 명시해 LLM이 현재 시점 기준으로 판단한다.
"""
news_block = _format_news_block(news) news_block = _format_news_block(news)
prompt = PROMPT_TEMPLATE.format( prompt = PROMPT_TEMPLATE.format(
name=name or ticker, ticker=ticker, name=name or ticker, ticker=ticker,
n=len(news), news_block=news_block, n=len(news), news_block=news_block,
) )
if asof is not None:
prompt = f"오늘 날짜: {asof.isoformat()} (이 시점 기준으로 뉴스를 평가하세요)\n\n" + prompt
resp = await llm.messages.create( resp = await llm.messages.create(
model=model, model=model,
max_tokens=200, max_tokens=200,

View File

@@ -39,11 +39,11 @@ def _make_llm():
async def _process_one( async def _process_one(
ticker: str, name: str, articles: List[Dict[str, Any]], ticker: str, name: str, articles: List[Dict[str, Any]],
sem: asyncio.Semaphore, llm, model: str, sem: asyncio.Semaphore, llm, model: str, asof: dt.date,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
async with sem: async with sem:
return await _analyzer.score_sentiment( return await _analyzer.score_sentiment(
llm, ticker, articles, name=name, model=model, llm, ticker, articles, name=name, model=model, asof=asof,
) )
@@ -110,7 +110,7 @@ async def refresh_daily(
arts = articles_by_ticker.get(t, []) arts = articles_by_ticker.get(t, [])
if not arts: if not arts:
continue # 매핑 0 — score 미생성 continue # 매핑 0 — score 미생성
tasks.append(_process_one(t, name_map.get(t, t), arts, sem, llm, model)) tasks.append(_process_one(t, name_map.get(t, t), arts, sem, llm, model, asof))
raw_results = await asyncio.gather(*tasks, return_exceptions=True) raw_results = await asyncio.gather(*tasks, return_exceptions=True)
successes: List[Dict[str, Any]] = [] successes: List[Dict[str, Any]] = []

View File

@@ -125,6 +125,16 @@ from . import telegram as _tg
from .engine import Screener, ScreenContext from .engine import Screener, ScreenContext
def _today_kst() -> dt.date:
"""KST 오늘 날짜.
stock 컨테이너는 python:3.12-alpine + tzdata 미설치라 TZ=Asia/Seoul이 무효 →
date.today()가 UTC를 반환한다. 08시대(KST) 리포트가 하루 밀리는 것을 막기 위해
UTC+9로 명시 보정한다(holdings_intel._today_kst와 동일한 관용).
"""
return (dt.datetime.utcnow() + dt.timedelta(hours=9)).date()
def _resolve_asof(asof_str, conn: sqlite3.Connection) -> dt.date: def _resolve_asof(asof_str, conn: sqlite3.Connection) -> dt.date:
if asof_str: if asof_str:
return dt.date.fromisoformat(asof_str) return dt.date.fromisoformat(asof_str)
@@ -263,7 +273,7 @@ from . import snapshot as _snap
@router.post("/snapshot/refresh") @router.post("/snapshot/refresh")
def post_snapshot_refresh(asof: Optional[str] = None): def post_snapshot_refresh(asof: Optional[str] = None):
asof_date = dt.date.fromisoformat(asof) if asof else dt.date.today() asof_date = dt.date.fromisoformat(asof) if asof else _today_kst()
if asof_date.weekday() >= 5: if asof_date.weekday() >= 5:
return {"asof": asof_date.isoformat(), "status": "skipped_weekend"} return {"asof": asof_date.isoformat(), "status": "skipped_weekend"}
with _conn() as c: with _conn() as c:
@@ -300,7 +310,7 @@ from .ai_news import validation as _ai_validation
@router.post("/snapshot/refresh-news-sentiment") @router.post("/snapshot/refresh-news-sentiment")
async def post_refresh_news_sentiment(asof: Optional[str] = None): async def post_refresh_news_sentiment(asof: Optional[str] = None):
asof_date = dt.date.fromisoformat(asof) if asof else dt.date.today() asof_date = dt.date.fromisoformat(asof) if asof else _today_kst()
if asof_date.weekday() >= 5: if asof_date.weekday() >= 5:
return {"asof": asof_date.isoformat(), "status": "skipped_weekend"} return {"asof": asof_date.isoformat(), "status": "skipped_weekend"}
if _is_holiday(asof_date): if _is_holiday(asof_date):

View File

@@ -58,6 +58,18 @@ async def test_score_sentiment_clamps_negative_out_of_range():
assert out["score_raw"] == -10.0 assert out["score_raw"] == -10.0
@pytest.mark.asyncio
async def test_score_sentiment_includes_asof_date_in_prompt():
"""asof(현재 KST 일자)를 넘기면 prompt에 오늘 날짜가 포함되어 LLM이 현재 일자 기준으로 판단."""
import datetime as _dt
llm = _mk_llm(json.dumps({"score": 5.0, "reason": "ok"}))
await analyzer.score_sentiment(
llm, "005930", NEWS, name="삼성전자", asof=_dt.date(2026, 7, 2),
)
user_msg = llm.messages.create.call_args.kwargs["messages"][0]["content"]
assert "2026-07-02" in user_msg
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_score_sentiment_includes_summary_in_prompt(): async def test_score_sentiment_includes_summary_in_prompt():
"""summary 가 있으면 prompt 에 포함, 없으면 title 만.""" """summary 가 있으면 prompt 에 포함, 없으면 title 만."""

View File

@@ -39,7 +39,7 @@ async def test_refresh_daily_happy_path(conn):
scores_by_ticker = { scores_by_ticker = {
"005930": 7.5, "000660": 4.0, "373220": -6.0, "005930": 7.5, "000660": 4.0, "373220": -6.0,
} }
async def fake_score(llm, ticker, news, *, name=None, model="m"): async def fake_score(llm, ticker, news, *, name=None, model="m", asof=None):
return { return {
"ticker": ticker, "score_raw": scores_by_ticker[ticker], "ticker": ticker, "score_raw": scores_by_ticker[ticker],
"reason": f"r{ticker}", "news_count": 1, "reason": f"r{ticker}", "news_count": 1,
@@ -81,7 +81,7 @@ async def test_refresh_daily_failures_isolated(conn):
} }
fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3} fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3}
async def fake_score(llm, ticker, news, *, name=None, model="m"): async def fake_score(llm, ticker, news, *, name=None, model="m", asof=None):
if ticker == "000660": if ticker == "000660":
raise RuntimeError("llm exploded") raise RuntimeError("llm exploded")
return { return {
@@ -116,7 +116,7 @@ async def test_refresh_daily_no_match_ticker_skipped(conn):
} }
fake_stats = {"total_articles": 1, "matched_pairs": 1, "hit_tickers": 1} fake_stats = {"total_articles": 1, "matched_pairs": 1, "hit_tickers": 1}
async def fake_score(llm, ticker, news, *, name=None, model="m"): async def fake_score(llm, ticker, news, *, name=None, model="m", asof=None):
return { return {
"ticker": ticker, "score_raw": 5.0, "reason": "r", "ticker": ticker, "score_raw": 5.0, "reason": "r",
"news_count": 1, "tokens_input": 100, "tokens_output": 20, "news_count": 1, "tokens_input": 100, "tokens_output": 20,
@@ -152,7 +152,7 @@ async def test_refresh_daily_sign_gate_no_positive_in_neg(conn):
fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3} fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3}
scores = {"005930": 6.0, "000660": 2.0, "373220": 0.5} # 모두 양수 scores = {"005930": 6.0, "000660": 2.0, "373220": 0.5} # 모두 양수
async def fake_score(llm, ticker, news, *, name=None, model="m"): async def fake_score(llm, ticker, news, *, name=None, model="m", asof=None):
return { return {
"ticker": ticker, "score_raw": scores[ticker], "reason": "r", "ticker": ticker, "score_raw": scores[ticker], "reason": "r",
"news_count": 1, "tokens_input": 1, "tokens_output": 1, "model": model, "news_count": 1, "tokens_input": 1, "tokens_output": 1, "model": model,
@@ -183,7 +183,7 @@ async def test_refresh_daily_sign_gate_excludes_neutral(conn):
fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3} fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3}
scores = {"005930": 3.0, "000660": 0.0, "373220": -3.0} scores = {"005930": 3.0, "000660": 0.0, "373220": -3.0}
async def fake_score(llm, ticker, news, *, name=None, model="m"): async def fake_score(llm, ticker, news, *, name=None, model="m", asof=None):
return { return {
"ticker": ticker, "score_raw": scores[ticker], "reason": "r", "ticker": ticker, "score_raw": scores[ticker], "reason": "r",
"news_count": 1, "tokens_input": 1, "tokens_output": 1, "model": model, "news_count": 1, "tokens_input": 1, "tokens_output": 1, "model": model,

View File

@@ -5,6 +5,21 @@ from fastapi.testclient import TestClient
from app.main import app from app.main import app
def test_today_kst_uses_kst_offset_not_utc(monkeypatch):
"""컨테이너가 UTC(Alpine, tzdata 미설치)라 date.today()는 08시 KST에 어제를 준다.
_today_kst()는 UTC+9로 보정해 오늘(KST)을 반환해야 한다."""
from app.screener import router
class _FrozenDT(dt.datetime):
@classmethod
def utcnow(cls):
# 2026-07-01 23:30 UTC == 2026-07-02 08:30 KST (AI 뉴스 리포트 시각대)
return dt.datetime(2026, 7, 1, 23, 30, 0)
monkeypatch.setattr(router.dt, "datetime", _FrozenDT)
assert router._today_kst() == dt.date(2026, 7, 2)
def test_refresh_news_sentiment_weekend_skip(): def test_refresh_news_sentiment_weekend_skip():
# 2026-05-16 = Saturday # 2026-05-16 = Saturday
client = TestClient(app) client = TestClient(app)