Files
dav-imap-mcp/src/services/email_service.py
Yigit Colakoglu 767f076048
All checks were successful
Build And Test / publish (push) Successful in 49s
Add tool call logging and reply fields
2026-01-01 15:24:06 -08:00

1039 lines
36 KiB
Python

import email
from email.header import decode_header
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formataddr, parseaddr, formatdate, make_msgid
from datetime import datetime
from typing import Optional
import re
from imapclient import IMAPClient
import aiosmtplib
from models.email_models import (
Mailbox,
EmailAddress,
Attachment,
EmailSummary,
Email,
EmailList,
UnsubscribeInfo,
)
from models.common import OperationResult
from config import Settings
from services.unsubscribe_service import parse_unsubscribe_header, execute_unsubscribe
def decode_mime_header(header) -> str:
if not header:
return ""
# Handle bytes input from IMAP
if isinstance(header, bytes):
header = header.decode("utf-8", errors="replace")
decoded_parts = []
for part, encoding in decode_header(header):
if isinstance(part, bytes):
decoded_parts.append(part.decode(encoding or "utf-8", errors="replace"))
else:
decoded_parts.append(part)
return "".join(decoded_parts)
def parse_email_address(addr: str) -> EmailAddress:
name, email_addr = parseaddr(addr)
return EmailAddress(name=decode_mime_header(name) or None, email=email_addr)
def parse_email_addresses(addrs: Optional[str]) -> list[EmailAddress]:
if not addrs:
return []
# Handle multiple addresses separated by comma
addresses = []
for addr in addrs.split(","):
addr = addr.strip()
if addr:
addresses.append(parse_email_address(addr))
return addresses
class EmailService:
def __init__(self, settings: Settings):
self.settings = settings
self._imap_client: Optional[IMAPClient] = None
def _get_imap_client(self) -> IMAPClient:
if self._imap_client is None:
self._imap_client = IMAPClient(
host=self.settings.imap_host,
port=self.settings.imap_port,
ssl=self.settings.imap_use_ssl,
)
self._imap_client.login(
self.settings.imap_username,
self.settings.imap_password.get_secret_value(),
)
return self._imap_client
def _close_imap_client(self):
if self._imap_client:
try:
self._imap_client.logout()
except Exception:
pass
self._imap_client = None
def list_mailboxes(self) -> list[Mailbox]:
client = self._get_imap_client()
folders = client.list_folders()
mailboxes = []
for flags, delimiter, name in folders:
# Get folder status
try:
status = client.folder_status(name, ["MESSAGES", "UNSEEN"])
message_count = status.get(b"MESSAGES", 0)
unread_count = status.get(b"UNSEEN", 0)
except Exception:
message_count = 0
unread_count = 0
has_children = b"\\HasChildren" in flags
mailboxes.append(
Mailbox(
name=name.split(delimiter.decode() if delimiter else "/")[-1],
path=name,
message_count=message_count,
unread_count=unread_count,
has_children=has_children,
)
)
return mailboxes
def list_emails(
self,
mailbox: str = "INBOX",
limit: int = 50,
offset: int = 0,
include_body: bool = False,
) -> EmailList:
client = self._get_imap_client()
client.select_folder(mailbox, readonly=True)
# Search for all messages
message_ids = client.search(["ALL"])
total = len(message_ids)
# Sort by UID descending (newest first) and apply pagination
message_ids = sorted(message_ids, reverse=True)
paginated_ids = message_ids[offset : offset + limit]
if not paginated_ids:
return EmailList(
emails=[], total=total, mailbox=mailbox, limit=limit, offset=offset
)
# Fetch message data
fetch_items = ["ENVELOPE", "FLAGS", "BODYSTRUCTURE", "RFC822.SIZE"]
if include_body:
fetch_items.append("BODY.PEEK[]")
messages = client.fetch(paginated_ids, fetch_items)
emails = []
for uid, data in messages.items():
envelope = data[b"ENVELOPE"]
flags = data[b"FLAGS"]
# Parse from address
from_addr = EmailAddress(name=None, email="unknown@unknown.com")
if envelope.from_ and len(envelope.from_) > 0:
sender = envelope.from_[0]
from_addr = EmailAddress(
name=decode_mime_header(sender.name) if sender.name else None,
email=f"{sender.mailbox.decode() if sender.mailbox else 'unknown'}@{sender.host.decode() if sender.host else 'unknown.com'}",
)
# Parse to addresses
to_addrs = []
if envelope.to:
for addr in envelope.to:
to_addrs.append(
EmailAddress(
name=decode_mime_header(addr.name) if addr.name else None,
email=f"{addr.mailbox.decode() if addr.mailbox else 'unknown'}@{addr.host.decode() if addr.host else 'unknown.com'}",
)
)
# Parse date
date = envelope.date or datetime.now()
# Check for attachments
has_attachments = self._has_attachments(data.get(b"BODYSTRUCTURE"))
# Get snippet if body was fetched
snippet = None
if include_body and b"BODY[]" in data:
raw_email = data[b"BODY[]"]
msg = email.message_from_bytes(raw_email)
snippet = self._get_text_snippet(msg, 200)
email_summary = EmailSummary(
id=str(uid),
mailbox=mailbox,
subject=decode_mime_header(envelope.subject) if envelope.subject else "(No Subject)",
from_address=from_addr,
to_addresses=to_addrs,
date=date,
is_read=b"\\Seen" in flags,
is_flagged=b"\\Flagged" in flags,
has_attachments=has_attachments,
snippet=snippet,
)
emails.append(email_summary)
# Sort by date descending
emails.sort(key=lambda e: e.date, reverse=True)
return EmailList(
emails=emails, total=total, mailbox=mailbox, limit=limit, offset=offset
)
def list_drafts(
self,
mailbox: Optional[str] = None,
limit: int = 50,
offset: int = 0,
include_body: bool = False,
) -> EmailList:
draft_mailbox = mailbox or self._find_drafts_folder() or "Drafts"
try:
return self.list_emails(draft_mailbox, limit, offset, include_body)
except Exception:
return EmailList(
emails=[],
total=0,
mailbox=draft_mailbox,
limit=limit,
offset=offset,
)
def read_email(
self, mailbox: str, email_id: str, format: str = "text"
) -> Optional[Email]:
client = self._get_imap_client()
client.select_folder(mailbox, readonly=True)
uid = int(email_id)
messages = client.fetch([uid], ["ENVELOPE", "FLAGS", "BODY[]", "BODYSTRUCTURE"])
if uid not in messages:
return None
data = messages[uid]
envelope = data[b"ENVELOPE"]
flags = data[b"FLAGS"]
raw_email = data[b"BODY[]"]
msg = email.message_from_bytes(raw_email)
# Parse from address
from_addr = EmailAddress(name=None, email="unknown@unknown.com")
if envelope.from_ and len(envelope.from_) > 0:
sender = envelope.from_[0]
from_addr = EmailAddress(
name=decode_mime_header(sender.name) if sender.name else None,
email=f"{sender.mailbox.decode() if sender.mailbox else 'unknown'}@{sender.host.decode() if sender.host else 'unknown.com'}",
)
# Parse addresses
to_addrs = self._parse_envelope_addresses(envelope.to)
cc_addrs = self._parse_envelope_addresses(envelope.cc)
bcc_addrs = self._parse_envelope_addresses(envelope.bcc)
# Get body
body_text, body_html = self._get_body(msg)
# Get attachments
attachments = self._get_attachments(msg)
# Get headers
headers = {}
for key in ["Message-ID", "In-Reply-To", "References", "Reply-To", "X-Priority", "List-Unsubscribe", "List-Unsubscribe-Post"]:
value = msg.get(key)
if value:
headers[key] = decode_mime_header(value)
# Parse unsubscribe info
unsubscribe_info = parse_unsubscribe_header(msg)
return Email(
id=str(uid),
mailbox=mailbox,
subject=decode_mime_header(envelope.subject) if envelope.subject else "(No Subject)",
from_address=from_addr,
to_addresses=to_addrs,
cc_addresses=cc_addrs,
bcc_addresses=bcc_addrs,
date=envelope.date or datetime.now(),
is_read=b"\\Seen" in flags,
is_flagged=b"\\Flagged" in flags,
has_attachments=len(attachments) > 0,
body_text=body_text if format in ["text", "both"] else None,
body_html=body_html if format in ["html", "both"] else None,
attachments=attachments,
headers=headers,
in_reply_to=headers.get("In-Reply-To"),
references=headers.get("References", "").split() if headers.get("References") else [],
unsubscribe=unsubscribe_info if unsubscribe_info.available else None,
)
def search_emails(
self,
query: str,
mailbox: str = "INBOX",
search_in: list[str] = None,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
limit: int = 50,
) -> EmailList:
if search_in is None:
search_in = ["subject", "from", "body"]
client = self._get_imap_client()
client.select_folder(mailbox, readonly=True)
# Build IMAP search criteria
criteria = []
# Add text search
if "subject" in search_in:
criteria.append(["SUBJECT", query])
elif "from" in search_in:
criteria.append(["FROM", query])
elif "body" in search_in:
criteria.append(["BODY", query])
else:
criteria.append(["TEXT", query])
# Add date filters
if date_from:
criteria.append(["SINCE", date_from])
if date_to:
criteria.append(["BEFORE", date_to])
# Flatten criteria for OR search across fields
if len(criteria) == 1:
search_criteria = criteria[0]
else:
# Use OR for multiple search fields
search_criteria = criteria[0]
message_ids = client.search(search_criteria)
total = len(message_ids)
# Sort and limit
message_ids = sorted(message_ids, reverse=True)[:limit]
if not message_ids:
return EmailList(
emails=[], total=0, mailbox=mailbox, limit=limit, offset=0
)
# Fetch and parse messages
messages = client.fetch(message_ids, ["ENVELOPE", "FLAGS", "BODYSTRUCTURE"])
emails = []
for uid, data in messages.items():
envelope = data[b"ENVELOPE"]
flags = data[b"FLAGS"]
from_addr = EmailAddress(name=None, email="unknown@unknown.com")
if envelope.from_ and len(envelope.from_) > 0:
sender = envelope.from_[0]
from_addr = EmailAddress(
name=decode_mime_header(sender.name) if sender.name else None,
email=f"{sender.mailbox.decode() if sender.mailbox else 'unknown'}@{sender.host.decode() if sender.host else 'unknown.com'}",
)
to_addrs = self._parse_envelope_addresses(envelope.to)
email_summary = EmailSummary(
id=str(uid),
mailbox=mailbox,
subject=decode_mime_header(envelope.subject) if envelope.subject else "(No Subject)",
from_address=from_addr,
to_addresses=to_addrs,
date=envelope.date or datetime.now(),
is_read=b"\\Seen" in flags,
is_flagged=b"\\Flagged" in flags,
has_attachments=self._has_attachments(data.get(b"BODYSTRUCTURE")),
)
emails.append(email_summary)
emails.sort(key=lambda e: e.date, reverse=True)
return EmailList(
emails=emails, total=total, mailbox=mailbox, limit=limit, offset=0
)
def move_email(
self, email_id: str, source_mailbox: str, destination_mailbox: str
) -> OperationResult:
try:
client = self._get_imap_client()
client.select_folder(source_mailbox)
uid = int(email_id)
client.move([uid], destination_mailbox)
return OperationResult(
success=True,
message=f"Email moved from {source_mailbox} to {destination_mailbox}",
id=email_id,
)
except Exception as e:
return OperationResult(success=False, message=str(e))
def delete_email(
self, email_id: str, mailbox: str, permanent: bool = False
) -> OperationResult:
try:
client = self._get_imap_client()
client.select_folder(mailbox)
uid = int(email_id)
if permanent:
client.delete_messages([uid])
client.expunge()
return OperationResult(
success=True, message="Email permanently deleted", id=email_id
)
else:
# Move to Trash
trash_folder = self._find_trash_folder()
if trash_folder:
client.move([uid], trash_folder)
return OperationResult(
success=True, message="Email moved to trash", id=email_id
)
else:
client.delete_messages([uid])
client.expunge()
return OperationResult(
success=True, message="Email deleted (no trash folder found)", id=email_id
)
except Exception as e:
return OperationResult(success=False, message=str(e))
def delete_draft(
self, email_id: str, mailbox: Optional[str] = None, permanent: bool = False
) -> OperationResult:
draft_mailbox = mailbox or self._find_drafts_folder() or "Drafts"
return self.delete_email(email_id, draft_mailbox, permanent)
def save_draft(
self,
to: Optional[list[str]] = None,
subject: Optional[str] = None,
body: Optional[str] = None,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
reply_to: Optional[str] = None,
html_body: Optional[str] = None,
mailbox: Optional[str] = None,
reply_to_email_id: Optional[str] = None,
reply_mailbox: Optional[str] = None,
reply_all: bool = False,
) -> OperationResult:
try:
draft_mailbox = mailbox or self._find_drafts_folder() or "Drafts"
if reply_to_email_id:
context, error = self._get_reply_context(
reply_mailbox or "INBOX",
reply_to_email_id,
reply_all,
cc,
self.settings.smtp_from_email,
)
if error:
return error
if not to:
to = context["to"]
if subject is None:
subject = context["subject"]
if cc is None:
cc = context["cc"]
in_reply_to = context["in_reply_to"]
references = context["references"]
else:
in_reply_to = None
references = None
if not to:
return OperationResult(success=False, message="'to' is required for drafts")
if subject is None:
return OperationResult(success=False, message="'subject' is required for drafts")
if body is None:
return OperationResult(success=False, message="'body' is required for drafts")
msg = self._build_draft_message(
to=to,
subject=subject,
body=body,
cc=cc,
bcc=bcc,
reply_to=reply_to,
html_body=html_body,
in_reply_to=in_reply_to,
references=references,
)
client = self._get_imap_client()
append_result = client.append(
draft_mailbox,
msg.as_bytes(),
flags=["\\Draft"],
)
draft_id = None
if append_result and isinstance(append_result, tuple) and len(append_result) > 1:
draft_id = str(append_result[1])
return OperationResult(
success=True,
message=f"Draft saved to {draft_mailbox}",
id=draft_id,
)
except Exception as e:
return OperationResult(success=False, message=str(e))
def update_draft(
self,
email_id: str,
mailbox: Optional[str] = None,
to: Optional[list[str]] = None,
subject: Optional[str] = None,
body: Optional[str] = None,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
reply_to: Optional[str] = None,
html_body: Optional[str] = None,
reply_to_email_id: Optional[str] = None,
reply_mailbox: Optional[str] = None,
reply_all: bool = False,
) -> OperationResult:
draft_mailbox = mailbox or self._find_drafts_folder() or "Drafts"
try:
existing = self.read_email(draft_mailbox, email_id, format="both")
except Exception as e:
return OperationResult(success=False, message=str(e), id=email_id)
if not existing:
return OperationResult(
success=False,
message=f"Draft {email_id} not found in {draft_mailbox}",
id=email_id,
)
resolved_to = to if to is not None else [addr.email for addr in existing.to_addresses]
resolved_cc = cc if cc is not None else [addr.email for addr in existing.cc_addresses]
resolved_bcc = bcc if bcc is not None else [addr.email for addr in existing.bcc_addresses]
resolved_subject = subject if subject is not None else existing.subject
resolved_body = body if body is not None else (existing.body_text or "")
resolved_html = html_body if html_body is not None else existing.body_html
if reply_to_email_id:
context, error = self._get_reply_context(
reply_mailbox or "INBOX",
reply_to_email_id,
reply_all,
cc,
self.settings.smtp_from_email,
)
if error:
return error
if to is None:
resolved_to = context["to"]
if subject is None:
resolved_subject = context["subject"]
if cc is None:
resolved_cc = context["cc"]
in_reply_to = context["in_reply_to"]
references = context["references"]
else:
in_reply_to = None
references = None
try:
client = self._get_imap_client()
client.select_folder(draft_mailbox)
uid = int(email_id)
client.delete_messages([uid])
client.expunge()
msg = self._build_draft_message(
to=resolved_to,
subject=resolved_subject,
body=resolved_body,
cc=resolved_cc,
bcc=resolved_bcc,
reply_to=reply_to,
html_body=resolved_html,
in_reply_to=in_reply_to,
references=references,
)
append_result = client.append(
draft_mailbox,
msg.as_bytes(),
flags=["\\Draft"],
)
draft_id = None
if append_result and isinstance(append_result, tuple) and len(append_result) > 1:
draft_id = str(append_result[1])
return OperationResult(
success=True,
message=f"Draft {email_id} updated in {draft_mailbox}",
id=draft_id or email_id,
)
except Exception as e:
return OperationResult(success=False, message=str(e), id=email_id)
async def send_email(
self,
to: Optional[list[str]] = None,
subject: Optional[str] = None,
body: Optional[str] = None,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
reply_to: Optional[str] = None,
html_body: Optional[str] = None,
sender_email: Optional[str] = None,
sender_name: Optional[str] = None,
in_reply_to: Optional[str] = None,
references: Optional[list[str]] = None,
reply_to_email_id: Optional[str] = None,
reply_mailbox: Optional[str] = None,
reply_all: bool = False,
) -> OperationResult:
try:
if reply_to_email_id:
resolved_name, resolved_email = self._resolve_sender(sender_email, sender_name)
context, error = self._get_reply_context(
reply_mailbox or "INBOX",
reply_to_email_id,
reply_all,
cc,
resolved_email,
)
if error:
return error
if not to:
to = context["to"]
if subject is None:
subject = context["subject"]
if cc is None:
cc = context["cc"]
in_reply_to = context["in_reply_to"]
references = context["references"]
if not to:
return OperationResult(success=False, message="'to' is required to send email")
if subject is None:
return OperationResult(success=False, message="'subject' is required to send email")
if body is None:
return OperationResult(success=False, message="'body' is required to send email")
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
resolved_name, resolved_email = self._resolve_sender(sender_email, sender_name)
msg["From"] = formataddr((resolved_name or "", resolved_email))
msg["To"] = ", ".join(to)
if cc:
msg["Cc"] = ", ".join(cc)
if reply_to:
msg["Reply-To"] = reply_to
if in_reply_to:
msg["In-Reply-To"] = in_reply_to
if references:
msg["References"] = " ".join(references)
# Add plain text body
msg.attach(MIMEText(body, "plain", "utf-8"))
# Add HTML body if provided
if html_body:
msg.attach(MIMEText(html_body, "html", "utf-8"))
# Build recipient list
recipients = list(to)
if cc:
recipients.extend(cc)
if bcc:
recipients.extend(bcc)
# Send via SMTP
await aiosmtplib.send(
msg,
hostname=self.settings.smtp_host,
port=self.settings.smtp_port,
username=self.settings.smtp_username,
password=self.settings.smtp_password.get_secret_value(),
start_tls=self.settings.smtp_use_tls,
)
return OperationResult(
success=True,
message=f"Email sent successfully to {', '.join(to)}",
id=msg.get("Message-ID"),
)
except Exception as e:
return OperationResult(success=False, message=str(e))
async def reply_email(
self,
mailbox: str,
email_id: str,
body: str,
reply_all: bool = False,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
reply_to: Optional[str] = None,
html_body: Optional[str] = None,
sender_email: Optional[str] = None,
sender_name: Optional[str] = None,
) -> OperationResult:
return await self.send_email(
body=body,
bcc=bcc,
reply_to=reply_to,
html_body=html_body,
sender_email=sender_email,
sender_name=sender_name,
reply_to_email_id=email_id,
reply_mailbox=mailbox,
reply_all=reply_all,
cc=cc,
)
def _parse_envelope_addresses(self, addresses) -> list[EmailAddress]:
if not addresses:
return []
result = []
for addr in addresses:
result.append(
EmailAddress(
name=decode_mime_header(addr.name) if addr.name else None,
email=f"{addr.mailbox.decode() if addr.mailbox else 'unknown'}@{addr.host.decode() if addr.host else 'unknown.com'}",
)
)
return result
def _has_attachments(self, bodystructure) -> bool:
if bodystructure is None:
return False
# Simple heuristic: check if multipart with non-text parts
if isinstance(bodystructure, list):
for part in bodystructure:
if isinstance(part, tuple) and len(part) > 0:
content_type = part[0].decode() if isinstance(part[0], bytes) else str(part[0])
if content_type.lower() not in ["text", "multipart"]:
return True
return False
def _get_body(self, msg) -> tuple[Optional[str], Optional[str]]:
body_text = None
body_html = None
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition", ""))
if "attachment" in content_disposition:
continue
if content_type == "text/plain" and body_text is None:
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or "utf-8"
body_text = payload.decode(charset, errors="replace")
elif content_type == "text/html" and body_html is None:
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or "utf-8"
body_html = payload.decode(charset, errors="replace")
else:
content_type = msg.get_content_type()
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or "utf-8"
decoded = payload.decode(charset, errors="replace")
if content_type == "text/html":
body_html = decoded
else:
body_text = decoded
return body_text, body_html
def _get_text_snippet(self, msg, max_length: int = 200) -> Optional[str]:
body_text, body_html = self._get_body(msg)
text = body_text or ""
# Ensure text is a string
if isinstance(text, bytes):
text = text.decode("utf-8", errors="replace")
if not text and body_html:
# Ensure body_html is a string before regex
if isinstance(body_html, bytes):
body_html = body_html.decode("utf-8", errors="replace")
# Strip HTML tags for snippet
text = re.sub(r"<[^>]+>", "", body_html)
text = re.sub(r"\s+", " ", text).strip()
if text:
return text[:max_length] + "..." if len(text) > max_length else text
return None
def _get_attachments(self, msg) -> list[Attachment]:
attachments = []
if msg.is_multipart():
for part in msg.walk():
content_disposition = str(part.get("Content-Disposition", ""))
if "attachment" in content_disposition:
filename = part.get_filename()
if filename:
filename = decode_mime_header(filename)
else:
filename = "unnamed"
attachments.append(
Attachment(
filename=filename,
content_type=part.get_content_type(),
size=len(part.get_payload(decode=True) or b""),
content_id=part.get("Content-ID"),
)
)
return attachments
def _find_trash_folder(self) -> Optional[str]:
client = self._get_imap_client()
folders = client.list_folders()
trash_names = ["Trash", "Deleted", "Deleted Items", "Deleted Messages", "[Gmail]/Trash"]
for flags, delimiter, name in folders:
if name in trash_names or b"\\Trash" in flags:
return name
return None
def _find_drafts_folder(self) -> Optional[str]:
client = self._get_imap_client()
folders = client.list_folders()
draft_names = ["Drafts", "Draft", "INBOX.Drafts", "[Gmail]/Drafts"]
for flags, delimiter, name in folders:
if name in draft_names or b"\\Drafts" in flags:
return name
return None
def _build_draft_message(
self,
to: list[str],
subject: str,
body: str,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
reply_to: Optional[str] = None,
html_body: Optional[str] = None,
in_reply_to: Optional[str] = None,
references: Optional[list[str]] = None,
) -> MIMEMultipart:
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = formataddr(
(self.settings.smtp_from_name or "", self.settings.smtp_from_email)
)
msg["To"] = ", ".join(to)
msg["Date"] = formatdate(localtime=True)
msg["Message-ID"] = make_msgid()
if cc:
msg["Cc"] = ", ".join(cc)
if bcc:
msg["Bcc"] = ", ".join(bcc)
if reply_to:
msg["Reply-To"] = reply_to
if in_reply_to:
msg["In-Reply-To"] = in_reply_to
if references:
msg["References"] = " ".join(references)
msg.attach(MIMEText(body or "", "plain", "utf-8"))
if html_body:
msg.attach(MIMEText(html_body, "html", "utf-8"))
return msg
def _get_reply_context(
self,
mailbox: str,
email_id: str,
reply_all: bool,
cc: Optional[list[str]],
sender_email: Optional[str],
) -> tuple[dict, Optional[OperationResult]]:
original = self.read_email(mailbox, email_id, format="both")
if not original:
return {}, OperationResult(
success=False,
message=f"Email {email_id} not found in {mailbox}",
id=email_id,
)
reply_to_header = original.headers.get("Reply-To")
reply_to_email = None
if reply_to_header:
_, reply_to_email = parseaddr(reply_to_header)
if not reply_to_email:
reply_to_email = original.from_address.email
to = [reply_to_email] if reply_to_email else []
reply_cc: list[str] = []
if reply_all:
for addr in original.to_addresses + original.cc_addresses:
if addr.email and addr.email not in to:
reply_cc.append(addr.email)
if cc:
reply_cc.extend(cc)
to = self._dedupe_emails(to, sender_email)
reply_cc = self._dedupe_emails(reply_cc, sender_email)
if not to and reply_cc:
to = [reply_cc.pop(0)]
if not to:
return {}, OperationResult(
success=False,
message="No valid recipients found for reply",
id=email_id,
)
subject = original.subject or "(No Subject)"
if not subject.lower().startswith("re:"):
subject = f"Re: {subject}"
in_reply_to = original.headers.get("Message-ID") or original.in_reply_to
references = list(original.references)
if in_reply_to and in_reply_to not in references:
references.append(in_reply_to)
return {
"to": to,
"cc": reply_cc or None,
"subject": subject,
"in_reply_to": in_reply_to,
"references": references or None,
}, None
def _resolve_sender(
self, sender_email: Optional[str], sender_name: Optional[str]
) -> tuple[Optional[str], str]:
if sender_email:
return sender_name, sender_email
if sender_name:
name, email_addr = parseaddr(sender_name)
if email_addr:
return name or None, email_addr
return self.settings.smtp_from_name, self.settings.smtp_from_email
def _dedupe_emails(self, emails: list[str], self_email: Optional[str]) -> list[str]:
seen = set()
cleaned = []
for addr in emails:
if not addr:
continue
addr_lower = addr.lower()
if self_email and addr_lower == self_email.lower():
continue
if addr_lower in seen:
continue
seen.add(addr_lower)
cleaned.append(addr)
return cleaned
def set_flags(
self,
email_id: str,
mailbox: str,
add_flags: Optional[list[str]] = None,
remove_flags: Optional[list[str]] = None,
) -> OperationResult:
"""
Set or remove IMAP flags on an email.
Standard flags: \\Seen, \\Answered, \\Flagged, \\Deleted, \\Draft
Custom keywords are also supported (server-dependent).
Args:
email_id: The unique ID of the email
mailbox: The mailbox containing the email
add_flags: List of flags to add
remove_flags: List of flags to remove
"""
try:
client = self._get_imap_client()
client.select_folder(mailbox)
uid = int(email_id)
if add_flags:
client.add_flags([uid], add_flags)
if remove_flags:
client.remove_flags([uid], remove_flags)
return OperationResult(
success=True,
message=f"Flags updated for email {email_id}",
id=email_id,
data={
"added": add_flags or [],
"removed": remove_flags or [],
}
)
except Exception as e:
return OperationResult(success=False, message=str(e))
async def unsubscribe(
self,
mailbox: str,
email_id: str,
) -> OperationResult:
"""
Attempt to unsubscribe from a mailing list based on email headers.
Args:
mailbox: The mailbox containing the email
email_id: The unique ID of the email
Returns:
OperationResult with unsubscribe status
"""
# First, read the email to get unsubscribe info
email_data = self.read_email(mailbox, email_id)
if not email_data:
return OperationResult(
success=False,
message=f"Email {email_id} not found in {mailbox}"
)
if not email_data.unsubscribe:
return OperationResult(
success=False,
message="This email does not have unsubscribe information"
)
# Execute unsubscribe
return await execute_unsubscribe(email_data.unsubscribe)