Compare commits

...

9 Commits

Author SHA1 Message Date
1543bc4174 Quote original message in reply drafts
All checks were successful
Build And Test / publish (push) Successful in 49s
2026-01-01 17:01:22 -08:00
64af784998 Use CARDDAV_URL as address book URL
All checks were successful
Build And Test / publish (push) Successful in 49s
2026-01-01 15:56:38 -08:00
bd8e1412e4 Use configured contacts address book
All checks were successful
Build And Test / publish (push) Successful in 47s
2026-01-01 15:55:00 -08:00
5a9ef0e48f Revise contacts and email tools
All checks were successful
Build And Test / publish (push) Successful in 48s
2026-01-01 15:46:44 -08:00
767f076048 Add tool call logging and reply fields
All checks were successful
Build And Test / publish (push) Successful in 49s
2026-01-01 15:24:06 -08:00
7966a4302d Add ICS calendar support
All checks were successful
Build And Test / publish (push) Successful in 48s
2026-01-01 15:06:44 -08:00
71c55f7289 Add reply_email tool and sender override
All checks were successful
Build And Test / publish (push) Successful in 1m0s
2026-01-01 12:16:06 -08:00
0459fb1d4c Rewrite README
All checks were successful
Build And Test / publish (push) Successful in 49s
2025-12-31 13:44:09 -08:00
3b687c1a4c drafts 2025-12-31 13:42:30 -08:00
12 changed files with 1169 additions and 324 deletions

View File

@@ -51,7 +51,7 @@ CALDAV_PASSWORD=your-caldav-password
# - Nextcloud: https://cloud.example.com/remote.php/dav # - Nextcloud: https://cloud.example.com/remote.php/dav
# - Fastmail: https://carddav.fastmail.com/dav/addressbooks/user/you@fastmail.com # - Fastmail: https://carddav.fastmail.com/dav/addressbooks/user/you@fastmail.com
# - Radicale: https://radicale.example.com/user/ # - Radicale: https://radicale.example.com/user/
CARDDAV_URL=https://carddav.example.com/dav CARDDAV_URL=https://carddav.example.com/dav/addressbooks/users/user@example.com/contacts/
CARDDAV_USERNAME=user@example.com CARDDAV_USERNAME=user@example.com
CARDDAV_PASSWORD=your-carddav-password CARDDAV_PASSWORD=your-carddav-password

299
README.md
View File

