Compare commits

..

9 Commits

Author SHA1 Message Date
1543bc4174 Quote original message in reply drafts
All checks were successful
Build And Test / publish (push) Successful in 49s
2026-01-01 17:01:22 -08:00
64af784998 Use CARDDAV_URL as address book URL
All checks were successful
Build And Test / publish (push) Successful in 49s
2026-01-01 15:56:38 -08:00
bd8e1412e4 Use configured contacts address book
All checks were successful
Build And Test / publish (push) Successful in 47s
2026-01-01 15:55:00 -08:00
5a9ef0e48f Revise contacts and email tools
All checks were successful
Build And Test / publish (push) Successful in 48s
2026-01-01 15:46:44 -08:00
767f076048 Add tool call logging and reply fields
All checks were successful
Build And Test / publish (push) Successful in 49s
2026-01-01 15:24:06 -08:00
7966a4302d Add ICS calendar support
All checks were successful
Build And Test / publish (push) Successful in 48s
2026-01-01 15:06:44 -08:00
71c55f7289 Add reply_email tool and sender override
All checks were successful
Build And Test / publish (push) Successful in 1m0s
2026-01-01 12:16:06 -08:00
0459fb1d4c Rewrite README
All checks were successful
Build And Test / publish (push) Successful in 49s
2025-12-31 13:44:09 -08:00
3b687c1a4c drafts 2025-12-31 13:42:30 -08:00
12 changed files with 1169 additions and 324 deletions

View File

@@ -51,7 +51,7 @@ CALDAV_PASSWORD=your-caldav-password
# - Nextcloud: https://cloud.example.com/remote.php/dav
# - Fastmail: https://carddav.fastmail.com/dav/addressbooks/user/you@fastmail.com
# - Radicale: https://radicale.example.com/user/
CARDDAV_URL=https://carddav.example.com/dav
CARDDAV_URL=https://carddav.example.com/dav/addressbooks/users/user@example.com/contacts/
CARDDAV_USERNAME=user@example.com
CARDDAV_PASSWORD=your-carddav-password

313
README.md
View File

