Gmail API를 활용한 이메일 자동화 RPA 프로그램 Features: - Gmail API 인증 및 연동 - 키워드/발신자 기반 자동 분류 - 조건별 자동 답장 기능 - 통계 리포트 생성 - 커스터마이징 가능한 JSON 설정 Modules: - gmail_automation.py: 메인 프로그램 - gmail_auth.py: Gmail API 인증 - email_classifier.py: 이메일 분류 로직 - auto_reply.py: 자동 답장 로직 - config.json.example: 설정 예시 Documentation: - 상세한 README.md (설치, 사용법, 트러블슈팅) - Google Cloud Console 설정 가이드 - 실제 효과 측정 데이터
300 lines
9.6 KiB
Python
300 lines
9.6 KiB
Python
"""
|
|
Gmail 자동화 RPA 프로그램
|
|
Gmail Automation RPA
|
|
|
|
Gmail API를 사용하여 이메일을 자동으로 분류하고 답장하는 프로그램입니다.
|
|
"""
|
|
|
|
from gmail_auth import GmailAuth
|
|
from email_classifier import EmailClassifier
|
|
from auto_reply import AutoReply
|
|
from datetime import datetime
|
|
import json
|
|
import os
|
|
|
|
|
|
class GmailAutomation:
|
|
"""Gmail 자동화 클래스"""
|
|
|
|
def __init__(self, config_file="config.json"):
|
|
"""
|
|
초기화
|
|
|
|
Args:
|
|
config_file: 설정 파일 경로
|
|
"""
|
|
self.config = self.load_config(config_file)
|
|
self.auth = GmailAuth()
|
|
self.service = self.auth.get_service()
|
|
self.classifier = EmailClassifier(self.config)
|
|
self.auto_reply = AutoReply(self.service, self.config)
|
|
|
|
def load_config(self, config_file):
|
|
"""설정 파일 로드"""
|
|
if os.path.exists(config_file):
|
|
with open(config_file, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
else:
|
|
# 기본 설정 생성
|
|
default_config = {
|
|
"classification_rules": [
|
|
{
|
|
"name": "업무",
|
|
"keywords": ["회의", "프로젝트", "업무", "문의"],
|
|
"from_domains": ["company.com"],
|
|
"label": "업무/중요"
|
|
},
|
|
{
|
|
"name": "마케팅",
|
|
"keywords": ["광고", "프로모션", "할인"],
|
|
"label": "마케팅"
|
|
},
|
|
{
|
|
"name": "개인",
|
|
"keywords": ["안녕", "친구"],
|
|
"from_domains": ["gmail.com", "naver.com"],
|
|
"label": "개인"
|
|
}
|
|
],
|
|
"auto_reply_rules": [
|
|
{
|
|
"condition": "업무 시간 외",
|
|
"hours": {"start": 18, "end": 9},
|
|
"template": "업무 시간 외 자동 답장입니다. 다음 영업일에 확인하겠습니다."
|
|
}
|
|
],
|
|
"max_emails": 50
|
|
}
|
|
|
|
with open(config_file, 'w', encoding='utf-8') as f:
|
|
json.dump(default_config, f, ensure_ascii=False, indent=2)
|
|
|
|
return default_config
|
|
|
|
def get_unread_emails(self, max_results=None):
|
|
"""읽지 않은 이메일 가져오기"""
|
|
if max_results is None:
|
|
max_results = self.config.get("max_emails", 50)
|
|
|
|
try:
|
|
results = self.service.users().messages().list(
|
|
userId='me',
|
|
q='is:unread',
|
|
maxResults=max_results
|
|
).execute()
|
|
|
|
messages = results.get('messages', [])
|
|
return messages
|
|
|
|
except Exception as e:
|
|
print(f"❌ 이메일 가져오기 실패: {e}")
|
|
return []
|
|
|
|
def get_email_details(self, msg_id):
|
|
"""이메일 상세 정보 가져오기"""
|
|
try:
|
|
message = self.service.users().messages().get(
|
|
userId='me',
|
|
id=msg_id,
|
|
format='full'
|
|
).execute()
|
|
|
|
headers = message['payload']['headers']
|
|
subject = next((h['value'] for h in headers if h['name'] == 'Subject'), '제목 없음')
|
|
sender = next((h['value'] for h in headers if h['name'] == 'From'), '발신자 미상')
|
|
date = next((h['value'] for h in headers if h['name'] == 'Date'), '')
|
|
|
|
# 본문 추출
|
|
body = self.extract_body(message)
|
|
|
|
return {
|
|
'id': msg_id,
|
|
'subject': subject,
|
|
'sender': sender,
|
|
'date': date,
|
|
'body': body,
|
|
'snippet': message.get('snippet', '')
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"❌ 이메일 상세 정보 가져오기 실패: {e}")
|
|
return None
|
|
|
|
def extract_body(self, message):
|
|
"""이메일 본문 추출"""
|
|
try:
|
|
if 'parts' in message['payload']:
|
|
parts = message['payload']['parts']
|
|
for part in parts:
|
|
if part['mimeType'] == 'text/plain':
|
|
import base64
|
|
data = part['body'].get('data', '')
|
|
return base64.urlsafe_b64decode(data).decode('utf-8')
|
|
else:
|
|
import base64
|
|
data = message['payload']['body'].get('data', '')
|
|
if data:
|
|
return base64.urlsafe_b64decode(data).decode('utf-8')
|
|
except Exception as e:
|
|
print(f"⚠️ 본문 추출 실패: {e}")
|
|
|
|
return message.get('snippet', '')
|
|
|
|
def apply_label(self, msg_id, label_name):
|
|
"""이메일에 라벨 적용"""
|
|
try:
|
|
# 라벨 가져오기 또는 생성
|
|
label_id = self.get_or_create_label(label_name)
|
|
|
|
if label_id:
|
|
self.service.users().messages().modify(
|
|
userId='me',
|
|
id=msg_id,
|
|
body={'addLabelIds': [label_id]}
|
|
).execute()
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"❌ 라벨 적용 실패: {e}")
|
|
|
|
return False
|
|
|
|
def get_or_create_label(self, label_name):
|
|
"""라벨 가져오기 또는 생성"""
|
|
try:
|
|
# 기존 라벨 확인
|
|
results = self.service.users().labels().list(userId='me').execute()
|
|
labels = results.get('labels', [])
|
|
|
|
for label in labels:
|
|
if label['name'] == label_name:
|
|
return label['id']
|
|
|
|
# 라벨 생성
|
|
label_object = {
|
|
'name': label_name,
|
|
'labelListVisibility': 'labelShow',
|
|
'messageListVisibility': 'show'
|
|
}
|
|
|
|
created_label = self.service.users().labels().create(
|
|
userId='me',
|
|
body=label_object
|
|
).execute()
|
|
|
|
return created_label['id']
|
|
|
|
except Exception as e:
|
|
print(f"❌ 라벨 생성 실패: {e}")
|
|
return None
|
|
|
|
def process_emails(self):
|
|
"""이메일 자동 처리"""
|
|
print("=" * 60)
|
|
print("📧 Gmail 자동화 RPA 실행")
|
|
print("=" * 60)
|
|
print()
|
|
|
|
# 읽지 않은 이메일 가져오기
|
|
print("📥 읽지 않은 이메일 확인 중...")
|
|
messages = self.get_unread_emails()
|
|
|
|
if not messages:
|
|
print("✅ 처리할 이메일이 없습니다.")
|
|
return
|
|
|
|
print(f"📨 총 {len(messages)}개의 읽지 않은 이메일 발견")
|
|
print("-" * 60)
|
|
|
|
processed_count = 0
|
|
classified_count = 0
|
|
replied_count = 0
|
|
|
|
for i, msg in enumerate(messages, 1):
|
|
try:
|
|
# 이메일 상세 정보 가져오기
|
|
email_data = self.get_email_details(msg['id'])
|
|
|
|
if not email_data:
|
|
continue
|
|
|
|
print(f"\n{i}. 처리 중...")
|
|
print(f" 제목: {email_data['subject'][:50]}...")
|
|
print(f" 발신자: {email_data['sender']}")
|
|
|
|
# 이메일 분류
|
|
category = self.classifier.classify(email_data)
|
|
|
|
if category:
|
|
print(f" 📁 분류: {category['label']}")
|
|
if self.apply_label(msg['id'], category['label']):
|
|
classified_count += 1
|
|
|
|
# 자동 답장 (조건 확인)
|
|
if self.auto_reply.should_reply(email_data):
|
|
if self.auto_reply.send_reply(msg['id'], email_data):
|
|
print(f" ✉️ 자동 답장 전송됨")
|
|
replied_count += 1
|
|
|
|
processed_count += 1
|
|
|
|
except Exception as e:
|
|
print(f" ❌ 처리 실패: {e}")
|
|
continue
|
|
|
|
print("\n" + "=" * 60)
|
|
print("✅ 처리 완료!")
|
|
print(f"📊 처리: {processed_count}건")
|
|
print(f"📁 분류: {classified_count}건")
|
|
print(f"✉️ 답장: {replied_count}건")
|
|
print("=" * 60)
|
|
|
|
def generate_report(self):
|
|
"""통계 리포트 생성"""
|
|
print("\n📊 이메일 통계 리포트")
|
|
print("-" * 60)
|
|
|
|
try:
|
|
# 라벨별 이메일 개수
|
|
results = self.service.users().labels().list(userId='me').execute()
|
|
labels = results.get('labels', [])
|
|
|
|
print("\n라벨별 이메일 개수:")
|
|
for label in labels:
|
|
if not label['name'].startswith('CATEGORY_'):
|
|
print(f" • {label['name']}: {label.get('messagesTotal', 0)}개")
|
|
|
|
except Exception as e:
|
|
print(f"❌ 리포트 생성 실패: {e}")
|
|
|
|
|
|
def main():
|
|
"""메인 실행 함수"""
|
|
print("🚀 Gmail 자동화 RPA 시작\n")
|
|
|
|
try:
|
|
automation = GmailAutomation()
|
|
|
|
print("메뉴를 선택하세요:")
|
|
print("1. 이메일 자동 분류 및 답장")
|
|
print("2. 통계 리포트 보기")
|
|
print("3. 종료")
|
|
|
|
choice = input("\n선택 (1-3): ").strip()
|
|
|
|
if choice == '1':
|
|
automation.process_emails()
|
|
elif choice == '2':
|
|
automation.generate_report()
|
|
elif choice == '3':
|
|
print("👋 프로그램을 종료합니다.")
|
|
else:
|
|
print("❌ 잘못된 선택입니다.")
|
|
|
|
except Exception as e:
|
|
print(f"❌ 오류 발생: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|