@@ -1,92 +1,56 @@
# PIM MCP Server # DAV/IMAP MCP Server
A self-hosted MCP server for managing your email, calendar, and contacts through AI assistants. A self-hosted MCP server that connects IMAP/SMTP, CalDAV, and CardDAV to MCP-compatible clients. It exposes email, calendar, and contacts tools and can optionally send new email notifications to a Poke webhook.
Connect your existing mail server (IMAP/SMTP) and CalDAV/CardDAV services to any MCP-compatible client. ## Features
## Prerequisites - Email tools over IMAP/SMTP (list, read, search, drafts, send drafts, flags, unsubscribe)
- Calendar tools over CalDAV (list, create, update, delete)
- Contacts tools over CardDAV (list, create, update, delete)
- Optional email notifications via webhook with IMAP IDLE or polling
- SQLite cache with Alembic migrations
- API key auth for the MCP endpoint
- Docker and Docker Compose (recommended), OR Python 3.12+ ## Quickstart (Docker Compose)
- An email account with IMAP/SMTP access
- (Optional) A CalDAV server for calendars (Nextcloud, Fastmail, Radicale, etc.)
- (Optional) A CardDAV server for contacts
## Setup 1. Copy the environment template:
### Option 1: Docker Compose (Recommended)
1. **Clone the repository**
```bash
git clone https://github.com/your-repo/pim-mcp-server.git
cd pim-mcp-server
```
2. **Create your environment file**
```bash ```bash
cp .env.example .env cp .env.example .env
``` ```
3. **Edit `.env` with your credentials** 2. Edit `.env` with your credentials.
```bash
# Required: Generate an API key
MCP_API_KEY=$(python3 -c "import secrets; print(secrets.token_urlsafe(32))")
# Then edit .env with your mail server details 3. Start the server:
nano .env
```
4. **Start the server**
```bash ```bash
docker compose up -d docker compose up -d
``` ```
5. **Verify it's running** 4. Verify it is running:
```bash ```bash
curl http://localhost:8000/mcp curl http://localhost:8000/mcp
``` ```
### Option 2: Local Python ## Local Run (Python)
1. **Clone and setup environment**
```bash ```bash
git clone https://github.com/your-repo/pim-mcp-server.git
cd pim-mcp-server
python -m venv venv python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate source venv/bin/activate # Windows: venv\Scripts\activate
pip install -r requirements.txt pip install -r requirements.txt
```
2. **Configure**
```bash
cp .env.example .env cp .env.example .env
nano .env # Add your credentials # Edit .env
```
3. **Initialize database**
```bash
# Create initial migration (first time only)
alembic revision --autogenerate -m "Initial tables"
alembic upgrade head
```
4. **Run**
```bash
python src/server.py python src/server.py
``` ```
## Configuration ## Configuration
Edit `.env` with your service credentials. Only configure the services you want to use. All settings are read from `.env`. Configure only the services you want to enable.
### Minimal Setup (Email Only) ### Minimal email-only setup
```bash ```bash
# Server MCP_API_KEY=your-secret-key
MCP_API_KEY=your-secret-key-here
PORT=8000 PORT=8000
# Email
IMAP_HOST=imap.example.com IMAP_HOST=imap.example.com
IMAP_USERNAME=you@example.com IMAP_USERNAME=you@example.com
IMAP_PASSWORD=your-password IMAP_PASSWORD=your-password
@@ -95,121 +59,75 @@ SMTP_USERNAME=you@example.com
SMTP_PASSWORD=your-password SMTP_PASSWORD=your-password
SMTP_FROM_EMAIL=you@example.com SMTP_FROM_EMAIL=you@example.com
# Disable other services
ENABLE_CALENDAR=false ENABLE_CALENDAR=false
ENABLE_CONTACTS=false ENABLE_CONTACTS=false
``` ```
### Full Setup (Email + Calendar + Contacts) ### Full setup (email + calendar + contacts)
```bash ```bash
# Server MCP_API_KEY=your-secret-key
MCP_API_KEY=your-secret-key-here
PORT=8000 PORT=8000
# Email (IMAP/SMTP)
IMAP_HOST=imap.example.com IMAP_HOST=imap.example.com
IMAP_PORT=993 IMAP_PORT=993
IMAP_USERNAME=you@example.com IMAP_USERNAME=you@example.com
IMAP_PASSWORD=your-password IMAP_PASSWORD=your-password
IMAP_USE_SSL=true
SMTP_HOST=smtp.example.com SMTP_HOST=smtp.example.com
SMTP_PORT=587 SMTP_PORT=587
SMTP_USERNAME=you@example.com SMTP_USERNAME=you@example.com
SMTP_PASSWORD=your-password SMTP_PASSWORD=your-password
SMTP_USE_TLS=true
SMTP_FROM_EMAIL=you@example.com SMTP_FROM_EMAIL=you@example.com
SMTP_FROM_NAME=Your Name SMTP_FROM_NAME=Your Name
# Calendar (CalDAV)
CALDAV_URL=https://caldav.example.com/dav CALDAV_URL=https://caldav.example.com/dav
CALDAV_USERNAME=you@example.com CALDAV_USERNAME=you@example.com
CALDAV_PASSWORD=your-password CALDAV_PASSWORD=your-password
ICS_CALENDARS=Team|https://example.com/team.ics,Family|https://example.com/family.ics
ICS_CALENDAR_TIMEOUT=20
ICS_CALENDARS=Team|https://example.com/team.ics,Family|https://example.com/family.ics
ICS_CALENDAR_TIMEOUT=20
# Contacts (CardDAV) CARDDAV_URL=https://carddav.example.com/dav/addressbooks/users/you@example.com/contacts/
CARDDAV_URL=https://carddav.example.com/dav
CARDDAV_USERNAME=you@example.com CARDDAV_USERNAME=you@example.com
CARDDAV_PASSWORD=your-password CARDDAV_PASSWORD=your-password
``` ```
### Provider-Specific Examples Contacts tools always use `CARDDAV_URL` as the full CardDAV address book URL. Listing address books is not exposed via MCP.
<details> ICS calendars are optional and read-only. Set `ICS_CALENDARS` to a comma-separated list of entries, each as `name|url` or just `url` if you want the name inferred.
<summary><strong>Fastmail</strong></summary>
### Email notifications (Poke webhook)
Enable notifications to send new-email alerts to Poke. The server will use IMAP IDLE when available and fall back to polling.
```bash ```bash
IMAP_HOST=imap.fastmail.com ENABLE_EMAIL_NOTIFICATIONS=true
IMAP_PORT=993 NOTIFICATION_MAILBOXES=INBOX,Updates
SMTP_HOST=smtp.fastmail.com NOTIFICATION_POLL_INTERVAL=60
SMTP_PORT=587 NOTIFICATION_IDLE_TIMEOUT=1680
CALDAV_URL=https://caldav.fastmail.com/dav/calendars/user/you@fastmail.com
CARDDAV_URL=https://carddav.fastmail.com/dav/addressbooks/user/you@fastmail.com POKE_WEBHOOK_URL=https://poke.com/api/v1/inbound-sms/webhook
POKE_API_KEY=your-poke-api-key
POKE_WEBHOOK_TIMEOUT=30
POKE_WEBHOOK_MAX_RETRIES=3
``` ```
Use an [app-specific password](https://www.fastmail.help/hc/en-us/articles/360058752854).
</details>
<details> ## MCP Client Setup
<summary><strong>Nextcloud</strong></summary>
```bash ### MCP Inspector
# Use your mail server for IMAP/SMTP
IMAP_HOST=mail.example.com
SMTP_HOST=mail.example.com
# Nextcloud for CalDAV/CardDAV
CALDAV_URL=https://cloud.example.com/remote.php/dav
CARDDAV_URL=https://cloud.example.com/remote.php/dav
CALDAV_USERNAME=your-nextcloud-user
CARDDAV_USERNAME=your-nextcloud-user
```
</details>
<details>
<summary><strong>Gmail</strong></summary>
```bash
IMAP_HOST=imap.gmail.com
IMAP_PORT=993
SMTP_HOST=smtp.gmail.com
SMTP_PORT=587
```
You must use an [App Password](https://support.google.com/accounts/answer/185833) (not your regular password).
Note: Gmail's CalDAV/CardDAV requires OAuth2, which is not currently supported.
</details>
<details>
<summary><strong>Mailcow</strong></summary>
```bash
IMAP_HOST=mail.example.com
SMTP_HOST=mail.example.com
CALDAV_URL=https://mail.example.com/SOGo/dav
CARDDAV_URL=https://mail.example.com/SOGo/dav
```
</details>
<details>
<summary><strong>Radicale (Self-hosted)</strong></summary>
```bash
CALDAV_URL=https://radicale.example.com/user/
CARDDAV_URL=https://radicale.example.com/user/
```
</details>
## Connecting to MCP Clients
### MCP Inspector (Testing)
```bash ```bash
npx @modelcontextprotocol/inspector npx @modelcontextprotocol/inspector
``` ```
- Transport: **Streamable HTTP** - Transport: Streamable HTTP
- URL: `http://localhost:8000/mcp` - URL: `http://localhost:8000/mcp`
### Claude Desktop ### Claude Desktop
Add to your Claude Desktop config (`~/.config/claude/claude_desktop_config.json` on Linux/Mac):
```json ```json
{ {
"mcpServers": { "mcpServers": {
@@ -223,99 +141,68 @@ Add to your Claude Desktop config (`~/.config/claude/claude_desktop_config.json`
### Poke ### Poke
Go to [poke.com/settings/connections](https://poke.com/settings/connections) and add your server URL. Add your MCP endpoint at https://poke.com/settings/connections.
## Available Tools ## Available Tools
Once connected, your AI assistant can use these tools:
| Category | Tools | | Category | Tools |
|----------|-------| | --- | --- |
| **Email** | `list_mailboxes`, `list_emails`, `read_email`, `search_emails`, `move_email`, `delete_email`, `send_email` | | Email | `list_mailboxes`, `list_emails`, `list_drafts`, `read_email`, `search_emails`, `move_email`, `delete_email`, `delete_draft`, `save_draft`, `edit_draft`, `send_draft`, `set_email_flags`, `unsubscribe_maillist` |
| **Calendar** | `list_calendars`, `list_events`, `get_event`, `create_event`, `update_event`, `delete_event` | | Calendar | `list_calendars`, `list_events`, `get_event`, `create_event`, `update_event`, `delete_event` |
| **Contacts** | `list_addressbooks`, `list_contacts`, `get_contact`, `create_contact`, `update_contact`, `delete_contact` | | Contacts | `list_contacts`, `get_contact`, `create_contact`, `update_contact`, `delete_contact` |
| **System** | `get_server_info` | | System | `get_server_info` |
## Database Migrations ### Sending email
The server uses SQLModel with Alembic for database migrations. Emails are sent only from drafts. Create or edit a draft with `save_draft`/`edit_draft`, then send it with `send_draft` using the returned draft ID.
### When you update the code ### Replying to an email
```bash Use `in_reply_to_email_id` on `save_draft` or `edit_draft` to create a reply without a separate tool. The draft includes reply headers and a quoted original message so webmail clients can preserve threading on send. Then send it with `send_draft`.
# Pull latest changes
git pull
# Run any new migrations - Provide `in_reply_to_email_id` (and optionally `in_reply_to_mailbox`, default `INBOX`).
alembic upgrade head - `reply_all=true` includes original recipients; otherwise it replies to the sender/Reply-To.
- If `to`/`subject` are omitted, they are derived from the original email; `body` is still required.
- `in_reply_to_email_id` is the email UID from `list_emails`/`read_email`, not the RFC Message-ID header.
# Restart Example (send a reply):
docker compose restart ```json
# or: python src/server.py {
"tool": "save_draft",
"args": {
"in_reply_to_email_id": "12345",
"in_reply_to_mailbox": "INBOX",
"reply_all": true,
"body": "Thanks — sounds good to me."
}
}
``` ```
### Adding custom fields Then send the draft by its returned ID:
```json
{
"tool": "send_draft",
"args": {
"email_id": "67890"
}
}
```
1. Edit `src/database/models.py` ## Database and Migrations
2. Generate migration: `alembic revision --autogenerate -m "Description"`
3. Apply: `alembic upgrade head` The server uses SQLite (default: `/data/cache.db`) and Alembic.
```bash
alembic revision --autogenerate -m "Describe change"
alembic upgrade head
```
## Troubleshooting ## Troubleshooting
### Connection refused - Connection refused: check `docker compose ps` or `curl http://localhost:8000/mcp`
- Check the server is running: `docker compose ps` or `curl localhost:8000/mcp` - Auth errors: confirm `MCP_API_KEY` and client config
- Check logs: `docker compose logs -f` - IMAP/SMTP failures: verify credentials and app-specific passwords
- CalDAV/CardDAV failures: confirm base URL and username
### Authentication failed (IMAP/SMTP)
- Verify credentials in `.env`
- Many providers require app-specific passwords (Gmail, Fastmail, etc.)
- Check if 2FA is enabled on your account
### CalDAV/CardDAV not working
- Verify the URL is correct (try opening it in a browser)
- Check if your provider requires a specific URL format
- Some providers need the full path including username
### Service disabled
If you see "service: disabled (not configured)", check that:
- All required env vars are set (host, username, password)
- `ENABLE_*` is not set to `false`
### View logs
```bash
# Docker
docker compose logs -f
# Local
# Logs print to stdout
```
## Project Structure
```
├── src/
│ ├── server.py # Entry point
│ ├── config.py # Environment configuration
│ ├── database/ # SQLModel ORM
│ │ ├── models.py # Table definitions
│ │ └── connection.py # Database connection
│ ├── models/ # Pydantic models (API)
│ ├── services/ # Business logic
│ └── tools/ # MCP tool definitions
├── migrations/ # Alembic migrations
├── docker-compose.yml
├── Dockerfile
├── .env.example
└── requirements.txt
```
## Security Notes
- Never commit `.env` files
- Use app-specific passwords where available
- The Docker container runs as non-root
- Consider running behind a reverse proxy with HTTPS for remote access
- The `MCP_API_KEY` is optional but recommended for production
## License ## License

View File

@@ -33,6 +33,8 @@ class Settings(BaseSettings):
caldav_url: Optional[str] = Field(default=None, alias="CALDAV_URL") caldav_url: Optional[str] = Field(default=None, alias="CALDAV_URL")
caldav_username: Optional[str] = Field(default=None, alias="CALDAV_USERNAME") caldav_username: Optional[str] = Field(default=None, alias="CALDAV_USERNAME")
caldav_password: Optional[SecretStr] = Field(default=None, alias="CALDAV_PASSWORD") caldav_password: Optional[SecretStr] = Field(default=None, alias="CALDAV_PASSWORD")
ics_calendars: Optional[str] = Field(default=None, alias="ICS_CALENDARS")
ics_calendar_timeout: int = Field(default=20, alias="ICS_CALENDAR_TIMEOUT")
# CardDAV Configuration # CardDAV Configuration
carddav_url: Optional[str] = Field(default=None, alias="CARDDAV_URL") carddav_url: Optional[str] = Field(default=None, alias="CARDDAV_URL")
@@ -107,7 +109,7 @@ class Settings(BaseSettings):
self.smtp_from_email, self.smtp_from_email,
]) ])
def is_calendar_configured(self) -> bool: def is_caldav_configured(self) -> bool:
return all([ return all([
self.enable_calendar, self.enable_calendar,
self.caldav_url, self.caldav_url,
@@ -115,6 +117,33 @@ class Settings(BaseSettings):
self.caldav_password, self.caldav_password,
]) ])
def is_calendar_configured(self) -> bool:
return all([
self.enable_calendar,
(self.is_caldav_configured() or self.get_ics_calendars()),
])
def get_ics_calendars(self) -> list[tuple[Optional[str], str]]:
if not self.ics_calendars:
return []
calendars: list[tuple[Optional[str], str]] = []
for entry in self.ics_calendars.split(","):
item = entry.strip()
if not item:
continue
if "|" in item:
name, url = item.split("|", 1)
name = name.strip() or None
url = url.strip()
else:
name = None
url = item
if url:
calendars.append((name, url))
return calendars
def is_contacts_configured(self) -> bool: def is_contacts_configured(self) -> bool:
return all([ return all([
self.enable_contacts, self.enable_contacts,

View File

@@ -19,6 +19,7 @@ from fastmcp import FastMCP
from config import settings from config import settings
from database import init_db, close_db from database import init_db, close_db
from tools.logging_utils import log_tool_call
# Configure logging # Configure logging
logging.basicConfig( logging.basicConfig(
@@ -53,7 +54,10 @@ def setup_services():
if settings.is_calendar_configured(): if settings.is_calendar_configured():
from services.calendar_service import CalendarService from services.calendar_service import CalendarService
calendar_service = CalendarService(settings) calendar_service = CalendarService(settings)
if settings.is_caldav_configured():
print(f" Calendar service: enabled (CalDAV: {settings.caldav_url})") print(f" Calendar service: enabled (CalDAV: {settings.caldav_url})")
else:
print(" Calendar service: enabled (ICS calendars only)")
else: else:
print(" Calendar service: disabled (not configured)") print(" Calendar service: disabled (not configured)")
@@ -101,6 +105,7 @@ def register_tools():
# Server info tool (always available) # Server info tool (always available)
@mcp.tool(description="Get information about this PIM MCP server including enabled services and version.") @mcp.tool(description="Get information about this PIM MCP server including enabled services and version.")
@log_tool_call
def get_server_info() -> dict: def get_server_info() -> dict:
"""Get server information and status.""" """Get server information and status."""
return { return {
@@ -116,6 +121,7 @@ def get_server_info() -> dict:
"calendar": { "calendar": {
"enabled": calendar_service is not None, "enabled": calendar_service is not None,
"caldav_url": settings.caldav_url if calendar_service else None, "caldav_url": settings.caldav_url if calendar_service else None,
"ics_calendars": [c[1] for c in settings.get_ics_calendars()] if calendar_service else [],
}, },
"contacts": { "contacts": {
"enabled": contacts_service is not None, "enabled": contacts_service is not None,

View File

@@ -1,8 +1,10 @@
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Optional from typing import Optional
import uuid import uuid
from urllib.parse import urlparse
import caldav import caldav
import httpx
from icalendar import Calendar as iCalendar, Event as iEvent, vText from icalendar import Calendar as iCalendar, Event as iEvent, vText
from dateutil.parser import parse as parse_date from dateutil.parser import parse as parse_date
from dateutil.rrule import rrulestr from dateutil.rrule import rrulestr
@@ -24,8 +26,39 @@ class CalendarService:
self.settings = settings self.settings = settings
self._client: Optional[caldav.DAVClient] = None self._client: Optional[caldav.DAVClient] = None
self._principal = None self._principal = None
self._ics_calendars = self._load_ics_calendars()
def _load_ics_calendars(self) -> list[dict]:
calendars = []
for idx, (name, url) in enumerate(self.settings.get_ics_calendars()):
cal_id = f"ics:{url}"
calendars.append(
{
"id": cal_id,
"name": name or self._derive_ics_name(url, idx),
"url": url,
}
)
return calendars
def _derive_ics_name(self, url: str, fallback_index: int) -> str:
parsed = urlparse(url)
if parsed.path and parsed.path != "/":
return parsed.path.rstrip("/").split("/")[-1] or f"ICS Calendar {fallback_index + 1}"
return parsed.netloc or f"ICS Calendar {fallback_index + 1}"
def _is_ics_calendar(self, calendar_id: str) -> bool:
return calendar_id.startswith("ics:")
def _get_ics_calendar(self, calendar_id: str) -> Optional[dict]:
for cal in self._ics_calendars:
if cal["id"] == calendar_id:
return cal
return None
def _get_client(self) -> caldav.DAVClient: def _get_client(self) -> caldav.DAVClient:
if not self.settings.is_caldav_configured():
raise ValueError("CalDAV is not configured")
if self._client is None: if self._client is None:
self._client = caldav.DAVClient( self._client = caldav.DAVClient(
url=self.settings.caldav_url, url=self.settings.caldav_url,
@@ -40,10 +73,11 @@ class CalendarService:
return self._principal return self._principal
def list_calendars(self) -> list[Calendar]: def list_calendars(self) -> list[Calendar]:
result = []
if self.settings.is_caldav_configured():
principal = self._get_principal() principal = self._get_principal()
calendars = principal.calendars() calendars = principal.calendars()
result = []
for cal in calendars: for cal in calendars:
props = cal.get_properties([caldav.dav.DisplayName()]) props = cal.get_properties([caldav.dav.DisplayName()])
name = props.get("{DAV:}displayname", cal.name or "Unnamed") name = props.get("{DAV:}displayname", cal.name or "Unnamed")
@@ -58,6 +92,17 @@ class CalendarService:
) )
) )
for ics_cal in self._ics_calendars:
result.append(
Calendar(
id=ics_cal["id"],
name=ics_cal["name"],
color=None,
description=None,
is_readonly=True,
)
)
return result return result
def _get_calendar_by_id(self, calendar_id: str) -> caldav.Calendar: def _get_calendar_by_id(self, calendar_id: str) -> caldav.Calendar:
@@ -77,6 +122,9 @@ class CalendarService:
end_date: str, end_date: str,
include_recurring: bool = True, include_recurring: bool = True,
) -> EventList: ) -> EventList:
if self._is_ics_calendar(calendar_id):
return self._list_ics_events(calendar_id, start_date, end_date, include_recurring)
calendar = self._get_calendar_by_id(calendar_id) calendar = self._get_calendar_by_id(calendar_id)
start = parse_date(start_date) start = parse_date(start_date)
@@ -101,6 +149,9 @@ class CalendarService:
) )
def get_event(self, calendar_id: str, event_id: str) -> Optional[Event]: def get_event(self, calendar_id: str, event_id: str) -> Optional[Event]:
if self._is_ics_calendar(calendar_id):
return self._get_ics_event(calendar_id, event_id)
calendar = self._get_calendar_by_id(calendar_id) calendar = self._get_calendar_by_id(calendar_id)
try: try:
@@ -127,6 +178,9 @@ class CalendarService:
reminders: Optional[list[int]] = None, reminders: Optional[list[int]] = None,
recurrence: Optional[str] = None, recurrence: Optional[str] = None,
) -> Event: ) -> Event:
if self._is_ics_calendar(calendar_id):
raise ValueError("ICS calendars are read-only")
calendar = self._get_calendar_by_id(calendar_id) calendar = self._get_calendar_by_id(calendar_id)
# Create iCalendar event # Create iCalendar event
@@ -184,6 +238,9 @@ class CalendarService:
location: Optional[str] = None, location: Optional[str] = None,
attendees: Optional[list[str]] = None, attendees: Optional[list[str]] = None,
) -> Optional[Event]: ) -> Optional[Event]:
if self._is_ics_calendar(calendar_id):
raise ValueError("ICS calendars are read-only")
calendar = self._get_calendar_by_id(calendar_id) calendar = self._get_calendar_by_id(calendar_id)
# Find the event # Find the event
@@ -224,6 +281,13 @@ class CalendarService:
def delete_event( def delete_event(
self, calendar_id: str, event_id: str, notify_attendees: bool = True self, calendar_id: str, event_id: str, notify_attendees: bool = True
) -> OperationResult: ) -> OperationResult:
if self._is_ics_calendar(calendar_id):
return OperationResult(
success=False,
message="ICS calendars are read-only",
id=event_id,
)
try: try:
calendar = self._get_calendar_by_id(calendar_id) calendar = self._get_calendar_by_id(calendar_id)
@@ -314,3 +378,186 @@ class CalendarService:
return None return None
return None return None
def _list_ics_events(
self,
calendar_id: str,
start_date: str,
end_date: str,
include_recurring: bool,
) -> EventList:
ics_calendar = self._get_ics_calendar(calendar_id)
if not ics_calendar:
raise ValueError(f"Calendar not found: {calendar_id}")
start = parse_date(start_date)
end = parse_date(end_date)
ical = self._fetch_ics_calendar(ics_calendar["url"])
events: list[Event] = []
for component in ical.walk():
if component.name != "VEVENT":
continue
parsed_events = self._parse_ics_component(
component, calendar_id, start, end, include_recurring
)
events.extend(parsed_events)
events.sort(key=lambda e: e.start)
return EventList(
events=events,
calendar_id=calendar_id,
start_date=start_date,
end_date=end_date,
total=len(events),
)
def _get_ics_event(self, calendar_id: str, event_id: str) -> Optional[Event]:
ics_calendar = self._get_ics_calendar(calendar_id)
if not ics_calendar:
raise ValueError(f"Calendar not found: {calendar_id}")
ical = self._fetch_ics_calendar(ics_calendar["url"])
for component in ical.walk():
if component.name != "VEVENT":
continue
uid = str(component.get("uid", ""))
if uid == event_id:
events = self._parse_ics_component(
component,
calendar_id,
datetime.min,
datetime.max,
include_recurring=False,
)
return events[0] if events else None
return None
def _fetch_ics_calendar(self, url: str) -> iCalendar:
response = httpx.get(url, timeout=self.settings.ics_calendar_timeout)
response.raise_for_status()
return iCalendar.from_ical(response.text)
def _parse_ics_component(
self,
component,
calendar_id: str,
range_start: datetime,
range_end: datetime,
include_recurring: bool,
) -> list[Event]:
base_event = self._build_event_from_component(component, calendar_id)
if not base_event:
return []
range_start_cmp = range_start
range_end_cmp = range_end
if base_event.start.tzinfo and range_start.tzinfo is None:
range_start_cmp = range_start.replace(tzinfo=base_event.start.tzinfo)
range_end_cmp = range_end.replace(tzinfo=base_event.start.tzinfo)
elif base_event.start.tzinfo is None and range_start.tzinfo is not None:
range_start_cmp = range_start.replace(tzinfo=None)
range_end_cmp = range_end.replace(tzinfo=None)
if not include_recurring or not base_event.recurrence_rule:
if base_event.start <= range_end_cmp and base_event.end >= range_start_cmp:
return [base_event]
return []
dtstart = base_event.start
duration = base_event.end - base_event.start
if duration.total_seconds() <= 0:
duration = timedelta(hours=1)
rrule = rrulestr(base_event.recurrence_rule, dtstart=dtstart)
occurrences = rrule.between(range_start_cmp, range_end_cmp, inc=True)
excluded = self._extract_exdates(component)
events = []
for occ_start in occurrences:
if occ_start in excluded:
continue
occ_end = occ_start + duration
occurrence = base_event.model_copy()
occurrence.start = occ_start
occurrence.end = occ_end
events.append(occurrence)
return events
def _extract_exdates(self, component) -> set[datetime]:
exdates: set[datetime] = set()
exdate_prop = component.get("exdate")
if not exdate_prop:
return exdates
exdate_list = exdate_prop if isinstance(exdate_prop, list) else [exdate_prop]
for exdate in exdate_list:
dates = getattr(exdate, "dts", [])
for dt in dates:
if isinstance(dt.dt, datetime):
exdates.add(dt.dt)
else:
exdates.add(datetime.combine(dt.dt, datetime.min.time()))
return exdates
def _build_event_from_component(self, component, calendar_id: str) -> Optional[Event]:
try:
uid = str(component.get("uid", ""))
dtstart = component.get("dtstart")
dtend = component.get("dtend")
start = dtstart.dt if dtstart else datetime.now()
end = dtend.dt if dtend else start + timedelta(hours=1)
all_day = False
if not isinstance(start, datetime):
all_day = True
start = datetime.combine(start, datetime.min.time())
if not isinstance(end, datetime):
end = datetime.combine(end, datetime.min.time())
status_str = str(component.get("status", "CONFIRMED")).upper()
status = EventStatus.CONFIRMED
if status_str == "TENTATIVE":
status = EventStatus.TENTATIVE
elif status_str == "CANCELLED":
status = EventStatus.CANCELLED
attendees = []
for attendee in component.get("attendee", []):
if isinstance(attendee, list):
for a in attendee:
email = str(a).replace("mailto:", "")
attendees.append(Attendee(email=email))
else:
email = str(attendee).replace("mailto:", "")
attendees.append(Attendee(email=email))
rrule = component.get("rrule")
recurrence_rule = None
if rrule:
recurrence_rule = rrule.to_ical().decode("utf-8")
return Event(
id=uid,
calendar_id=calendar_id,
title=str(component.get("summary", "Untitled")),
start=start,
end=end,
all_day=all_day,
description=str(component.get("description", "")) or None,
location=str(component.get("location", "")) or None,
status=status,
attendees=attendees,
recurrence_rule=recurrence_rule,
organizer=str(component.get("organizer", "")).replace("mailto:", "") or None,
)
except Exception as e:
print(f"Error parsing ICS event: {e}")
return None

View File

@@ -114,12 +114,13 @@ class ContactsService:
def list_contacts( def list_contacts(
self, self,
addressbook_id: str, addressbook_id: Optional[str] = None,
search: Optional[str] = None, search: Optional[str] = None,
limit: int = 100, limit: int = 100,
offset: int = 0, offset: int = 0,
) -> ContactList: ) -> ContactList:
client = self._get_client() client = self._get_client()
addressbook_id = self._resolve_addressbook_id(addressbook_id)
# Build URL # Build URL
addressbook_url = self._build_url(addressbook_id) addressbook_url = self._build_url(addressbook_id)
@@ -188,8 +189,9 @@ class ContactsService:
offset=offset, offset=offset,
) )
def get_contact(self, addressbook_id: str, contact_id: str) -> Optional[Contact]: def get_contact(self, contact_id: str, addressbook_id: Optional[str] = None) -> Optional[Contact]:
client = self._get_client() client = self._get_client()
addressbook_id = self._resolve_addressbook_id(addressbook_id)
# Build URL # Build URL
contact_url = self._build_url(contact_id) contact_url = self._build_url(contact_id)
@@ -206,7 +208,7 @@ class ContactsService:
def create_contact( def create_contact(
self, self,
addressbook_id: str, addressbook_id: Optional[str] = None,
first_name: Optional[str] = None, first_name: Optional[str] = None,
last_name: Optional[str] = None, last_name: Optional[str] = None,
display_name: Optional[str] = None, display_name: Optional[str] = None,
@@ -219,6 +221,7 @@ class ContactsService:
birthday: Optional[str] = None, birthday: Optional[str] = None,
) -> Contact: ) -> Contact:
client = self._get_client() client = self._get_client()
addressbook_id = self._resolve_addressbook_id(addressbook_id)
# Create vCard # Create vCard
vcard = vobject.vCard() vcard = vobject.vCard()
@@ -311,8 +314,8 @@ class ContactsService:
def update_contact( def update_contact(
self, self,
addressbook_id: str,
contact_id: str, contact_id: str,
addressbook_id: Optional[str] = None,
first_name: Optional[str] = None, first_name: Optional[str] = None,
last_name: Optional[str] = None, last_name: Optional[str] = None,
display_name: Optional[str] = None, display_name: Optional[str] = None,
@@ -324,7 +327,7 @@ class ContactsService:
notes: Optional[str] = None, notes: Optional[str] = None,
) -> Optional[Contact]: ) -> Optional[Contact]:
# Get existing contact # Get existing contact
existing = self.get_contact(addressbook_id, contact_id) existing = self.get_contact(contact_id, addressbook_id)
if not existing: if not existing:
return None return None
@@ -342,10 +345,10 @@ class ContactsService:
} }
# Delete and recreate (simpler than partial update) # Delete and recreate (simpler than partial update)
self.delete_contact(addressbook_id, contact_id) self.delete_contact(contact_id, addressbook_id)
return self.create_contact(addressbook_id, **updated_data) return self.create_contact(addressbook_id, **updated_data)
def delete_contact(self, addressbook_id: str, contact_id: str) -> OperationResult: def delete_contact(self, contact_id: str, addressbook_id: Optional[str] = None) -> OperationResult:
try: try:
client = self._get_client() client = self._get_client()
@@ -371,7 +374,7 @@ class ContactsService:
return OperationResult(success=False, message=str(e)) return OperationResult(success=False, message=str(e))
def _parse_vcard( def _parse_vcard(
self, vcard_data: str, addressbook_id: str, href: str self, vcard_data: str, addressbook_id: Optional[str], href: str
) -> Optional[Contact]: ) -> Optional[Contact]:
try: try:
vcard = vobject.readOne(vcard_data) vcard = vobject.readOne(vcard_data)
@@ -467,9 +470,10 @@ class ContactsService:
except Exception: except Exception:
pass pass
resolved_addressbook_id = addressbook_id or self._derive_addressbook_id(href)
return Contact( return Contact(
id=href, id=href,
addressbook_id=addressbook_id, addressbook_id=resolved_addressbook_id,
first_name=first_name, first_name=first_name,
last_name=last_name, last_name=last_name,
display_name=display_name, display_name=display_name,
@@ -481,3 +485,16 @@ class ContactsService:
notes=notes, notes=notes,
birthday=birthday, birthday=birthday,
) )
def _derive_addressbook_id(self, contact_href: str) -> str:
if "/" not in contact_href:
return contact_href
base = contact_href.rsplit("/", 1)[0]
return f"{base}/"
def _resolve_addressbook_id(self, addressbook_id: Optional[str]) -> str:
if addressbook_id:
return addressbook_id
if self.settings.carddav_url:
return self.settings.carddav_url
raise ValueError("CARDDAV_URL must be set to use contacts tools")