@@ -1,92 +1,56 @@
# PIM MCP Server
# DAV/IMAP MCP Server
A self-hosted MCP server for managing your email, calendar, and contacts through AI assistants.
A self-hosted MCP server that connects IMAP/SMTP, CalDAV, and CardDAV to MCP-compatible clients. It exposes email, calendar, and contacts tools and can optionally send new email notifications to a Poke webhook.
Connect your existing mail server (IMAP/SMTP) and CalDAV/CardDAV services to any MCP-compatible client.
## Features
## Prerequisites
- Email tools over IMAP/SMTP (list, read, search, drafts, send drafts, flags, unsubscribe)
- Calendar tools over CalDAV (list, create, update, delete)
- Contacts tools over CardDAV (list, create, update, delete)
- Optional email notifications via webhook with IMAP IDLE or polling
- SQLite cache with Alembic migrations
- API key auth for the MCP endpoint
- Docker and Docker Compose (recommended), OR Python 3.12+
- An email account with IMAP/SMTP access
- (Optional) A CalDAV server for calendars (Nextcloud, Fastmail, Radicale, etc.)
- (Optional) A CardDAV server for contacts
## Quickstart (Docker Compose)
## Setup
### Option 1: Docker Compose (Recommended)
1. **Clone the repository**
```bash
git clone https://github.com/your-repo/pim-mcp-server.git
cd pim-mcp-server
```
2. **Create your environment file**
1. Copy the environment template:
```bash
cp .env.example .env
```
3. **Edit `.env` with your credentials**
```bash
# Required: Generate an API key
MCP_API_KEY=$(python3 -c "import secrets; print(secrets.token_urlsafe(32))")
2. Edit `.env` with your credentials.
# Then edit .env with your mail server details
nano .env
```
4. **Start the server**
3. Start the server:
```bash
docker compose up -d
```
5. **Verify it's running**
4. Verify it is running:
```bash
curl http://localhost:8000/mcp
```
### Option 2: Local Python
## Local Run (Python)
1. **Clone and setup environment**
```bash
git clone https://github.com/your-repo/pim-mcp-server.git
cd pim-mcp-server
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install -r requirements.txt
```
2. **Configure**
```bash
cp .env.example .env
nano .env # Add your credentials
```
3. **Initialize database**
```bash
# Create initial migration (first time only)
alembic revision --autogenerate -m "Initial tables"
alembic upgrade head
```
4. **Run**
```bash
python src/server.py
```
```bash
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install -r requirements.txt
cp .env.example .env
# Edit .env
python src/server.py
```
## Configuration
Edit `.env` with your service credentials. Only configure the services you want to use.
All settings are read from `.env`. Configure only the services you want to enable.
### Minimal Setup (Email Only)
### Minimal email-only setup
```bash
# Server
MCP_API_KEY=your-secret-key-here
MCP_API_KEY=your-secret-key
PORT=8000
# Email
IMAP_HOST=imap.example.com
IMAP_USERNAME=you@example.com
IMAP_PASSWORD=your-password
@@ -95,121 +59,75 @@ SMTP_USERNAME=you@example.com
SMTP_PASSWORD=your-password
SMTP_FROM_EMAIL=you@example.com
# Disable other services
ENABLE_CALENDAR=false
ENABLE_CONTACTS=false
```
### Full Setup (Email + Calendar + Contacts)
### Full setup (email + calendar + contacts)
```bash
# Server
MCP_API_KEY=your-secret-key-here
MCP_API_KEY=your-secret-key
PORT=8000
# Email (IMAP/SMTP)
IMAP_HOST=imap.example.com
IMAP_PORT=993
IMAP_USERNAME=you@example.com
IMAP_PASSWORD=your-password
IMAP_USE_SSL=true
SMTP_HOST=smtp.example.com
SMTP_PORT=587
SMTP_USERNAME=you@example.com
SMTP_PASSWORD=your-password
SMTP_USE_TLS=true
SMTP_FROM_EMAIL=you@example.com
SMTP_FROM_NAME=Your Name
# Calendar (CalDAV)
CALDAV_URL=https://caldav.example.com/dav
CALDAV_USERNAME=you@example.com
CALDAV_PASSWORD=your-password
ICS_CALENDARS=Team|https://example.com/team.ics,Family|https://example.com/family.ics
ICS_CALENDAR_TIMEOUT=20
ICS_CALENDARS=Team|https://example.com/team.ics,Family|https://example.com/family.ics
ICS_CALENDAR_TIMEOUT=20
# Contacts (CardDAV)
CARDDAV_URL=https://carddav.example.com/dav
CARDDAV_URL=https://carddav.example.com/dav/addressbooks/users/you@example.com/contacts/
CARDDAV_USERNAME=you@example.com
CARDDAV_PASSWORD=your-password
```
### Provider-Specific Examples
Contacts tools always use `CARDDAV_URL` as the full CardDAV address book URL. Listing address books is not exposed via MCP.
<details>
<summary><strong>Fastmail</strong></summary>
ICS calendars are optional and read-only. Set `ICS_CALENDARS` to a comma-separated list of entries, each as `name|url` or just `url` if you want the name inferred.
### Email notifications (Poke webhook)
Enable notifications to send new-email alerts to Poke. The server will use IMAP IDLE when available and fall back to polling.
```bash
IMAP_HOST=imap.fastmail.com
IMAP_PORT=993
SMTP_HOST=smtp.fastmail.com
SMTP_PORT=587
CALDAV_URL=https://caldav.fastmail.com/dav/calendars/user/you@fastmail.com
CARDDAV_URL=https://carddav.fastmail.com/dav/addressbooks/user/you@fastmail.com
ENABLE_EMAIL_NOTIFICATIONS=true
NOTIFICATION_MAILBOXES=INBOX,Updates
NOTIFICATION_POLL_INTERVAL=60
NOTIFICATION_IDLE_TIMEOUT=1680
POKE_WEBHOOK_URL=https://poke.com/api/v1/inbound-sms/webhook
POKE_API_KEY=your-poke-api-key
POKE_WEBHOOK_TIMEOUT=30
POKE_WEBHOOK_MAX_RETRIES=3
```
Use an [app-specific password](https://www.fastmail.help/hc/en-us/articles/360058752854).
</details>
<details>
<summary><strong>Nextcloud</strong></summary>
## MCP Client Setup
```bash
# Use your mail server for IMAP/SMTP
IMAP_HOST=mail.example.com
SMTP_HOST=mail.example.com
# Nextcloud for CalDAV/CardDAV
CALDAV_URL=https://cloud.example.com/remote.php/dav
CARDDAV_URL=https://cloud.example.com/remote.php/dav
CALDAV_USERNAME=your-nextcloud-user
CARDDAV_USERNAME=your-nextcloud-user
```
</details>
<details>
<summary><strong>Gmail</strong></summary>
```bash
IMAP_HOST=imap.gmail.com
IMAP_PORT=993
SMTP_HOST=smtp.gmail.com
SMTP_PORT=587
```
You must use an [App Password](https://support.google.com/accounts/answer/185833) (not your regular password).
Note: Gmail's CalDAV/CardDAV requires OAuth2, which is not currently supported.
</details>
<details>
<summary><strong>Mailcow</strong></summary>
```bash
IMAP_HOST=mail.example.com
SMTP_HOST=mail.example.com
CALDAV_URL=https://mail.example.com/SOGo/dav
CARDDAV_URL=https://mail.example.com/SOGo/dav
```
</details>
<details>
<summary><strong>Radicale (Self-hosted)</strong></summary>
```bash
CALDAV_URL=https://radicale.example.com/user/
CARDDAV_URL=https://radicale.example.com/user/
```
</details>
## Connecting to MCP Clients
### MCP Inspector (Testing)
### MCP Inspector
```bash
npx @modelcontextprotocol/inspector
```
- Transport: **Streamable HTTP**
- Transport: Streamable HTTP
- URL: `http://localhost:8000/mcp`
### Claude Desktop
Add to your Claude Desktop config (`~/.config/claude/claude_desktop_config.json` on Linux/Mac):
```json
{
"mcpServers": {
@@ -223,99 +141,68 @@ Add to your Claude Desktop config (`~/.config/claude/claude_desktop_config.json`
### Poke
Go to [poke.com/settings/connections](https://poke.com/settings/connections) and add your server URL.
Add your MCP endpoint at https://poke.com/settings/connections.
## Available Tools
Once connected, your AI assistant can use these tools:
| Category | Tools |
|----------|-------|
| **Email** | `list_mailboxes`, `list_emails`, `read_email`, `search_emails`, `move_email`, `delete_email`, `send_email` |
| **Calendar** | `list_calendars`, `list_events`, `get_event`, `create_event`, `update_event`, `delete_event` |
| **Contacts** | `list_addressbooks`, `list_contacts`, `get_contact`, `create_contact`, `update_contact`, `delete_contact` |
| **System** | `get_server_info` |
| --- | --- |
| Email | `list_mailboxes`, `list_emails`, `list_drafts`, `read_email`, `search_emails`, `move_email`, `delete_email`, `delete_draft`, `save_draft`, `edit_draft`, `send_draft`, `set_email_flags`, `unsubscribe_maillist` |
| Calendar | `list_calendars`, `list_events`, `get_event`, `create_event`, `update_event`, `delete_event` |
| Contacts | `list_contacts`, `get_contact`, `create_contact`, `update_contact`, `delete_contact` |
| System | `get_server_info` |
## Database Migrations
### Sending email
The server uses SQLModel with Alembic for database migrations.
Emails are sent only from drafts. Create or edit a draft with `save_draft`/`edit_draft`, then send it with `send_draft` using the returned draft ID.
### When you update the code
### Replying to an email
```bash
# Pull latest changes
git pull
Use `in_reply_to_email_id` on `save_draft` or `edit_draft` to create a reply without a separate tool. The draft includes reply headers and a quoted original message so webmail clients can preserve threading on send. Then send it with `send_draft`.
# Run any new migrations
alembic upgrade head
- Provide `in_reply_to_email_id` (and optionally `in_reply_to_mailbox`, default `INBOX`).
- `reply_all=true` includes original recipients; otherwise it replies to the sender/Reply-To.
- If `to`/`subject` are omitted, they are derived from the original email; `body` is still required.
- `in_reply_to_email_id` is the email UID from `list_emails`/`read_email`, not the RFC Message-ID header.
# Restart
docker compose restart
# or: python src/server.py
Example (send a reply):
```json
{
"tool": "save_draft",
"args": {
"in_reply_to_email_id": "12345",
"in_reply_to_mailbox": "INBOX",
"reply_all": true,
"body": "Thanks — sounds good to me."
}
}
```
### Adding custom fields
Then send the draft by its returned ID:
```json
{
"tool": "send_draft",
"args": {
"email_id": "67890"
}
}
```
1. Edit `src/database/models.py`
2. Generate migration: `alembic revision --autogenerate -m "Description"`
3. Apply: `alembic upgrade head`
## Database and Migrations
The server uses SQLite (default: `/data/cache.db`) and Alembic.
```bash
alembic revision --autogenerate -m "Describe change"
alembic upgrade head
```
## Troubleshooting
### Connection refused
- Check the server is running: `docker compose ps` or `curl localhost:8000/mcp`
- Check logs: `docker compose logs -f`
### Authentication failed (IMAP/SMTP)
- Verify credentials in `.env`
- Many providers require app-specific passwords (Gmail, Fastmail, etc.)
- Check if 2FA is enabled on your account
### CalDAV/CardDAV not working
- Verify the URL is correct (try opening it in a browser)
- Check if your provider requires a specific URL format
- Some providers need the full path including username
### Service disabled
If you see "service: disabled (not configured)", check that:
- All required env vars are set (host, username, password)
- `ENABLE_*` is not set to `false`
### View logs
```bash
# Docker
docker compose logs -f
# Local
# Logs print to stdout
```
## Project Structure
```
├── src/
│ ├── server.py # Entry point
│ ├── config.py # Environment configuration
│ ├── database/ # SQLModel ORM
│ │ ├── models.py # Table definitions
│ │ └── connection.py # Database connection
│ ├── models/ # Pydantic models (API)
│ ├── services/ # Business logic
│ └── tools/ # MCP tool definitions
├── migrations/ # Alembic migrations
├── docker-compose.yml
├── Dockerfile
├── .env.example
└── requirements.txt
```
## Security Notes
- Never commit `.env` files
- Use app-specific passwords where available
- The Docker container runs as non-root
- Consider running behind a reverse proxy with HTTPS for remote access
- The `MCP_API_KEY` is optional but recommended for production
- Connection refused: check `docker compose ps` or `curl http://localhost:8000/mcp`
- Auth errors: confirm `MCP_API_KEY` and client config
- IMAP/SMTP failures: verify credentials and app-specific passwords
- CalDAV/CardDAV failures: confirm base URL and username
## License

View File

@@ -33,6 +33,8 @@ class Settings(BaseSettings):
caldav_url: Optional[str] = Field(default=None, alias="CALDAV_URL")
caldav_username: Optional[str] = Field(default=None, alias="CALDAV_USERNAME")
caldav_password: Optional[SecretStr] = Field(default=None, alias="CALDAV_PASSWORD")
ics_calendars: Optional[str] = Field(default=None, alias="ICS_CALENDARS")
ics_calendar_timeout: int = Field(default=20, alias="ICS_CALENDAR_TIMEOUT")
# CardDAV Configuration
carddav_url: Optional[str] = Field(default=None, alias="CARDDAV_URL")
@@ -107,7 +109,7 @@ class Settings(BaseSettings):
self.smtp_from_email,
])
def is_calendar_configured(self) -> bool:
def is_caldav_configured(self) -> bool:
return all([
self.enable_calendar,
self.caldav_url,
@@ -115,6 +117,33 @@ class Settings(BaseSettings):
self.caldav_password,
])
def is_calendar_configured(self) -> bool:
return all([
self.enable_calendar,
(self.is_caldav_configured() or self.get_ics_calendars()),
])
def get_ics_calendars(self) -> list[tuple[Optional[str], str]]:
if not self.ics_calendars:
return []
calendars: list[tuple[Optional[str], str]] = []
for entry in self.ics_calendars.split(","):
item = entry.strip()
if not item:
continue
if "|" in item:
name, url = item.split("|", 1)
name = name.strip() or None
url = url.strip()
else:
name = None
url = item
if url:
calendars.append((name, url))
return calendars
def is_contacts_configured(self) -> bool:
return all([
self.enable_contacts,

View File

@@ -19,6 +19,7 @@ from fastmcp import FastMCP
from config import settings
from database import init_db, close_db
from tools.logging_utils import log_tool_call
# Configure logging
logging.basicConfig(
@@ -53,7 +54,10 @@ def setup_services():
if settings.is_calendar_configured():
from services.calendar_service import CalendarService
calendar_service = CalendarService(settings)
if settings.is_caldav_configured():
print(f" Calendar service: enabled (CalDAV: {settings.caldav_url})")
else:
print(" Calendar service: enabled (ICS calendars only)")
else:
print(" Calendar service: disabled (not configured)")
@@ -101,6 +105,7 @@ def register_tools():
# Server info tool (always available)
@mcp.tool(description="Get information about this PIM MCP server including enabled services and version.")
@log_tool_call
def get_server_info() -> dict:
"""Get server information and status."""
return {
@@ -116,6 +121,7 @@ def get_server_info() -> dict:
"calendar": {
"enabled": calendar_service is not None,
"caldav_url": settings.caldav_url if calendar_service else None,
"ics_calendars": [c[1] for c in settings.get_ics_calendars()] if calendar_service else [],
},
"contacts": {
"enabled": contacts_service is not None,

View File

@@ -1,8 +1,10 @@
from datetime import datetime, timedelta
from typing import Optional
import uuid
from urllib.parse import urlparse
import caldav
import httpx
from icalendar import Calendar as iCalendar, Event as iEvent, vText
from dateutil.parser import parse as parse_date
from dateutil.rrule import rrulestr
@@ -24,8 +26,39 @@ class CalendarService:
self.settings = settings
self._client: Optional[caldav.DAVClient] = None
self._principal = None
self._ics_calendars = self._load_ics_calendars()
def _load_ics_calendars(self) -> list[dict]:
calendars = []
for idx, (name, url) in enumerate(self.settings.get_ics_calendars()):
cal_id = f"ics:{url}"
calendars.append(
{
"id": cal_id,
"name": name or self._derive_ics_name(url, idx),
"url": url,
}
)
return calendars
def _derive_ics_name(self, url: str, fallback_index: int) -> str:
parsed = urlparse(url)
if parsed.path and parsed.path != "/":
return parsed.path.rstrip("/").split("/")[-1] or f"ICS Calendar {fallback_index + 1}"
return parsed.netloc or f"ICS Calendar {fallback_index + 1}"
def _is_ics_calendar(self, calendar_id: str) -> bool:
return calendar_id.startswith("ics:")
def _get_ics_calendar(self, calendar_id: str) -> Optional[dict]:
for cal in self._ics_calendars:
if cal["id"] == calendar_id:
return cal
return None
def _get_client(self) -> caldav.DAVClient:
if not self.settings.is_caldav_configured():
raise ValueError("CalDAV is not configured")
if self._client is None:
self._client = caldav.DAVClient(
url=self.settings.caldav_url,
@@ -40,10 +73,11 @@ class CalendarService:
return self._principal
def list_calendars(self) -> list[Calendar]:
result = []
if self.settings.is_caldav_configured():
principal = self._get_principal()
calendars = principal.calendars()
result = []
for cal in calendars:
props = cal.get_properties([caldav.dav.DisplayName()])
name = props.get("{DAV:}displayname", cal.name or "Unnamed")
@@ -58,6 +92,17 @@ class CalendarService:
)
)
for ics_cal in self._ics_calendars:
result.append(
Calendar(
id=ics_cal["id"],
name=ics_cal["name"],
color=None,
description=None,
is_readonly=True,
)
)
return result
def _get_calendar_by_id(self, calendar_id: str) -> caldav.Calendar:
@@ -77,6 +122,9 @@ class CalendarService:
end_date: str,
include_recurring: bool = True,
) -> EventList:
if self._is_ics_calendar(calendar_id):
return self._list_ics_events(calendar_id, start_date, end_date, include_recurring)
calendar = self._get_calendar_by_id(calendar_id)
start = parse_date(start_date)
@@ -101,6 +149,9 @@ class CalendarService:
)
def get_event(self, calendar_id: str, event_id: str) -> Optional[Event]:
if self._is_ics_calendar(calendar_id):
return self._get_ics_event(calendar_id, event_id)
calendar = self._get_calendar_by_id(calendar_id)
try:
@@ -127,6 +178,9 @@ class CalendarService:
reminders: Optional[list[int]] = None,
recurrence: Optional[str] = None,
) -> Event:
if self._is_ics_calendar(calendar_id):
raise ValueError("ICS calendars are read-only")
calendar = self._get_calendar_by_id(calendar_id)
# Create iCalendar event
@@ -184,6 +238,9 @@ class CalendarService:
location: Optional[str] = None,
attendees: Optional[list[str]] = None,
) -> Optional[Event]:
if self._is_ics_calendar(calendar_id):
raise ValueError("ICS calendars are read-only")
calendar = self._get_calendar_by_id(calendar_id)
# Find the event
@@ -224,6 +281,13 @@ class CalendarService:
def delete_event(
self, calendar_id: str, event_id: str, notify_attendees: bool = True
) -> OperationResult:
if self._is_ics_calendar(calendar_id):
return OperationResult(
success=False,
message="ICS calendars are read-only",
id=event_id,
)
try:
calendar = self._get_calendar_by_id(calendar_id)
@@ -314,3 +378,186 @@ class CalendarService:
return None
return None
def _list_ics_events(
self,
calendar_id: str,
start_date: str,
end_date: str,
include_recurring: bool,
) -> EventList:
ics_calendar = self._get_ics_calendar(calendar_id)
if not ics_calendar:
raise ValueError(f"Calendar not found: {calendar_id}")
start = parse_date(start_date)
end = parse_date(end_date)
ical = self._fetch_ics_calendar(ics_calendar["url"])
events: list[Event] = []
for component in ical.walk():
if component.name != "VEVENT":
continue
parsed_events = self._parse_ics_component(
component, calendar_id, start, end, include_recurring
)
events.extend(parsed_events)
events.sort(key=lambda e: e.start)
return EventList(
events=events,
calendar_id=calendar_id,
start_date=start_date,
end_date=end_date,
total=len(events),
)
def _get_ics_event(self, calendar_id: str, event_id: str) -> Optional[Event]:
ics_calendar = self._get_ics_calendar(calendar_id)
if not ics_calendar:
raise ValueError(f"Calendar not found: {calendar_id}")
ical = self._fetch_ics_calendar(ics_calendar["url"])
for component in ical.walk():
if component.name != "VEVENT":
continue
uid = str(component.get("uid", ""))
if uid == event_id:
events = self._parse_ics_component(
component,
calendar_id,
datetime.min,
datetime.max,
include_recurring=False,
)
return events[0] if events else None
return None
def _fetch_ics_calendar(self, url: str) -> iCalendar:
response = httpx.get(url, timeout=self.settings.ics_calendar_timeout)
response.raise_for_status()
return iCalendar.from_ical(response.text)
def _parse_ics_component(
self,
component,
calendar_id: str,
range_start: datetime,
range_end: datetime,
include_recurring: bool,
) -> list[Event]:
base_event = self._build_event_from_component(component, calendar_id)
if not base_event:
return []
range_start_cmp = range_start
range_end_cmp = range_end
if base_event.start.tzinfo and range_start.tzinfo is None:
range_start_cmp = range_start.replace(tzinfo=base_event.start.tzinfo)
range_end_cmp = range_end.replace(tzinfo=base_event.start.tzinfo)
elif base_event.start.tzinfo is None and range_start.tzinfo is not None:
range_start_cmp = range_start.replace(tzinfo=None)
range_end_cmp = range_end.replace(tzinfo=None)
if not include_recurring or not base_event.recurrence_rule:
if base_event.start <= range_end_cmp and base_event.end >= range_start_cmp:
return [base_event]
return []
dtstart = base_event.start
duration = base_event.end - base_event.start
if duration.total_seconds() <= 0:
duration = timedelta(hours=1)
rrule = rrulestr(base_event.recurrence_rule, dtstart=dtstart)
occurrences = rrule.between(range_start_cmp, range_end_cmp, inc=True)
excluded = self._extract_exdates(component)
events = []
for occ_start in occurrences:
if occ_start in excluded:
continue
occ_end = occ_start + duration
occurrence = base_event.model_copy()
occurrence.start = occ_start
occurrence.end = occ_end
events.append(occurrence)
return events
def _extract_exdates(self, component) -> set[datetime]:
exdates: set[datetime] = set()
exdate_prop = component.get("exdate")
if not exdate_prop:
return exdates
exdate_list = exdate_prop if isinstance(exdate_prop, list) else [exdate_prop]
for exdate in exdate_list:
dates = getattr(exdate, "dts", [])
for dt in dates:
if isinstance(dt.dt, datetime):
exdates.add(dt.dt)
else:
exdates.add(datetime.combine(dt.dt, datetime.min.time()))
return exdates
def _build_event_from_component(self, component, calendar_id: str) -> Optional[Event]:
try:
uid = str(component.get("uid", ""))
dtstart = component.get("dtstart")
dtend = component.get("dtend")
start = dtstart.dt if dtstart else datetime.now()
end = dtend.dt if dtend else start + timedelta(hours=1)
all_day = False
if not isinstance(start, datetime):
all_day = True
start = datetime.combine(start, datetime.min.time())
if not isinstance(end, datetime):
end = datetime.combine(end, datetime.min.time())
status_str = str(component.get("status", "CONFIRMED")).upper()
status = EventStatus.CONFIRMED
if status_str == "TENTATIVE":
status = EventStatus.TENTATIVE
elif status_str == "CANCELLED":
status = EventStatus.CANCELLED
attendees = []
for attendee in component.get("attendee", []):
if isinstance(attendee, list):
for a in attendee:
email = str(a).replace("mailto:", "")
attendees.append(Attendee(email=email))
else:
email = str(attendee).replace("mailto:", "")
attendees.append(Attendee(email=email))
rrule = component.get("rrule")
recurrence_rule = None
if rrule:
recurrence_rule = rrule.to_ical().decode("utf-8")
return Event(
id=uid,
calendar_id=calendar_id,
title=str(component.get("summary", "Untitled")),
start=start,
end=end,
all_day=all_day,
description=str(component.get("description", "")) or None,
location=str(component.get("location", "")) or None,
status=status,
attendees=attendees,
recurrence_rule=recurrence_rule,
organizer=str(component.get("organizer", "")).replace("mailto:", "") or None,
)
except Exception as e:
print(f"Error parsing ICS event: {e}")
return None

