from collections.abc import Mapping import datetime from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler import os.path import socketserver from threading import Thread from time import sleep from urllib.parse import urlparse, parse_qs import multiprocessing from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient.errors import HttpError from config import GOOGLE_CLIENT_SECRET_FILE # If modifying these scopes, delete the file token.json. SCOPES = ["https://www.googleapis.com/auth/calendar"] class GoogleCalendar: class TCPServer(socketserver.ForkingTCPServer): flow_callback = None class HTTPHandler(BaseHTTPRequestHandler): def do_GET(self): url_parts = urlparse(self.path) query_params = parse_qs(url_parts.query) if "code" not in query_params: self.send_response(500) self.send_header("Content-type", "text/html") self.end_headers() self.wfile.write("

Missing OAuth Code

".encode("utf-8")) return if "state" not in query_params: self.send_response(500) self.send_header("Content-type", "text/html") self.end_headers() self.wfile.write("

Missing OAuth State

".encode("utf-8")) return code = query_params["code"][0] state = query_params["state"][0] result = self.server.flow_callback(code=code, state=state) if result is None: self.send_response(200) self.send_header("Content-type", "text/html") self.end_headers() self.wfile.write("

OAuth Success

".encode("utf-8")) return self.send_response(500) self.send_header("Content-type", "text/html") self.end_headers() self.wfile.write(f"

{result}

".encode("utf-8")) return def __init__( self, http_port: int, client_secret_file: str, redirect_uri: str, flow_callback: callable, calendar_summary: str, token_refresh_callback: callable = None, ) -> None: self.flow = InstalledAppFlow.from_client_secrets_file( client_secret_file, SCOPES, redirect_uri=redirect_uri ) self.httpd = ThreadingHTTPServer(("", http_port), self.HTTPHandler) self.httpd.flow_callback = self.finish_flow manager = multiprocessing.Manager() self.state_callback_kwargs = manager.dict() self.flow_callback = flow_callback self.calendar_summary = calendar_summary self.token_refresh_callback = token_refresh_callback def finish_flow(self, state: str, code: str): if (kwargs := self.state_callback_kwargs.get(state, None)) is None: return "Invalid OAuth State" token = self.flow.fetch_token(code=code) self.flow_callback(token, **kwargs) def flow_url(self, callback_kwargs={}): url, state = self.flow.authorization_url(prompt="consent") self.state_callback_kwargs[state] = callback_kwargs return url def service(self, token: Mapping[str, str]): # Convert the token dict into a Credentials object creds = Credentials( token=token.get("access_token"), refresh_token=token.get("refresh_token"), token_uri=self.flow.client_config["token_uri"], client_id=self.flow.client_config["client_id"], client_secret=self.flow.client_config["client_secret"], scopes=SCOPES, ) if not creds or not creds.valid: return None if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) self.token_refresh_callback( token, { "access_token": creds.token, "expires_in": creds.expiry, "refresh_token": creds.refresh_token, "scope": creds.scopes, "token_type": creds.token, "expires_at": creds.expiry, }, ) service = build("calendar", "v3", credentials=creds) # Return the credentials object return service def get_calendar_id(self, token: Mapping[str, str]): # Create calendar with id service = self.service(token) if not service: return None calendars = service.calendarList().list().execute().get("items") for calendar in calendars: if calendar["summary"] == self.calendar_summary: return calendar["id"] calendar = ( service.calendars() .insert( body={ "summary": self.calendar_summary, "timeZone": "Europe/Amsterdam", } ) .execute() ) return calendar["id"] def get_first_event(self, token: Mapping[str, str], start: datetime.datetime): service = self.service(token) if not service: return None calendar_id = self.get_calendar_id(token) print(start.isoformat() + "Z") try: events_result = ( service.events() .list( calendarId=calendar_id, timeMin=start.isoformat(), maxResults=1, singleEvents=True, orderBy="startTime", ) .execute() ) except HttpError as e: if e.resp.status == 404: return [] print(e) items = events_result.get("items", []) if items: return items[0] return [] def create_event(self, token: Mapping[str, str], event: dict): service = self.service(token) if not service: return None calendar_id = self.get_calendar_id(token) event = service.events().insert(calendarId=calendar_id, body=event).execute() return event def delete_event(self, token: Mapping[str, str], event_id: str): service = self.service(token) if not service: return None calendar_id = self.get_calendar_id(token) service.events().delete(calendarId=calendar_id, eventId=event_id).execute() def httpd_listen(self): # Start httpd as a daemon thread thread = Thread(target=self.httpd.serve_forever, args=()) thread.daemon = True thread.start()