Initial commit
All checks were successful
Build And Test / publish (push) Successful in 1m30s

This commit is contained in:
2025-12-30 15:16:45 -08:00
parent 4df9a7229e
commit 4f6098e8c2
28 changed files with 3080 additions and 0 deletions

View File

@@ -0,0 +1,477 @@
from typing import Optional
import uuid
import httpx
import vobject
from models.contacts_models import (
AddressBook,
Contact,
ContactList,
EmailField,
PhoneField,
AddressField,
)
from models.common import OperationResult
from config import Settings
PROPFIND_ADDRESSBOOKS = """<?xml version="1.0" encoding="utf-8"?>
<d:propfind xmlns:d="DAV:" xmlns:card="urn:ietf:params:xml:ns:carddav">
<d:prop>
<d:displayname/>
<d:resourcetype/>
<card:addressbook-description/>
</d:prop>
</d:propfind>"""
REPORT_CONTACTS = """<?xml version="1.0" encoding="utf-8"?>
<card:addressbook-query xmlns:d="DAV:" xmlns:card="urn:ietf:params:xml:ns:carddav">
<d:prop>
<d:getetag/>
<card:address-data/>
</d:prop>
</card:addressbook-query>"""
class ContactsService:
def __init__(self, settings: Settings):
self.settings = settings
self._client: Optional[httpx.Client] = None
def _get_client(self) -> httpx.Client:
if self._client is None:
self._client = httpx.Client(
auth=(
self.settings.carddav_username,
self.settings.carddav_password.get_secret_value(),
),
headers={"Content-Type": "application/xml; charset=utf-8"},
timeout=30.0,
)
return self._client
def list_addressbooks(self) -> list[AddressBook]:
client = self._get_client()
response = client.request(
"PROPFIND",
self.settings.carddav_url,
headers={"Depth": "1"},
content=PROPFIND_ADDRESSBOOKS,
)
if response.status_code not in [200, 207]:
raise Exception(f"Failed to list addressbooks: {response.status_code}")
# Parse XML response
addressbooks = []
from xml.etree import ElementTree as ET
root = ET.fromstring(response.text)
ns = {
"d": "DAV:",
"card": "urn:ietf:params:xml:ns:carddav",
}
for response_elem in root.findall(".//d:response", ns):
href = response_elem.find("d:href", ns)
if href is None:
continue
resourcetype = response_elem.find(".//d:resourcetype", ns)
is_addressbook = (
resourcetype is not None
and resourcetype.find("card:addressbook", ns) is not None
)
if not is_addressbook:
continue
displayname = response_elem.find(".//d:displayname", ns)
description = response_elem.find(".//card:addressbook-description", ns)
addressbooks.append(
AddressBook(
id=href.text,
name=displayname.text if displayname is not None and displayname.text else "Unnamed",
description=description.text if description is not None else None,
contact_count=0,
)
)
return addressbooks
def list_contacts(
self,
addressbook_id: str,
search: Optional[str] = None,
limit: int = 100,
offset: int = 0,
) -> ContactList:
client = self._get_client()
# Build URL
base_url = self.settings.carddav_url.rstrip("/")
addressbook_url = f"{base_url}{addressbook_id}" if addressbook_id.startswith("/") else addressbook_id
response = client.request(
"REPORT",
addressbook_url,
headers={"Depth": "1"},
content=REPORT_CONTACTS,
)
if response.status_code not in [200, 207]:
raise Exception(f"Failed to list contacts: {response.status_code}")
# Parse XML response
contacts = []
from xml.etree import ElementTree as ET
root = ET.fromstring(response.text)
ns = {
"d": "DAV:",
"card": "urn:ietf:params:xml:ns:carddav",
}
for response_elem in root.findall(".//d:response", ns):
href = response_elem.find("d:href", ns)
address_data = response_elem.find(".//card:address-data", ns)
if href is None or address_data is None or address_data.text is None:
continue
try:
contact = self._parse_vcard(address_data.text, addressbook_id, href.text)
if contact:
# Apply search filter
if search:
search_lower = search.lower()
match = False
if contact.display_name and search_lower in contact.display_name.lower():
match = True
elif contact.first_name and search_lower in contact.first_name.lower():
match = True
elif contact.last_name and search_lower in contact.last_name.lower():
match = True
elif any(search_lower in e.email.lower() for e in contact.emails):
match = True
if not match:
continue
contacts.append(contact)
except Exception as e:
print(f"Error parsing contact: {e}")
continue
# Sort by display name
contacts.sort(key=lambda c: c.display_name or c.first_name or c.last_name or "")
total = len(contacts)
contacts = contacts[offset : offset + limit]
return ContactList(
contacts=contacts,
addressbook_id=addressbook_id,
total=total,
limit=limit,
offset=offset,
)
def get_contact(self, addressbook_id: str, contact_id: str) -> Optional[Contact]:
client = self._get_client()
# Build URL
base_url = self.settings.carddav_url.rstrip("/")
contact_url = f"{base_url}{contact_id}" if contact_id.startswith("/") else contact_id
response = client.get(contact_url)
if response.status_code == 404:
return None
if response.status_code != 200:
raise Exception(f"Failed to get contact: {response.status_code}")
return self._parse_vcard(response.text, addressbook_id, contact_id)
def create_contact(
self,
addressbook_id: str,
first_name: Optional[str] = None,
last_name: Optional[str] = None,
display_name: Optional[str] = None,
emails: Optional[list[dict]] = None,
phones: Optional[list[dict]] = None,
addresses: Optional[list[dict]] = None,
organization: Optional[str] = None,
title: Optional[str] = None,
notes: Optional[str] = None,
birthday: Optional[str] = None,
) -> Contact:
client = self._get_client()
# Create vCard
vcard = vobject.vCard()
# Generate UID
uid = str(uuid.uuid4())
vcard.add("uid").value = uid
# Name
n = vcard.add("n")
n.value = vobject.vcard.Name(
family=last_name or "",
given=first_name or "",
)
# Full name
fn = display_name or " ".join(filter(None, [first_name, last_name])) or "Unnamed"
vcard.add("fn").value = fn
# Organization
if organization:
org = vcard.add("org")
org.value = [organization]
# Title
if title:
vcard.add("title").value = title
# Notes
if notes:
vcard.add("note").value = notes
# Birthday
if birthday:
vcard.add("bday").value = birthday
# Emails
if emails:
for email_data in emails:
email = vcard.add("email")
email.value = email_data.get("email", "")
email.type_param = email_data.get("type", "home").upper()
# Phones
if phones:
for phone_data in phones:
tel = vcard.add("tel")
tel.value = phone_data.get("number", "")
tel.type_param = phone_data.get("type", "cell").upper()
# Addresses
if addresses:
for addr_data in addresses:
adr = vcard.add("adr")
adr.value = vobject.vcard.Address(
street=addr_data.get("street", ""),
city=addr_data.get("city", ""),
region=addr_data.get("state", ""),
code=addr_data.get("postal_code", ""),
country=addr_data.get("country", ""),
)
adr.type_param = addr_data.get("type", "home").upper()
# Build URL and save
base_url = self.settings.carddav_url.rstrip("/")
addressbook_url = f"{base_url}{addressbook_id}" if addressbook_id.startswith("/") else addressbook_id
contact_url = f"{addressbook_url.rstrip('/')}/{uid}.vcf"
response = client.put(
contact_url,
content=vcard.serialize(),
headers={"Content-Type": "text/vcard; charset=utf-8"},
)
if response.status_code not in [200, 201, 204]:
raise Exception(f"Failed to create contact: {response.status_code}")
return Contact(
id=contact_url,
addressbook_id=addressbook_id,
first_name=first_name,
last_name=last_name,
display_name=fn,
emails=[EmailField(**e) for e in (emails or [])],
phones=[PhoneField(**p) for p in (phones or [])],
addresses=[AddressField(**a) for a in (addresses or [])],
organization=organization,
title=title,
notes=notes,
)
def update_contact(
self,
addressbook_id: str,
contact_id: str,
first_name: Optional[str] = None,
last_name: Optional[str] = None,
display_name: Optional[str] = None,
emails: Optional[list[dict]] = None,
phones: Optional[list[dict]] = None,
addresses: Optional[list[dict]] = None,
organization: Optional[str] = None,
title: Optional[str] = None,
notes: Optional[str] = None,
) -> Optional[Contact]:
# Get existing contact
existing = self.get_contact(addressbook_id, contact_id)
if not existing:
return None
# Merge with updates
updated_data = {
"first_name": first_name if first_name is not None else existing.first_name,
"last_name": last_name if last_name is not None else existing.last_name,
"display_name": display_name if display_name is not None else existing.display_name,
"emails": emails if emails is not None else [e.model_dump() for e in existing.emails],
"phones": phones if phones is not None else [p.model_dump() for p in existing.phones],
"addresses": addresses if addresses is not None else [a.model_dump() for a in existing.addresses],
"organization": organization if organization is not None else existing.organization,
"title": title if title is not None else existing.title,
"notes": notes if notes is not None else existing.notes,
}
# Delete and recreate (simpler than partial update)
self.delete_contact(addressbook_id, contact_id)
return self.create_contact(addressbook_id, **updated_data)
def delete_contact(self, addressbook_id: str, contact_id: str) -> OperationResult:
try:
client = self._get_client()
# Build URL
base_url = self.settings.carddav_url.rstrip("/")
contact_url = f"{base_url}{contact_id}" if contact_id.startswith("/") else contact_id
response = client.delete(contact_url)
if response.status_code in [200, 204]:
return OperationResult(
success=True, message="Contact deleted successfully", id=contact_id
)
elif response.status_code == 404:
return OperationResult(
success=False, message="Contact not found", id=contact_id
)
else:
return OperationResult(
success=False,
message=f"Failed to delete contact: {response.status_code}",
)
except Exception as e:
return OperationResult(success=False, message=str(e))
def _parse_vcard(
self, vcard_data: str, addressbook_id: str, href: str
) -> Optional[Contact]:
try:
vcard = vobject.readOne(vcard_data)
except Exception:
return None
# Get UID
uid = href
if hasattr(vcard, "uid"):
uid = vcard.uid.value
# Get name components
first_name = None
last_name = None
if hasattr(vcard, "n"):
first_name = vcard.n.value.given or None
last_name = vcard.n.value.family or None
# Get display name
display_name = None
if hasattr(vcard, "fn"):
display_name = vcard.fn.value
# Get emails
emails = []
if hasattr(vcard, "email_list"):
for email in vcard.email_list:
email_type = "home"
if hasattr(email, "type_param"):
email_type = str(email.type_param).lower()
emails.append(
EmailField(type=email_type, email=email.value, primary=len(emails) == 0)
)
# Get phones
phones = []
if hasattr(vcard, "tel_list"):
for tel in vcard.tel_list:
phone_type = "mobile"
if hasattr(tel, "type_param"):
phone_type = str(tel.type_param).lower()
phones.append(
PhoneField(type=phone_type, number=tel.value, primary=len(phones) == 0)
)
# Get addresses
addresses = []
if hasattr(vcard, "adr_list"):
for adr in vcard.adr_list:
addr_type = "home"
if hasattr(adr, "type_param"):
addr_type = str(adr.type_param).lower()
addresses.append(
AddressField(
type=addr_type,
street=adr.value.street or None,
city=adr.value.city or None,
state=adr.value.region or None,
postal_code=adr.value.code or None,
country=adr.value.country or None,
)
)
# Get organization
organization = None
if hasattr(vcard, "org"):
org_value = vcard.org.value
if isinstance(org_value, list) and len(org_value) > 0:
organization = org_value[0]
else:
organization = str(org_value)
# Get title
title = None
if hasattr(vcard, "title"):
title = vcard.title.value
# Get notes
notes = None
if hasattr(vcard, "note"):
notes = vcard.note.value
# Get birthday
birthday = None
if hasattr(vcard, "bday"):
try:
from datetime import date
bday_value = vcard.bday.value
if isinstance(bday_value, str):
birthday = date.fromisoformat(bday_value)
else:
birthday = bday_value
except Exception:
pass
return Contact(
id=href,
addressbook_id=addressbook_id,
first_name=first_name,
last_name=last_name,
display_name=display_name,
emails=emails,
phones=phones,
addresses=addresses,
organization=organization,
title=title,
notes=notes,
birthday=birthday,
)