View File

@@ -114,12 +114,13 @@ class ContactsService:
def list_contacts(
self,
addressbook_id: str,
addressbook_id: Optional[str] = None,
search: Optional[str] = None,
limit: int = 100,
offset: int = 0,
) -> ContactList:
client = self._get_client()
addressbook_id = self._resolve_addressbook_id(addressbook_id)
# Build URL
addressbook_url = self._build_url(addressbook_id)
@@ -188,8 +189,9 @@ class ContactsService:
offset=offset,
)
def get_contact(self, addressbook_id: str, contact_id: str) -> Optional[Contact]:
def get_contact(self, contact_id: str, addressbook_id: Optional[str] = None) -> Optional[Contact]:
client = self._get_client()
addressbook_id = self._resolve_addressbook_id(addressbook_id)
# Build URL
contact_url = self._build_url(contact_id)
@@ -206,7 +208,7 @@ class ContactsService:
def create_contact(
self,
addressbook_id: str,
addressbook_id: Optional[str] = None,
first_name: Optional[str] = None,
last_name: Optional[str] = None,
display_name: Optional[str] = None,
@@ -219,6 +221,7 @@ class ContactsService:
birthday: Optional[str] = None,
) -> Contact:
client = self._get_client()
addressbook_id = self._resolve_addressbook_id(addressbook_id)
# Create vCard
vcard = vobject.vCard()
@@ -311,8 +314,8 @@ class ContactsService:
def update_contact(
self,
addressbook_id: str,
contact_id: str,
addressbook_id: Optional[str] = None,
first_name: Optional[str] = None,
last_name: Optional[str] = None,
display_name: Optional[str] = None,
@@ -324,7 +327,7 @@ class ContactsService:
notes: Optional[str] = None,
) -> Optional[Contact]:
# Get existing contact
existing = self.get_contact(addressbook_id, contact_id)
existing = self.get_contact(contact_id, addressbook_id)
if not existing:
return None
@@ -342,10 +345,10 @@ class ContactsService:
}
# Delete and recreate (simpler than partial update)
self.delete_contact(addressbook_id, contact_id)
self.delete_contact(contact_id, addressbook_id)
return self.create_contact(addressbook_id, **updated_data)
def delete_contact(self, addressbook_id: str, contact_id: str) -> OperationResult:
def delete_contact(self, contact_id: str, addressbook_id: Optional[str] = None) -> OperationResult:
try:
client = self._get_client()
@@ -371,7 +374,7 @@ class ContactsService:
return OperationResult(success=False, message=str(e))
def _parse_vcard(
self, vcard_data: str, addressbook_id: str, href: str
self, vcard_data: str, addressbook_id: Optional[str], href: str
) -> Optional[Contact]:
try:
vcard = vobject.readOne(vcard_data)
@@ -467,9 +470,10 @@ class ContactsService:
except Exception:
pass
resolved_addressbook_id = addressbook_id or self._derive_addressbook_id(href)
return Contact(
id=href,
addressbook_id=addressbook_id,
addressbook_id=resolved_addressbook_id,
first_name=first_name,
last_name=last_name,
display_name=display_name,
@@ -481,3 +485,16 @@ class ContactsService:
notes=notes,
birthday=birthday,
)
def _derive_addressbook_id(self, contact_href: str) -> str:
if "/" not in contact_href:
return contact_href
base = contact_href.rsplit("/", 1)[0]
return f"{base}/"
def _resolve_addressbook_id(self, addressbook_id: Optional[str]) -> str:
if addressbook_id:
return addressbook_id
if self.settings.carddav_url:
return self.settings.carddav_url
raise ValueError("CARDDAV_URL must be set to use contacts tools")

