This commit is contained in:
2025-12-31 13:42:30 -08:00
parent afe35c51bf
commit 3b687c1a4c
2 changed files with 273 additions and 2 deletions

View File

@@ -2,7 +2,7 @@ import email
from email.header import decode_header from email.header import decode_header
from email.mime.multipart import MIMEMultipart from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText from email.mime.text import MIMEText
from email.utils import formataddr, parseaddr from email.utils import formataddr, parseaddr, formatdate, make_msgid
from datetime import datetime from datetime import datetime
from typing import Optional from typing import Optional
import re import re
@@ -200,6 +200,25 @@ class EmailService:
emails=emails, total=total, mailbox=mailbox, limit=limit, offset=offset emails=emails, total=total, mailbox=mailbox, limit=limit, offset=offset
) )
def list_drafts(
self,
mailbox: Optional[str] = None,
limit: int = 50,
offset: int = 0,
include_body: bool = False,
) -> EmailList:
draft_mailbox = mailbox or self._find_drafts_folder() or "Drafts"
try:
return self.list_emails(draft_mailbox, limit, offset, include_body)
except Exception:
return EmailList(
emails=[],
total=0,
mailbox=draft_mailbox,
limit=limit,
offset=offset,
)
def read_email( def read_email(
self, mailbox: str, email_id: str, format: str = "text" self, mailbox: str, email_id: str, format: str = "text"
) -> Optional[Email]: ) -> Optional[Email]:
@@ -241,7 +260,7 @@ class EmailService:
# Get headers # Get headers
headers = {} headers = {}
for key in ["Message-ID", "In-Reply-To", "References", "X-Priority", "List-Unsubscribe", "List-Unsubscribe-Post"]: for key in ["Message-ID", "In-Reply-To", "References", "Reply-To", "X-Priority", "List-Unsubscribe", "List-Unsubscribe-Post"]:
value = msg.get(key) value = msg.get(key)
if value: if value:
headers[key] = decode_mime_header(value) headers[key] = decode_mime_header(value)
@@ -406,6 +425,115 @@ class EmailService:
except Exception as e: except Exception as e:
return OperationResult(success=False, message=str(e)) return OperationResult(success=False, message=str(e))
def delete_draft(
self, email_id: str, mailbox: Optional[str] = None, permanent: bool = False
) -> OperationResult:
draft_mailbox = mailbox or self._find_drafts_folder() or "Drafts"
return self.delete_email(email_id, draft_mailbox, permanent)
def save_draft(
self,
to: list[str],
subject: str,
body: str,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
reply_to: Optional[str] = None,
html_body: Optional[str] = None,
mailbox: Optional[str] = None,
) -> OperationResult:
try:
draft_mailbox = mailbox or self._find_drafts_folder() or "Drafts"
msg = self._build_draft_message(
to=to,
subject=subject,
body=body,
cc=cc,
bcc=bcc,
reply_to=reply_to,
html_body=html_body,
)
client = self._get_imap_client()
append_result = client.append(
draft_mailbox,
msg.as_bytes(),
flags=["\\Draft"],
)
draft_id = None
if append_result and isinstance(append_result, tuple) and len(append_result) > 1:
draft_id = str(append_result[1])
return OperationResult(
success=True,
message=f"Draft saved to {draft_mailbox}",
id=draft_id,
)
except Exception as e:
return OperationResult(success=False, message=str(e))
def update_draft(
self,
email_id: str,
mailbox: Optional[str] = None,
to: Optional[list[str]] = None,
subject: Optional[str] = None,
body: Optional[str] = None,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
reply_to: Optional[str] = None,
html_body: Optional[str] = None,
) -> OperationResult:
draft_mailbox = mailbox or self._find_drafts_folder() or "Drafts"
try:
existing = self.read_email(draft_mailbox, email_id, format="both")
except Exception as e:
return OperationResult(success=False, message=str(e), id=email_id)
if not existing:
return OperationResult(
success=False,
message=f"Draft {email_id} not found in {draft_mailbox}",
id=email_id,
)
resolved_to = to if to is not None else [addr.email for addr in existing.to_addresses]
resolved_cc = cc if cc is not None else [addr.email for addr in existing.cc_addresses]
resolved_bcc = bcc if bcc is not None else [addr.email for addr in existing.bcc_addresses]
resolved_subject = subject if subject is not None else existing.subject
resolved_body = body if body is not None else (existing.body_text or "")
resolved_html = html_body if html_body is not None else existing.body_html
try:
client = self._get_imap_client()
client.select_folder(draft_mailbox)
uid = int(email_id)
client.delete_messages([uid])
client.expunge()
msg = self._build_draft_message(
to=resolved_to,
subject=resolved_subject,
body=resolved_body,
cc=resolved_cc,
bcc=resolved_bcc,
reply_to=reply_to,
html_body=resolved_html,
)
append_result = client.append(
draft_mailbox,
msg.as_bytes(),
flags=["\\Draft"],
)
draft_id = None
if append_result and isinstance(append_result, tuple) and len(append_result) > 1:
draft_id = str(append_result[1])
return OperationResult(
success=True,
message=f"Draft {email_id} updated in {draft_mailbox}",
id=draft_id or email_id,
)
except Exception as e:
return OperationResult(success=False, message=str(e), id=email_id)
async def send_email( async def send_email(
self, self,
to: list[str], to: list[str],
@@ -575,6 +703,47 @@ class EmailService:
return name return name
return None return None
def _find_drafts_folder(self) -> Optional[str]:
client = self._get_imap_client()
folders = client.list_folders()
draft_names = ["Drafts", "Draft", "INBOX.Drafts", "[Gmail]/Drafts"]
for flags, delimiter, name in folders:
if name in draft_names or b"\\Drafts" in flags:
return name
return None
def _build_draft_message(
self,
to: list[str],
subject: str,
body: str,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
reply_to: Optional[str] = None,
html_body: Optional[str] = None,
) -> MIMEMultipart:
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = formataddr(
(self.settings.smtp_from_name or "", self.settings.smtp_from_email)
)
msg["To"] = ", ".join(to)
msg["Date"] = formatdate(localtime=True)
msg["Message-ID"] = make_msgid()
if cc:
msg["Cc"] = ", ".join(cc)
if bcc:
msg["Bcc"] = ", ".join(bcc)
if reply_to:
msg["Reply-To"] = reply_to
msg.attach(MIMEText(body or "", "plain", "utf-8"))
if html_body:
msg.attach(MIMEText(html_body, "html", "utf-8"))
return msg
def set_flags( def set_flags(
self, self,
email_id: str, email_id: str,

View File

@@ -32,6 +32,25 @@ def register_email_tools(mcp: FastMCP, service: EmailService):
result = service.list_emails(mailbox, limit, offset, include_body) result = service.list_emails(mailbox, limit, offset, include_body)
return result.model_dump() return result.model_dump()
@mcp.tool(description="List draft emails in the Drafts mailbox with pagination.")
def list_drafts(
mailbox: Optional[str] = None,
limit: int = 50,
offset: int = 0,
include_body: bool = False,
) -> dict:
"""
List draft emails.
Args:
mailbox: Drafts mailbox/folder override (default: auto-detect)
limit: Maximum number of drafts to return (default: 50)
offset: Number of drafts to skip for pagination (default: 0)
include_body: Whether to include body snippets (default: False)
"""
result = service.list_drafts(mailbox, limit, offset, include_body)
return result.model_dump()
@mcp.tool(description="Read a specific email by ID with full body content and attachment information.") @mcp.tool(description="Read a specific email by ID with full body content and attachment information.")
def read_email( def read_email(
mailbox: str, mailbox: str,
@@ -108,6 +127,89 @@ def register_email_tools(mcp: FastMCP, service: EmailService):
result = service.delete_email(email_id, mailbox, permanent) result = service.delete_email(email_id, mailbox, permanent)
return result.model_dump() return result.model_dump()
@mcp.tool(description="Delete a drafted email by ID, optionally permanently.")
def delete_draft(
email_id: str,
mailbox: Optional[str] = None,
permanent: bool = False,
) -> dict:
"""
Delete a draft email.
Args:
email_id: The unique ID of the draft
mailbox: Drafts mailbox/folder override (default: auto-detect)
permanent: If True, permanently delete; if False, move to Trash (default: False)
"""
result = service.delete_draft(email_id, mailbox, permanent)
return result.model_dump()
@mcp.tool(description="Save a new draft email to the Drafts mailbox.")
def save_draft(
to: list[str],
subject: str,
body: str,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
reply_to: Optional[str] = None,
html_body: Optional[str] = None,
mailbox: Optional[str] = None,
) -> dict:
"""
Save a new email draft.
Args:
to: List of recipient email addresses
subject: Email subject line
body: Plain text email body
cc: List of CC recipients (optional)
bcc: List of BCC recipients (optional)
reply_to: Reply-to address (optional)
html_body: HTML version of the email body (optional)
mailbox: Drafts mailbox/folder override (default: auto-detect)
"""
result = service.save_draft(to, subject, body, cc, bcc, reply_to, html_body, mailbox)
return result.model_dump()
@mcp.tool(description="Edit an existing draft email. Only provided fields will be modified.")
def edit_draft(
email_id: str,
mailbox: Optional[str] = None,
to: Optional[list[str]] = None,
subject: Optional[str] = None,
body: Optional[str] = None,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
reply_to: Optional[str] = None,
html_body: Optional[str] = None,
) -> dict:
"""
Update an existing draft email.
Args:
email_id: The unique ID of the draft
mailbox: Drafts mailbox/folder override (default: auto-detect)
to: List of recipient email addresses
subject: Email subject line
body: Plain text email body
cc: List of CC recipients (optional)
bcc: List of BCC recipients (optional)
reply_to: Reply-to address (optional)
html_body: HTML version of the email body (optional)
"""
result = service.update_draft(
email_id,
mailbox,
to,
subject,
body,
cc,
bcc,
reply_to,
html_body,
)
return result.model_dump()
@mcp.tool(description="Send a new email via SMTP. Supports plain text and HTML content, CC, BCC, and reply-to.") @mcp.tool(description="Send a new email via SMTP. Supports plain text and HTML content, CC, BCC, and reply-to.")
async def send_email( async def send_email(
to: list[str], to: list[str],