from collections.abc import Mapping
import datetime
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
import os.path
import socketserver
from threading import Thread
from time import sleep
from urllib.parse import urlparse, parse_qs
import multiprocessing
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from config import GOOGLE_CLIENT_SECRET_FILE
# If modifying these scopes, delete the file token.json.
SCOPES = ["https://www.googleapis.com/auth/calendar"]
class GoogleCalendar:
class TCPServer(socketserver.ForkingTCPServer):
flow_callback = None
class HTTPHandler(BaseHTTPRequestHandler):
def do_GET(self):
url_parts = urlparse(self.path)
query_params = parse_qs(url_parts.query)
if "code" not in query_params:
self.send_response(500)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write("
Missing OAuth Code
".encode("utf-8"))
return
if "state" not in query_params:
self.send_response(500)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write("Missing OAuth State
".encode("utf-8"))
return
code = query_params["code"][0]
state = query_params["state"][0]
result = self.server.flow_callback(code=code, state=state)
if result is None:
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write("OAuth Success
".encode("utf-8"))
return
self.send_response(500)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(f"{result}
".encode("utf-8"))
return
def __init__(
self,
http_port: int,
client_secret_file: str,
redirect_uri: str,
flow_callback: callable,
calendar_summary: str,
token_refresh_callback: callable = None,
) -> None:
self.flow = InstalledAppFlow.from_client_secrets_file(
client_secret_file, SCOPES, redirect_uri=redirect_uri
)
self.httpd = ThreadingHTTPServer(("", http_port), self.HTTPHandler)
self.httpd.flow_callback = self.finish_flow
manager = multiprocessing.Manager()
self.state_callback_kwargs = manager.dict()
self.flow_callback = flow_callback
self.calendar_summary = calendar_summary
self.token_refresh_callback = token_refresh_callback
def finish_flow(self, state: str, code: str):
if (kwargs := self.state_callback_kwargs.get(state, None)) is None:
return "Invalid OAuth State"
token = self.flow.fetch_token(code=code)
self.flow_callback(token, **kwargs)
def flow_url(self, callback_kwargs={}):
url, state = self.flow.authorization_url(prompt="consent")
self.state_callback_kwargs[state] = callback_kwargs
return url
def service(self, token: Mapping[str, str]):
# Convert the token dict into a Credentials object
creds = Credentials(
token=token.get("access_token"),
refresh_token=token.get("refresh_token"),
token_uri=self.flow.client_config["token_uri"],
client_id=self.flow.client_config["client_id"],
client_secret=self.flow.client_config["client_secret"],
scopes=SCOPES,
)
if not creds or not creds.valid:
return None
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
self.token_refresh_callback(
token,
{
"access_token": creds.token,
"expires_in": creds.expiry,
"refresh_token": creds.refresh_token,
"scope": creds.scopes,
"token_type": creds.token,
"expires_at": creds.expiry,
},
)
service = build("calendar", "v3", credentials=creds)
# Return the credentials object
return service
def get_calendar_id(self, token: Mapping[str, str]):
# Create calendar with id
service = self.service(token)
if not service:
return None
calendars = service.calendarList().list().execute().get("items")
for calendar in calendars:
if calendar["summary"] == self.calendar_summary:
return calendar["id"]
calendar = (
service.calendars()
.insert(
body={
"summary": self.calendar_summary,
"timeZone": "Europe/Amsterdam",
}
)
.execute()
)
return calendar["id"]
def get_first_event(self, token: Mapping[str, str], start: datetime.datetime):
service = self.service(token)
if not service:
return None
calendar_id = self.get_calendar_id(token)
print(start.isoformat() + "Z")
try:
events_result = (
service.events()
.list(
calendarId=calendar_id,
timeMin=start.isoformat(),
maxResults=1,
singleEvents=True,
orderBy="startTime",
)
.execute()
)
except HttpError as e:
if e.resp.status == 404:
return []
print(e)
items = events_result.get("items", [])
if items:
return items[0]
return []
def create_event(self, token: Mapping[str, str], event: dict):
service = self.service(token)
if not service:
return None
calendar_id = self.get_calendar_id(token)
event = service.events().insert(calendarId=calendar_id, body=event).execute()
return event
def delete_event(self, token: Mapping[str, str], event_id: str):
service = self.service(token)
if not service:
return None
calendar_id = self.get_calendar_id(token)
service.events().delete(calendarId=calendar_id, eventId=event_id).execute()
def httpd_listen(self):
# Start httpd as a daemon thread
thread = Thread(target=self.httpd.serve_forever, args=())
thread.daemon = True
thread.start()