View File

@@ -1,8 +1,9 @@
import email
import html
from email.header import decode_header
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formataddr, parseaddr
from email.utils import formataddr, parseaddr, formatdate, make_msgid
from datetime import datetime
from typing import Optional
import re
@@ -200,6 +201,25 @@ class EmailService:
emails=emails, total=total, mailbox=mailbox, limit=limit, offset=offset
)
def list_drafts(
self,
mailbox: Optional[str] = None,
limit: int = 50,
offset: int = 0,
include_body: bool = False,
) -> EmailList:
draft_mailbox = mailbox or self._find_drafts_folder() or "Drafts"
try:
return self.list_emails(draft_mailbox, limit, offset, include_body)
except Exception:
return EmailList(
emails=[],
total=0,
mailbox=draft_mailbox,
limit=limit,
offset=offset,
)
def read_email(
self, mailbox: str, email_id: str, format: str = "text"
) -> Optional[Email]:
@@ -241,7 +261,7 @@ class EmailService:
# Get headers
headers = {}
for key in ["Message-ID", "In-Reply-To", "References", "X-Priority", "List-Unsubscribe", "List-Unsubscribe-Post"]:
for key in ["Message-ID", "In-Reply-To", "References", "Reply-To", "X-Priority", "List-Unsubscribe", "List-Unsubscribe-Post"]:
value = msg.get(key)
if value:
headers[key] = decode_mime_header(value)
@@ -406,28 +426,238 @@ class EmailService:
except Exception as e:
return OperationResult(success=False, message=str(e))
async def send_email(
def delete_draft(
self, email_id: str, mailbox: Optional[str] = None, permanent: bool = False
) -> OperationResult:
draft_mailbox = mailbox or self._find_drafts_folder() or "Drafts"
return self.delete_email(email_id, draft_mailbox, permanent)
def save_draft(
self,
to: list[str],
subject: str,
body: str,
to: Optional[list[str]] = None,
subject: Optional[str] = None,
body: Optional[str] = None,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
reply_to: Optional[str] = None,
html_body: Optional[str] = None,
mailbox: Optional[str] = None,
in_reply_to_email_id: Optional[str] = None,
in_reply_to_mailbox: Optional[str] = None,
reply_all: bool = False,
) -> OperationResult:
try:
draft_mailbox = mailbox or self._find_drafts_folder() or "Drafts"
if in_reply_to_email_id:
context, error = self._get_reply_context(
in_reply_to_mailbox or "INBOX",
in_reply_to_email_id,
reply_all,
cc,
self.settings.smtp_from_email,
)
if error:
return error
if not to:
to = context["to"]
if subject is None:
subject = context["subject"]
if cc is None:
cc = context["cc"]
in_reply_to = context["in_reply_to"]
references = context["references"]
original = context["original"]
else:
in_reply_to = None
references = None
original = None
if not to:
return OperationResult(success=False, message="'to' is required for drafts")
if subject is None:
return OperationResult(success=False, message="'subject' is required for drafts")
if body is None:
return OperationResult(success=False, message="'body' is required for drafts")
if original:
body, html_body = self._build_reply_bodies(original, body)
msg = self._build_draft_message(
to=to,
subject=subject,
body=body,
cc=cc,
bcc=bcc,
html_body=html_body,
in_reply_to=in_reply_to,
references=references,
)
client = self._get_imap_client()
append_result = client.append(
draft_mailbox,
msg.as_bytes(),
flags=["\\Draft"],
)
draft_id = None
if append_result and isinstance(append_result, tuple) and len(append_result) > 1:
draft_id = str(append_result[1])
return OperationResult(
success=True,
message=f"Draft saved to {draft_mailbox}",
id=draft_id,
)
except Exception as e:
return OperationResult(success=False, message=str(e))
def update_draft(
self,
email_id: str,
mailbox: Optional[str] = None,
to: Optional[list[str]] = None,
subject: Optional[str] = None,
body: Optional[str] = None,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
html_body: Optional[str] = None,
in_reply_to_email_id: Optional[str] = None,
in_reply_to_mailbox: Optional[str] = None,
reply_all: bool = False,
) -> OperationResult:
draft_mailbox = mailbox or self._find_drafts_folder() or "Drafts"
try:
existing = self.read_email(draft_mailbox, email_id, format="both")
except Exception as e:
return OperationResult(success=False, message=str(e), id=email_id)
if not existing:
return OperationResult(
success=False,
message=f"Draft {email_id} not found in {draft_mailbox}",
id=email_id,
)
resolved_to = to if to is not None else [addr.email for addr in existing.to_addresses]
resolved_cc = cc if cc is not None else [addr.email for addr in existing.cc_addresses]
resolved_bcc = bcc if bcc is not None else [addr.email for addr in existing.bcc_addresses]
resolved_subject = subject if subject is not None else existing.subject
resolved_body = body if body is not None else (existing.body_text or "")
resolved_html = html_body if html_body is not None else existing.body_html
if in_reply_to_email_id:
context, error = self._get_reply_context(
in_reply_to_mailbox or "INBOX",
in_reply_to_email_id,
reply_all,
cc,
self.settings.smtp_from_email,
)
if error:
return error
if to is None:
resolved_to = context["to"]
if subject is None:
resolved_subject = context["subject"]
if cc is None:
resolved_cc = context["cc"]
in_reply_to = context["in_reply_to"]
references = context["references"]
original = context["original"]
else:
in_reply_to = None
references = None
original = None
try:
client = self._get_imap_client()
client.select_folder(draft_mailbox)
uid = int(email_id)
client.delete_messages([uid])
client.expunge()
if original:
resolved_body, resolved_html = self._build_reply_bodies(original, resolved_body)
msg = self._build_draft_message(
to=resolved_to,
subject=resolved_subject,
body=resolved_body,
cc=resolved_cc,
bcc=resolved_bcc,
html_body=resolved_html,
in_reply_to=in_reply_to,
references=references,
)
append_result = client.append(
draft_mailbox,
msg.as_bytes(),
flags=["\\Draft"],
)
draft_id = None
if append_result and isinstance(append_result, tuple) and len(append_result) > 1:
draft_id = str(append_result[1])
return OperationResult(
success=True,
message=f"Draft {email_id} updated in {draft_mailbox}",
id=draft_id or email_id,
)
except Exception as e:
return OperationResult(success=False, message=str(e), id=email_id)
async def send_email(
self,
to: Optional[list[str]] = None,
subject: Optional[str] = None,
body: Optional[str] = None,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
html_body: Optional[str] = None,
sender_email: Optional[str] = None,
sender_name: Optional[str] = None,
in_reply_to: Optional[str] = None,
references: Optional[list[str]] = None,
in_reply_to_email_id: Optional[str] = None,
in_reply_to_mailbox: Optional[str] = None,
reply_all: bool = False,
) -> OperationResult:
try:
if in_reply_to_email_id:
resolved_name, resolved_email = self._resolve_sender(sender_email, sender_name)
context, error = self._get_reply_context(
in_reply_to_mailbox or "INBOX",
in_reply_to_email_id,
reply_all,
cc,
resolved_email,
)
if error:
return error
if not to:
to = context["to"]
if subject is None:
subject = context["subject"]
if cc is None:
cc = context["cc"]
in_reply_to = context["in_reply_to"]
references = context["references"]
if not to:
return OperationResult(success=False, message="'to' is required to send email")
if subject is None:
return OperationResult(success=False, message="'subject' is required to send email")
if body is None:
return OperationResult(success=False, message="'body' is required to send email")
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = formataddr(
(self.settings.smtp_from_name or "", self.settings.smtp_from_email)
)
resolved_name, resolved_email = self._resolve_sender(sender_email, sender_name)
msg["From"] = formataddr((resolved_name or "", resolved_email))
msg["To"] = ", ".join(to)
if cc:
msg["Cc"] = ", ".join(cc)
if reply_to:
msg["Reply-To"] = reply_to
if in_reply_to:
msg["In-Reply-To"] = in_reply_to
if references:
msg["References"] = " ".join(references)
# Add plain text body
msg.attach(MIMEText(body, "plain", "utf-8"))
@@ -461,6 +691,86 @@ class EmailService:
except Exception as e:
return OperationResult(success=False, message=str(e))
async def send_draft(
self,
email_id: str,
mailbox: Optional[str] = None,
) -> OperationResult:
draft_mailbox = mailbox or self._find_drafts_folder() or "Drafts"
try:
draft = self.read_email(draft_mailbox, email_id, format="both")
except Exception as e:
return OperationResult(success=False, message=str(e), id=email_id)
if not draft:
return OperationResult(
success=False,
message=f"Draft {email_id} not found in {draft_mailbox}",
id=email_id,
)
to = [addr.email for addr in draft.to_addresses]
cc = [addr.email for addr in draft.cc_addresses]
bcc = [addr.email for addr in draft.bcc_addresses]
if not to and not cc and not bcc:
return OperationResult(
success=False,
message="Draft has no recipients",
id=email_id,
)
subject = draft.subject or "(No Subject)"
body = draft.body_text or ""
html_body = draft.body_html
result = await self.send_email(
to=to or None,
subject=subject,
body=body,
cc=cc or None,
bcc=bcc or None,
html_body=html_body,
in_reply_to=draft.in_reply_to,
references=draft.references or None,
)
if not result.success:
return result
try:
client = self._get_imap_client()
client.select_folder(draft_mailbox)
client.delete_messages([int(email_id)])
client.expunge()
except Exception:
pass
return result
async def reply_email(
self,
mailbox: str,
email_id: str,
body: str,
reply_all: bool = False,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
html_body: Optional[str] = None,
sender_email: Optional[str] = None,
sender_name: Optional[str] = None,
) -> OperationResult:
return await self.send_email(
body=body,
bcc=bcc,
html_body=html_body,
sender_email=sender_email,
sender_name=sender_name,
in_reply_to_email_id=email_id,
in_reply_to_mailbox=mailbox,
reply_all=reply_all,
cc=cc,
)
def _parse_envelope_addresses(self, addresses) -> list[EmailAddress]:
if not addresses:
return []
@@ -575,6 +885,179 @@ class EmailService:
return name
return None
def _find_drafts_folder(self) -> Optional[str]:
client = self._get_imap_client()
folders = client.list_folders()
draft_names = ["Drafts", "Draft", "INBOX.Drafts", "[Gmail]/Drafts"]
for flags, delimiter, name in folders:
if name in draft_names or b"\\Drafts" in flags:
return name
return None
def _build_draft_message(
self,
to: list[str],
subject: str,
body: str,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
html_body: Optional[str] = None,
in_reply_to: Optional[str] = None,
references: Optional[list[str]] = None,
) -> MIMEMultipart:
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = formataddr(
(self.settings.smtp_from_name or "", self.settings.smtp_from_email)
)
msg["To"] = ", ".join(to)
msg["Date"] = formatdate(localtime=True)
msg["Message-ID"] = make_msgid()
if cc:
msg["Cc"] = ", ".join(cc)
if bcc:
msg["Bcc"] = ", ".join(bcc)
if in_reply_to:
msg["In-Reply-To"] = in_reply_to
if references:
msg["References"] = " ".join(references)
msg.attach(MIMEText(body or "", "plain", "utf-8"))
if html_body:
msg.attach(MIMEText(html_body, "html", "utf-8"))
return msg
def _get_reply_context(
self,
mailbox: str,
email_id: str,
reply_all: bool,
cc: Optional[list[str]],
sender_email: Optional[str],
) -> tuple[dict, Optional[OperationResult]]:
original = self.read_email(mailbox, email_id, format="both")
if not original:
return {}, OperationResult(
success=False,
message=f"Email {email_id} not found in {mailbox}",
id=email_id,
)
reply_to_header = original.headers.get("Reply-To")
reply_to_email = None
if reply_to_header:
_, reply_to_email = parseaddr(reply_to_header)
if not reply_to_email:
reply_to_email = original.from_address.email
to = [reply_to_email] if reply_to_email else []
reply_cc: list[str] = []
if reply_all:
for addr in original.to_addresses + original.cc_addresses:
if addr.email and addr.email not in to:
reply_cc.append(addr.email)
if cc:
reply_cc.extend(cc)
to = self._dedupe_emails(to, sender_email)
reply_cc = self._dedupe_emails(reply_cc, sender_email)
if not to and reply_cc:
to = [reply_cc.pop(0)]
if not to:
return {}, OperationResult(
success=False,
message="No valid recipients found for reply",
id=email_id,
)
subject = original.subject or "(No Subject)"
if not subject.lower().startswith("re:"):
subject = f"Re: {subject}"
in_reply_to = original.headers.get("Message-ID") or original.in_reply_to
references = list(original.references)
if in_reply_to and in_reply_to not in references:
references.append(in_reply_to)
return {
"original": original,
"to": to,
"cc": reply_cc or None,
"subject": subject,
"in_reply_to": in_reply_to,
"references": references or None,
}, None
def _build_reply_bodies(self, original: Email, body_text: str) -> tuple[str, Optional[str]]:
intro = self._format_reply_intro(original)
quoted_text = original.body_text or ""
text = body_text or ""
if intro:
text = f"{text}\n\n{intro}\n\n{quoted_text}".rstrip()
html_body = None
quoted_html = original.body_html
if not quoted_html and quoted_text:
quoted_html = html.escape(quoted_text).replace("\n", "<br/>")
if quoted_html:
cite = original.headers.get("Message-ID") or original.in_reply_to or ""
cite_attr = f' cite="{html.escape(cite)}"' if cite else ""
html_intro = html.escape(intro).replace("\n", "<br/>")
html_body = (
f"<div>{html.escape(body_text).replace('\\n', '<br/>')}</div>"
f"<br/><br/>{html_intro}<br/><br/>"
f"<blockquote type=\"cite\"{cite_attr}>"
f"<div dir=\"ltr\">{quoted_html}</div>"
f"</blockquote><br/><br/><br/>"
)
return text, html_body
def _format_reply_intro(self, original: Email) -> str:
date_str = ""
if original.date:
try:
date_str = original.date.strftime("%A, %B %d, %Y %H:%M %Z").strip()
except Exception:
date_str = str(original.date)
from_name = original.from_address.name or original.from_address.email
from_email = original.from_address.email
if date_str:
return f"On {date_str}, {from_name} <{from_email}> wrote:"
return f"On {from_name} <{from_email}> wrote:"
def _resolve_sender(
self, sender_email: Optional[str], sender_name: Optional[str]
) -> tuple[Optional[str], str]:
if sender_email:
return sender_name, sender_email
if sender_name:
name, email_addr = parseaddr(sender_name)
if email_addr:
return name or None, email_addr
return self.settings.smtp_from_name, self.settings.smtp_from_email
def _dedupe_emails(self, emails: list[str], self_email: Optional[str]) -> list[str]:
seen = set()
cleaned = []
for addr in emails:
if not addr:
continue
addr_lower = addr.lower()
if self_email and addr_lower == self_email.lower():
continue
if addr_lower in seen:
continue
seen.add(addr_lower)
cleaned.append(addr)
return cleaned
def set_flags(
self,
email_id: str,

View File

@@ -2,18 +2,21 @@ from typing import Optional
from fastmcp import FastMCP
from services.calendar_service import CalendarService
from tools.logging_utils import log_tool_call
def register_calendar_tools(mcp: FastMCP, service: CalendarService):
"""Register all calendar-related MCP tools."""
@mcp.tool(description="List all available calendars from the CalDAV server. Returns calendar ID, name, and properties.")
@mcp.tool(description="List all available calendars from CalDAV and configured ICS feeds. Returns calendar ID, name, and properties.")
@log_tool_call
def list_calendars() -> list[dict]:
"""List all calendars."""
calendars = service.list_calendars()
return [c.model_dump() for c in calendars]
@mcp.tool(description="List events within a date range. If no calendar is specified, lists events from all calendars.")
@log_tool_call
def list_events(
start_date: str,
end_date: str,
@@ -26,7 +29,7 @@ def register_calendar_tools(mcp: FastMCP, service: CalendarService):
Args:
start_date: Start of date range (ISO format: YYYY-MM-DD)
end_date: End of date range (ISO format: YYYY-MM-DD)
calendar_id: The calendar ID (URL) to query. If not provided, lists from all calendars.
calendar_id: The calendar ID (CalDAV URL or ICS ID) to query. If not provided, lists from all calendars.
include_recurring: Whether to expand recurring events (default: True)
"""
if calendar_id:
@@ -59,6 +62,7 @@ def register_calendar_tools(mcp: FastMCP, service: CalendarService):
}
@mcp.tool(description="Get detailed information about a specific calendar event including attendees and recurrence.")
@log_tool_call
def get_event(
calendar_id: str,
event_id: str,
@@ -67,13 +71,14 @@ def register_calendar_tools(mcp: FastMCP, service: CalendarService):
Get a specific event.
Args:
calendar_id: The calendar ID containing the event
calendar_id: The calendar ID (CalDAV URL or ICS ID) containing the event
event_id: The unique ID (UID) of the event
"""
result = service.get_event(calendar_id, event_id)
return result.model_dump() if result else None
@mcp.tool(description="Create a new calendar event with title, time, location, attendees, and optional recurrence.")
@log_tool_call
def create_event(
calendar_id: str,
title: str,
@@ -89,7 +94,7 @@ def register_calendar_tools(mcp: FastMCP, service: CalendarService):
Create a new calendar event.
Args:
calendar_id: The calendar ID to create the event in
calendar_id: The calendar ID to create the event in (CalDAV only)
title: Event title/summary
start: Start datetime (ISO format: YYYY-MM-DDTHH:MM:SS)
end: End datetime (ISO format: YYYY-MM-DDTHH:MM:SS)
@@ -105,6 +110,7 @@ def register_calendar_tools(mcp: FastMCP, service: CalendarService):
return result.model_dump()
@mcp.tool(description="Update an existing calendar event. Only provided fields will be modified.")
@log_tool_call
def update_event(
calendar_id: str,
event_id: str,
@@ -119,7 +125,7 @@ def register_calendar_tools(mcp: FastMCP, service: CalendarService):
Update an existing event.
Args:
calendar_id: The calendar ID containing the event
calendar_id: The calendar ID containing the event (CalDAV only)
event_id: The unique ID of the event to update
title: New event title (optional)
start: New start datetime (optional)
@@ -134,6 +140,7 @@ def register_calendar_tools(mcp: FastMCP, service: CalendarService):
return result.model_dump() if result else None
@mcp.tool(description="Delete a calendar event by ID.")
@log_tool_call
def delete_event(
calendar_id: str,
event_id: str,
@@ -143,7 +150,7 @@ def register_calendar_tools(mcp: FastMCP, service: CalendarService):
Delete a calendar event.
Args:
calendar_id: The calendar ID containing the event
calendar_id: The calendar ID containing the event (CalDAV only)
event_id: The unique ID of the event to delete
notify_attendees: Whether to notify attendees of cancellation (default: True)
"""

View File

@@ -2,54 +2,47 @@ from typing import Optional
from fastmcp import FastMCP
from services.contacts_service import ContactsService
from tools.logging_utils import log_tool_call
def register_contacts_tools(mcp: FastMCP, service: ContactsService):
"""Register all contacts-related MCP tools."""
@mcp.tool(description="List all available address books from the CardDAV server.")
def list_addressbooks() -> list[dict]:
"""List all address books."""
addressbooks = service.list_addressbooks()
return [a.model_dump() for a in addressbooks]
@mcp.tool(description="List contacts in an address book with optional search filtering and pagination.")
@mcp.tool(description="List contacts in the configured address book with optional search filtering and pagination.")
@log_tool_call
def list_contacts(
addressbook_id: str,
search: Optional[str] = None,
limit: int = 100,
offset: int = 0,
) -> dict:
"""
List contacts in an address book.
List contacts in the configured address book.
Args:
addressbook_id: The address book ID (URL path) to query
search: Optional search term to filter contacts by name or email
limit: Maximum number of contacts to return (default: 100)
offset: Number of contacts to skip for pagination (default: 0)
"""
result = service.list_contacts(addressbook_id, search, limit, offset)
result = service.list_contacts(search=search, limit=limit, offset=offset)
return result.model_dump()
@mcp.tool(description="Get detailed information about a specific contact including all fields.")
@log_tool_call
def get_contact(
addressbook_id: str,
contact_id: str,
) -> Optional[dict]:
"""
Get a specific contact.
Args:
addressbook_id: The address book containing the contact
contact_id: The unique ID (URL) of the contact
"""
result = service.get_contact(addressbook_id, contact_id)
result = service.get_contact(contact_id)
return result.model_dump() if result else None
@mcp.tool(description="Create a new contact with name, emails, phones, addresses, and other details.")
@log_tool_call
def create_contact(
addressbook_id: str,
first_name: Optional[str] = None,
last_name: Optional[str] = None,
display_name: Optional[str] = None,
@@ -65,7 +58,6 @@ def register_contacts_tools(mcp: FastMCP, service: ContactsService):
Create a new contact.
Args:
addressbook_id: The address book ID to create the contact in
first_name: Contact's first/given name
last_name: Contact's last/family name
display_name: Full display name (auto-generated if not provided)
@@ -78,23 +70,22 @@ def register_contacts_tools(mcp: FastMCP, service: ContactsService):
birthday: Birthday in ISO format (YYYY-MM-DD)
"""
result = service.create_contact(
addressbook_id,
first_name,
last_name,
display_name,
emails,
phones,
addresses,
organization,
title,
notes,
birthday,
first_name=first_name,
last_name=last_name,
display_name=display_name,
emails=emails,
phones=phones,
addresses=addresses,
organization=organization,
title=title,
notes=notes,
birthday=birthday,
)
return result.model_dump()
@mcp.tool(description="Update an existing contact. Only provided fields will be modified.")
@log_tool_call
def update_contact(
addressbook_id: str,
contact_id: str,
first_name: Optional[str] = None,
last_name: Optional[str] = None,
@@ -110,7 +101,6 @@ def register_contacts_tools(mcp: FastMCP, service: ContactsService):
Update an existing contact.
Args:
addressbook_id: The address book containing the contact
contact_id: The unique ID of the contact to update
first_name: New first name (optional)
last_name: New last name (optional)
@@ -123,31 +113,29 @@ def register_contacts_tools(mcp: FastMCP, service: ContactsService):
notes: New notes (optional)
"""
result = service.update_contact(
addressbook_id,
contact_id,
first_name,
last_name,
display_name,
emails,
phones,
addresses,
organization,
title,
notes,
first_name=first_name,
last_name=last_name,
display_name=display_name,
emails=emails,
phones=phones,
addresses=addresses,
organization=organization,
title=title,
notes=notes,
)
return result.model_dump() if result else None
@mcp.tool(description="Delete a contact from an address book.")
@log_tool_call
def delete_contact(
addressbook_id: str,
contact_id: str,
) -> dict:
"""
Delete a contact.
Args:
addressbook_id: The address book containing the contact
contact_id: The unique ID of the contact to delete
"""
result = service.delete_contact(addressbook_id, contact_id)
result = service.delete_contact(contact_id)
return result.model_dump()

View File

@@ -2,18 +2,21 @@ from typing import Optional
from fastmcp import FastMCP
from services.email_service import EmailService
from tools.logging_utils import log_tool_call
def register_email_tools(mcp: FastMCP, service: EmailService):
"""Register all email-related MCP tools."""
@mcp.tool(description="List all mailboxes/folders in the email account. Returns name, path, message count, and unread count for each mailbox.")
@log_tool_call
def list_mailboxes() -> list[dict]:
"""List all IMAP mailboxes/folders."""
mailboxes = service.list_mailboxes()
return [m.model_dump() for m in mailboxes]
@mcp.tool(description="List emails in a mailbox with pagination. Returns email summaries including subject, from, date, and read status.")
@log_tool_call
def list_emails(
mailbox: str = "INBOX",
limit: int = 50,
@@ -32,7 +35,28 @@ def register_email_tools(mcp: FastMCP, service: EmailService):
result = service.list_emails(mailbox, limit, offset, include_body)
return result.model_dump()
@mcp.tool(description="List draft emails in the Drafts mailbox with pagination.")
@log_tool_call
def list_drafts(
mailbox: Optional[str] = None,
limit: int = 50,
offset: int = 0,
include_body: bool = False,
) -> dict:
"""
List draft emails.
Args:
mailbox: Drafts mailbox/folder override (default: auto-detect)
limit: Maximum number of drafts to return (default: 50)
offset: Number of drafts to skip for pagination (default: 0)
include_body: Whether to include body snippets (default: False)
"""
result = service.list_drafts(mailbox, limit, offset, include_body)
return result.model_dump()
@mcp.tool(description="Read a specific email by ID with full body content and attachment information.")
@log_tool_call
def read_email(
mailbox: str,
email_id: str,
@@ -50,6 +74,7 @@ def register_email_tools(mcp: FastMCP, service: EmailService):
return result.model_dump() if result else None
@mcp.tool(description="Search emails in a mailbox using various criteria like subject, sender, or body content.")
@log_tool_call
def search_emails(
query: str,
mailbox: str = "INBOX",
@@ -75,6 +100,7 @@ def register_email_tools(mcp: FastMCP, service: EmailService):
return result.model_dump()
@mcp.tool(description="Move an email from one mailbox/folder to another.")
@log_tool_call
def move_email(
email_id: str,
source_mailbox: str,
@@ -92,6 +118,7 @@ def register_email_tools(mcp: FastMCP, service: EmailService):
return result.model_dump()
@mcp.tool(description="Delete an email, either moving it to trash or permanently deleting it.")
@log_tool_call
def delete_email(
email_id: str,
mailbox: str,
@@ -108,32 +135,127 @@ def register_email_tools(mcp: FastMCP, service: EmailService):
result = service.delete_email(email_id, mailbox, permanent)
return result.model_dump()
@mcp.tool(description="Send a new email via SMTP. Supports plain text and HTML content, CC, BCC, and reply-to.")
async def send_email(
to: list[str],
subject: str,
body: str,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
reply_to: Optional[str] = None,
html_body: Optional[str] = None,
@mcp.tool(description="Delete a drafted email by ID, optionally permanently.")
@log_tool_call
def delete_draft(
email_id: str,
mailbox: Optional[str] = None,
permanent: bool = False,
) -> dict:
"""
Send a new email.
Delete a draft email.
Args:
email_id: The unique ID of the draft
mailbox: Drafts mailbox/folder override (default: auto-detect)
permanent: If True, permanently delete; if False, move to Trash (default: False)
"""
result = service.delete_draft(email_id, mailbox, permanent)
return result.model_dump()
@mcp.tool(description="Save a new draft email to the Drafts mailbox. Supports reply threading via in_reply_to_email_id.")
@log_tool_call
def save_draft(
to: Optional[list[str]] = None,
subject: Optional[str] = None,
body: Optional[str] = None,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
mailbox: Optional[str] = None,
in_reply_to_email_id: Optional[str] = None,
in_reply_to_mailbox: Optional[str] = None,
reply_all: bool = False,
) -> dict:
"""
Save a new email draft.
Args:
to: List of recipient email addresses (required unless in_reply_to_email_id is set)
subject: Email subject line (required unless in_reply_to_email_id is set)
body: Plain text email body (required unless in_reply_to_email_id is set)
cc: List of CC recipients (optional)
bcc: List of BCC recipients (optional)
mailbox: Drafts mailbox/folder override (default: auto-detect)
in_reply_to_email_id: Email UID to reply to (optional, derives recipients/subject and sets threading headers)
in_reply_to_mailbox: Mailbox containing the in_reply_to_email_id (default: INBOX)
reply_all: Whether to include original recipients when replying (default: False)
"""
result = service.save_draft(
to=to,
subject=subject,
body=body,
cc=cc,
bcc=bcc,
html_body=None,
mailbox=mailbox,
in_reply_to_email_id=in_reply_to_email_id,
in_reply_to_mailbox=in_reply_to_mailbox,
reply_all=reply_all,
)
return result.model_dump()
@mcp.tool(description="Edit an existing draft email. Only provided fields will be modified. Supports reply threading via in_reply_to_email_id.")
@log_tool_call
def edit_draft(
email_id: str,
mailbox: Optional[str] = None,
to: Optional[list[str]] = None,
subject: Optional[str] = None,
body: Optional[str] = None,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
in_reply_to_email_id: Optional[str] = None,
in_reply_to_mailbox: Optional[str] = None,
reply_all: bool = False,
) -> dict:
"""
Update an existing draft email.
Args:
email_id: The unique ID of the draft
mailbox: Drafts mailbox/folder override (default: auto-detect)
to: List of recipient email addresses
subject: Email subject line
body: Plain text email body
cc: List of CC recipients (optional)
bcc: List of BCC recipients (optional)
reply_to: Reply-to address (optional)
html_body: HTML version of the email body (optional)
in_reply_to_email_id: Email UID to reply to (optional, derives recipients/subject and sets threading headers)
in_reply_to_mailbox: Mailbox containing the in_reply_to_email_id (default: INBOX)
reply_all: Whether to include original recipients when replying (default: False)
"""
result = await service.send_email(to, subject, body, cc, bcc, reply_to, html_body)
result = service.update_draft(
email_id=email_id,
mailbox=mailbox,
to=to,
subject=subject,
body=body,
cc=cc,
bcc=bcc,
html_body=None,
in_reply_to_email_id=in_reply_to_email_id,
in_reply_to_mailbox=in_reply_to_mailbox,
reply_all=reply_all,
)
return result.model_dump()
@mcp.tool(description="Send an existing draft by ID. Only drafts can be sent.")
@log_tool_call
async def send_draft(
email_id: str,
mailbox: Optional[str] = None,
) -> dict:
"""
Send a draft email.
Args:
email_id: The unique ID of the draft to send
mailbox: Drafts mailbox/folder override (default: auto-detect)
"""
result = await service.send_draft(email_id, mailbox)
return result.model_dump()
@mcp.tool(description="Set or remove IMAP flags on an email. Standard flags: \\Seen, \\Answered, \\Flagged, \\Deleted, \\Draft. Custom keywords are also supported.")
@log_tool_call
def set_email_flags(
email_id: str,
mailbox: str,
@@ -153,7 +275,8 @@ def register_email_tools(mcp: FastMCP, service: EmailService):
return result.model_dump()
@mcp.tool(description="Unsubscribe from a mailing list. Parses List-Unsubscribe headers and attempts automatic unsubscribe via HTTP or provides mailto instructions.")
async def unsubscribe_email(
@log_tool_call
async def unsubscribe_maillist(
email_id: str,
mailbox: str = "INBOX",
) -> dict:

View File

@@ -0,0 +1,60 @@
import functools
import inspect
import json
import logging
import os
logger = logging.getLogger("mcp.tools")
_MAX_LOG_CHARS = int(os.getenv("TOOL_LOG_MAX_CHARS", "4000"))
def _serialize(value) -> str:
try:
return json.dumps(value, default=str, ensure_ascii=True)
except TypeError:
return repr(value)
def _truncate(text: str) -> str:
if len(text) <= _MAX_LOG_CHARS:
return text
truncated = len(text) - _MAX_LOG_CHARS
return f"{text[:_MAX_LOG_CHARS]}...(truncated {truncated} chars)"
def log_tool_call(func):
"""Log tool calls and responses with optional truncation."""
if inspect.iscoroutinefunction(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
logger.info(
"Tool call %s args=%s kwargs=%s",
func.__name__,
_truncate(_serialize(args)),
_truncate(_serialize(kwargs)),
)
result = await func(*args, **kwargs)
logger.info(
"Tool response %s result=%s",
func.__name__,
_truncate(_serialize(result)),
)
return result
return wrapper
@functools.wraps(func)
def wrapper(*args, **kwargs):
logger.info(
"Tool call %s args=%s kwargs=%s",
func.__name__,
_truncate(_serialize(args)),
_truncate(_serialize(kwargs)),
)
result = func(*args, **kwargs)
logger.info(
"Tool response %s result=%s",
func.__name__,
_truncate(_serialize(result)),
)
return result
return wrapper

30
test.sh
View File

@@ -17,12 +17,12 @@ echo "Testing MCP server at $BASE_URL"
echo "================================"
# Test health endpoint
echo -e "\n[1/5] Testing health endpoint..."
echo -e "\n[1/6] Testing health endpoint..."
HEALTH=$(curl -s "$BASE_URL/health")
echo "Response: $HEALTH"
# Initialize session and capture session ID
echo -e "\n[2/5] Initializing MCP session..."
echo -e "\n[2/6] Initializing MCP session..."
INIT_RESPONSE=$(curl -s -D - -X POST "$MCP_ENDPOINT" \
-H "Content-Type: application/json" \
-H "Accept: application/json, text/event-stream" \
@@ -55,12 +55,12 @@ mcp_request() {
}
# List available tools
echo -e "\n[3/5] Listing available tools..."
echo -e "\n[3/6] Listing available tools..."
TOOLS=$(mcp_request 2 "tools/list" "{}")
echo "$TOOLS" | grep -o '"name":"[^"]*"' | head -20 || echo "$TOOLS"
# Get server info
echo -e "\n[4/5] Getting server info..."
echo -e "\n[4/6] Getting server info..."
SERVER_INFO=$(mcp_request 3 "tools/call" '{"name":"get_server_info","arguments":{}}')
echo "$SERVER_INFO"
@@ -69,20 +69,18 @@ echo -e "\n[5/7] Listing mailboxes..."
MAILBOXES=$(mcp_request 4 "tools/call" '{"name":"list_mailboxes","arguments":{}}')
echo "$MAILBOXES"
# List address books
echo -e "\n[6/7] Listing address books..."
ADDRESSBOOKS=$(mcp_request 5 "tools/call" '{"name":"list_addressbooks","arguments":{}}')
echo "$ADDRESSBOOKS"
# List contacts
echo -e "\n[6/7] Listing contacts..."
CONTACTS=$(mcp_request 5 "tools/call" '{"name":"list_contacts","arguments":{"limit":10}}')
echo "$CONTACTS"
# List contacts (using first addressbook from previous response)
echo -e "\n[7/7] Listing contacts..."
# Extract first addressbook ID from previous response
ADDRESSBOOK_ID=$(echo "$ADDRESSBOOKS" | grep -o '"id":"[^"]*"' | head -1 | cut -d'"' -f4)
if [ -n "$ADDRESSBOOK_ID" ]; then
CONTACTS=$(mcp_request 6 "tools/call" "{\"name\":\"list_contacts\",\"arguments\":{\"addressbook_id\":\"$ADDRESSBOOK_ID\",\"limit\":10}}")
echo "$CONTACTS"
# Draft a reply (requires REPLY_EMAIL_ID)
echo -e "\n[7/7] Drafting reply..."
if [ -n "$REPLY_EMAIL_ID" ]; then
DRAFT_REPLY=$(mcp_request 6 "tools/call" "{\"name\":\"save_draft\",\"arguments\":{\"in_reply_to_email_id\":\"$REPLY_EMAIL_ID\",\"in_reply_to_mailbox\":\"INBOX\",\"reply_all\":false,\"body\":\"Test reply draft\"}}")
echo "$DRAFT_REPLY"
else
echo "No addressbook found to list contacts from"
echo "Skipping reply draft: set REPLY_EMAIL_ID to an email UID to test threading"
fi
echo -e "\n================================"