""" Gmail 자동화 RPA 프로그램 Gmail Automation RPA Gmail API를 사용하여 이메일을 자동으로 분류하고 답장하는 프로그램입니다. """ from gmail_auth import GmailAuth from email_classifier import EmailClassifier from auto_reply import AutoReply from datetime import datetime import json import os class GmailAutomation: """Gmail 자동화 클래스""" def __init__(self, config_file="config.json"): """ 초기화 Args: config_file: 설정 파일 경로 """ self.config = self.load_config(config_file) self.auth = GmailAuth() self.service = self.auth.get_service() self.classifier = EmailClassifier(self.config) self.auto_reply = AutoReply(self.service, self.config) def load_config(self, config_file): """설정 파일 로드""" if os.path.exists(config_file): with open(config_file, 'r', encoding='utf-8') as f: return json.load(f) else: # 기본 설정 생성 default_config = { "classification_rules": [ { "name": "업무", "keywords": ["회의", "프로젝트", "업무", "문의"], "from_domains": ["company.com"], "label": "업무/중요" }, { "name": "마케팅", "keywords": ["광고", "프로모션", "할인"], "label": "마케팅" }, { "name": "개인", "keywords": ["안녕", "친구"], "from_domains": ["gmail.com", "naver.com"], "label": "개인" } ], "auto_reply_rules": [ { "condition": "업무 시간 외", "hours": {"start": 18, "end": 9}, "template": "업무 시간 외 자동 답장입니다. 다음 영업일에 확인하겠습니다." } ], "max_emails": 50 } with open(config_file, 'w', encoding='utf-8') as f: json.dump(default_config, f, ensure_ascii=False, indent=2) return default_config def get_unread_emails(self, max_results=None): """읽지 않은 이메일 가져오기""" if max_results is None: max_results = self.config.get("max_emails", 50) try: results = self.service.users().messages().list( userId='me', q='is:unread', maxResults=max_results ).execute() messages = results.get('messages', []) return messages except Exception as e: print(f"❌ 이메일 가져오기 실패: {e}") return [] def get_email_details(self, msg_id): """이메일 상세 정보 가져오기""" try: message = self.service.users().messages().get( userId='me', id=msg_id, format='full' ).execute() headers = message['payload']['headers'] subject = next((h['value'] for h in headers if h['name'] == 'Subject'), '제목 없음') sender = next((h['value'] for h in headers if h['name'] == 'From'), '발신자 미상') date = next((h['value'] for h in headers if h['name'] == 'Date'), '') # 본문 추출 body = self.extract_body(message) return { 'id': msg_id, 'subject': subject, 'sender': sender, 'date': date, 'body': body, 'snippet': message.get('snippet', '') } except Exception as e: print(f"❌ 이메일 상세 정보 가져오기 실패: {e}") return None def extract_body(self, message): """이메일 본문 추출""" try: if 'parts' in message['payload']: parts = message['payload']['parts'] for part in parts: if part['mimeType'] == 'text/plain': import base64 data = part['body'].get('data', '') return base64.urlsafe_b64decode(data).decode('utf-8') else: import base64 data = message['payload']['body'].get('data', '') if data: return base64.urlsafe_b64decode(data).decode('utf-8') except Exception as e: print(f"⚠️ 본문 추출 실패: {e}") return message.get('snippet', '') def apply_label(self, msg_id, label_name): """이메일에 라벨 적용""" try: # 라벨 가져오기 또는 생성 label_id = self.get_or_create_label(label_name) if label_id: self.service.users().messages().modify( userId='me', id=msg_id, body={'addLabelIds': [label_id]} ).execute() return True except Exception as e: print(f"❌ 라벨 적용 실패: {e}") return False def get_or_create_label(self, label_name): """라벨 가져오기 또는 생성""" try: # 기존 라벨 확인 results = self.service.users().labels().list(userId='me').execute() labels = results.get('labels', []) for label in labels: if label['name'] == label_name: return label['id'] # 라벨 생성 label_object = { 'name': label_name, 'labelListVisibility': 'labelShow', 'messageListVisibility': 'show' } created_label = self.service.users().labels().create( userId='me', body=label_object ).execute() return created_label['id'] except Exception as e: print(f"❌ 라벨 생성 실패: {e}") return None def process_emails(self): """이메일 자동 처리""" print("=" * 60) print("📧 Gmail 자동화 RPA 실행") print("=" * 60) print() # 읽지 않은 이메일 가져오기 print("📥 읽지 않은 이메일 확인 중...") messages = self.get_unread_emails() if not messages: print("✅ 처리할 이메일이 없습니다.") return print(f"📨 총 {len(messages)}개의 읽지 않은 이메일 발견") print("-" * 60) processed_count = 0 classified_count = 0 replied_count = 0 for i, msg in enumerate(messages, 1): try: # 이메일 상세 정보 가져오기 email_data = self.get_email_details(msg['id']) if not email_data: continue print(f"\n{i}. 처리 중...") print(f" 제목: {email_data['subject'][:50]}...") print(f" 발신자: {email_data['sender']}") # 이메일 분류 category = self.classifier.classify(email_data) if category: print(f" 📁 분류: {category['label']}") if self.apply_label(msg['id'], category['label']): classified_count += 1 # 자동 답장 (조건 확인) if self.auto_reply.should_reply(email_data): if self.auto_reply.send_reply(msg['id'], email_data): print(f" ✉️ 자동 답장 전송됨") replied_count += 1 processed_count += 1 except Exception as e: print(f" ❌ 처리 실패: {e}") continue print("\n" + "=" * 60) print("✅ 처리 완료!") print(f"📊 처리: {processed_count}건") print(f"📁 분류: {classified_count}건") print(f"✉️ 답장: {replied_count}건") print("=" * 60) def generate_report(self): """통계 리포트 생성""" print("\n📊 이메일 통계 리포트") print("-" * 60) try: # 라벨별 이메일 개수 results = self.service.users().labels().list(userId='me').execute() labels = results.get('labels', []) print("\n라벨별 이메일 개수:") for label in labels: if not label['name'].startswith('CATEGORY_'): print(f" • {label['name']}: {label.get('messagesTotal', 0)}개") except Exception as e: print(f"❌ 리포트 생성 실패: {e}") def main(): """메인 실행 함수""" print("🚀 Gmail 자동화 RPA 시작\n") try: automation = GmailAutomation() print("메뉴를 선택하세요:") print("1. 이메일 자동 분류 및 답장") print("2. 통계 리포트 보기") print("3. 종료") choice = input("\n선택 (1-3): ").strip() if choice == '1': automation.process_emails() elif choice == '2': automation.generate_report() elif choice == '3': print("👋 프로그램을 종료합니다.") else: print("❌ 잘못된 선택입니다.") except Exception as e: print(f"❌ 오류 발생: {e}") if __name__ == "__main__": main()