diff --git a/README.md b/README.md index f18758c..c2b6b02 100644 --- a/README.md +++ b/README.md @@ -86,12 +86,18 @@ SMTP_FROM_NAME=Your Name CALDAV_URL=https://caldav.example.com/dav CALDAV_USERNAME=you@example.com CALDAV_PASSWORD=your-password +ICS_CALENDARS=Team|https://example.com/team.ics,Family|https://example.com/family.ics +ICS_CALENDAR_TIMEOUT=20 +ICS_CALENDARS=Team|https://example.com/team.ics,Family|https://example.com/family.ics +ICS_CALENDAR_TIMEOUT=20 CARDDAV_URL=https://carddav.example.com/dav CARDDAV_USERNAME=you@example.com CARDDAV_PASSWORD=your-password ``` +ICS calendars are optional and read-only. Set `ICS_CALENDARS` to a comma-separated list of entries, each as `name|url` or just `url` if you want the name inferred. + ### Email notifications (Poke webhook) Enable notifications to send new-email alerts to Poke. The server will use IMAP IDLE when available and fall back to polling. diff --git a/src/config.py b/src/config.py index 468b3e5..026083a 100644 --- a/src/config.py +++ b/src/config.py @@ -33,6 +33,8 @@ class Settings(BaseSettings): caldav_url: Optional[str] = Field(default=None, alias="CALDAV_URL") caldav_username: Optional[str] = Field(default=None, alias="CALDAV_USERNAME") caldav_password: Optional[SecretStr] = Field(default=None, alias="CALDAV_PASSWORD") + ics_calendars: Optional[str] = Field(default=None, alias="ICS_CALENDARS") + ics_calendar_timeout: int = Field(default=20, alias="ICS_CALENDAR_TIMEOUT") # CardDAV Configuration carddav_url: Optional[str] = Field(default=None, alias="CARDDAV_URL") @@ -107,7 +109,7 @@ class Settings(BaseSettings): self.smtp_from_email, ]) - def is_calendar_configured(self) -> bool: + def is_caldav_configured(self) -> bool: return all([ self.enable_calendar, self.caldav_url, @@ -115,6 +117,33 @@ class Settings(BaseSettings): self.caldav_password, ]) + def is_calendar_configured(self) -> bool: + return all([ + self.enable_calendar, + (self.is_caldav_configured() or self.get_ics_calendars()), + ]) + + def get_ics_calendars(self) -> list[tuple[Optional[str], str]]: + if not self.ics_calendars: + return [] + + calendars: list[tuple[Optional[str], str]] = [] + for entry in self.ics_calendars.split(","): + item = entry.strip() + if not item: + continue + if "|" in item: + name, url = item.split("|", 1) + name = name.strip() or None + url = url.strip() + else: + name = None + url = item + if url: + calendars.append((name, url)) + + return calendars + def is_contacts_configured(self) -> bool: return all([ self.enable_contacts, diff --git a/src/server.py b/src/server.py index 0ad7f89..be300d8 100644 --- a/src/server.py +++ b/src/server.py @@ -53,7 +53,10 @@ def setup_services(): if settings.is_calendar_configured(): from services.calendar_service import CalendarService calendar_service = CalendarService(settings) - print(f" Calendar service: enabled (CalDAV: {settings.caldav_url})") + if settings.is_caldav_configured(): + print(f" Calendar service: enabled (CalDAV: {settings.caldav_url})") + else: + print(" Calendar service: enabled (ICS calendars only)") else: print(" Calendar service: disabled (not configured)") @@ -116,6 +119,7 @@ def get_server_info() -> dict: "calendar": { "enabled": calendar_service is not None, "caldav_url": settings.caldav_url if calendar_service else None, + "ics_calendars": [c[1] for c in settings.get_ics_calendars()] if calendar_service else [], }, "contacts": { "enabled": contacts_service is not None, diff --git a/src/services/calendar_service.py b/src/services/calendar_service.py index 33f34c3..8f44e60 100644 --- a/src/services/calendar_service.py +++ b/src/services/calendar_service.py @@ -1,8 +1,10 @@ from datetime import datetime, timedelta from typing import Optional import uuid +from urllib.parse import urlparse import caldav +import httpx from icalendar import Calendar as iCalendar, Event as iEvent, vText from dateutil.parser import parse as parse_date from dateutil.rrule import rrulestr @@ -24,8 +26,39 @@ class CalendarService: self.settings = settings self._client: Optional[caldav.DAVClient] = None self._principal = None + self._ics_calendars = self._load_ics_calendars() + + def _load_ics_calendars(self) -> list[dict]: + calendars = [] + for idx, (name, url) in enumerate(self.settings.get_ics_calendars()): + cal_id = f"ics:{url}" + calendars.append( + { + "id": cal_id, + "name": name or self._derive_ics_name(url, idx), + "url": url, + } + ) + return calendars + + def _derive_ics_name(self, url: str, fallback_index: int) -> str: + parsed = urlparse(url) + if parsed.path and parsed.path != "/": + return parsed.path.rstrip("/").split("/")[-1] or f"ICS Calendar {fallback_index + 1}" + return parsed.netloc or f"ICS Calendar {fallback_index + 1}" + + def _is_ics_calendar(self, calendar_id: str) -> bool: + return calendar_id.startswith("ics:") + + def _get_ics_calendar(self, calendar_id: str) -> Optional[dict]: + for cal in self._ics_calendars: + if cal["id"] == calendar_id: + return cal + return None def _get_client(self) -> caldav.DAVClient: + if not self.settings.is_caldav_configured(): + raise ValueError("CalDAV is not configured") if self._client is None: self._client = caldav.DAVClient( url=self.settings.caldav_url, @@ -40,21 +73,33 @@ class CalendarService: return self._principal def list_calendars(self) -> list[Calendar]: - principal = self._get_principal() - calendars = principal.calendars() - result = [] - for cal in calendars: - props = cal.get_properties([caldav.dav.DisplayName()]) - name = props.get("{DAV:}displayname", cal.name or "Unnamed") + if self.settings.is_caldav_configured(): + principal = self._get_principal() + calendars = principal.calendars() + for cal in calendars: + props = cal.get_properties([caldav.dav.DisplayName()]) + name = props.get("{DAV:}displayname", cal.name or "Unnamed") + + result.append( + Calendar( + id=str(cal.url), + name=name, + color=None, + description=None, + is_readonly=False, + ) + ) + + for ics_cal in self._ics_calendars: result.append( Calendar( - id=str(cal.url), - name=name, + id=ics_cal["id"], + name=ics_cal["name"], color=None, description=None, - is_readonly=False, + is_readonly=True, ) ) @@ -77,6 +122,9 @@ class CalendarService: end_date: str, include_recurring: bool = True, ) -> EventList: + if self._is_ics_calendar(calendar_id): + return self._list_ics_events(calendar_id, start_date, end_date, include_recurring) + calendar = self._get_calendar_by_id(calendar_id) start = parse_date(start_date) @@ -101,6 +149,9 @@ class CalendarService: ) def get_event(self, calendar_id: str, event_id: str) -> Optional[Event]: + if self._is_ics_calendar(calendar_id): + return self._get_ics_event(calendar_id, event_id) + calendar = self._get_calendar_by_id(calendar_id) try: @@ -127,6 +178,9 @@ class CalendarService: reminders: Optional[list[int]] = None, recurrence: Optional[str] = None, ) -> Event: + if self._is_ics_calendar(calendar_id): + raise ValueError("ICS calendars are read-only") + calendar = self._get_calendar_by_id(calendar_id) # Create iCalendar event @@ -184,6 +238,9 @@ class CalendarService: location: Optional[str] = None, attendees: Optional[list[str]] = None, ) -> Optional[Event]: + if self._is_ics_calendar(calendar_id): + raise ValueError("ICS calendars are read-only") + calendar = self._get_calendar_by_id(calendar_id) # Find the event @@ -224,6 +281,13 @@ class CalendarService: def delete_event( self, calendar_id: str, event_id: str, notify_attendees: bool = True ) -> OperationResult: + if self._is_ics_calendar(calendar_id): + return OperationResult( + success=False, + message="ICS calendars are read-only", + id=event_id, + ) + try: calendar = self._get_calendar_by_id(calendar_id) @@ -314,3 +378,186 @@ class CalendarService: return None return None + + def _list_ics_events( + self, + calendar_id: str, + start_date: str, + end_date: str, + include_recurring: bool, + ) -> EventList: + ics_calendar = self._get_ics_calendar(calendar_id) + if not ics_calendar: + raise ValueError(f"Calendar not found: {calendar_id}") + + start = parse_date(start_date) + end = parse_date(end_date) + + ical = self._fetch_ics_calendar(ics_calendar["url"]) + events: list[Event] = [] + + for component in ical.walk(): + if component.name != "VEVENT": + continue + + parsed_events = self._parse_ics_component( + component, calendar_id, start, end, include_recurring + ) + events.extend(parsed_events) + + events.sort(key=lambda e: e.start) + + return EventList( + events=events, + calendar_id=calendar_id, + start_date=start_date, + end_date=end_date, + total=len(events), + ) + + def _get_ics_event(self, calendar_id: str, event_id: str) -> Optional[Event]: + ics_calendar = self._get_ics_calendar(calendar_id) + if not ics_calendar: + raise ValueError(f"Calendar not found: {calendar_id}") + + ical = self._fetch_ics_calendar(ics_calendar["url"]) + for component in ical.walk(): + if component.name != "VEVENT": + continue + uid = str(component.get("uid", "")) + if uid == event_id: + events = self._parse_ics_component( + component, + calendar_id, + datetime.min, + datetime.max, + include_recurring=False, + ) + return events[0] if events else None + + return None + + def _fetch_ics_calendar(self, url: str) -> iCalendar: + response = httpx.get(url, timeout=self.settings.ics_calendar_timeout) + response.raise_for_status() + return iCalendar.from_ical(response.text) + + def _parse_ics_component( + self, + component, + calendar_id: str, + range_start: datetime, + range_end: datetime, + include_recurring: bool, + ) -> list[Event]: + base_event = self._build_event_from_component(component, calendar_id) + if not base_event: + return [] + + range_start_cmp = range_start + range_end_cmp = range_end + if base_event.start.tzinfo and range_start.tzinfo is None: + range_start_cmp = range_start.replace(tzinfo=base_event.start.tzinfo) + range_end_cmp = range_end.replace(tzinfo=base_event.start.tzinfo) + elif base_event.start.tzinfo is None and range_start.tzinfo is not None: + range_start_cmp = range_start.replace(tzinfo=None) + range_end_cmp = range_end.replace(tzinfo=None) + + if not include_recurring or not base_event.recurrence_rule: + if base_event.start <= range_end_cmp and base_event.end >= range_start_cmp: + return [base_event] + return [] + + dtstart = base_event.start + duration = base_event.end - base_event.start + if duration.total_seconds() <= 0: + duration = timedelta(hours=1) + + rrule = rrulestr(base_event.recurrence_rule, dtstart=dtstart) + occurrences = rrule.between(range_start_cmp, range_end_cmp, inc=True) + excluded = self._extract_exdates(component) + + events = [] + for occ_start in occurrences: + if occ_start in excluded: + continue + occ_end = occ_start + duration + occurrence = base_event.model_copy() + occurrence.start = occ_start + occurrence.end = occ_end + events.append(occurrence) + + return events + + def _extract_exdates(self, component) -> set[datetime]: + exdates: set[datetime] = set() + exdate_prop = component.get("exdate") + if not exdate_prop: + return exdates + + exdate_list = exdate_prop if isinstance(exdate_prop, list) else [exdate_prop] + for exdate in exdate_list: + dates = getattr(exdate, "dts", []) + for dt in dates: + if isinstance(dt.dt, datetime): + exdates.add(dt.dt) + else: + exdates.add(datetime.combine(dt.dt, datetime.min.time())) + + return exdates + + def _build_event_from_component(self, component, calendar_id: str) -> Optional[Event]: + try: + uid = str(component.get("uid", "")) + dtstart = component.get("dtstart") + dtend = component.get("dtend") + + start = dtstart.dt if dtstart else datetime.now() + end = dtend.dt if dtend else start + timedelta(hours=1) + + all_day = False + if not isinstance(start, datetime): + all_day = True + start = datetime.combine(start, datetime.min.time()) + if not isinstance(end, datetime): + end = datetime.combine(end, datetime.min.time()) + + status_str = str(component.get("status", "CONFIRMED")).upper() + status = EventStatus.CONFIRMED + if status_str == "TENTATIVE": + status = EventStatus.TENTATIVE + elif status_str == "CANCELLED": + status = EventStatus.CANCELLED + + attendees = [] + for attendee in component.get("attendee", []): + if isinstance(attendee, list): + for a in attendee: + email = str(a).replace("mailto:", "") + attendees.append(Attendee(email=email)) + else: + email = str(attendee).replace("mailto:", "") + attendees.append(Attendee(email=email)) + + rrule = component.get("rrule") + recurrence_rule = None + if rrule: + recurrence_rule = rrule.to_ical().decode("utf-8") + + return Event( + id=uid, + calendar_id=calendar_id, + title=str(component.get("summary", "Untitled")), + start=start, + end=end, + all_day=all_day, + description=str(component.get("description", "")) or None, + location=str(component.get("location", "")) or None, + status=status, + attendees=attendees, + recurrence_rule=recurrence_rule, + organizer=str(component.get("organizer", "")).replace("mailto:", "") or None, + ) + except Exception as e: + print(f"Error parsing ICS event: {e}") + return None diff --git a/src/tools/calendar_tools.py b/src/tools/calendar_tools.py index 7e7ba1a..20b8c52 100644 --- a/src/tools/calendar_tools.py +++ b/src/tools/calendar_tools.py @@ -7,7 +7,7 @@ from services.calendar_service import CalendarService def register_calendar_tools(mcp: FastMCP, service: CalendarService): """Register all calendar-related MCP tools.""" - @mcp.tool(description="List all available calendars from the CalDAV server. Returns calendar ID, name, and properties.") + @mcp.tool(description="List all available calendars from CalDAV and configured ICS feeds. Returns calendar ID, name, and properties.") def list_calendars() -> list[dict]: """List all calendars.""" calendars = service.list_calendars() @@ -26,7 +26,7 @@ def register_calendar_tools(mcp: FastMCP, service: CalendarService): Args: start_date: Start of date range (ISO format: YYYY-MM-DD) end_date: End of date range (ISO format: YYYY-MM-DD) - calendar_id: The calendar ID (URL) to query. If not provided, lists from all calendars. + calendar_id: The calendar ID (CalDAV URL or ICS ID) to query. If not provided, lists from all calendars. include_recurring: Whether to expand recurring events (default: True) """ if calendar_id: @@ -67,7 +67,7 @@ def register_calendar_tools(mcp: FastMCP, service: CalendarService): Get a specific event. Args: - calendar_id: The calendar ID containing the event + calendar_id: The calendar ID (CalDAV URL or ICS ID) containing the event event_id: The unique ID (UID) of the event """ result = service.get_event(calendar_id, event_id) @@ -89,7 +89,7 @@ def register_calendar_tools(mcp: FastMCP, service: CalendarService): Create a new calendar event. Args: - calendar_id: The calendar ID to create the event in + calendar_id: The calendar ID to create the event in (CalDAV only) title: Event title/summary start: Start datetime (ISO format: YYYY-MM-DDTHH:MM:SS) end: End datetime (ISO format: YYYY-MM-DDTHH:MM:SS) @@ -119,7 +119,7 @@ def register_calendar_tools(mcp: FastMCP, service: CalendarService): Update an existing event. Args: - calendar_id: The calendar ID containing the event + calendar_id: The calendar ID containing the event (CalDAV only) event_id: The unique ID of the event to update title: New event title (optional) start: New start datetime (optional) @@ -143,7 +143,7 @@ def register_calendar_tools(mcp: FastMCP, service: CalendarService): Delete a calendar event. Args: - calendar_id: The calendar ID containing the event + calendar_id: The calendar ID containing the event (CalDAV only) event_id: The unique ID of the event to delete notify_attendees: Whether to notify attendees of cancellation (default: True) """