from typing import Optional from urllib.parse import urlparse import uuid import httpx import vobject from models.contacts_models import ( AddressBook, Contact, ContactList, EmailField, PhoneField, AddressField, ) from models.common import OperationResult from config import Settings PROPFIND_ADDRESSBOOKS = """ """ REPORT_CONTACTS = """ """ class ContactsService: def __init__(self, settings: Settings): self.settings = settings self._client: Optional[httpx.Client] = None def _get_client(self) -> httpx.Client: if self._client is None: self._client = httpx.Client( auth=( self.settings.carddav_username, self.settings.carddav_password.get_secret_value(), ), headers={"Content-Type": "application/xml; charset=utf-8"}, timeout=30.0, ) return self._client def _build_url(self, path: str) -> str: """Build full URL from a path, handling both relative and absolute paths.""" if path.startswith("http://") or path.startswith("https://"): return path # Extract base URL (scheme + host) from carddav_url parsed = urlparse(self.settings.carddav_url) base = f"{parsed.scheme}://{parsed.netloc}" return f"{base}{path}" def list_addressbooks(self) -> list[AddressBook]: client = self._get_client() response = client.request( "PROPFIND", self.settings.carddav_url, headers={"Depth": "1"}, content=PROPFIND_ADDRESSBOOKS, ) if response.status_code not in [200, 207]: raise Exception(f"Failed to list addressbooks: {response.status_code}") # Parse XML response addressbooks = [] from xml.etree import ElementTree as ET root = ET.fromstring(response.text) ns = { "d": "DAV:", "card": "urn:ietf:params:xml:ns:carddav", } for response_elem in root.findall(".//d:response", ns): href = response_elem.find("d:href", ns) if href is None: continue resourcetype = response_elem.find(".//d:resourcetype", ns) is_addressbook = ( resourcetype is not None and resourcetype.find("card:addressbook", ns) is not None ) if not is_addressbook: continue displayname = response_elem.find(".//d:displayname", ns) description = response_elem.find(".//card:addressbook-description", ns) addressbooks.append( AddressBook( id=href.text, name=displayname.text if displayname is not None and displayname.text else "Unnamed", description=description.text if description is not None else None, contact_count=0, ) ) return addressbooks def list_contacts( self, addressbook_id: Optional[str] = None, search: Optional[str] = None, limit: int = 100, offset: int = 0, ) -> ContactList: client = self._get_client() addressbook_id = self._resolve_addressbook_id(addressbook_id) # Build URL addressbook_url = self._build_url(addressbook_id) response = client.request( "REPORT", addressbook_url, headers={"Depth": "1"}, content=REPORT_CONTACTS, ) if response.status_code not in [200, 207]: raise Exception(f"Failed to list contacts: {response.status_code}") # Parse XML response contacts = [] from xml.etree import ElementTree as ET root = ET.fromstring(response.text) ns = { "d": "DAV:", "card": "urn:ietf:params:xml:ns:carddav", } for response_elem in root.findall(".//d:response", ns): href = response_elem.find("d:href", ns) address_data = response_elem.find(".//card:address-data", ns) if href is None or address_data is None or address_data.text is None: continue try: contact = self._parse_vcard(address_data.text, addressbook_id, href.text) if contact: # Apply search filter if search: search_lower = search.lower() match = False if contact.display_name and search_lower in contact.display_name.lower(): match = True elif contact.first_name and search_lower in contact.first_name.lower(): match = True elif contact.last_name and search_lower in contact.last_name.lower(): match = True elif any(search_lower in e.email.lower() for e in contact.emails): match = True if not match: continue contacts.append(contact) except Exception as e: print(f"Error parsing contact: {e}") continue # Sort by display name contacts.sort(key=lambda c: c.display_name or c.first_name or c.last_name or "") total = len(contacts) contacts = contacts[offset : offset + limit] return ContactList( contacts=contacts, addressbook_id=addressbook_id, total=total, limit=limit, offset=offset, ) def get_contact(self, contact_id: str, addressbook_id: Optional[str] = None) -> Optional[Contact]: client = self._get_client() addressbook_id = self._resolve_addressbook_id(addressbook_id) # Build URL contact_url = self._build_url(contact_id) response = client.get(contact_url) if response.status_code == 404: return None if response.status_code != 200: raise Exception(f"Failed to get contact: {response.status_code}") return self._parse_vcard(response.text, addressbook_id, contact_id) def create_contact( self, addressbook_id: Optional[str] = None, first_name: Optional[str] = None, last_name: Optional[str] = None, display_name: Optional[str] = None, emails: Optional[list[dict]] = None, phones: Optional[list[dict]] = None, addresses: Optional[list[dict]] = None, organization: Optional[str] = None, title: Optional[str] = None, notes: Optional[str] = None, birthday: Optional[str] = None, ) -> Contact: client = self._get_client() addressbook_id = self._resolve_addressbook_id(addressbook_id) # Create vCard vcard = vobject.vCard() # Generate UID uid = str(uuid.uuid4()) vcard.add("uid").value = uid # Name n = vcard.add("n") n.value = vobject.vcard.Name( family=last_name or "", given=first_name or "", ) # Full name fn = display_name or " ".join(filter(None, [first_name, last_name])) or "Unnamed" vcard.add("fn").value = fn # Organization if organization: org = vcard.add("org") org.value = [organization] # Title if title: vcard.add("title").value = title # Notes if notes: vcard.add("note").value = notes # Birthday if birthday: vcard.add("bday").value = birthday # Emails if emails: for email_data in emails: email = vcard.add("email") email.value = email_data.get("email", "") email.type_param = email_data.get("type", "home").upper() # Phones if phones: for phone_data in phones: tel = vcard.add("tel") tel.value = phone_data.get("number", "") tel.type_param = phone_data.get("type", "cell").upper() # Addresses if addresses: for addr_data in addresses: adr = vcard.add("adr") adr.value = vobject.vcard.Address( street=addr_data.get("street", ""), city=addr_data.get("city", ""), region=addr_data.get("state", ""), code=addr_data.get("postal_code", ""), country=addr_data.get("country", ""), ) adr.type_param = addr_data.get("type", "home").upper() # Build URL and save addressbook_url = self._build_url(addressbook_id) contact_url = f"{addressbook_url.rstrip('/')}/{uid}.vcf" response = client.put( contact_url, content=vcard.serialize(), headers={"Content-Type": "text/vcard; charset=utf-8"}, ) if response.status_code not in [200, 201, 204]: raise Exception(f"Failed to create contact: {response.status_code}") return Contact( id=contact_url, addressbook_id=addressbook_id, first_name=first_name, last_name=last_name, display_name=fn, emails=[EmailField(**e) for e in (emails or [])], phones=[PhoneField(**p) for p in (phones or [])], addresses=[AddressField(**a) for a in (addresses or [])], organization=organization, title=title, notes=notes, ) def update_contact( self, contact_id: str, addressbook_id: Optional[str] = None, first_name: Optional[str] = None, last_name: Optional[str] = None, display_name: Optional[str] = None, emails: Optional[list[dict]] = None, phones: Optional[list[dict]] = None, addresses: Optional[list[dict]] = None, organization: Optional[str] = None, title: Optional[str] = None, notes: Optional[str] = None, ) -> Optional[Contact]: # Get existing contact existing = self.get_contact(contact_id, addressbook_id) if not existing: return None # Merge with updates updated_data = { "first_name": first_name if first_name is not None else existing.first_name, "last_name": last_name if last_name is not None else existing.last_name, "display_name": display_name if display_name is not None else existing.display_name, "emails": emails if emails is not None else [e.model_dump() for e in existing.emails], "phones": phones if phones is not None else [p.model_dump() for p in existing.phones], "addresses": addresses if addresses is not None else [a.model_dump() for a in existing.addresses], "organization": organization if organization is not None else existing.organization, "title": title if title is not None else existing.title, "notes": notes if notes is not None else existing.notes, } # Delete and recreate (simpler than partial update) self.delete_contact(contact_id, addressbook_id) return self.create_contact(addressbook_id, **updated_data) def delete_contact(self, contact_id: str, addressbook_id: Optional[str] = None) -> OperationResult: try: client = self._get_client() # Build URL contact_url = self._build_url(contact_id) response = client.delete(contact_url) if response.status_code in [200, 204]: return OperationResult( success=True, message="Contact deleted successfully", id=contact_id ) elif response.status_code == 404: return OperationResult( success=False, message="Contact not found", id=contact_id ) else: return OperationResult( success=False, message=f"Failed to delete contact: {response.status_code}", ) except Exception as e: return OperationResult(success=False, message=str(e)) def _parse_vcard( self, vcard_data: str, addressbook_id: Optional[str], href: str ) -> Optional[Contact]: try: vcard = vobject.readOne(vcard_data) except Exception: return None # Get UID uid = href if hasattr(vcard, "uid"): uid = vcard.uid.value # Get name components first_name = None last_name = None if hasattr(vcard, "n"): first_name = vcard.n.value.given or None last_name = vcard.n.value.family or None # Get display name display_name = None if hasattr(vcard, "fn"): display_name = vcard.fn.value # Get emails emails = [] if hasattr(vcard, "email_list"): for email in vcard.email_list: email_type = "home" if hasattr(email, "type_param"): email_type = str(email.type_param).lower() emails.append( EmailField(type=email_type, email=email.value, primary=len(emails) == 0) ) # Get phones phones = [] if hasattr(vcard, "tel_list"): for tel in vcard.tel_list: phone_type = "mobile" if hasattr(tel, "type_param"): phone_type = str(tel.type_param).lower() phones.append( PhoneField(type=phone_type, number=tel.value, primary=len(phones) == 0) ) # Get addresses addresses = [] if hasattr(vcard, "adr_list"): for adr in vcard.adr_list: addr_type = "home" if hasattr(adr, "type_param"): addr_type = str(adr.type_param).lower() addresses.append( AddressField( type=addr_type, street=adr.value.street or None, city=adr.value.city or None, state=adr.value.region or None, postal_code=adr.value.code or None, country=adr.value.country or None, ) ) # Get organization organization = None if hasattr(vcard, "org"): org_value = vcard.org.value if isinstance(org_value, list) and len(org_value) > 0: organization = org_value[0] else: organization = str(org_value) # Get title title = None if hasattr(vcard, "title"): title = vcard.title.value # Get notes notes = None if hasattr(vcard, "note"): notes = vcard.note.value # Get birthday birthday = None if hasattr(vcard, "bday"): try: from datetime import date bday_value = vcard.bday.value if isinstance(bday_value, str): birthday = date.fromisoformat(bday_value) else: birthday = bday_value except Exception: pass resolved_addressbook_id = addressbook_id or self._derive_addressbook_id(href) return Contact( id=href, addressbook_id=resolved_addressbook_id, first_name=first_name, last_name=last_name, display_name=display_name, emails=emails, phones=phones, addresses=addresses, organization=organization, title=title, notes=notes, birthday=birthday, ) def _derive_addressbook_id(self, contact_href: str) -> str: if "/" not in contact_href: return contact_href base = contact_href.rsplit("/", 1)[0] return f"{base}/" def _resolve_addressbook_id(self, addressbook_id: Optional[str]) -> str: if addressbook_id: return addressbook_id if self.settings.contacts_addressbook_url: return self.settings.contacts_addressbook_url raise ValueError("CONTACTS_ADDRESSBOOK_URL must be set to use contacts tools")