From 0bbb5a5a2c373e342aef9af017fc77c5987f957a Mon Sep 17 00:00:00 2001 From: Yigit Colakoglu Date: Tue, 30 Dec 2025 15:45:50 -0800 Subject: [PATCH] Notifications --- requirements.txt | 1 + src/config.py | 47 ++++ src/database/__init__.py | 2 + src/database/models.py | 25 ++ src/middleware/__init__.py | 5 + src/middleware/auth.py | 49 ++++ src/models/notification_models.py | 60 +++++ src/server.py | 79 ++++++- src/services/email_monitor.py | 381 ++++++++++++++++++++++++++++++ src/services/webhook_service.py | 106 +++++++++ 10 files changed, 752 insertions(+), 3 deletions(-) create mode 100644 src/middleware/__init__.py create mode 100644 src/middleware/auth.py create mode 100644 src/models/notification_models.py create mode 100644 src/services/email_monitor.py create mode 100644 src/services/webhook_service.py diff --git a/requirements.txt b/requirements.txt index 5961369..38d0a16 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ # FastMCP framework fastmcp>=2.12.0 uvicorn>=0.35.0 +starlette>=0.41.0 # Email (IMAP/SMTP) imapclient>=3.0.1 diff --git a/src/config.py b/src/config.py index cde0dae..468b3e5 100644 --- a/src/config.py +++ b/src/config.py @@ -48,6 +48,42 @@ class Settings(BaseSettings): enable_calendar: bool = Field(default=True, alias="ENABLE_CALENDAR") enable_contacts: bool = Field(default=True, alias="ENABLE_CONTACTS") + # Email Notification Settings + enable_email_notifications: bool = Field( + default=False, + alias="ENABLE_EMAIL_NOTIFICATIONS" + ) + notification_mailboxes: str = Field( + default="INBOX", + alias="NOTIFICATION_MAILBOXES", + ) + notification_poll_interval: int = Field( + default=60, + alias="NOTIFICATION_POLL_INTERVAL", + ) + notification_idle_timeout: int = Field( + default=1680, # 28 minutes (RFC recommends refresh before 29 min) + alias="NOTIFICATION_IDLE_TIMEOUT", + ) + + # Poke Webhook Settings + poke_webhook_url: Optional[str] = Field( + default="https://poke.com/api/v1/inbound-sms/webhook", + alias="POKE_WEBHOOK_URL", + ) + poke_api_key: Optional[SecretStr] = Field( + default=None, + alias="POKE_API_KEY", + ) + poke_webhook_timeout: int = Field( + default=30, + alias="POKE_WEBHOOK_TIMEOUT", + ) + poke_webhook_max_retries: int = Field( + default=3, + alias="POKE_WEBHOOK_MAX_RETRIES", + ) + model_config = { "env_file": ".env", "env_file_encoding": "utf-8", @@ -87,5 +123,16 @@ class Settings(BaseSettings): self.carddav_password, ]) + def is_notification_configured(self) -> bool: + return all([ + self.enable_email_notifications, + self.is_email_configured(), + self.poke_api_key, + self.poke_webhook_url, + ]) + + def get_notification_mailboxes(self) -> list[str]: + return [m.strip() for m in self.notification_mailboxes.split(",") if m.strip()] + settings = Settings() diff --git a/src/database/__init__.py b/src/database/__init__.py index 3a1d8c6..cdece03 100644 --- a/src/database/__init__.py +++ b/src/database/__init__.py @@ -5,6 +5,7 @@ from .models import ( ContactCache, SyncState, CacheMeta, + SeenEmail, ) __all__ = [ @@ -19,4 +20,5 @@ __all__ = [ "ContactCache", "SyncState", "CacheMeta", + "SeenEmail", ] diff --git a/src/database/models.py b/src/database/models.py index 10a83fc..03e9317 100644 --- a/src/database/models.py +++ b/src/database/models.py @@ -68,3 +68,28 @@ class SyncState(SQLModel, table=True): resource_id: str = Field(primary_key=True) last_sync: Optional[datetime] = None sync_token: Optional[str] = None + + +class SeenEmail(SQLModel, table=True): + """Track emails that have been processed for notifications.""" + + __tablename__ = "seen_emails" + + id: Optional[int] = Field(default=None, primary_key=True) + email_uid: str = Field(index=True, description="IMAP UID of the email") + mailbox: str = Field(index=True, description="Mailbox path (e.g., INBOX)") + message_id: Optional[str] = Field( + default=None, + index=True, + description="RFC 2822 Message-ID header for cross-server dedup" + ) + from_address: Optional[str] = Field(default=None, description="Sender email for logging") + subject: Optional[str] = Field(default=None, description="Subject for logging") + email_date: Optional[datetime] = Field(default=None, description="Email date") + seen_at: datetime = Field(default_factory=datetime.utcnow) + notification_sent: bool = Field(default=False) + notification_sent_at: Optional[datetime] = None + notification_error: Optional[str] = Field( + default=None, + description="Last error if notification failed" + ) diff --git a/src/middleware/__init__.py b/src/middleware/__init__.py new file mode 100644 index 0000000..ed7a75d --- /dev/null +++ b/src/middleware/__init__.py @@ -0,0 +1,5 @@ +"""Middleware package for the MCP server.""" + +from .auth import APIKeyAuthMiddleware + +__all__ = ["APIKeyAuthMiddleware"] diff --git a/src/middleware/auth.py b/src/middleware/auth.py new file mode 100644 index 0000000..532fc6c --- /dev/null +++ b/src/middleware/auth.py @@ -0,0 +1,49 @@ +"""Authentication middleware for the MCP server.""" + +import json +from typing import Optional + +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.requests import Request +from starlette.responses import JSONResponse + + +class APIKeyAuthMiddleware(BaseHTTPMiddleware): + """Middleware to authenticate requests using API key.""" + + def __init__(self, app, api_key: Optional[str] = None): + super().__init__(app) + self.api_key = api_key + + async def dispatch(self, request: Request, call_next): + # Skip auth if no API key is configured + if not self.api_key: + return await call_next(request) + + # Skip auth for health check endpoints + if request.url.path in ["/health", "/healthz", "/"]: + return await call_next(request) + + # Get the Authorization header + auth_header = request.headers.get("Authorization") + + if not auth_header: + return JSONResponse( + status_code=401, + content={"error": "Missing Authorization header"}, + ) + + # Support both "Bearer " and raw "" formats + if auth_header.startswith("Bearer "): + provided_key = auth_header[7:] + else: + provided_key = auth_header + + # Validate the API key + if provided_key != self.api_key: + return JSONResponse( + status_code=401, + content={"error": "Invalid API key"}, + ) + + return await call_next(request) diff --git a/src/models/notification_models.py b/src/models/notification_models.py new file mode 100644 index 0000000..1168feb --- /dev/null +++ b/src/models/notification_models.py @@ -0,0 +1,60 @@ +"""Pydantic models for webhook notification payloads.""" + +from datetime import datetime +from typing import Optional +from pydantic import BaseModel + + +class EmailNotificationPayload(BaseModel): + """Webhook payload for new email notifications to Poke.""" + + # Required fields + event_type: str = "new_email" + timestamp: datetime + + # Email identification + email_id: str + mailbox: str + message_id: Optional[str] = None + + # Sender info + from_email: str + from_name: Optional[str] = None + + # Recipients + to_emails: list[str] + + # Content + subject: str + snippet: Optional[str] = None + + # Metadata + date: datetime + has_attachments: bool + is_flagged: bool + + # Threading + in_reply_to: Optional[str] = None + + def to_webhook_format(self) -> dict: + """Convert to the format expected by Poke webhook.""" + return { + "type": self.event_type, + "timestamp": self.timestamp.isoformat(), + "data": { + "id": self.email_id, + "mailbox": self.mailbox, + "message_id": self.message_id, + "from": { + "email": self.from_email, + "name": self.from_name, + }, + "to": self.to_emails, + "subject": self.subject, + "snippet": self.snippet, + "date": self.date.isoformat(), + "has_attachments": self.has_attachments, + "is_flagged": self.is_flagged, + "in_reply_to": self.in_reply_to, + } + } diff --git a/src/server.py b/src/server.py index 21e9368..111821a 100644 --- a/src/server.py +++ b/src/server.py @@ -8,6 +8,7 @@ A self-hosted MCP server that provides tools for managing: - Contacts (CardDAV) """ +import logging import os import sys @@ -19,6 +20,12 @@ from fastmcp import FastMCP from config import settings from database import init_db, close_db +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) + # Initialize MCP server mcp = FastMCP( settings.server_name, @@ -29,6 +36,7 @@ mcp = FastMCP( email_service = None calendar_service = None contacts_service = None +email_monitor = None def setup_services(): @@ -57,6 +65,22 @@ def setup_services(): print(" Contacts service: disabled (not configured)") +async def setup_email_monitor(): + """Initialize email notification monitoring.""" + global email_monitor + + if settings.is_notification_configured() and email_service: + from services.webhook_service import WebhookService + from services.email_monitor import EmailMonitor + + webhook_service = WebhookService(settings) + email_monitor = EmailMonitor(email_service, webhook_service, settings) + await email_monitor.start() + print(f" Email notifications: enabled (monitoring: {settings.notification_mailboxes})") + else: + print(" Email notifications: disabled (not configured)") + + def register_tools(): """Register MCP tools based on enabled services.""" if email_service: @@ -117,11 +141,31 @@ async def initialize(): print(f"\nRegistering tools...") register_tools() + print(f"\nStarting background services...") + await setup_email_monitor() + print(f"\n{'='*60}") +async def shutdown(): + """Shutdown the server gracefully.""" + global email_monitor + print("\nShutting down...") + + if email_monitor: + await email_monitor.stop() + email_monitor = None + + await close_db() + print("Shutdown complete") + + if __name__ == "__main__": import asyncio + import signal + import uvicorn + + from middleware import APIKeyAuthMiddleware async def main(): await initialize() @@ -129,15 +173,44 @@ if __name__ == "__main__": port = settings.server_port host = settings.server_host + # Get the underlying ASGI app from FastMCP + app = mcp.http_app(path="/mcp") + + # Add authentication middleware if API key is configured + if settings.mcp_api_key: + app.add_middleware( + APIKeyAuthMiddleware, + api_key=settings.mcp_api_key.get_secret_value(), + ) + print(f"\n Authentication: enabled (API key required)") + else: + print(f"\n Authentication: disabled (no MCP_API_KEY set)") + print(f"\nStarting server on {host}:{port}") print(f"MCP endpoint: http://{host}:{port}/mcp") print(f"{'='*60}\n") - mcp.run( - transport="http", + # Setup signal handlers for graceful shutdown + loop = asyncio.get_running_loop() + + def signal_handler(): + asyncio.create_task(shutdown()) + + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, signal_handler) + + # Run with uvicorn + config = uvicorn.Config( + app, host=host, port=port, - stateless_http=True, + log_level="info", ) + server = uvicorn.Server(config) + + try: + await server.serve() + finally: + await shutdown() asyncio.run(main()) diff --git a/src/services/email_monitor.py b/src/services/email_monitor.py new file mode 100644 index 0000000..b874965 --- /dev/null +++ b/src/services/email_monitor.py @@ -0,0 +1,381 @@ +"""Background service for monitoring new emails and sending notifications.""" + +import asyncio +import logging +from datetime import datetime +from typing import Optional + +from imapclient import IMAPClient +from sqlmodel import select + +from config import Settings +from database import get_session, SeenEmail, CacheMeta +from models.email_models import EmailSummary +from models.notification_models import EmailNotificationPayload +from services.email_service import EmailService +from services.webhook_service import WebhookService + +logger = logging.getLogger(__name__) + + +class EmailMonitor: + """Background service for monitoring new emails and sending notifications.""" + + def __init__( + self, + email_service: EmailService, + webhook_service: WebhookService, + settings: Settings, + ): + self.email_service = email_service + self.webhook_service = webhook_service + self.settings = settings + self._running = False + self._task: Optional[asyncio.Task] = None + self._idle_client: Optional[IMAPClient] = None + self._idle_supported: Optional[bool] = None + + async def start(self): + """Start the email monitoring background task.""" + if self._running: + logger.warning("Email monitor already running") + return + + self._running = True + + # Seed existing emails on first run + for mailbox in self.settings.get_notification_mailboxes(): + await self._seed_seen_emails(mailbox) + + # Start the monitoring task + self._task = asyncio.create_task(self._monitor_loop()) + logger.info( + f"Email monitor started for mailboxes: {self.settings.notification_mailboxes}" + ) + + async def stop(self): + """Gracefully stop the monitor.""" + self._running = False + + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None + + self._close_idle_client() + await self.webhook_service.close() + logger.info("Email monitor stopped") + + def _get_idle_client(self) -> IMAPClient: + """Get or create a dedicated IMAP client for IDLE.""" + if self._idle_client is None: + self._idle_client = IMAPClient( + host=self.settings.imap_host, + port=self.settings.imap_port, + ssl=self.settings.imap_use_ssl, + ) + self._idle_client.login( + self.settings.imap_username, + self.settings.imap_password.get_secret_value(), + ) + return self._idle_client + + def _close_idle_client(self): + """Close the IDLE client.""" + if self._idle_client: + try: + self._idle_client.logout() + except Exception: + pass + self._idle_client = None + + async def _check_idle_support(self) -> bool: + """Check if the IMAP server supports IDLE.""" + if self._idle_supported is not None: + return self._idle_supported + + def _check(): + client = self._get_idle_client() + capabilities = client.capabilities() + return b"IDLE" in capabilities + + try: + self._idle_supported = await asyncio.to_thread(_check) + logger.info(f"IMAP IDLE support: {self._idle_supported}") + return self._idle_supported + except Exception as e: + logger.warning(f"Failed to check IDLE support: {e}") + self._idle_supported = False + return False + + async def _monitor_loop(self): + """Main monitoring loop - tries IDLE, falls back to polling.""" + while self._running: + try: + # Check IDLE support + if await self._check_idle_support(): + # Use IDLE for real-time monitoring + await self._idle_monitor() + else: + # Fall back to polling + await self._poll_monitor() + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Monitor loop error: {e}") + # Reset IDLE client on error + self._close_idle_client() + self._idle_supported = None + # Wait before retrying + await asyncio.sleep(self.settings.notification_poll_interval) + + async def _idle_monitor(self): + """Use IMAP IDLE for real-time monitoring.""" + mailboxes = self.settings.get_notification_mailboxes() + + for mailbox in mailboxes: + if not self._running: + break + + try: + # Check for new emails first + await self._check_and_notify_new_emails(mailbox) + + # Start IDLE and wait for changes + idle_started = await self._start_idle(mailbox) + if not idle_started: + # IDLE failed, fall back to polling for this cycle + await asyncio.sleep(self.settings.notification_poll_interval) + continue + + # Wait for IDLE responses or timeout + responses = await self._wait_for_idle( + timeout=min( + self.settings.notification_idle_timeout, + self.settings.notification_poll_interval * 10, + ) + ) + + # End IDLE + await self._end_idle() + + # If we got EXISTS responses, check for new emails + if responses: + has_new = any( + b"EXISTS" in str(r).encode() if isinstance(r, tuple) else False + for r in responses + ) + if has_new: + await self._check_and_notify_new_emails(mailbox) + + except Exception as e: + logger.error(f"IDLE monitor error for {mailbox}: {e}") + self._close_idle_client() + self._idle_supported = None + raise + + async def _poll_monitor(self): + """Fallback polling implementation.""" + mailboxes = self.settings.get_notification_mailboxes() + + for mailbox in mailboxes: + if not self._running: + break + + try: + await self._check_and_notify_new_emails(mailbox) + except Exception as e: + logger.error(f"Poll monitor error for {mailbox}: {e}") + + # Wait for next poll interval + await asyncio.sleep(self.settings.notification_poll_interval) + + async def _start_idle(self, mailbox: str) -> bool: + """Start IMAP IDLE mode.""" + + def _idle_start(): + client = self._get_idle_client() + client.select_folder(mailbox) + client.idle() + return True + + try: + return await asyncio.to_thread(_idle_start) + except Exception as e: + logger.warning(f"Failed to start IDLE: {e}") + return False + + async def _wait_for_idle(self, timeout: int = 30) -> list: + """Wait for IDLE responses.""" + + def _idle_check(): + client = self._get_idle_client() + return client.idle_check(timeout=timeout) + + try: + return await asyncio.to_thread(_idle_check) + except Exception as e: + logger.warning(f"IDLE check error: {e}") + return [] + + async def _end_idle(self): + """End IMAP IDLE mode.""" + + def _idle_done(): + client = self._get_idle_client() + client.idle_done() + + try: + await asyncio.to_thread(_idle_done) + except Exception as e: + logger.warning(f"Failed to end IDLE: {e}") + + async def _check_and_notify_new_emails(self, mailbox: str): + """Check for new emails and send notifications.""" + try: + # Get recent emails + email_list = self.email_service.list_emails( + mailbox=mailbox, limit=50, include_body=True + ) + + for email in email_list.emails: + # Check if we've already seen this email + if await self._is_email_seen(email.id, mailbox): + continue + + # Mark as seen first (to avoid duplicates on retry) + await self._mark_as_seen(email, mailbox, notification_sent=False) + + # Send notification + success, error = await self._send_notification(email, mailbox) + + # Update notification status + await self._update_notification_status( + email.id, mailbox, success, error + ) + + if success: + logger.info( + f"Notification sent for email {email.id}: {email.subject}" + ) + else: + logger.error( + f"Failed to send notification for email {email.id}: {error}" + ) + + except Exception as e: + logger.error(f"Error checking new emails in {mailbox}: {e}") + + async def _send_notification( + self, email: EmailSummary, mailbox: str + ) -> tuple[bool, Optional[str]]: + """Send webhook notification for a new email.""" + payload = EmailNotificationPayload( + timestamp=datetime.utcnow(), + email_id=email.id, + mailbox=mailbox, + from_email=email.from_address.email, + from_name=email.from_address.name, + to_emails=[addr.email for addr in email.to_addresses], + subject=email.subject, + snippet=email.snippet, + date=email.date, + has_attachments=email.has_attachments, + is_flagged=email.is_flagged, + ) + + return await self.webhook_service.send_new_email_notification(payload) + + async def _seed_seen_emails(self, mailbox: str): + """Seed the seen_emails table with existing emails on first run.""" + async with get_session() as session: + # Check if this mailbox has been seeded + result = await session.exec( + select(CacheMeta).where(CacheMeta.key == f"seeded_{mailbox}") + ) + if result.first(): + logger.debug(f"Mailbox {mailbox} already seeded") + return + + logger.info(f"Seeding existing emails in {mailbox}...") + + try: + # Get all current email UIDs + email_list = self.email_service.list_emails(mailbox, limit=10000) + + # Mark them all as seen (with notification_sent=True to skip) + for email in email_list.emails: + seen = SeenEmail( + email_uid=email.id, + mailbox=mailbox, + from_address=email.from_address.email, + subject=email.subject, + email_date=email.date, + notification_sent=True, # Don't notify for pre-existing + ) + session.add(seen) + + # Mark as seeded + session.add(CacheMeta(key=f"seeded_{mailbox}", value="true")) + await session.commit() + + logger.info( + f"Seeded {len(email_list.emails)} existing emails in {mailbox}" + ) + + except Exception as e: + logger.error(f"Error seeding emails for {mailbox}: {e}") + await session.rollback() + + async def _is_email_seen(self, email_uid: str, mailbox: str) -> bool: + """Check if email has already been processed.""" + async with get_session() as session: + result = await session.exec( + select(SeenEmail).where( + SeenEmail.email_uid == email_uid, SeenEmail.mailbox == mailbox + ) + ) + return result.first() is not None + + async def _mark_as_seen( + self, email: EmailSummary, mailbox: str, notification_sent: bool = False + ): + """Mark email as seen in our tracking database.""" + async with get_session() as session: + seen = SeenEmail( + email_uid=email.id, + mailbox=mailbox, + from_address=email.from_address.email, + subject=email.subject, + email_date=email.date, + notification_sent=notification_sent, + notification_sent_at=datetime.utcnow() if notification_sent else None, + ) + session.add(seen) + await session.commit() + + async def _update_notification_status( + self, + email_uid: str, + mailbox: str, + success: bool, + error: Optional[str] = None, + ): + """Update the notification status for a seen email.""" + async with get_session() as session: + result = await session.exec( + select(SeenEmail).where( + SeenEmail.email_uid == email_uid, SeenEmail.mailbox == mailbox + ) + ) + seen = result.first() + if seen: + seen.notification_sent = success + seen.notification_sent_at = datetime.utcnow() if success else None + seen.notification_error = error + session.add(seen) + await session.commit() diff --git a/src/services/webhook_service.py b/src/services/webhook_service.py new file mode 100644 index 0000000..9007b87 --- /dev/null +++ b/src/services/webhook_service.py @@ -0,0 +1,106 @@ +"""Service for sending webhook notifications to Poke.""" + +import asyncio +import logging +from typing import Optional + +import httpx + +from config import Settings +from models.notification_models import EmailNotificationPayload + +logger = logging.getLogger(__name__) + + +class WebhookService: + """Service for sending webhook notifications to Poke.""" + + def __init__(self, settings: Settings): + self.settings = settings + self._client: Optional[httpx.AsyncClient] = None + + async def _get_client(self) -> httpx.AsyncClient: + """Get or create the HTTP client.""" + if self._client is None: + self._client = httpx.AsyncClient( + timeout=self.settings.poke_webhook_timeout, + ) + return self._client + + async def close(self): + """Close the HTTP client.""" + if self._client: + await self._client.aclose() + self._client = None + + async def send_new_email_notification( + self, payload: EmailNotificationPayload + ) -> tuple[bool, Optional[str]]: + """ + Send notification about a new email to Poke webhook. + + Returns: + Tuple of (success, error_message) + """ + webhook_payload = payload.to_webhook_format() + return await self._send_webhook_with_retry(webhook_payload) + + async def _send_webhook_with_retry( + self, payload: dict + ) -> tuple[bool, Optional[str]]: + """Send webhook with exponential backoff retry.""" + client = await self._get_client() + last_error: Optional[str] = None + + for attempt in range(self.settings.poke_webhook_max_retries): + try: + response = await client.post( + self.settings.poke_webhook_url, + json=payload, + headers={ + "Authorization": f"Bearer {self.settings.poke_api_key.get_secret_value()}", + "Content-Type": "application/json", + "X-Webhook-Source": "pim-mcp-server", + }, + ) + + if response.status_code in (200, 201, 202, 204): + logger.info(f"Webhook sent successfully: {response.status_code}") + return True, None + + if response.status_code >= 500: + # Server error, retry + last_error = f"Server error {response.status_code}: {response.text}" + logger.warning( + f"Webhook server error (attempt {attempt + 1}): {last_error}" + ) + await asyncio.sleep(2**attempt) + else: + # Client error, don't retry + last_error = f"Client error {response.status_code}: {response.text}" + logger.error(f"Webhook failed: {last_error}") + return False, last_error + + except httpx.TimeoutException: + last_error = "Request timeout" + logger.warning( + f"Webhook timeout (attempt {attempt + 1}/{self.settings.poke_webhook_max_retries})" + ) + await asyncio.sleep(2**attempt) + + except httpx.RequestError as e: + last_error = f"Request error: {str(e)}" + logger.warning( + f"Webhook request error (attempt {attempt + 1}): {last_error}" + ) + await asyncio.sleep(2**attempt) + + except Exception as e: + last_error = f"Unexpected error: {str(e)}" + logger.error(f"Webhook unexpected error: {last_error}") + await asyncio.sleep(2**attempt) + + logger.error( + f"Webhook failed after {self.settings.poke_webhook_max_retries} attempts: {last_error}" + ) + return False, last_error