This commit is contained in:
381
src/services/email_monitor.py
Normal file
381
src/services/email_monitor.py
Normal file
@@ -0,0 +1,381 @@
|
||||
"""Background service for monitoring new emails and sending notifications."""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
from imapclient import IMAPClient
|
||||
from sqlmodel import select
|
||||
|
||||
from config import Settings
|
||||
from database import get_session, SeenEmail, CacheMeta
|
||||
from models.email_models import EmailSummary
|
||||
from models.notification_models import EmailNotificationPayload
|
||||
from services.email_service import EmailService
|
||||
from services.webhook_service import WebhookService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EmailMonitor:
|
||||
"""Background service for monitoring new emails and sending notifications."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
email_service: EmailService,
|
||||
webhook_service: WebhookService,
|
||||
settings: Settings,
|
||||
):
|
||||
self.email_service = email_service
|
||||
self.webhook_service = webhook_service
|
||||
self.settings = settings
|
||||
self._running = False
|
||||
self._task: Optional[asyncio.Task] = None
|
||||
self._idle_client: Optional[IMAPClient] = None
|
||||
self._idle_supported: Optional[bool] = None
|
||||
|
||||
async def start(self):
|
||||
"""Start the email monitoring background task."""
|
||||
if self._running:
|
||||
logger.warning("Email monitor already running")
|
||||
return
|
||||
|
||||
self._running = True
|
||||
|
||||
# Seed existing emails on first run
|
||||
for mailbox in self.settings.get_notification_mailboxes():
|
||||
await self._seed_seen_emails(mailbox)
|
||||
|
||||
# Start the monitoring task
|
||||
self._task = asyncio.create_task(self._monitor_loop())
|
||||
logger.info(
|
||||
f"Email monitor started for mailboxes: {self.settings.notification_mailboxes}"
|
||||
)
|
||||
|
||||
async def stop(self):
|
||||
"""Gracefully stop the monitor."""
|
||||
self._running = False
|
||||
|
||||
if self._task:
|
||||
self._task.cancel()
|
||||
try:
|
||||
await self._task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._task = None
|
||||
|
||||
self._close_idle_client()
|
||||
await self.webhook_service.close()
|
||||
logger.info("Email monitor stopped")
|
||||
|
||||
def _get_idle_client(self) -> IMAPClient:
|
||||
"""Get or create a dedicated IMAP client for IDLE."""
|
||||
if self._idle_client is None:
|
||||
self._idle_client = IMAPClient(
|
||||
host=self.settings.imap_host,
|
||||
port=self.settings.imap_port,
|
||||
ssl=self.settings.imap_use_ssl,
|
||||
)
|
||||
self._idle_client.login(
|
||||
self.settings.imap_username,
|
||||
self.settings.imap_password.get_secret_value(),
|
||||
)
|
||||
return self._idle_client
|
||||
|
||||
def _close_idle_client(self):
|
||||
"""Close the IDLE client."""
|
||||
if self._idle_client:
|
||||
try:
|
||||
self._idle_client.logout()
|
||||
except Exception:
|
||||
pass
|
||||
self._idle_client = None
|
||||
|
||||
async def _check_idle_support(self) -> bool:
|
||||
"""Check if the IMAP server supports IDLE."""
|
||||
if self._idle_supported is not None:
|
||||
return self._idle_supported
|
||||
|
||||
def _check():
|
||||
client = self._get_idle_client()
|
||||
capabilities = client.capabilities()
|
||||
return b"IDLE" in capabilities
|
||||
|
||||
try:
|
||||
self._idle_supported = await asyncio.to_thread(_check)
|
||||
logger.info(f"IMAP IDLE support: {self._idle_supported}")
|
||||
return self._idle_supported
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to check IDLE support: {e}")
|
||||
self._idle_supported = False
|
||||
return False
|
||||
|
||||
async def _monitor_loop(self):
|
||||
"""Main monitoring loop - tries IDLE, falls back to polling."""
|
||||
while self._running:
|
||||
try:
|
||||
# Check IDLE support
|
||||
if await self._check_idle_support():
|
||||
# Use IDLE for real-time monitoring
|
||||
await self._idle_monitor()
|
||||
else:
|
||||
# Fall back to polling
|
||||
await self._poll_monitor()
|
||||
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Monitor loop error: {e}")
|
||||
# Reset IDLE client on error
|
||||
self._close_idle_client()
|
||||
self._idle_supported = None
|
||||
# Wait before retrying
|
||||
await asyncio.sleep(self.settings.notification_poll_interval)
|
||||
|
||||
async def _idle_monitor(self):
|
||||
"""Use IMAP IDLE for real-time monitoring."""
|
||||
mailboxes = self.settings.get_notification_mailboxes()
|
||||
|
||||
for mailbox in mailboxes:
|
||||
if not self._running:
|
||||
break
|
||||
|
||||
try:
|
||||
# Check for new emails first
|
||||
await self._check_and_notify_new_emails(mailbox)
|
||||
|
||||
# Start IDLE and wait for changes
|
||||
idle_started = await self._start_idle(mailbox)
|
||||
if not idle_started:
|
||||
# IDLE failed, fall back to polling for this cycle
|
||||
await asyncio.sleep(self.settings.notification_poll_interval)
|
||||
continue
|
||||
|
||||
# Wait for IDLE responses or timeout
|
||||
responses = await self._wait_for_idle(
|
||||
timeout=min(
|
||||
self.settings.notification_idle_timeout,
|
||||
self.settings.notification_poll_interval * 10,
|
||||
)
|
||||
)
|
||||
|
||||
# End IDLE
|
||||
await self._end_idle()
|
||||
|
||||
# If we got EXISTS responses, check for new emails
|
||||
if responses:
|
||||
has_new = any(
|
||||
b"EXISTS" in str(r).encode() if isinstance(r, tuple) else False
|
||||
for r in responses
|
||||
)
|
||||
if has_new:
|
||||
await self._check_and_notify_new_emails(mailbox)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"IDLE monitor error for {mailbox}: {e}")
|
||||
self._close_idle_client()
|
||||
self._idle_supported = None
|
||||
raise
|
||||
|
||||
async def _poll_monitor(self):
|
||||
"""Fallback polling implementation."""
|
||||
mailboxes = self.settings.get_notification_mailboxes()
|
||||
|
||||
for mailbox in mailboxes:
|
||||
if not self._running:
|
||||
break
|
||||
|
||||
try:
|
||||
await self._check_and_notify_new_emails(mailbox)
|
||||
except Exception as e:
|
||||
logger.error(f"Poll monitor error for {mailbox}: {e}")
|
||||
|
||||
# Wait for next poll interval
|
||||
await asyncio.sleep(self.settings.notification_poll_interval)
|
||||
|
||||
async def _start_idle(self, mailbox: str) -> bool:
|
||||
"""Start IMAP IDLE mode."""
|
||||
|
||||
def _idle_start():
|
||||
client = self._get_idle_client()
|
||||
client.select_folder(mailbox)
|
||||
client.idle()
|
||||
return True
|
||||
|
||||
try:
|
||||
return await asyncio.to_thread(_idle_start)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to start IDLE: {e}")
|
||||
return False
|
||||
|
||||
async def _wait_for_idle(self, timeout: int = 30) -> list:
|
||||
"""Wait for IDLE responses."""
|
||||
|
||||
def _idle_check():
|
||||
client = self._get_idle_client()
|
||||
return client.idle_check(timeout=timeout)
|
||||
|
||||
try:
|
||||
return await asyncio.to_thread(_idle_check)
|
||||
except Exception as e:
|
||||
logger.warning(f"IDLE check error: {e}")
|
||||
return []
|
||||
|
||||
async def _end_idle(self):
|
||||
"""End IMAP IDLE mode."""
|
||||
|
||||
def _idle_done():
|
||||
client = self._get_idle_client()
|
||||
client.idle_done()
|
||||
|
||||
try:
|
||||
await asyncio.to_thread(_idle_done)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to end IDLE: {e}")
|
||||
|
||||
async def _check_and_notify_new_emails(self, mailbox: str):
|
||||
"""Check for new emails and send notifications."""
|
||||
try:
|
||||
# Get recent emails
|
||||
email_list = self.email_service.list_emails(
|
||||
mailbox=mailbox, limit=50, include_body=True
|
||||
)
|
||||
|
||||
for email in email_list.emails:
|
||||
# Check if we've already seen this email
|
||||
if await self._is_email_seen(email.id, mailbox):
|
||||
continue
|
||||
|
||||
# Mark as seen first (to avoid duplicates on retry)
|
||||
await self._mark_as_seen(email, mailbox, notification_sent=False)
|
||||
|
||||
# Send notification
|
||||
success, error = await self._send_notification(email, mailbox)
|
||||
|
||||
# Update notification status
|
||||
await self._update_notification_status(
|
||||
email.id, mailbox, success, error
|
||||
)
|
||||
|
||||
if success:
|
||||
logger.info(
|
||||
f"Notification sent for email {email.id}: {email.subject}"
|
||||
)
|
||||
else:
|
||||
logger.error(
|
||||
f"Failed to send notification for email {email.id}: {error}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking new emails in {mailbox}: {e}")
|
||||
|
||||
async def _send_notification(
|
||||
self, email: EmailSummary, mailbox: str
|
||||
) -> tuple[bool, Optional[str]]:
|
||||
"""Send webhook notification for a new email."""
|
||||
payload = EmailNotificationPayload(
|
||||
timestamp=datetime.utcnow(),
|
||||
email_id=email.id,
|
||||
mailbox=mailbox,
|
||||
from_email=email.from_address.email,
|
||||
from_name=email.from_address.name,
|
||||
to_emails=[addr.email for addr in email.to_addresses],
|
||||
subject=email.subject,
|
||||
snippet=email.snippet,
|
||||
date=email.date,
|
||||
has_attachments=email.has_attachments,
|
||||
is_flagged=email.is_flagged,
|
||||
)
|
||||
|
||||
return await self.webhook_service.send_new_email_notification(payload)
|
||||
|
||||
async def _seed_seen_emails(self, mailbox: str):
|
||||
"""Seed the seen_emails table with existing emails on first run."""
|
||||
async with get_session() as session:
|
||||
# Check if this mailbox has been seeded
|
||||
result = await session.exec(
|
||||
select(CacheMeta).where(CacheMeta.key == f"seeded_{mailbox}")
|
||||
)
|
||||
if result.first():
|
||||
logger.debug(f"Mailbox {mailbox} already seeded")
|
||||
return
|
||||
|
||||
logger.info(f"Seeding existing emails in {mailbox}...")
|
||||
|
||||
try:
|
||||
# Get all current email UIDs
|
||||
email_list = self.email_service.list_emails(mailbox, limit=10000)
|
||||
|
||||
# Mark them all as seen (with notification_sent=True to skip)
|
||||
for email in email_list.emails:
|
||||
seen = SeenEmail(
|
||||
email_uid=email.id,
|
||||
mailbox=mailbox,
|
||||
from_address=email.from_address.email,
|
||||
subject=email.subject,
|
||||
email_date=email.date,
|
||||
notification_sent=True, # Don't notify for pre-existing
|
||||
)
|
||||
session.add(seen)
|
||||
|
||||
# Mark as seeded
|
||||
session.add(CacheMeta(key=f"seeded_{mailbox}", value="true"))
|
||||
await session.commit()
|
||||
|
||||
logger.info(
|
||||
f"Seeded {len(email_list.emails)} existing emails in {mailbox}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error seeding emails for {mailbox}: {e}")
|
||||
await session.rollback()
|
||||
|
||||
async def _is_email_seen(self, email_uid: str, mailbox: str) -> bool:
|
||||
"""Check if email has already been processed."""
|
||||
async with get_session() as session:
|
||||
result = await session.exec(
|
||||
select(SeenEmail).where(
|
||||
SeenEmail.email_uid == email_uid, SeenEmail.mailbox == mailbox
|
||||
)
|
||||
)
|
||||
return result.first() is not None
|
||||
|
||||
async def _mark_as_seen(
|
||||
self, email: EmailSummary, mailbox: str, notification_sent: bool = False
|
||||
):
|
||||
"""Mark email as seen in our tracking database."""
|
||||
async with get_session() as session:
|
||||
seen = SeenEmail(
|
||||
email_uid=email.id,
|
||||
mailbox=mailbox,
|
||||
from_address=email.from_address.email,
|
||||
subject=email.subject,
|
||||
email_date=email.date,
|
||||
notification_sent=notification_sent,
|
||||
notification_sent_at=datetime.utcnow() if notification_sent else None,
|
||||
)
|
||||
session.add(seen)
|
||||
await session.commit()
|
||||
|
||||
async def _update_notification_status(
|
||||
self,
|
||||
email_uid: str,
|
||||
mailbox: str,
|
||||
success: bool,
|
||||
error: Optional[str] = None,
|
||||
):
|
||||
"""Update the notification status for a seen email."""
|
||||
async with get_session() as session:
|
||||
result = await session.exec(
|
||||
select(SeenEmail).where(
|
||||
SeenEmail.email_uid == email_uid, SeenEmail.mailbox == mailbox
|
||||
)
|
||||
)
|
||||
seen = result.first()
|
||||
if seen:
|
||||
seen.notification_sent = success
|
||||
seen.notification_sent_at = datetime.utcnow() if success else None
|
||||
seen.notification_error = error
|
||||
session.add(seen)
|
||||
await session.commit()
|
||||
106
src/services/webhook_service.py
Normal file
106
src/services/webhook_service.py
Normal file
@@ -0,0 +1,106 @@
|
||||
"""Service for sending webhook notifications to Poke."""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
import httpx
|
||||
|
||||
from config import Settings
|
||||
from models.notification_models import EmailNotificationPayload
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class WebhookService:
|
||||
"""Service for sending webhook notifications to Poke."""
|
||||
|
||||
def __init__(self, settings: Settings):
|
||||
self.settings = settings
|
||||
self._client: Optional[httpx.AsyncClient] = None
|
||||
|
||||
async def _get_client(self) -> httpx.AsyncClient:
|
||||
"""Get or create the HTTP client."""
|
||||
if self._client is None:
|
||||
self._client = httpx.AsyncClient(
|
||||
timeout=self.settings.poke_webhook_timeout,
|
||||
)
|
||||
return self._client
|
||||
|
||||
async def close(self):
|
||||
"""Close the HTTP client."""
|
||||
if self._client:
|
||||
await self._client.aclose()
|
||||
self._client = None
|
||||
|
||||
async def send_new_email_notification(
|
||||
self, payload: EmailNotificationPayload
|
||||
) -> tuple[bool, Optional[str]]:
|
||||
"""
|
||||
Send notification about a new email to Poke webhook.
|
||||
|
||||
Returns:
|
||||
Tuple of (success, error_message)
|
||||
"""
|
||||
webhook_payload = payload.to_webhook_format()
|
||||
return await self._send_webhook_with_retry(webhook_payload)
|
||||
|
||||
async def _send_webhook_with_retry(
|
||||
self, payload: dict
|
||||
) -> tuple[bool, Optional[str]]:
|
||||
"""Send webhook with exponential backoff retry."""
|
||||
client = await self._get_client()
|
||||
last_error: Optional[str] = None
|
||||
|
||||
for attempt in range(self.settings.poke_webhook_max_retries):
|
||||
try:
|
||||
response = await client.post(
|
||||
self.settings.poke_webhook_url,
|
||||
json=payload,
|
||||
headers={
|
||||
"Authorization": f"Bearer {self.settings.poke_api_key.get_secret_value()}",
|
||||
"Content-Type": "application/json",
|
||||
"X-Webhook-Source": "pim-mcp-server",
|
||||
},
|
||||
)
|
||||
|
||||
if response.status_code in (200, 201, 202, 204):
|
||||
logger.info(f"Webhook sent successfully: {response.status_code}")
|
||||
return True, None
|
||||
|
||||
if response.status_code >= 500:
|
||||
# Server error, retry
|
||||
last_error = f"Server error {response.status_code}: {response.text}"
|
||||
logger.warning(
|
||||
f"Webhook server error (attempt {attempt + 1}): {last_error}"
|
||||
)
|
||||
await asyncio.sleep(2**attempt)
|
||||
else:
|
||||
# Client error, don't retry
|
||||
last_error = f"Client error {response.status_code}: {response.text}"
|
||||
logger.error(f"Webhook failed: {last_error}")
|
||||
return False, last_error
|
||||
|
||||
except httpx.TimeoutException:
|
||||
last_error = "Request timeout"
|
||||
logger.warning(
|
||||
f"Webhook timeout (attempt {attempt + 1}/{self.settings.poke_webhook_max_retries})"
|
||||
)
|
||||
await asyncio.sleep(2**attempt)
|
||||
|
||||
except httpx.RequestError as e:
|
||||
last_error = f"Request error: {str(e)}"
|
||||
logger.warning(
|
||||
f"Webhook request error (attempt {attempt + 1}): {last_error}"
|
||||
)
|
||||
await asyncio.sleep(2**attempt)
|
||||
|
||||
except Exception as e:
|
||||
last_error = f"Unexpected error: {str(e)}"
|
||||
logger.error(f"Webhook unexpected error: {last_error}")
|
||||
await asyncio.sleep(2**attempt)
|
||||
|
||||
logger.error(
|
||||
f"Webhook failed after {self.settings.poke_webhook_max_retries} attempts: {last_error}"
|
||||
)
|
||||
return False, last_error
|
||||
Reference in New Issue
Block a user