All checks were successful
Build And Test / publish (push) Successful in 48s
1083 lines
38 KiB
Python
1083 lines
38 KiB
Python
import email
|
|
from email.header import decode_header
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from email.utils import formataddr, parseaddr, formatdate, make_msgid
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
import re
|
|
|
|
from imapclient import IMAPClient
|
|
import aiosmtplib
|
|
|
|
from models.email_models import (
|
|
Mailbox,
|
|
EmailAddress,
|
|
Attachment,
|
|
EmailSummary,
|
|
Email,
|
|
EmailList,
|
|
UnsubscribeInfo,
|
|
)
|
|
from models.common import OperationResult
|
|
from config import Settings
|
|
from services.unsubscribe_service import parse_unsubscribe_header, execute_unsubscribe
|
|
|
|
|
|
def decode_mime_header(header) -> str:
|
|
if not header:
|
|
return ""
|
|
# Handle bytes input from IMAP
|
|
if isinstance(header, bytes):
|
|
header = header.decode("utf-8", errors="replace")
|
|
decoded_parts = []
|
|
for part, encoding in decode_header(header):
|
|
if isinstance(part, bytes):
|
|
decoded_parts.append(part.decode(encoding or "utf-8", errors="replace"))
|
|
else:
|
|
decoded_parts.append(part)
|
|
return "".join(decoded_parts)
|
|
|
|
|
|
def parse_email_address(addr: str) -> EmailAddress:
|
|
name, email_addr = parseaddr(addr)
|
|
return EmailAddress(name=decode_mime_header(name) or None, email=email_addr)
|
|
|
|
|
|
def parse_email_addresses(addrs: Optional[str]) -> list[EmailAddress]:
|
|
if not addrs:
|
|
return []
|
|
# Handle multiple addresses separated by comma
|
|
addresses = []
|
|
for addr in addrs.split(","):
|
|
addr = addr.strip()
|
|
if addr:
|
|
addresses.append(parse_email_address(addr))
|
|
return addresses
|
|
|
|
|
|
class EmailService:
|
|
def __init__(self, settings: Settings):
|
|
self.settings = settings
|
|
self._imap_client: Optional[IMAPClient] = None
|
|
|
|
def _get_imap_client(self) -> IMAPClient:
|
|
if self._imap_client is None:
|
|
self._imap_client = IMAPClient(
|
|
host=self.settings.imap_host,
|
|
port=self.settings.imap_port,
|
|
ssl=self.settings.imap_use_ssl,
|
|
)
|
|
self._imap_client.login(
|
|
self.settings.imap_username,
|
|
self.settings.imap_password.get_secret_value(),
|
|
)
|
|
return self._imap_client
|
|
|
|
def _close_imap_client(self):
|
|
if self._imap_client:
|
|
try:
|
|
self._imap_client.logout()
|
|
except Exception:
|
|
pass
|
|
self._imap_client = None
|
|
|
|
def list_mailboxes(self) -> list[Mailbox]:
|
|
client = self._get_imap_client()
|
|
folders = client.list_folders()
|
|
mailboxes = []
|
|
|
|
for flags, delimiter, name in folders:
|
|
# Get folder status
|
|
try:
|
|
status = client.folder_status(name, ["MESSAGES", "UNSEEN"])
|
|
message_count = status.get(b"MESSAGES", 0)
|
|
unread_count = status.get(b"UNSEEN", 0)
|
|
except Exception:
|
|
message_count = 0
|
|
unread_count = 0
|
|
|
|
has_children = b"\\HasChildren" in flags
|
|
|
|
mailboxes.append(
|
|
Mailbox(
|
|
name=name.split(delimiter.decode() if delimiter else "/")[-1],
|
|
path=name,
|
|
message_count=message_count,
|
|
unread_count=unread_count,
|
|
has_children=has_children,
|
|
)
|
|
)
|
|
|
|
return mailboxes
|
|
|
|
def list_emails(
|
|
self,
|
|
mailbox: str = "INBOX",
|
|
limit: int = 50,
|
|
offset: int = 0,
|
|
include_body: bool = False,
|
|
) -> EmailList:
|
|
client = self._get_imap_client()
|
|
client.select_folder(mailbox, readonly=True)
|
|
|
|
# Search for all messages
|
|
message_ids = client.search(["ALL"])
|
|
total = len(message_ids)
|
|
|
|
# Sort by UID descending (newest first) and apply pagination
|
|
message_ids = sorted(message_ids, reverse=True)
|
|
paginated_ids = message_ids[offset : offset + limit]
|
|
|
|
if not paginated_ids:
|
|
return EmailList(
|
|
emails=[], total=total, mailbox=mailbox, limit=limit, offset=offset
|
|
)
|
|
|
|
# Fetch message data
|
|
fetch_items = ["ENVELOPE", "FLAGS", "BODYSTRUCTURE", "RFC822.SIZE"]
|
|
if include_body:
|
|
fetch_items.append("BODY.PEEK[]")
|
|
|
|
messages = client.fetch(paginated_ids, fetch_items)
|
|
emails = []
|
|
|
|
for uid, data in messages.items():
|
|
envelope = data[b"ENVELOPE"]
|
|
flags = data[b"FLAGS"]
|
|
|
|
# Parse from address
|
|
from_addr = EmailAddress(name=None, email="unknown@unknown.com")
|
|
if envelope.from_ and len(envelope.from_) > 0:
|
|
sender = envelope.from_[0]
|
|
from_addr = EmailAddress(
|
|
name=decode_mime_header(sender.name) if sender.name else None,
|
|
email=f"{sender.mailbox.decode() if sender.mailbox else 'unknown'}@{sender.host.decode() if sender.host else 'unknown.com'}",
|
|
)
|
|
|
|
# Parse to addresses
|
|
to_addrs = []
|
|
if envelope.to:
|
|
for addr in envelope.to:
|
|
to_addrs.append(
|
|
EmailAddress(
|
|
name=decode_mime_header(addr.name) if addr.name else None,
|
|
email=f"{addr.mailbox.decode() if addr.mailbox else 'unknown'}@{addr.host.decode() if addr.host else 'unknown.com'}",
|
|
)
|
|
)
|
|
|
|
# Parse date
|
|
date = envelope.date or datetime.now()
|
|
|
|
# Check for attachments
|
|
has_attachments = self._has_attachments(data.get(b"BODYSTRUCTURE"))
|
|
|
|
# Get snippet if body was fetched
|
|
snippet = None
|
|
if include_body and b"BODY[]" in data:
|
|
raw_email = data[b"BODY[]"]
|
|
msg = email.message_from_bytes(raw_email)
|
|
snippet = self._get_text_snippet(msg, 200)
|
|
|
|
email_summary = EmailSummary(
|
|
id=str(uid),
|
|
mailbox=mailbox,
|
|
subject=decode_mime_header(envelope.subject) if envelope.subject else "(No Subject)",
|
|
from_address=from_addr,
|
|
to_addresses=to_addrs,
|
|
date=date,
|
|
is_read=b"\\Seen" in flags,
|
|
is_flagged=b"\\Flagged" in flags,
|
|
has_attachments=has_attachments,
|
|
snippet=snippet,
|
|
)
|
|
emails.append(email_summary)
|
|
|
|
# Sort by date descending
|
|
emails.sort(key=lambda e: e.date, reverse=True)
|
|
|
|
return EmailList(
|
|
emails=emails, total=total, mailbox=mailbox, limit=limit, offset=offset
|
|
)
|
|
|
|
def list_drafts(
|
|
self,
|
|
mailbox: Optional[str] = None,
|
|
limit: int = 50,
|
|
offset: int = 0,
|
|
include_body: bool = False,
|
|
) -> EmailList:
|
|
draft_mailbox = mailbox or self._find_drafts_folder() or "Drafts"
|
|
try:
|
|
return self.list_emails(draft_mailbox, limit, offset, include_body)
|
|
except Exception:
|
|
return EmailList(
|
|
emails=[],
|
|
total=0,
|
|
mailbox=draft_mailbox,
|
|
limit=limit,
|
|
offset=offset,
|
|
)
|
|
|
|
def read_email(
|
|
self, mailbox: str, email_id: str, format: str = "text"
|
|
) -> Optional[Email]:
|
|
client = self._get_imap_client()
|
|
client.select_folder(mailbox, readonly=True)
|
|
|
|
uid = int(email_id)
|
|
messages = client.fetch([uid], ["ENVELOPE", "FLAGS", "BODY[]", "BODYSTRUCTURE"])
|
|
|
|
if uid not in messages:
|
|
return None
|
|
|
|
data = messages[uid]
|
|
envelope = data[b"ENVELOPE"]
|
|
flags = data[b"FLAGS"]
|
|
raw_email = data[b"BODY[]"]
|
|
|
|
msg = email.message_from_bytes(raw_email)
|
|
|
|
# Parse from address
|
|
from_addr = EmailAddress(name=None, email="unknown@unknown.com")
|
|
if envelope.from_ and len(envelope.from_) > 0:
|
|
sender = envelope.from_[0]
|
|
from_addr = EmailAddress(
|
|
name=decode_mime_header(sender.name) if sender.name else None,
|
|
email=f"{sender.mailbox.decode() if sender.mailbox else 'unknown'}@{sender.host.decode() if sender.host else 'unknown.com'}",
|
|
)
|
|
|
|
# Parse addresses
|
|
to_addrs = self._parse_envelope_addresses(envelope.to)
|
|
cc_addrs = self._parse_envelope_addresses(envelope.cc)
|
|
bcc_addrs = self._parse_envelope_addresses(envelope.bcc)
|
|
|
|
# Get body
|
|
body_text, body_html = self._get_body(msg)
|
|
|
|
# Get attachments
|
|
attachments = self._get_attachments(msg)
|
|
|
|
# Get headers
|
|
headers = {}
|
|
for key in ["Message-ID", "In-Reply-To", "References", "Reply-To", "X-Priority", "List-Unsubscribe", "List-Unsubscribe-Post"]:
|
|
value = msg.get(key)
|
|
if value:
|
|
headers[key] = decode_mime_header(value)
|
|
|
|
# Parse unsubscribe info
|
|
unsubscribe_info = parse_unsubscribe_header(msg)
|
|
|
|
return Email(
|
|
id=str(uid),
|
|
mailbox=mailbox,
|
|
subject=decode_mime_header(envelope.subject) if envelope.subject else "(No Subject)",
|
|
from_address=from_addr,
|
|
to_addresses=to_addrs,
|
|
cc_addresses=cc_addrs,
|
|
bcc_addresses=bcc_addrs,
|
|
date=envelope.date or datetime.now(),
|
|
is_read=b"\\Seen" in flags,
|
|
is_flagged=b"\\Flagged" in flags,
|
|
has_attachments=len(attachments) > 0,
|
|
body_text=body_text if format in ["text", "both"] else None,
|
|
body_html=body_html if format in ["html", "both"] else None,
|
|
attachments=attachments,
|
|
headers=headers,
|
|
in_reply_to=headers.get("In-Reply-To"),
|
|
references=headers.get("References", "").split() if headers.get("References") else [],
|
|
unsubscribe=unsubscribe_info if unsubscribe_info.available else None,
|
|
)
|
|
|
|
def search_emails(
|
|
self,
|
|
query: str,
|
|
mailbox: str = "INBOX",
|
|
search_in: list[str] = None,
|
|
date_from: Optional[str] = None,
|
|
date_to: Optional[str] = None,
|
|
limit: int = 50,
|
|
) -> EmailList:
|
|
if search_in is None:
|
|
search_in = ["subject", "from", "body"]
|
|
|
|
client = self._get_imap_client()
|
|
client.select_folder(mailbox, readonly=True)
|
|
|
|
# Build IMAP search criteria
|
|
criteria = []
|
|
|
|
# Add text search
|
|
if "subject" in search_in:
|
|
criteria.append(["SUBJECT", query])
|
|
elif "from" in search_in:
|
|
criteria.append(["FROM", query])
|
|
elif "body" in search_in:
|
|
criteria.append(["BODY", query])
|
|
else:
|
|
criteria.append(["TEXT", query])
|
|
|
|
# Add date filters
|
|
if date_from:
|
|
criteria.append(["SINCE", date_from])
|
|
if date_to:
|
|
criteria.append(["BEFORE", date_to])
|
|
|
|
# Flatten criteria for OR search across fields
|
|
if len(criteria) == 1:
|
|
search_criteria = criteria[0]
|
|
else:
|
|
# Use OR for multiple search fields
|
|
search_criteria = criteria[0]
|
|
|
|
message_ids = client.search(search_criteria)
|
|
total = len(message_ids)
|
|
|
|
# Sort and limit
|
|
message_ids = sorted(message_ids, reverse=True)[:limit]
|
|
|
|
if not message_ids:
|
|
return EmailList(
|
|
emails=[], total=0, mailbox=mailbox, limit=limit, offset=0
|
|
)
|
|
|
|
# Fetch and parse messages
|
|
messages = client.fetch(message_ids, ["ENVELOPE", "FLAGS", "BODYSTRUCTURE"])
|
|
emails = []
|
|
|
|
for uid, data in messages.items():
|
|
envelope = data[b"ENVELOPE"]
|
|
flags = data[b"FLAGS"]
|
|
|
|
from_addr = EmailAddress(name=None, email="unknown@unknown.com")
|
|
if envelope.from_ and len(envelope.from_) > 0:
|
|
sender = envelope.from_[0]
|
|
from_addr = EmailAddress(
|
|
name=decode_mime_header(sender.name) if sender.name else None,
|
|
email=f"{sender.mailbox.decode() if sender.mailbox else 'unknown'}@{sender.host.decode() if sender.host else 'unknown.com'}",
|
|
)
|
|
|
|
to_addrs = self._parse_envelope_addresses(envelope.to)
|
|
|
|
email_summary = EmailSummary(
|
|
id=str(uid),
|
|
mailbox=mailbox,
|
|
subject=decode_mime_header(envelope.subject) if envelope.subject else "(No Subject)",
|
|
from_address=from_addr,
|
|
to_addresses=to_addrs,
|
|
date=envelope.date or datetime.now(),
|
|
is_read=b"\\Seen" in flags,
|
|
is_flagged=b"\\Flagged" in flags,
|
|
has_attachments=self._has_attachments(data.get(b"BODYSTRUCTURE")),
|
|
)
|
|
emails.append(email_summary)
|
|
|
|
emails.sort(key=lambda e: e.date, reverse=True)
|
|
|
|
return EmailList(
|
|
emails=emails, total=total, mailbox=mailbox, limit=limit, offset=0
|
|
)
|
|
|
|
def move_email(
|
|
self, email_id: str, source_mailbox: str, destination_mailbox: str
|
|
) -> OperationResult:
|
|
try:
|
|
client = self._get_imap_client()
|
|
client.select_folder(source_mailbox)
|
|
uid = int(email_id)
|
|
client.move([uid], destination_mailbox)
|
|
return OperationResult(
|
|
success=True,
|
|
message=f"Email moved from {source_mailbox} to {destination_mailbox}",
|
|
id=email_id,
|
|
)
|
|
except Exception as e:
|
|
return OperationResult(success=False, message=str(e))
|
|
|
|
def delete_email(
|
|
self, email_id: str, mailbox: str, permanent: bool = False
|
|
) -> OperationResult:
|
|
try:
|
|
client = self._get_imap_client()
|
|
client.select_folder(mailbox)
|
|
uid = int(email_id)
|
|
|
|
if permanent:
|
|
client.delete_messages([uid])
|
|
client.expunge()
|
|
return OperationResult(
|
|
success=True, message="Email permanently deleted", id=email_id
|
|
)
|
|
else:
|
|
# Move to Trash
|
|
trash_folder = self._find_trash_folder()
|
|
if trash_folder:
|
|
client.move([uid], trash_folder)
|
|
return OperationResult(
|
|
success=True, message="Email moved to trash", id=email_id
|
|
)
|
|
else:
|
|
client.delete_messages([uid])
|
|
client.expunge()
|
|
return OperationResult(
|
|
success=True, message="Email deleted (no trash folder found)", id=email_id
|
|
)
|
|
except Exception as e:
|
|
return OperationResult(success=False, message=str(e))
|
|
|
|
def delete_draft(
|
|
self, email_id: str, mailbox: Optional[str] = None, permanent: bool = False
|
|
) -> OperationResult:
|
|
draft_mailbox = mailbox or self._find_drafts_folder() or "Drafts"
|
|
return self.delete_email(email_id, draft_mailbox, permanent)
|
|
|
|
def save_draft(
|
|
self,
|
|
to: Optional[list[str]] = None,
|
|
subject: Optional[str] = None,
|
|
body: Optional[str] = None,
|
|
cc: Optional[list[str]] = None,
|
|
bcc: Optional[list[str]] = None,
|
|
html_body: Optional[str] = None,
|
|
mailbox: Optional[str] = None,
|
|
in_reply_to_email_id: Optional[str] = None,
|
|
in_reply_to_mailbox: Optional[str] = None,
|
|
reply_all: bool = False,
|
|
) -> OperationResult:
|
|
try:
|
|
draft_mailbox = mailbox or self._find_drafts_folder() or "Drafts"
|
|
if in_reply_to_email_id:
|
|
context, error = self._get_reply_context(
|
|
in_reply_to_mailbox or "INBOX",
|
|
in_reply_to_email_id,
|
|
reply_all,
|
|
cc,
|
|
self.settings.smtp_from_email,
|
|
)
|
|
if error:
|
|
return error
|
|
if not to:
|
|
to = context["to"]
|
|
if subject is None:
|
|
subject = context["subject"]
|
|
if cc is None:
|
|
cc = context["cc"]
|
|
in_reply_to = context["in_reply_to"]
|
|
references = context["references"]
|
|
else:
|
|
in_reply_to = None
|
|
references = None
|
|
|
|
if not to:
|
|
return OperationResult(success=False, message="'to' is required for drafts")
|
|
if subject is None:
|
|
return OperationResult(success=False, message="'subject' is required for drafts")
|
|
if body is None:
|
|
return OperationResult(success=False, message="'body' is required for drafts")
|
|
|
|
msg = self._build_draft_message(
|
|
to=to,
|
|
subject=subject,
|
|
body=body,
|
|
cc=cc,
|
|
bcc=bcc,
|
|
html_body=html_body,
|
|
in_reply_to=in_reply_to,
|
|
references=references,
|
|
)
|
|
client = self._get_imap_client()
|
|
append_result = client.append(
|
|
draft_mailbox,
|
|
msg.as_bytes(),
|
|
flags=["\\Draft"],
|
|
)
|
|
draft_id = None
|
|
if append_result and isinstance(append_result, tuple) and len(append_result) > 1:
|
|
draft_id = str(append_result[1])
|
|
return OperationResult(
|
|
success=True,
|
|
message=f"Draft saved to {draft_mailbox}",
|
|
id=draft_id,
|
|
)
|
|
except Exception as e:
|
|
return OperationResult(success=False, message=str(e))
|
|
|
|
def update_draft(
|
|
self,
|
|
email_id: str,
|
|
mailbox: Optional[str] = None,
|
|
to: Optional[list[str]] = None,
|
|
subject: Optional[str] = None,
|
|
body: Optional[str] = None,
|
|
cc: Optional[list[str]] = None,
|
|
bcc: Optional[list[str]] = None,
|
|
html_body: Optional[str] = None,
|
|
in_reply_to_email_id: Optional[str] = None,
|
|
in_reply_to_mailbox: Optional[str] = None,
|
|
reply_all: bool = False,
|
|
) -> OperationResult:
|
|
draft_mailbox = mailbox or self._find_drafts_folder() or "Drafts"
|
|
try:
|
|
existing = self.read_email(draft_mailbox, email_id, format="both")
|
|
except Exception as e:
|
|
return OperationResult(success=False, message=str(e), id=email_id)
|
|
|
|
if not existing:
|
|
return OperationResult(
|
|
success=False,
|
|
message=f"Draft {email_id} not found in {draft_mailbox}",
|
|
id=email_id,
|
|
)
|
|
|
|
resolved_to = to if to is not None else [addr.email for addr in existing.to_addresses]
|
|
resolved_cc = cc if cc is not None else [addr.email for addr in existing.cc_addresses]
|
|
resolved_bcc = bcc if bcc is not None else [addr.email for addr in existing.bcc_addresses]
|
|
resolved_subject = subject if subject is not None else existing.subject
|
|
resolved_body = body if body is not None else (existing.body_text or "")
|
|
resolved_html = html_body if html_body is not None else existing.body_html
|
|
|
|
if in_reply_to_email_id:
|
|
context, error = self._get_reply_context(
|
|
in_reply_to_mailbox or "INBOX",
|
|
in_reply_to_email_id,
|
|
reply_all,
|
|
cc,
|
|
self.settings.smtp_from_email,
|
|
)
|
|
if error:
|
|
return error
|
|
if to is None:
|
|
resolved_to = context["to"]
|
|
if subject is None:
|
|
resolved_subject = context["subject"]
|
|
if cc is None:
|
|
resolved_cc = context["cc"]
|
|
in_reply_to = context["in_reply_to"]
|
|
references = context["references"]
|
|
else:
|
|
in_reply_to = None
|
|
references = None
|
|
|
|
try:
|
|
client = self._get_imap_client()
|
|
client.select_folder(draft_mailbox)
|
|
uid = int(email_id)
|
|
client.delete_messages([uid])
|
|
client.expunge()
|
|
|
|
msg = self._build_draft_message(
|
|
to=resolved_to,
|
|
subject=resolved_subject,
|
|
body=resolved_body,
|
|
cc=resolved_cc,
|
|
bcc=resolved_bcc,
|
|
html_body=resolved_html,
|
|
in_reply_to=in_reply_to,
|
|
references=references,
|
|
)
|
|
append_result = client.append(
|
|
draft_mailbox,
|
|
msg.as_bytes(),
|
|
flags=["\\Draft"],
|
|
)
|
|
draft_id = None
|
|
if append_result and isinstance(append_result, tuple) and len(append_result) > 1:
|
|
draft_id = str(append_result[1])
|
|
return OperationResult(
|
|
success=True,
|
|
message=f"Draft {email_id} updated in {draft_mailbox}",
|
|
id=draft_id or email_id,
|
|
)
|
|
except Exception as e:
|
|
return OperationResult(success=False, message=str(e), id=email_id)
|
|
|
|
async def send_email(
|
|
self,
|
|
to: Optional[list[str]] = None,
|
|
subject: Optional[str] = None,
|
|
body: Optional[str] = None,
|
|
cc: Optional[list[str]] = None,
|
|
bcc: Optional[list[str]] = None,
|
|
html_body: Optional[str] = None,
|
|
sender_email: Optional[str] = None,
|
|
sender_name: Optional[str] = None,
|
|
in_reply_to: Optional[str] = None,
|
|
references: Optional[list[str]] = None,
|
|
in_reply_to_email_id: Optional[str] = None,
|
|
in_reply_to_mailbox: Optional[str] = None,
|
|
reply_all: bool = False,
|
|
) -> OperationResult:
|
|
try:
|
|
if in_reply_to_email_id:
|
|
resolved_name, resolved_email = self._resolve_sender(sender_email, sender_name)
|
|
context, error = self._get_reply_context(
|
|
in_reply_to_mailbox or "INBOX",
|
|
in_reply_to_email_id,
|
|
reply_all,
|
|
cc,
|
|
resolved_email,
|
|
)
|
|
if error:
|
|
return error
|
|
if not to:
|
|
to = context["to"]
|
|
if subject is None:
|
|
subject = context["subject"]
|
|
if cc is None:
|
|
cc = context["cc"]
|
|
in_reply_to = context["in_reply_to"]
|
|
references = context["references"]
|
|
|
|
if not to:
|
|
return OperationResult(success=False, message="'to' is required to send email")
|
|
if subject is None:
|
|
return OperationResult(success=False, message="'subject' is required to send email")
|
|
if body is None:
|
|
return OperationResult(success=False, message="'body' is required to send email")
|
|
|
|
msg = MIMEMultipart("alternative")
|
|
msg["Subject"] = subject
|
|
resolved_name, resolved_email = self._resolve_sender(sender_email, sender_name)
|
|
msg["From"] = formataddr((resolved_name or "", resolved_email))
|
|
msg["To"] = ", ".join(to)
|
|
|
|
if cc:
|
|
msg["Cc"] = ", ".join(cc)
|
|
if in_reply_to:
|
|
msg["In-Reply-To"] = in_reply_to
|
|
if references:
|
|
msg["References"] = " ".join(references)
|
|
|
|
# Add plain text body
|
|
msg.attach(MIMEText(body, "plain", "utf-8"))
|
|
|
|
# Add HTML body if provided
|
|
if html_body:
|
|
msg.attach(MIMEText(html_body, "html", "utf-8"))
|
|
|
|
# Build recipient list
|
|
recipients = list(to)
|
|
if cc:
|
|
recipients.extend(cc)
|
|
if bcc:
|
|
recipients.extend(bcc)
|
|
|
|
# Send via SMTP
|
|
await aiosmtplib.send(
|
|
msg,
|
|
hostname=self.settings.smtp_host,
|
|
port=self.settings.smtp_port,
|
|
username=self.settings.smtp_username,
|
|
password=self.settings.smtp_password.get_secret_value(),
|
|
start_tls=self.settings.smtp_use_tls,
|
|
)
|
|
|
|
return OperationResult(
|
|
success=True,
|
|
message=f"Email sent successfully to {', '.join(to)}",
|
|
id=msg.get("Message-ID"),
|
|
)
|
|
except Exception as e:
|
|
return OperationResult(success=False, message=str(e))
|
|
|
|
async def send_draft(
|
|
self,
|
|
email_id: str,
|
|
mailbox: Optional[str] = None,
|
|
) -> OperationResult:
|
|
draft_mailbox = mailbox or self._find_drafts_folder() or "Drafts"
|
|
try:
|
|
draft = self.read_email(draft_mailbox, email_id, format="both")
|
|
except Exception as e:
|
|
return OperationResult(success=False, message=str(e), id=email_id)
|
|
|
|
if not draft:
|
|
return OperationResult(
|
|
success=False,
|
|
message=f"Draft {email_id} not found in {draft_mailbox}",
|
|
id=email_id,
|
|
)
|
|
|
|
to = [addr.email for addr in draft.to_addresses]
|
|
cc = [addr.email for addr in draft.cc_addresses]
|
|
bcc = [addr.email for addr in draft.bcc_addresses]
|
|
|
|
if not to and not cc and not bcc:
|
|
return OperationResult(
|
|
success=False,
|
|
message="Draft has no recipients",
|
|
id=email_id,
|
|
)
|
|
|
|
subject = draft.subject or "(No Subject)"
|
|
body = draft.body_text or ""
|
|
html_body = draft.body_html
|
|
|
|
result = await self.send_email(
|
|
to=to or None,
|
|
subject=subject,
|
|
body=body,
|
|
cc=cc or None,
|
|
bcc=bcc or None,
|
|
html_body=html_body,
|
|
in_reply_to=draft.in_reply_to,
|
|
references=draft.references or None,
|
|
)
|
|
if not result.success:
|
|
return result
|
|
|
|
try:
|
|
client = self._get_imap_client()
|
|
client.select_folder(draft_mailbox)
|
|
client.delete_messages([int(email_id)])
|
|
client.expunge()
|
|
except Exception:
|
|
pass
|
|
|
|
return result
|
|
|
|
async def reply_email(
|
|
self,
|
|
mailbox: str,
|
|
email_id: str,
|
|
body: str,
|
|
reply_all: bool = False,
|
|
cc: Optional[list[str]] = None,
|
|
bcc: Optional[list[str]] = None,
|
|
html_body: Optional[str] = None,
|
|
sender_email: Optional[str] = None,
|
|
sender_name: Optional[str] = None,
|
|
) -> OperationResult:
|
|
return await self.send_email(
|
|
body=body,
|
|
bcc=bcc,
|
|
html_body=html_body,
|
|
sender_email=sender_email,
|
|
sender_name=sender_name,
|
|
in_reply_to_email_id=email_id,
|
|
in_reply_to_mailbox=mailbox,
|
|
reply_all=reply_all,
|
|
cc=cc,
|
|
)
|
|
|
|
def _parse_envelope_addresses(self, addresses) -> list[EmailAddress]:
|
|
if not addresses:
|
|
return []
|
|
result = []
|
|
for addr in addresses:
|
|
result.append(
|
|
EmailAddress(
|
|
name=decode_mime_header(addr.name) if addr.name else None,
|
|
email=f"{addr.mailbox.decode() if addr.mailbox else 'unknown'}@{addr.host.decode() if addr.host else 'unknown.com'}",
|
|
)
|
|
)
|
|
return result
|
|
|
|
def _has_attachments(self, bodystructure) -> bool:
|
|
if bodystructure is None:
|
|
return False
|
|
# Simple heuristic: check if multipart with non-text parts
|
|
if isinstance(bodystructure, list):
|
|
for part in bodystructure:
|
|
if isinstance(part, tuple) and len(part) > 0:
|
|
content_type = part[0].decode() if isinstance(part[0], bytes) else str(part[0])
|
|
if content_type.lower() not in ["text", "multipart"]:
|
|
return True
|
|
return False
|
|
|
|
def _get_body(self, msg) -> tuple[Optional[str], Optional[str]]:
|
|
body_text = None
|
|
body_html = None
|
|
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
content_type = part.get_content_type()
|
|
content_disposition = str(part.get("Content-Disposition", ""))
|
|
|
|
if "attachment" in content_disposition:
|
|
continue
|
|
|
|
if content_type == "text/plain" and body_text is None:
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
charset = part.get_content_charset() or "utf-8"
|
|
body_text = payload.decode(charset, errors="replace")
|
|
elif content_type == "text/html" and body_html is None:
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
charset = part.get_content_charset() or "utf-8"
|
|
body_html = payload.decode(charset, errors="replace")
|
|
else:
|
|
content_type = msg.get_content_type()
|
|
payload = msg.get_payload(decode=True)
|
|
if payload:
|
|
charset = msg.get_content_charset() or "utf-8"
|
|
decoded = payload.decode(charset, errors="replace")
|
|
if content_type == "text/html":
|
|
body_html = decoded
|
|
else:
|
|
body_text = decoded
|
|
|
|
return body_text, body_html
|
|
|
|
def _get_text_snippet(self, msg, max_length: int = 200) -> Optional[str]:
|
|
body_text, body_html = self._get_body(msg)
|
|
text = body_text or ""
|
|
|
|
# Ensure text is a string
|
|
if isinstance(text, bytes):
|
|
text = text.decode("utf-8", errors="replace")
|
|
|
|
if not text and body_html:
|
|
# Ensure body_html is a string before regex
|
|
if isinstance(body_html, bytes):
|
|
body_html = body_html.decode("utf-8", errors="replace")
|
|
# Strip HTML tags for snippet
|
|
text = re.sub(r"<[^>]+>", "", body_html)
|
|
text = re.sub(r"\s+", " ", text).strip()
|
|
|
|
if text:
|
|
return text[:max_length] + "..." if len(text) > max_length else text
|
|
return None
|
|
|
|
def _get_attachments(self, msg) -> list[Attachment]:
|
|
attachments = []
|
|
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
content_disposition = str(part.get("Content-Disposition", ""))
|
|
if "attachment" in content_disposition:
|
|
filename = part.get_filename()
|
|
if filename:
|
|
filename = decode_mime_header(filename)
|
|
else:
|
|
filename = "unnamed"
|
|
|
|
attachments.append(
|
|
Attachment(
|
|
filename=filename,
|
|
content_type=part.get_content_type(),
|
|
size=len(part.get_payload(decode=True) or b""),
|
|
content_id=part.get("Content-ID"),
|
|
)
|
|
)
|
|
|
|
return attachments
|
|
|
|
def _find_trash_folder(self) -> Optional[str]:
|
|
client = self._get_imap_client()
|
|
folders = client.list_folders()
|
|
|
|
trash_names = ["Trash", "Deleted", "Deleted Items", "Deleted Messages", "[Gmail]/Trash"]
|
|
for flags, delimiter, name in folders:
|
|
if name in trash_names or b"\\Trash" in flags:
|
|
return name
|
|
return None
|
|
|
|
def _find_drafts_folder(self) -> Optional[str]:
|
|
client = self._get_imap_client()
|
|
folders = client.list_folders()
|
|
|
|
draft_names = ["Drafts", "Draft", "INBOX.Drafts", "[Gmail]/Drafts"]
|
|
for flags, delimiter, name in folders:
|
|
if name in draft_names or b"\\Drafts" in flags:
|
|
return name
|
|
return None
|
|
|
|
def _build_draft_message(
|
|
self,
|
|
to: list[str],
|
|
subject: str,
|
|
body: str,
|
|
cc: Optional[list[str]] = None,
|
|
bcc: Optional[list[str]] = None,
|
|
html_body: Optional[str] = None,
|
|
in_reply_to: Optional[str] = None,
|
|
references: Optional[list[str]] = None,
|
|
) -> MIMEMultipart:
|
|
msg = MIMEMultipart("alternative")
|
|
msg["Subject"] = subject
|
|
msg["From"] = formataddr(
|
|
(self.settings.smtp_from_name or "", self.settings.smtp_from_email)
|
|
)
|
|
msg["To"] = ", ".join(to)
|
|
msg["Date"] = formatdate(localtime=True)
|
|
msg["Message-ID"] = make_msgid()
|
|
|
|
if cc:
|
|
msg["Cc"] = ", ".join(cc)
|
|
if bcc:
|
|
msg["Bcc"] = ", ".join(bcc)
|
|
if in_reply_to:
|
|
msg["In-Reply-To"] = in_reply_to
|
|
if references:
|
|
msg["References"] = " ".join(references)
|
|
|
|
msg.attach(MIMEText(body or "", "plain", "utf-8"))
|
|
if html_body:
|
|
msg.attach(MIMEText(html_body, "html", "utf-8"))
|
|
return msg
|
|
|
|
def _get_reply_context(
|
|
self,
|
|
mailbox: str,
|
|
email_id: str,
|
|
reply_all: bool,
|
|
cc: Optional[list[str]],
|
|
sender_email: Optional[str],
|
|
) -> tuple[dict, Optional[OperationResult]]:
|
|
original = self.read_email(mailbox, email_id, format="both")
|
|
if not original:
|
|
return {}, OperationResult(
|
|
success=False,
|
|
message=f"Email {email_id} not found in {mailbox}",
|
|
id=email_id,
|
|
)
|
|
|
|
reply_to_header = original.headers.get("Reply-To")
|
|
reply_to_email = None
|
|
if reply_to_header:
|
|
_, reply_to_email = parseaddr(reply_to_header)
|
|
if not reply_to_email:
|
|
reply_to_email = original.from_address.email
|
|
|
|
to = [reply_to_email] if reply_to_email else []
|
|
reply_cc: list[str] = []
|
|
|
|
if reply_all:
|
|
for addr in original.to_addresses + original.cc_addresses:
|
|
if addr.email and addr.email not in to:
|
|
reply_cc.append(addr.email)
|
|
|
|
if cc:
|
|
reply_cc.extend(cc)
|
|
|
|
to = self._dedupe_emails(to, sender_email)
|
|
reply_cc = self._dedupe_emails(reply_cc, sender_email)
|
|
|
|
if not to and reply_cc:
|
|
to = [reply_cc.pop(0)]
|
|
|
|
if not to:
|
|
return {}, OperationResult(
|
|
success=False,
|
|
message="No valid recipients found for reply",
|
|
id=email_id,
|
|
)
|
|
|
|
subject = original.subject or "(No Subject)"
|
|
if not subject.lower().startswith("re:"):
|
|
subject = f"Re: {subject}"
|
|
|
|
in_reply_to = original.headers.get("Message-ID") or original.in_reply_to
|
|
references = list(original.references)
|
|
if in_reply_to and in_reply_to not in references:
|
|
references.append(in_reply_to)
|
|
|
|
return {
|
|
"to": to,
|
|
"cc": reply_cc or None,
|
|
"subject": subject,
|
|
"in_reply_to": in_reply_to,
|
|
"references": references or None,
|
|
}, None
|
|
|
|
def _resolve_sender(
|
|
self, sender_email: Optional[str], sender_name: Optional[str]
|
|
) -> tuple[Optional[str], str]:
|
|
if sender_email:
|
|
return sender_name, sender_email
|
|
if sender_name:
|
|
name, email_addr = parseaddr(sender_name)
|
|
if email_addr:
|
|
return name or None, email_addr
|
|
return self.settings.smtp_from_name, self.settings.smtp_from_email
|
|
|
|
def _dedupe_emails(self, emails: list[str], self_email: Optional[str]) -> list[str]:
|
|
seen = set()
|
|
cleaned = []
|
|
for addr in emails:
|
|
if not addr:
|
|
continue
|
|
addr_lower = addr.lower()
|
|
if self_email and addr_lower == self_email.lower():
|
|
continue
|
|
if addr_lower in seen:
|
|
continue
|
|
seen.add(addr_lower)
|
|
cleaned.append(addr)
|
|
return cleaned
|
|
|
|
def set_flags(
|
|
self,
|
|
email_id: str,
|
|
mailbox: str,
|
|
add_flags: Optional[list[str]] = None,
|
|
remove_flags: Optional[list[str]] = None,
|
|
) -> OperationResult:
|
|
"""
|
|
Set or remove IMAP flags on an email.
|
|
|
|
Standard flags: \\Seen, \\Answered, \\Flagged, \\Deleted, \\Draft
|
|
Custom keywords are also supported (server-dependent).
|
|
|
|
Args:
|
|
email_id: The unique ID of the email
|
|
mailbox: The mailbox containing the email
|
|
add_flags: List of flags to add
|
|
remove_flags: List of flags to remove
|
|
"""
|
|
try:
|
|
client = self._get_imap_client()
|
|
client.select_folder(mailbox)
|
|
uid = int(email_id)
|
|
|
|
if add_flags:
|
|
client.add_flags([uid], add_flags)
|
|
if remove_flags:
|
|
client.remove_flags([uid], remove_flags)
|
|
|
|
return OperationResult(
|
|
success=True,
|
|
message=f"Flags updated for email {email_id}",
|
|
id=email_id,
|
|
data={
|
|
"added": add_flags or [],
|
|
"removed": remove_flags or [],
|
|
}
|
|
)
|
|
except Exception as e:
|
|
return OperationResult(success=False, message=str(e))
|
|
|
|
async def unsubscribe(
|
|
self,
|
|
mailbox: str,
|
|
email_id: str,
|
|
) -> OperationResult:
|
|
"""
|
|
Attempt to unsubscribe from a mailing list based on email headers.
|
|
|
|
Args:
|
|
mailbox: The mailbox containing the email
|
|
email_id: The unique ID of the email
|
|
|
|
Returns:
|
|
OperationResult with unsubscribe status
|
|
"""
|
|
# First, read the email to get unsubscribe info
|
|
email_data = self.read_email(mailbox, email_id)
|
|
if not email_data:
|
|
return OperationResult(
|
|
success=False,
|
|
message=f"Email {email_id} not found in {mailbox}"
|
|
)
|
|
|
|
if not email_data.unsubscribe:
|
|
return OperationResult(
|
|
success=False,
|
|
message="This email does not have unsubscribe information"
|
|
)
|
|
|
|
# Execute unsubscribe
|
|
return await execute_unsubscribe(email_data.unsubscribe)
|