"""YouTube OAuth flow + resumable 업로드.""" import os import logging from urllib.parse import urlencode import httpx from google.oauth2.credentials import Credentials from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload from googleapiclient.errors import HttpError from app import db logger = logging.getLogger("music-lab.youtube") SCOPES = ["https://www.googleapis.com/auth/youtube.upload", "https://www.googleapis.com/auth/youtube.readonly"] class NotAuthenticatedError(Exception): pass class QuotaExceededError(Exception): pass def _client_id() -> str: return os.getenv("YOUTUBE_OAUTH_CLIENT_ID", "") def _client_secret() -> str: return os.getenv("YOUTUBE_OAUTH_CLIENT_SECRET", "") def _redirect_uri() -> str: return os.getenv("YOUTUBE_OAUTH_REDIRECT_URI", "") def get_auth_url() -> str: cid = _client_id() redirect = _redirect_uri() if not cid or not redirect: raise RuntimeError("OAuth 환경변수 미설정") params = { "client_id": cid, "redirect_uri": redirect, "response_type": "code", "scope": " ".join(SCOPES), "access_type": "offline", "prompt": "consent", } return "https://accounts.google.com/o/oauth2/v2/auth?" + urlencode(params) async def exchange_code(code: str) -> dict: """code → refresh_token + access_token + 채널 정보 → DB 저장.""" async with httpx.AsyncClient(timeout=30) as client: token_resp = await client.post( "https://oauth2.googleapis.com/token", data={ "code": code, "client_id": _client_id(), "client_secret": _client_secret(), "redirect_uri": _redirect_uri(), "grant_type": "authorization_code", }, ) token_resp.raise_for_status() tok = token_resp.json() access = tok["access_token"] refresh = tok["refresh_token"] expires_at = _expiry_from_seconds(tok["expires_in"]) creds = _creds(access=access, refresh=refresh) yt = _build_youtube_client(creds) ch = yt.channels().list(part="snippet", mine=True).execute() item = ch["items"][0] db.upsert_oauth_token( channel_id=item["id"], channel_title=item["snippet"]["title"], avatar_url=item["snippet"]["thumbnails"]["default"]["url"], refresh_token=refresh, access_token=access, expires_at=expires_at, ) return {"channel_id": item["id"], "channel_title": item["snippet"]["title"]} def get_status() -> dict | None: tok = db.get_oauth_token() if not tok: return None return { "channel_id": tok["channel_id"], "channel_title": tok["channel_title"], "avatar_url": tok["avatar_url"], } def disconnect() -> None: db.delete_oauth_token() def upload_video(*, video_path: str, thumbnail_path: str | None, metadata: dict, privacy: str) -> dict: tok = db.get_oauth_token() if not tok: raise NotAuthenticatedError("YouTube 인증 없음") creds = _creds(access=tok["access_token"], refresh=tok["refresh_token"]) yt = _build_youtube_client(creds) body = { "snippet": { "title": metadata["title"], "description": metadata["description"], "tags": metadata.get("tags", []), "categoryId": str(metadata.get("category_id", 10)), }, "status": {"privacyStatus": privacy, "selfDeclaredMadeForKids": False}, } media = MediaFileUpload(video_path, chunksize=4 * 1024 * 1024, resumable=True, mimetype="video/mp4") req = yt.videos().insert(part="snippet,status", body=body, media_body=media) try: response = None while response is None: status, response = req.next_chunk() video_id = response["id"] except HttpError as e: if b"quotaExceeded" in (e.content or b""): raise QuotaExceededError(str(e)) raise if thumbnail_path: try: yt.thumbnails().set(videoId=video_id, media_body=thumbnail_path).execute() except HttpError as e: logger.warning("썸네일 업로드 실패: %s", e) return {"video_id": video_id} def _build_youtube_client(creds): # patch 포인트 return build("youtube", "v3", credentials=creds, cache_discovery=False) def _creds(access: str, refresh: str) -> Credentials: return Credentials( token=access, refresh_token=refresh, token_uri="https://oauth2.googleapis.com/token", client_id=_client_id(), client_secret=_client_secret(), scopes=SCOPES, ) def _expiry_from_seconds(secs: int) -> str: from datetime import datetime, timedelta return (datetime.utcnow() + timedelta(seconds=secs)).isoformat(timespec="seconds")