View File

@@ -1,8 +1,9 @@
import email import email
import html
from email.header import decode_header from email.header import decode_header
from email.mime.multipart import MIMEMultipart from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText from email.mime.text import MIMEText
from email.utils import formataddr, parseaddr from email.utils import formataddr, parseaddr, formatdate, make_msgid
from datetime import datetime from datetime import datetime
from typing import Optional from typing import Optional
import re import re
@@ -200,6 +201,25 @@ class EmailService:
emails=emails, total=total, mailbox=mailbox, limit=limit, offset=offset emails=emails, total=total, mailbox=mailbox, limit=limit, offset=offset
) )
def list_drafts(
self,
mailbox: Optional[str] = None,
limit: int = 50,
offset: int = 0,
include_body: bool = False,
) -> EmailList:
draft_mailbox = mailbox or self._find_drafts_folder() or "Drafts"
try:
return self.list_emails(draft_mailbox, limit, offset, include_body)
except Exception:
return EmailList(
emails=[],
total=0,
mailbox=draft_mailbox,
limit=limit,
offset=offset,
)
def read_email( def read_email(
self, mailbox: str, email_id: str, format: str = "text" self, mailbox: str, email_id: str, format: str = "text"
) -> Optional[Email]: ) -> Optional[Email]:
@@ -241,7 +261,7 @@ class EmailService:
# Get headers # Get headers
headers = {} headers = {}
for key in ["Message-ID", "In-Reply-To", "References", "X-Priority", "List-Unsubscribe", "List-Unsubscribe-Post"]: for key in ["Message-ID", "In-Reply-To", "References", "Reply-To", "X-Priority", "List-Unsubscribe", "List-Unsubscribe-Post"]:
value = msg.get(key) value = msg.get(key)
if value: if value:
headers[key] = decode_mime_header(value) headers[key] = decode_mime_header(value)
@@ -406,28 +426,238 @@ class EmailService:
except Exception as e: except Exception as e:
return OperationResult(success=False, message=str(e)) return OperationResult(success=False, message=str(e))
async def send_email( def delete_draft(
self, email_id: str, mailbox: Optional[str] = None, permanent: bool = False
) -> OperationResult:
draft_mailbox = mailbox or self._find_drafts_folder() or "Drafts"
return self.delete_email(email_id, draft_mailbox, permanent)
def save_draft(
self, self,
to: list[str], to: Optional[list[str]] = None,
subject: str, subject: Optional[str] = None,
body: str, body: Optional[str] = None,
cc: Optional[list[str]] = None, cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None, bcc: Optional[list[str]] = None,
reply_to: Optional[str] = None,
html_body: Optional[str] = None, html_body: Optional[str] = None,
mailbox: Optional[str] = None,
in_reply_to_email_id: Optional[str] = None,
in_reply_to_mailbox: Optional[str] = None,
reply_all: bool = False,
) -> OperationResult: ) -> OperationResult:
try: try:
draft_mailbox = mailbox or self._find_drafts_folder() or "Drafts"
if in_reply_to_email_id:
context, error = self._get_reply_context(
in_reply_to_mailbox or "INBOX",
in_reply_to_email_id,
reply_all,
cc,
self.settings.smtp_from_email,
)
if error:
return error
if not to:
to = context["to"]
if subject is None:
subject = context["subject"]
if cc is None:
cc = context["cc"]
in_reply_to = context["in_reply_to"]
references = context["references"]
original = context["original"]
else:
in_reply_to = None
references = None
original = None
if not to:
return OperationResult(success=False, message="'to' is required for drafts")
if subject is None:
return OperationResult(success=False, message="'subject' is required for drafts")
if body is None:
return OperationResult(success=False, message="'body' is required for drafts")
if original:
body, html_body = self._build_reply_bodies(original, body)
msg = self._build_draft_message(
to=to,
subject=subject,
body=body,
cc=cc,
bcc=bcc,
html_body=html_body,
in_reply_to=in_reply_to,
references=references,
)
client = self._get_imap_client()
append_result = client.append(
draft_mailbox,
msg.as_bytes(),
flags=["\\Draft"],
)
draft_id = None
if append_result and isinstance(append_result, tuple) and len(append_result) > 1:
draft_id = str(append_result[1])
return OperationResult(
success=True,
message=f"Draft saved to {draft_mailbox}",
id=draft_id,
)
except Exception as e:
return OperationResult(success=False, message=str(e))
def update_draft(
self,
email_id: str,
mailbox: Optional[str] = None,
to: Optional[list[str]] = None,
subject: Optional[str] = None,
body: Optional[str] = None,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
html_body: Optional[str] = None,
in_reply_to_email_id: Optional[str] = None,
in_reply_to_mailbox: Optional[str] = None,
reply_all: bool = False,
) -> OperationResult:
draft_mailbox = mailbox or self._find_drafts_folder() or "Drafts"
try:
existing = self.read_email(draft_mailbox, email_id, format="both")
except Exception as e:
return OperationResult(success=False, message=str(e), id=email_id)
if not existing:
return OperationResult(
success=False,
message=f"Draft {email_id} not found in {draft_mailbox}",
id=email_id,
)
resolved_to = to if to is not None else [addr.email for addr in existing.to_addresses]
resolved_cc = cc if cc is not None else [addr.email for addr in existing.cc_addresses]
resolved_bcc = bcc if bcc is not None else [addr.email for addr in existing.bcc_addresses]
resolved_subject = subject if subject is not None else existing.subject
resolved_body = body if body is not None else (existing.body_text or "")
resolved_html = html_body if html_body is not None else existing.body_html
if in_reply_to_email_id:
context, error = self._get_reply_context(
in_reply_to_mailbox or "INBOX",
in_reply_to_email_id,
reply_all,
cc,
self.settings.smtp_from_email,
)
if error:
return error
if to is None:
resolved_to = context["to"]
if subject is None:
resolved_subject = context["subject"]
if cc is None:
resolved_cc = context["cc"]
in_reply_to = context["in_reply_to"]
references = context["references"]
original = context["original"]
else:
in_reply_to = None
references = None
original = None
try:
client = self._get_imap_client()
client.select_folder(draft_mailbox)
uid = int(email_id)
client.delete_messages([uid])
client.expunge()
if original:
resolved_body, resolved_html = self._build_reply_bodies(original, resolved_body)
msg = self._build_draft_message(
to=resolved_to,
subject=resolved_subject,
body=resolved_body,
cc=resolved_cc,
bcc=resolved_bcc,
html_body=resolved_html,
in_reply_to=in_reply_to,
references=references,
)
append_result = client.append(
draft_mailbox,
msg.as_bytes(),
flags=["\\Draft"],
)
draft_id = None
if append_result and isinstance(append_result, tuple) and len(append_result) > 1:
draft_id = str(append_result[1])
return OperationResult(
success=True,
message=f"Draft {email_id} updated in {draft_mailbox}",
id=draft_id or email_id,
)
except Exception as e:
return OperationResult(success=False, message=str(e), id=email_id)
async def send_email(
self,
to: Optional[list[str]] = None,
subject: Optional[str] = None,
body: Optional[str] = None,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
html_body: Optional[str] = None,
sender_email: Optional[str] = None,
sender_name: Optional[str] = None,
in_reply_to: Optional[str] = None,
references: Optional[list[str]] = None,
in_reply_to_email_id: Optional[str] = None,
in_reply_to_mailbox: Optional[str] = None,
reply_all: bool = False,
) -> OperationResult:
try:
if in_reply_to_email_id:
resolved_name, resolved_email = self._resolve_sender(sender_email, sender_name)
context, error = self._get_reply_context(
in_reply_to_mailbox or "INBOX",
in_reply_to_email_id,
reply_all,
cc,
resolved_email,
)
if error:
return error
if not to:
to = context["to"]
if subject is None:
subject = context["subject"]
if cc is None:
cc = context["cc"]
in_reply_to = context["in_reply_to"]
references = context["references"]
if not to:
return OperationResult(success=False, message="'to' is required to send email")
if subject is None:
return OperationResult(success=False, message="'subject' is required to send email")
if body is None:
return OperationResult(success=False, message="'body' is required to send email")
msg = MIMEMultipart("alternative") msg = MIMEMultipart("alternative")
msg["Subject"] = subject msg["Subject"] = subject
msg["From"] = formataddr( resolved_name, resolved_email = self._resolve_sender(sender_email, sender_name)
(self.settings.smtp_from_name or "", self.settings.smtp_from_email) msg["From"] = formataddr((resolved_name or "", resolved_email))
)
msg["To"] = ", ".join(to) msg["To"] = ", ".join(to)
if cc: if cc:
msg["Cc"] = ", ".join(cc) msg["Cc"] = ", ".join(cc)
if reply_to: if in_reply_to:
msg["Reply-To"] = reply_to msg["In-Reply-To"] = in_reply_to
if references:
msg["References"] = " ".join(references)
# Add plain text body # Add plain text body
msg.attach(MIMEText(body, "plain", "utf-8")) msg.attach(MIMEText(body, "plain", "utf-8"))
@@ -461,6 +691,86 @@ class EmailService:
except Exception as e: except Exception as e:
return OperationResult(success=False, message=str(e)) return OperationResult(success=False, message=str(e))
async def send_draft(
self,
email_id: str,
mailbox: Optional[str] = None,
) -> OperationResult:
draft_mailbox = mailbox or self._find_drafts_folder() or "Drafts"
try:
draft = self.read_email(draft_mailbox, email_id, format="both")
except Exception as e:
return OperationResult(success=False, message=str(e), id=email_id)
if not draft:
return OperationResult(
success=False,
message=f"Draft {email_id} not found in {draft_mailbox}",
id=email_id,
)
to = [addr.email for addr in draft.to_addresses]
cc = [addr.email for addr in draft.cc_addresses]
bcc = [addr.email for addr in draft.bcc_addresses]
if not to and not cc and not bcc:
return OperationResult(
success=False,
message="Draft has no recipients",
id=email_id,
)
subject = draft.subject or "(No Subject)"
body = draft.body_text or ""
html_body = draft.body_html
result = await self.send_email(
to=to or None,
subject=subject,
body=body,
cc=cc or None,
bcc=bcc or None,
html_body=html_body,
in_reply_to=draft.in_reply_to,
references=draft.references or None,
)
if not result.success:
return result
try:
client = self._get_imap_client()
client.select_folder(draft_mailbox)
client.delete_messages([int(email_id)])
client.expunge()
except Exception:
pass
return result
async def reply_email(
self,
mailbox: str,
email_id: str,
body: str,
reply_all: bool = False,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
html_body: Optional[str] = None,
sender_email: Optional[str] = None,
sender_name: Optional[str] = None,
) -> OperationResult:
return await self.send_email(
body=body,
bcc=bcc,
html_body=html_body,
sender_email=sender_email,
sender_name=sender_name,
in_reply_to_email_id=email_id,
in_reply_to_mailbox=mailbox,
reply_all=reply_all,
cc=cc,
)
def _parse_envelope_addresses(self, addresses) -> list[EmailAddress]: def _parse_envelope_addresses(self, addresses) -> list[EmailAddress]:
if not addresses: if not addresses:
return [] return []
@@ -575,6 +885,179 @@ class EmailService:
return name return name
return None return None
def _find_drafts_folder(self) -> Optional[str]:
client = self._get_imap_client()
folders = client.list_folders()
draft_names = ["Drafts", "Draft", "INBOX.Drafts", "[Gmail]/Drafts"]
for flags, delimiter, name in folders:
if name in draft_names or b"\\Drafts" in flags:
return name
return None
def _build_draft_message(
self,
to: list[str],
subject: str,
body: str,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
html_body: Optional[str] = None,
in_reply_to: Optional[str] = None,
references: Optional[list[str]] = None,
) -> MIMEMultipart:
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = formataddr(
(self.settings.smtp_from_name or "", self.settings.smtp_from_email)
)
msg["To"] = ", ".join(to)
msg["Date"] = formatdate(localtime=True)
msg["Message-ID"] = make_msgid()
if cc:
msg["Cc"] = ", ".join(cc)
if bcc:
msg["Bcc"] = ", ".join(bcc)
if in_reply_to:
msg["In-Reply-To"] = in_reply_to
if references:
msg["References"] = " ".join(references)
msg.attach(MIMEText(body or "", "plain", "utf-8"))
if html_body:
msg.attach(MIMEText(html_body, "html", "utf-8"))
return msg
def _get_reply_context(
self,
mailbox: str,
email_id: str,
reply_all: bool,
cc: Optional[list[str]],
sender_email: Optional[str],
) -> tuple[dict, Optional[OperationResult]]:
original = self.read_email(mailbox, email_id, format="both")
if not original:
return {}, OperationResult(
success=False,
message=f"Email {email_id} not found in {mailbox}",
id=email_id,
)
reply_to_header = original.headers.get("Reply-To")
reply_to_email = None
if reply_to_header:
_, reply_to_email = parseaddr(reply_to_header)
if not reply_to_email:
reply_to_email = original.from_address.email
to = [reply_to_email] if reply_to_email else []
reply_cc: list[str] = []
if reply_all:
for addr in original.to_addresses + original.cc_addresses:
if addr.email and addr.email not in to:
reply_cc.append(addr.email)
if cc:
reply_cc.extend(cc)
to = self._dedupe_emails(to, sender_email)
reply_cc = self._dedupe_emails(reply_cc, sender_email)
if not to and reply_cc:
to = [reply_cc.pop(0)]
if not to:
return {}, OperationResult(
success=False,
message="No valid recipients found for reply",
id=email_id,
)
subject = original.subject or "(No Subject)"
if not subject.lower().startswith("re:"):
subject = f"Re: {subject}"
in_reply_to = original.headers.get("Message-ID") or original.in_reply_to
references = list(original.references)
if in_reply_to and in_reply_to not in references:
references.append(in_reply_to)
return {
"original": original,
"to": to,
"cc": reply_cc or None,
"subject": subject,
"in_reply_to": in_reply_to,
"references": references or None,
}, None
def _build_reply_bodies(self, original: Email, body_text: str) -> tuple[str, Optional[str]]:
intro = self._format_reply_intro(original)
quoted_text = original.body_text or ""
text = body_text or ""
if intro:
text = f"{text}\n\n{intro}\n\n{quoted_text}".rstrip()
html_body = None
quoted_html = original.body_html
if not quoted_html and quoted_text:
quoted_html = html.escape(quoted_text).replace("\n", "<br/>")
if quoted_html:
cite = original.headers.get("Message-ID") or original.in_reply_to or ""
cite_attr = f' cite="{html.escape(cite)}"' if cite else ""
html_intro = html.escape(intro).replace("\n", "<br/>")
html_body = (
f"<div>{html.escape(body_text).replace('\\n', '<br/>')}</div>"
f"<br/><br/>{html_intro}<br/><br/>"
f"<blockquote type=\"cite\"{cite_attr}>"
f"<div dir=\"ltr\">{quoted_html}</div>"
f"</blockquote><br/><br/><br/>"
)
return text, html_body
def _format_reply_intro(self, original: Email) -> str:
date_str = ""
if original.date:
try:
date_str = original.date.strftime("%A, %B %d, %Y %H:%M %Z").strip()
except Exception:
date_str = str(original.date)
from_name = original.from_address.name or original.from_address.email
from_email = original.from_address.email
if date_str:
return f"On {date_str}, {from_name} <{from_email}> wrote:"
return f"On {from_name} <{from_email}> wrote:"
def _resolve_sender(
self, sender_email: Optional[str], sender_name: Optional[str]
) -> tuple[Optional[str], str]:
if sender_email:
return sender_name, sender_email
if sender_name:
name, email_addr = parseaddr(sender_name)
if email_addr:
return name or None, email_addr
return self.settings.smtp_from_name, self.settings.smtp_from_email
def _dedupe_emails(self, emails: list[str], self_email: Optional[str]) -> list[str]:
seen = set()
cleaned = []
for addr in emails:
if not addr:
continue
addr_lower = addr.lower()
if self_email and addr_lower == self_email.lower():
continue
if addr_lower in seen:
continue
seen.add(addr_lower)
cleaned.append(addr)
return cleaned
def set_flags( def set_flags(
self, self,
email_id: str, email_id: str,

View File

@@ -2,18 +2,21 @@ from typing import Optional
from fastmcp import FastMCP from fastmcp import FastMCP
from services.calendar_service import CalendarService from services.calendar_service import CalendarService
from tools.logging_utils import log_tool_call
def register_calendar_tools(mcp: FastMCP, service: CalendarService): def register_calendar_tools(mcp: FastMCP, service: CalendarService):
"""Register all calendar-related MCP tools.""" """Register all calendar-related MCP tools."""
@mcp.tool(description="List all available calendars from the CalDAV server. Returns calendar ID, name, and properties.") @mcp.tool(description="List all available calendars from CalDAV and configured ICS feeds. Returns calendar ID, name, and properties.")
@log_tool_call
def list_calendars() -> list[dict]: def list_calendars() -> list[dict]:
"""List all calendars.""" """List all calendars."""
calendars = service.list_calendars() calendars = service.list_calendars()
return [c.model_dump() for c in calendars] return [c.model_dump() for c in calendars]
@mcp.tool(description="List events within a date range. If no calendar is specified, lists events from all calendars.") @mcp.tool(description="List events within a date range. If no calendar is specified, lists events from all calendars.")
@log_tool_call
def list_events( def list_events(
start_date: str, start_date: str,
end_date: str, end_date: str,
@@ -26,7 +29,7 @@ def register_calendar_tools(mcp: FastMCP, service: CalendarService):
Args: Args:
start_date: Start of date range (ISO format: YYYY-MM-DD) start_date: Start of date range (ISO format: YYYY-MM-DD)
end_date: End of date range (ISO format: YYYY-MM-DD) end_date: End of date range (ISO format: YYYY-MM-DD)
calendar_id: The calendar ID (URL) to query. If not provided, lists from all calendars. calendar_id: The calendar ID (CalDAV URL or ICS ID) to query. If not provided, lists from all calendars.
include_recurring: Whether to expand recurring events (default: True) include_recurring: Whether to expand recurring events (default: True)
""" """
if calendar_id: if calendar_id:
@@ -59,6 +62,7 @@ def register_calendar_tools(mcp: FastMCP, service: CalendarService):
} }
@mcp.tool(description="Get detailed information about a specific calendar event including attendees and recurrence.") @mcp.tool(description="Get detailed information about a specific calendar event including attendees and recurrence.")
@log_tool_call
def get_event( def get_event(
calendar_id: str, calendar_id: str,
event_id: str, event_id: str,
@@ -67,13 +71,14 @@ def register_calendar_tools(mcp: FastMCP, service: CalendarService):
Get a specific event. Get a specific event.
Args: Args:
calendar_id: The calendar ID containing the event calendar_id: The calendar ID (CalDAV URL or ICS ID) containing the event
event_id: The unique ID (UID) of the event event_id: The unique ID (UID) of the event
""" """
result = service.get_event(calendar_id, event_id) result = service.get_event(calendar_id, event_id)
return result.model_dump() if result else None return result.model_dump() if result else None
@mcp.tool(description="Create a new calendar event with title, time, location, attendees, and optional recurrence.") @mcp.tool(description="Create a new calendar event with title, time, location, attendees, and optional recurrence.")
@log_tool_call
def create_event( def create_event(
calendar_id: str, calendar_id: str,
title: str, title: str,
@@ -89,7 +94,7 @@ def register_calendar_tools(mcp: FastMCP, service: CalendarService):
Create a new calendar event. Create a new calendar event.
Args: Args:
calendar_id: The calendar ID to create the event in calendar_id: The calendar ID to create the event in (CalDAV only)
title: Event title/summary title: Event title/summary
start: Start datetime (ISO format: YYYY-MM-DDTHH:MM:SS) start: Start datetime (ISO format: YYYY-MM-DDTHH:MM:SS)
end: End datetime (ISO format: YYYY-MM-DDTHH:MM:SS) end: End datetime (ISO format: YYYY-MM-DDTHH:MM:SS)
@@ -105,6 +110,7 @@ def register_calendar_tools(mcp: FastMCP, service: CalendarService):
return result.model_dump() return result.model_dump()
@mcp.tool(description="Update an existing calendar event. Only provided fields will be modified.") @mcp.tool(description="Update an existing calendar event. Only provided fields will be modified.")
@log_tool_call
def update_event( def update_event(
calendar_id: str, calendar_id: str,
event_id: str, event_id: str,
@@ -119,7 +125,7 @@ def register_calendar_tools(mcp: FastMCP, service: CalendarService):
Update an existing event. Update an existing event.
Args: Args:
calendar_id: The calendar ID containing the event calendar_id: The calendar ID containing the event (CalDAV only)
event_id: The unique ID of the event to update event_id: The unique ID of the event to update
title: New event title (optional) title: New event title (optional)
start: New start datetime (optional) start: New start datetime (optional)
@@ -134,6 +140,7 @@ def register_calendar_tools(mcp: FastMCP, service: CalendarService):
return result.model_dump() if result else None return result.model_dump() if result else None
@mcp.tool(description="Delete a calendar event by ID.") @mcp.tool(description="Delete a calendar event by ID.")
@log_tool_call
def delete_event( def delete_event(
calendar_id: str, calendar_id: str,
event_id: str, event_id: str,
@@ -143,7 +150,7 @@ def register_calendar_tools(mcp: FastMCP, service: CalendarService):
Delete a calendar event. Delete a calendar event.
Args: Args:
calendar_id: The calendar ID containing the event calendar_id: The calendar ID containing the event (CalDAV only)
event_id: The unique ID of the event to delete event_id: The unique ID of the event to delete
notify_attendees: Whether to notify attendees of cancellation (default: True) notify_attendees: Whether to notify attendees of cancellation (default: True)
""" """

View File

@@ -2,54 +2,47 @@ from typing import Optional
from fastmcp import FastMCP from fastmcp import FastMCP
from services.contacts_service import ContactsService from services.contacts_service import ContactsService
from tools.logging_utils import log_tool_call
def register_contacts_tools(mcp: FastMCP, service: ContactsService): def register_contacts_tools(mcp: FastMCP, service: ContactsService):
"""Register all contacts-related MCP tools.""" """Register all contacts-related MCP tools."""
@mcp.tool(description="List all available address books from the CardDAV server.") @mcp.tool(description="List contacts in the configured address book with optional search filtering and pagination.")
def list_addressbooks() -> list[dict]: @log_tool_call
"""List all address books."""
addressbooks = service.list_addressbooks()
return [a.model_dump() for a in addressbooks]
@mcp.tool(description="List contacts in an address book with optional search filtering and pagination.")
def list_contacts( def list_contacts(
addressbook_id: str,
search: Optional[str] = None, search: Optional[str] = None,
limit: int = 100, limit: int = 100,
offset: int = 0, offset: int = 0,
) -> dict: ) -> dict:
""" """
List contacts in an address book. List contacts in the configured address book.
Args: Args:
addressbook_id: The address book ID (URL path) to query
search: Optional search term to filter contacts by name or email search: Optional search term to filter contacts by name or email
limit: Maximum number of contacts to return (default: 100) limit: Maximum number of contacts to return (default: 100)
offset: Number of contacts to skip for pagination (default: 0) offset: Number of contacts to skip for pagination (default: 0)
""" """
result = service.list_contacts(addressbook_id, search, limit, offset) result = service.list_contacts(search=search, limit=limit, offset=offset)
return result.model_dump() return result.model_dump()
@mcp.tool(description="Get detailed information about a specific contact including all fields.") @mcp.tool(description="Get detailed information about a specific contact including all fields.")
@log_tool_call
def get_contact( def get_contact(
addressbook_id: str,
contact_id: str, contact_id: str,
) -> Optional[dict]: ) -> Optional[dict]:
""" """
Get a specific contact. Get a specific contact.
Args: Args:
addressbook_id: The address book containing the contact
contact_id: The unique ID (URL) of the contact contact_id: The unique ID (URL) of the contact
""" """
result = service.get_contact(addressbook_id, contact_id) result = service.get_contact(contact_id)
return result.model_dump() if result else None return result.model_dump() if result else None
@mcp.tool(description="Create a new contact with name, emails, phones, addresses, and other details.") @mcp.tool(description="Create a new contact with name, emails, phones, addresses, and other details.")
@log_tool_call
def create_contact( def create_contact(
addressbook_id: str,
first_name: Optional[str] = None, first_name: Optional[str] = None,
last_name: Optional[str] = None, last_name: Optional[str] = None,
display_name: Optional[str] = None, display_name: Optional[str] = None,
@@ -65,7 +58,6 @@ def register_contacts_tools(mcp: FastMCP, service: ContactsService):
Create a new contact. Create a new contact.
Args: Args:
addressbook_id: The address book ID to create the contact in
first_name: Contact's first/given name first_name: Contact's first/given name
last_name: Contact's last/family name last_name: Contact's last/family name
display_name: Full display name (auto-generated if not provided) display_name: Full display name (auto-generated if not provided)
@@ -78,23 +70,22 @@ def register_contacts_tools(mcp: FastMCP, service: ContactsService):
birthday: Birthday in ISO format (YYYY-MM-DD) birthday: Birthday in ISO format (YYYY-MM-DD)
""" """
result = service.create_contact( result = service.create_contact(
addressbook_id, first_name=first_name,
first_name, last_name=last_name,
last_name, display_name=display_name,
display_name, emails=emails,
emails, phones=phones,
phones, addresses=addresses,
addresses, organization=organization,
organization, title=title,
title, notes=notes,
notes, birthday=birthday,
birthday,
) )
return result.model_dump() return result.model_dump()
@mcp.tool(description="Update an existing contact. Only provided fields will be modified.") @mcp.tool(description="Update an existing contact. Only provided fields will be modified.")
@log_tool_call
def update_contact( def update_contact(
addressbook_id: str,
contact_id: str, contact_id: str,
first_name: Optional[str] = None, first_name: Optional[str] = None,
last_name: Optional[str] = None, last_name: Optional[str] = None,
@@ -110,7 +101,6 @@ def register_contacts_tools(mcp: FastMCP, service: ContactsService):
Update an existing contact. Update an existing contact.
Args: Args:
addressbook_id: The address book containing the contact
contact_id: The unique ID of the contact to update contact_id: The unique ID of the contact to update
first_name: New first name (optional) first_name: New first name (optional)
last_name: New last name (optional) last_name: New last name (optional)
@@ -123,31 +113,29 @@ def register_contacts_tools(mcp: FastMCP, service: ContactsService):
notes: New notes (optional) notes: New notes (optional)
""" """
result = service.update_contact( result = service.update_contact(
addressbook_id,
contact_id, contact_id,
first_name, first_name=first_name,
last_name, last_name=last_name,
display_name, display_name=display_name,
emails, emails=emails,
phones, phones=phones,
addresses, addresses=addresses,
organization, organization=organization,
title, title=title,
notes, notes=notes,
) )
return result.model_dump() if result else None return result.model_dump() if result else None
@mcp.tool(description="Delete a contact from an address book.") @mcp.tool(description="Delete a contact from an address book.")
@log_tool_call
def delete_contact( def delete_contact(
addressbook_id: str,
contact_id: str, contact_id: str,
) -> dict: ) -> dict:
""" """
Delete a contact. Delete a contact.
Args: Args:
addressbook_id: The address book containing the contact
contact_id: The unique ID of the contact to delete contact_id: The unique ID of the contact to delete
""" """
result = service.delete_contact(addressbook_id, contact_id) result = service.delete_contact(contact_id)
return result.model_dump() return result.model_dump()

View File

@@ -2,18 +2,21 @@ from typing import Optional
from fastmcp import FastMCP from fastmcp import FastMCP
from services.email_service import EmailService from services.email_service import EmailService
from tools.logging_utils import log_tool_call
def register_email_tools(mcp: FastMCP, service: EmailService): def register_email_tools(mcp: FastMCP, service: EmailService):
"""Register all email-related MCP tools.""" """Register all email-related MCP tools."""
@mcp.tool(description="List all mailboxes/folders in the email account. Returns name, path, message count, and unread count for each mailbox.") @mcp.tool(description="List all mailboxes/folders in the email account. Returns name, path, message count, and unread count for each mailbox.")
@log_tool_call
def list_mailboxes() -> list[dict]: def list_mailboxes() -> list[dict]:
"""List all IMAP mailboxes/folders.""" """List all IMAP mailboxes/folders."""
mailboxes = service.list_mailboxes() mailboxes = service.list_mailboxes()
return [m.model_dump() for m in mailboxes] return [m.model_dump() for m in mailboxes]
@mcp.tool(description="List emails in a mailbox with pagination. Returns email summaries including subject, from, date, and read status.") @mcp.tool(description="List emails in a mailbox with pagination. Returns email summaries including subject, from, date, and read status.")
@log_tool_call
def list_emails( def list_emails(
mailbox: str = "INBOX", mailbox: str = "INBOX",
limit: int = 50, limit: int = 50,
@@ -32,7 +35,28 @@ def register_email_tools(mcp: FastMCP, service: EmailService):
result = service.list_emails(mailbox, limit, offset, include_body) result = service.list_emails(mailbox, limit, offset, include_body)
return result.model_dump() return result.model_dump()
@mcp.tool(description="List draft emails in the Drafts mailbox with pagination.")
@log_tool_call
def list_drafts(
mailbox: Optional[str] = None,
limit: int = 50,
offset: int = 0,
include_body: bool = False,
) -> dict:
"""
List draft emails.
Args:
mailbox: Drafts mailbox/folder override (default: auto-detect)
limit: Maximum number of drafts to return (default: 50)
offset: Number of drafts to skip for pagination (default: 0)
include_body: Whether to include body snippets (default: False)
"""
result = service.list_drafts(mailbox, limit, offset, include_body)
return result.model_dump()
@mcp.tool(description="Read a specific email by ID with full body content and attachment information.") @mcp.tool(description="Read a specific email by ID with full body content and attachment information.")
@log_tool_call
def read_email( def read_email(
mailbox: str, mailbox: str,
email_id: str, email_id: str,
@@ -50,6 +74,7 @@ def register_email_tools(mcp: FastMCP, service: EmailService):
return result.model_dump() if result else None return result.model_dump() if result else None
@mcp.tool(description="Search emails in a mailbox using various criteria like subject, sender, or body content.") @mcp.tool(description="Search emails in a mailbox using various criteria like subject, sender, or body content.")
@log_tool_call
def search_emails( def search_emails(
query: str, query: str,
mailbox: str = "INBOX", mailbox: str = "INBOX",
@@ -75,6 +100,7 @@ def register_email_tools(mcp: FastMCP, service: EmailService):
return result.model_dump() return result.model_dump()
@mcp.tool(description="Move an email from one mailbox/folder to another.") @mcp.tool(description="Move an email from one mailbox/folder to another.")
@log_tool_call
def move_email( def move_email(
email_id: str, email_id: str,
source_mailbox: str, source_mailbox: str,
@@ -92,6 +118,7 @@ def register_email_tools(mcp: FastMCP, service: EmailService):
return result.model_dump() return result.model_dump()
@mcp.tool(description="Delete an email, either moving it to trash or permanently deleting it.") @mcp.tool(description="Delete an email, either moving it to trash or permanently deleting it.")
@log_tool_call
def delete_email( def delete_email(
email_id: str, email_id: str,
mailbox: str, mailbox: str,
@@ -108,32 +135,127 @@ def register_email_tools(mcp: FastMCP, service: EmailService):
result = service.delete_email(email_id, mailbox, permanent) result = service.delete_email(email_id, mailbox, permanent)
return result.model_dump() return result.model_dump()
@mcp.tool(description="Send a new email via SMTP. Supports plain text and HTML content, CC, BCC, and reply-to.") @mcp.tool(description="Delete a drafted email by ID, optionally permanently.")
async def send_email( @log_tool_call
to: list[str], def delete_draft(
subject: str, email_id: str,
body: str, mailbox: Optional[str] = None,
cc: Optional[list[str]] = None, permanent: bool = False,
bcc: Optional[list[str]] = None,
reply_to: Optional[str] = None,
html_body: Optional[str] = None,
) -> dict: ) -> dict:
""" """
Send a new email. Delete a draft email.
Args: Args:
email_id: The unique ID of the draft
mailbox: Drafts mailbox/folder override (default: auto-detect)
permanent: If True, permanently delete; if False, move to Trash (default: False)
"""
result = service.delete_draft(email_id, mailbox, permanent)
return result.model_dump()
@mcp.tool(description="Save a new draft email to the Drafts mailbox. Supports reply threading via in_reply_to_email_id.")
@log_tool_call
def save_draft(
to: Optional[list[str]] = None,
subject: Optional[str] = None,
body: Optional[str] = None,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
mailbox: Optional[str] = None,
in_reply_to_email_id: Optional[str] = None,
in_reply_to_mailbox: Optional[str] = None,
reply_all: bool = False,
) -> dict:
"""
Save a new email draft.
Args:
to: List of recipient email addresses (required unless in_reply_to_email_id is set)
subject: Email subject line (required unless in_reply_to_email_id is set)
body: Plain text email body (required unless in_reply_to_email_id is set)
cc: List of CC recipients (optional)
bcc: List of BCC recipients (optional)
mailbox: Drafts mailbox/folder override (default: auto-detect)
in_reply_to_email_id: Email UID to reply to (optional, derives recipients/subject and sets threading headers)
in_reply_to_mailbox: Mailbox containing the in_reply_to_email_id (default: INBOX)
reply_all: Whether to include original recipients when replying (default: False)
"""
result = service.save_draft(
to=to,
subject=subject,
body=body,
cc=cc,
bcc=bcc,
html_body=None,
mailbox=mailbox,
in_reply_to_email_id=in_reply_to_email_id,
in_reply_to_mailbox=in_reply_to_mailbox,
reply_all=reply_all,
)
return result.model_dump()
@mcp.tool(description="Edit an existing draft email. Only provided fields will be modified. Supports reply threading via in_reply_to_email_id.")
@log_tool_call
def edit_draft(
email_id: str,
mailbox: Optional[str] = None,
to: Optional[list[str]] = None,
subject: Optional[str] = None,
body: Optional[str] = None,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
in_reply_to_email_id: Optional[str] = None,
in_reply_to_mailbox: Optional[str] = None,
reply_all: bool = False,
) -> dict:
"""
Update an existing draft email.
Args:
email_id: The unique ID of the draft
mailbox: Drafts mailbox/folder override (default: auto-detect)
to: List of recipient email addresses to: List of recipient email addresses
subject: Email subject line subject: Email subject line
body: Plain text email body body: Plain text email body
cc: List of CC recipients (optional) cc: List of CC recipients (optional)
bcc: List of BCC recipients (optional) bcc: List of BCC recipients (optional)
reply_to: Reply-to address (optional) in_reply_to_email_id: Email UID to reply to (optional, derives recipients/subject and sets threading headers)
html_body: HTML version of the email body (optional) in_reply_to_mailbox: Mailbox containing the in_reply_to_email_id (default: INBOX)
reply_all: Whether to include original recipients when replying (default: False)
""" """
result = await service.send_email(to, subject, body, cc, bcc, reply_to, html_body) result = service.update_draft(
email_id=email_id,
mailbox=mailbox,
to=to,
subject=subject,
body=body,
cc=cc,
bcc=bcc,
html_body=None,
in_reply_to_email_id=in_reply_to_email_id,
in_reply_to_mailbox=in_reply_to_mailbox,
reply_all=reply_all,
)
return result.model_dump()
@mcp.tool(description="Send an existing draft by ID. Only drafts can be sent.")
@log_tool_call
async def send_draft(
email_id: str,
mailbox: Optional[str] = None,
) -> dict:
"""
Send a draft email.
Args:
email_id: The unique ID of the draft to send
mailbox: Drafts mailbox/folder override (default: auto-detect)
"""
result = await service.send_draft(email_id, mailbox)
return result.model_dump() return result.model_dump()
@mcp.tool(description="Set or remove IMAP flags on an email. Standard flags: \\Seen, \\Answered, \\Flagged, \\Deleted, \\Draft. Custom keywords are also supported.") @mcp.tool(description="Set or remove IMAP flags on an email. Standard flags: \\Seen, \\Answered, \\Flagged, \\Deleted, \\Draft. Custom keywords are also supported.")
@log_tool_call
def set_email_flags( def set_email_flags(
email_id: str, email_id: str,
mailbox: str, mailbox: str,
@@ -153,7 +275,8 @@ def register_email_tools(mcp: FastMCP, service: EmailService):
return result.model_dump() return result.model_dump()
@mcp.tool(description="Unsubscribe from a mailing list. Parses List-Unsubscribe headers and attempts automatic unsubscribe via HTTP or provides mailto instructions.") @mcp.tool(description="Unsubscribe from a mailing list. Parses List-Unsubscribe headers and attempts automatic unsubscribe via HTTP or provides mailto instructions.")
async def unsubscribe_email( @log_tool_call
async def unsubscribe_maillist(
email_id: str, email_id: str,
mailbox: str = "INBOX", mailbox: str = "INBOX",
) -> dict: ) -> dict:

View File

@@ -0,0 +1,60 @@
import functools
import inspect
import json
import logging
import os
logger = logging.getLogger("mcp.tools")
_MAX_LOG_CHARS = int(os.getenv("TOOL_LOG_MAX_CHARS", "4000"))
def _serialize(value) -> str:
try:
return json.dumps(value, default=str, ensure_ascii=True)
except TypeError:
return repr(value)
def _truncate(text: str) -> str:
if len(text) <= _MAX_LOG_CHARS:
return text
truncated = len(text) - _MAX_LOG_CHARS
return f"{text[:_MAX_LOG_CHARS]}...(truncated {truncated} chars)"
def log_tool_call(func):
"""Log tool calls and responses with optional truncation."""
if inspect.iscoroutinefunction(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
logger.info(
"Tool call %s args=%s kwargs=%s",
func.__name__,
_truncate(_serialize(args)),
_truncate(_serialize(kwargs)),
)
result = await func(*args, **kwargs)
logger.info(
"Tool response %s result=%s",
func.__name__,
_truncate(_serialize(result)),
)
return result
return wrapper
@functools.wraps(func)
def wrapper(*args, **kwargs):
logger.info(
"Tool call %s args=%s kwargs=%s",
func.__name__,
_truncate(_serialize(args)),
_truncate(_serialize(kwargs)),
)
result = func(*args, **kwargs)
logger.info(
"Tool response %s result=%s",
func.__name__,
_truncate(_serialize(result)),
)
return result
return wrapper

30
test.sh
View File

@@ -17,12 +17,12 @@ echo "Testing MCP server at $BASE_URL"
echo "================================" echo "================================"
# Test health endpoint # Test health endpoint
echo -e "\n[1/5] Testing health endpoint..." echo -e "\n[1/6] Testing health endpoint..."
HEALTH=$(curl -s "$BASE_URL/health") HEALTH=$(curl -s "$BASE_URL/health")
echo "Response: $HEALTH" echo "Response: $HEALTH"
# Initialize session and capture session ID # Initialize session and capture session ID
echo -e "\n[2/5] Initializing MCP session..." echo -e "\n[2/6] Initializing MCP session..."
INIT_RESPONSE=$(curl -s -D - -X POST "$MCP_ENDPOINT" \ INIT_RESPONSE=$(curl -s -D - -X POST "$MCP_ENDPOINT" \
-H "Content-Type: application/json" \ -H "Content-Type: application/json" \
-H "Accept: application/json, text/event-stream" \ -H "Accept: application/json, text/event-stream" \
@@ -55,12 +55,12 @@ mcp_request() {
} }
# List available tools # List available tools
echo -e "\n[3/5] Listing available tools..." echo -e "\n[3/6] Listing available tools..."
TOOLS=$(mcp_request 2 "tools/list" "{}") TOOLS=$(mcp_request 2 "tools/list" "{}")
echo "$TOOLS" | grep -o '"name":"[^"]*"' | head -20 || echo "$TOOLS" echo "$TOOLS" | grep -o '"name":"[^"]*"' | head -20 || echo "$TOOLS"
# Get server info # Get server info
echo -e "\n[4/5] Getting server info..." echo -e "\n[4/6] Getting server info..."
SERVER_INFO=$(mcp_request 3 "tools/call" '{"name":"get_server_info","arguments":{}}') SERVER_INFO=$(mcp_request 3 "tools/call" '{"name":"get_server_info","arguments":{}}')
echo "$SERVER_INFO" echo "$SERVER_INFO"
@@ -69,20 +69,18 @@ echo -e "\n[5/7] Listing mailboxes..."
MAILBOXES=$(mcp_request 4 "tools/call" '{"name":"list_mailboxes","arguments":{}}') MAILBOXES=$(mcp_request 4 "tools/call" '{"name":"list_mailboxes","arguments":{}}')
echo "$MAILBOXES" echo "$MAILBOXES"
# List address books # List contacts
echo -e "\n[6/7] Listing address books..." echo -e "\n[6/7] Listing contacts..."
ADDRESSBOOKS=$(mcp_request 5 "tools/call" '{"name":"list_addressbooks","arguments":{}}') CONTACTS=$(mcp_request 5 "tools/call" '{"name":"list_contacts","arguments":{"limit":10}}')
echo "$ADDRESSBOOKS"
# List contacts (using first addressbook from previous response)
echo -e "\n[7/7] Listing contacts..."
# Extract first addressbook ID from previous response
ADDRESSBOOK_ID=$(echo "$ADDRESSBOOKS" | grep -o '"id":"[^"]*"' | head -1 | cut -d'"' -f4)
if [ -n "$ADDRESSBOOK_ID" ]; then
CONTACTS=$(mcp_request 6 "tools/call" "{\"name\":\"list_contacts\",\"arguments\":{\"addressbook_id\":\"$ADDRESSBOOK_ID\",\"limit\":10}}")
echo "$CONTACTS" echo "$CONTACTS"
# Draft a reply (requires REPLY_EMAIL_ID)
echo -e "\n[7/7] Drafting reply..."
if [ -n "$REPLY_EMAIL_ID" ]; then
DRAFT_REPLY=$(mcp_request 6 "tools/call" "{\"name\":\"save_draft\",\"arguments\":{\"in_reply_to_email_id\":\"$REPLY_EMAIL_ID\",\"in_reply_to_mailbox\":\"INBOX\",\"reply_all\":false,\"body\":\"Test reply draft\"}}")
echo "$DRAFT_REPLY"
else else
echo "No addressbook found to list contacts from" echo "Skipping reply draft: set REPLY_EMAIL_ID to an email UID to test threading"
fi fi
echo -e "\n================================" echo -e "\n================================"