42 lines
1.4 KiB
Python
42 lines
1.4 KiB
Python
"""파이프라인 상태 머신 — 전이 규칙 단일 소스."""
|
|
|
|
STEPS = ["cover", "video", "thumb", "meta", "review", "publish"]
|
|
USER_GATES = ["cover", "video", "thumb", "meta", "publish"] # review는 자동
|
|
|
|
_APPROVE_NEXT = {
|
|
"cover_pending": "video_pending",
|
|
"video_pending": "thumb_pending",
|
|
"thumb_pending": "meta_pending",
|
|
"meta_pending": "ai_review", # 자동 검토 단계로
|
|
"publish_pending": "publishing",
|
|
}
|
|
|
|
TERMINAL_STATES = {"published", "cancelled", "failed", "awaiting_manual"}
|
|
|
|
|
|
def next_state_on_approve(state: str) -> str:
|
|
if state not in _APPROVE_NEXT:
|
|
raise ValueError(f"승인 불가 상태: {state}")
|
|
return _APPROVE_NEXT[state]
|
|
|
|
|
|
def next_state_on_reject(state: str) -> str:
|
|
if not state.endswith("_pending"):
|
|
raise ValueError(f"반려 불가 상태: {state}")
|
|
return state # 같은 상태 유지 (재생성 후 다시 _pending)
|
|
|
|
|
|
def can_transition(from_state: str, to_state: str) -> bool:
|
|
if from_state in TERMINAL_STATES:
|
|
return False
|
|
if to_state in {"cancelled", "failed", "awaiting_manual"}:
|
|
return True
|
|
if to_state == _APPROVE_NEXT.get(from_state):
|
|
return True
|
|
# 자동 전이 (ai_review → publish_pending, publishing → published)
|
|
auto_transitions = {
|
|
("ai_review", "publish_pending"),
|
|
("publishing", "published"),
|
|
}
|
|
return (from_state, to_state) in auto_transitions
|