From 468bf36f52e5c8b5d01afb19463b11729a0bac28 Mon Sep 17 00:00:00 2001 From: Yigit Colakoglu Date: Tue, 10 Sep 2024 13:08:06 +0200 Subject: [PATCH] Initial commit --- .gitignore | 9 ++ main.py | 7 ++ models.py | 61 +++++++++++ tgbot.py | 166 ++++++++++++++++++++++++++++ tgbot_multi.py | 236 +++++++++++++++++++++++++++++++++++++++ xclient.py | 292 +++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 771 insertions(+) create mode 100644 .gitignore create mode 100644 main.py create mode 100644 models.py create mode 100644 tgbot.py create mode 100644 tgbot_multi.py create mode 100644 xclient.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..db707bc --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +__pycache__ +.venv +.venv_wsl +.wsl +chromedriver.exe +chromedriver + +config.py +default_config.py \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..8938f87 --- /dev/null +++ b/main.py @@ -0,0 +1,7 @@ +from xclient import XClient +from tgbot import TelegramBot + +if __name__ == "__main__": + client = XClient("ycolakoglu", "mTJNhBZI8PBdIx0e0Lak") + telegram_bot = TelegramBot("7312187888:AAG9w2UhU8zu9CZTxRnh72ltSqa59friRs0") + telegram_bot.run() \ No newline at end of file diff --git a/models.py b/models.py new file mode 100644 index 0000000..6e46cec --- /dev/null +++ b/models.py @@ -0,0 +1,61 @@ +from datetime import datetime, timedelta, timezone + +class Booking: + def __init__( + self, + bookingId, + startDate, + endDate, + bookableProductId, + linkedProductId, + isAvailable=False, + **kwargs, + ): + self.booking_id = bookingId + self.start = datetime.strptime(startDate, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc) + self.end = datetime.strptime(endDate, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc) + self.bookable_product_id = bookableProductId + self.linked_product_id = linkedProductId + self.available = isAvailable + + self.booking_dict = kwargs + + + @property + def start_stamp(self): + ## Get string representation of the timestamp in GMT +2 with only Day of week and month, hour and minute + gmt_plus_2 = timezone(timedelta(hours=2)) + return self.start.astimezone(gmt_plus_2).strftime("%A %d %B %H:%M") + + @property + def end_stamp(self): + gmt_plus_2 = timezone(timedelta(hours=2)) + return self.start.astimezone(gmt_plus_2).strftime("%A %d %B %H:%M") + + def __str__(self): + return ( + f"Booking(booking_id={self.booking_id}, start={self.start}, end={self.end})" + ) + + def __eq__(self, other): + if isinstance(other, Booking): + return ( + self.booking_id == other.booking_id + and self.start == other.start + and self.end == other.end + ) + return False + + def __repr__(self) -> str: + return self.__str__() + + def __hash__(self): + return hash((self.booking_id, self.start, self.end)) + + def time_left_to_booking(self, current_date): + current_datetime = datetime.strptime(current_date, "%Y-%m-%dT%H:%M:%S.%fZ") + time_left = self.start - current_datetime + if time_left.total_seconds() > 0: + return time_left + else: + return timedelta(0) # No time left if the booking has already started \ No newline at end of file diff --git a/tgbot.py b/tgbot.py new file mode 100644 index 0000000..be5ac52 --- /dev/null +++ b/tgbot.py @@ -0,0 +1,166 @@ +import telebot +from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton +from datetime import datetime, timedelta, timezone +import time +from threading import Thread + +class TelegramBot: + def __init__(self, bot_token, xclient): + self.bot = telebot.TeleBot(bot_token) + self.xclient = xclient + self.user_selected_slot = {} # Store user selected slot by chat ID + self.user_bookings = {} # Store user bookings by chat ID + self.watchlist = {} # Watchlist to store full slots by chat ID + + # Set up the command handlers + @self.bot.message_handler(commands=['start']) + def send_welcome(message): + self.bot.reply_to(message, "Welcome! Use /make_booking to view and book available slots.") + + @self.bot.message_handler(commands=['make_booking']) + def make_booking(message): + start = datetime.now() + end = start + timedelta(days=1) + slots = self.xclient.list_slots(start, end) + + if slots: + markup = InlineKeyboardMarkup() + for i, slot in enumerate(slots): + # Create a button for each slot + status = "Available" if slot.available else "FULL" + button_text = f"Slot {i + 1}: {slot.start_stamp} ({status})" + markup.add(InlineKeyboardButton(button_text, callback_data=f"book_{i}")) + + self.bot.reply_to(message, "Select a slot to book or watch:", reply_markup=markup) + self.user_selected_slot[message.chat.id] = slots + else: + self.bot.reply_to(message, "No slots found.") + + @self.bot.callback_query_handler(func=lambda call: call.data.startswith("book_")) + def callback_booking(call): + try: + # Extract slot index from callback data + slot_index = int(call.data.split('_')[1]) + slots = self.user_selected_slot.get(call.message.chat.id, []) + + if 0 <= slot_index < len(slots): + selected_slot = slots[slot_index] + + if selected_slot.available: + # Attempt to book the available slot + try: + self.xclient.make_booking(selected_slot) + self.bot.answer_callback_query(call.id, "Slot booked successfully!") + self.bot.send_message(call.message.chat.id, f"Booking confirmed for: {selected_slot.start_stamp}") + except Exception as e: + self.bot.answer_callback_query(call.id, "Failed to book slot.") + self.bot.send_message(call.message.chat.id, f"Error: {str(e)}") + else: + # Slot is full, add to watchlist + self.add_to_watchlist(call.message.chat.id, selected_slot) + self.bot.answer_callback_query(call.id, "Slot is full. Added to watchlist.") + self.bot.send_message(call.message.chat.id, f"Slot {selected_slot.start_stamp} is full. You will be notified when it becomes available.") + else: + self.bot.answer_callback_query(call.id, "Invalid slot selection.") + except (IndexError, ValueError): + self.bot.answer_callback_query(call.id, "Invalid data.") + + @self.bot.message_handler(commands=['cancel_booking']) + def cancel_booking(message): + # Fetch user's current bookings + bookings = self.xclient.my_bookings(datetime.now(), datetime.now() + timedelta(days=31)) + + if bookings: + markup = InlineKeyboardMarkup() + for i, booking in enumerate(bookings): + # Create a button for each booking + button_text = f"Booking {i + 1}: {booking.start_stamp}" + markup.add(InlineKeyboardButton(button_text, callback_data=f"cancel_{i}")) + + self.user_bookings[message.chat.id] = bookings + self.bot.reply_to(message, "Select a booking to cancel:", reply_markup=markup) + else: + self.bot.reply_to(message, "You have no bookings to cancel.") + + @self.bot.callback_query_handler(func=lambda call: call.data.startswith("cancel_")) + def callback_cancel_booking(call): + try: + # Extract booking index from callback data + booking_index = int(call.data.split('_')[1]) + bookings = self.user_bookings.get(call.message.chat.id, []) + + if 0 <= booking_index < len(bookings): + selected_booking = bookings[booking_index] + # Attempt to cancel the selected booking + try: + if self.xclient.cancel_booking(selected_booking): + self.bot.answer_callback_query(call.id, "Booking canceled successfully!") + self.bot.send_message(call.message.chat.id, f"Booking for {selected_booking.start_stamp} has been canceled.") + else: + self.bot.answer_callback_query(call.id, "Failed to cancel booking.") + except Exception as e: + self.bot.answer_callback_query(call.id, "Failed to cancel booking.") + self.bot.send_message(call.message.chat.id, f"Error: {str(e)}") + else: + self.bot.answer_callback_query(call.id, "Invalid booking selection.") + except (IndexError, ValueError): + self.bot.answer_callback_query(call.id, "Invalid data.") + + def add_to_watchlist(self, chat_id, slot): + if chat_id not in self.watchlist: + self.watchlist[chat_id] = [] + self.watchlist[chat_id].append(slot) + + def check_watchlist(self): + now = datetime.now().replace(tzinfo=timezone.utc) + for chat_id, slots in list(self.watchlist.items()): + available_slots = [] + for slot in slots: + if slot.start < now: + # If the slot has expired + self.bot.send_message(chat_id, f"Slot {slot.start_stamp} has expired and is no longer available.") + available_slots.append(slot) # Mark it to remove from watchlist + elif self.xclient.check_booking_availability(slot): + # If the slot becomes available + try: + self.xclient.make_booking(slot) + self.bot.send_message(chat_id, f"Slot {slot.start_stamp} is now available and has been booked for you.") + available_slots.append(slot) # Mark it to remove from watchlist + except Exception as e: + self.bot.send_message(chat_id, f"Error booking slot {slot.start_stamp}: {str(e)}") + + # Remove the expired or booked slots from the watchlist + self.watchlist[chat_id] = [slot for slot in slots if slot not in available_slots] + if not self.watchlist[chat_id]: + del self.watchlist[chat_id] # Remove chat_id from watchlist if empty + + def calculate_polling_interval(self, slot_time): + now = datetime.now().replace(tzinfo=timezone.utc) + time_until_slot = (slot_time - now).total_seconds() + + if time_until_slot < 3600: # If less than 1 hour + return 300 # 5 minutes + else: + return 1800 # 30 minutes + + def poll_periodically(self): + # Continuously poll the watchlist at dynamically adjusted intervals + while True: + self.check_watchlist() + + # Calculate the minimum polling interval based on upcoming slots + polling_interval = 1800 # Default 30 minutes + for chat_id, slots in self.watchlist.items(): + for slot in slots: + interval = self.calculate_polling_interval(slot.start) + polling_interval = min(polling_interval, interval) + + time.sleep(polling_interval) # Wait before checking the watchlist again + + + def run(self): + ### start poll_periodically on seperate thread + thread = Thread(target = self.bot.polling, args = ()) + thread.start() + self.poll_periodically() + thread.kill() \ No newline at end of file diff --git a/tgbot_multi.py b/tgbot_multi.py new file mode 100644 index 0000000..d248f01 --- /dev/null +++ b/tgbot_multi.py @@ -0,0 +1,236 @@ +import telebot +from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton +from datetime import datetime, timedelta, timezone +import time +from threading import Thread + +class TelegramBot: + def __init__(self, bot_token, db_path='bot_database.db'): + self.bot = telebot.TeleBot(bot_token) + self.db_path = db_path + self.user_selected_slot = {} # Store user selected slot by chat ID + self.user_bookings = {} # Store user bookings by chat ID + self.watchlist = {} # Watchlist to store full slots by chat ID + + self.init_db() + + # Set up the command handlers + @self.bot.message_handler(commands=['start']) + def send_welcome(message): + self.handle_login(message) + + @self.bot.message_handler(commands=['make_booking']) + def make_booking(message): + self.handle_booking(message) + + @self.bot.callback_query_handler(func=lambda call: call.data.startswith("book_")) + def callback_booking(call): + self.handle_callback_booking(call) + + @self.bot.message_handler(commands=['cancel_booking']) + def cancel_booking(message): + self.handle_cancel_booking(message) + + @self.bot.callback_query_handler(func=lambda call: call.data.startswith("cancel_")) + def callback_cancel_booking(call): + self.handle_callback_cancel_booking(call) + + def init_db(self): + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + # Create tables + cursor.execute(''' + CREATE TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + chat_id INTEGER UNIQUE, + username TEXT, + password TEXT + ) + ''') + cursor.execute(''' + CREATE TABLE IF NOT EXISTS chats ( + chat_id INTEGER PRIMARY KEY, + username TEXT + ) + ''') + conn.commit() + + def handle_login(self, message): + chat_id = message.chat.id + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute('SELECT username FROM chats WHERE chat_id = ?', (chat_id,)) + result = cursor.fetchone() + + if result: + self.bot.reply_to(message, "You are already logged in.") + else: + self.bot.reply_to(message, "Please provide your username and password in the format:\n`/login username password`") + self.bot.register_next_step_handler(message, self.process_login) + + def process_login(self, message): + try: + command, username, password = message.text.split() + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute('SELECT * FROM users WHERE username = ? AND password = ?', (username, password)) + user = cursor.fetchone() + + if user: + cursor.execute('INSERT OR IGNORE INTO chats (chat_id, username) VALUES (?, ?)', (message.chat.id, username)) + conn.commit() + self.bot.reply_to(message, "Login successful!") + else: + self.bot.reply_to(message, "Invalid credentials. Please try again.") + self.handle_login(message) + except ValueError: + self.bot.reply_to(message, "Invalid format. Please use `/login username password`.") + + + def send_welcome(self,message): + self.bot.reply_to(message, "Welcome! Use /make_booking to view and book available slots.") + + def handle_booking(self,message): + start = datetime.now() + end = start + timedelta(days=1) + slots = self.xclient.list_slots(start, end) + + if slots: + markup = InlineKeyboardMarkup() + for i, slot in enumerate(slots): + # Create a button for each slot + status = "Available" if slot.available else "FULL" + button_text = f"Slot {i + 1}: {slot.start_stamp} ({status})" + markup.add(InlineKeyboardButton(button_text, callback_data=f"book_{i}")) + + self.bot.reply_to(message, "Select a slot to book or watch:", reply_markup=markup) + self.user_selected_slot[message.chat.id] = slots + else: + self.bot.reply_to(message, "No slots found.") + + def handle_callback_booking(self, call): + try: + # Extract slot index from callback data + slot_index = int(call.data.split('_')[1]) + slots = self.user_selected_slot.get(call.message.chat.id, []) + + if 0 <= slot_index < len(slots): + selected_slot = slots[slot_index] + + if selected_slot.available: + # Attempt to book the available slot + try: + self.xclient.make_booking(selected_slot) + self.bot.answer_callback_query(call.id, "Slot booked successfully!") + self.bot.send_message(call.message.chat.id, f"Booking confirmed for: {selected_slot.start_stamp}") + except Exception as e: + self.bot.answer_callback_query(call.id, "Failed to book slot.") + self.bot.send_message(call.message.chat.id, f"Error: {str(e)}") + else: + # Slot is full, add to watchlist + self.add_to_watchlist(call.message.chat.id, selected_slot) + self.bot.answer_callback_query(call.id, "Slot is full. Added to watchlist.") + self.bot.send_message(call.message.chat.id, f"Slot {selected_slot.start_stamp} is full. You will be notified when it becomes available.") + else: + self.bot.answer_callback_query(call.id, "Invalid slot selection.") + except (IndexError, ValueError): + self.bot.answer_callback_query(call.id, "Invalid data.") + + def handle_cancel_booking(self, message): + # Fetch user's current bookings + bookings = self.xclient.my_bookings(datetime.now(), datetime.now() + timedelta(days=31)) + + if bookings: + markup = InlineKeyboardMarkup() + for i, booking in enumerate(bookings): + # Create a button for each booking + button_text = f"Booking {i + 1}: {booking.start_stamp}" + markup.add(InlineKeyboardButton(button_text, callback_data=f"cancel_{i}")) + + self.user_bookings[message.chat.id] = bookings + self.bot.reply_to(message, "Select a booking to cancel:", reply_markup=markup) + else: + self.bot.reply_to(message, "You have no bookings to cancel.") + + def handle_callback_cancel_booking(self, call): + try: + # Extract booking index from callback data + booking_index = int(call.data.split('_')[1]) + bookings = self.user_bookings.get(call.message.chat.id, []) + + if 0 <= booking_index < len(bookings): + selected_booking = bookings[booking_index] + # Attempt to cancel the selected booking + try: + if self.xclient.cancel_booking(selected_booking): + self.bot.answer_callback_query(call.id, "Booking canceled successfully!") + self.bot.send_message(call.message.chat.id, f"Booking for {selected_booking.start_stamp} has been canceled.") + else: + self.bot.answer_callback_query(call.id, "Failed to cancel booking.") + except Exception as e: + self.bot.answer_callback_query(call.id, "Failed to cancel booking.") + self.bot.send_message(call.message.chat.id, f"Error: {str(e)}") + else: + self.bot.answer_callback_query(call.id, "Invalid booking selection.") + except (IndexError, ValueError): + self.bot.answer_callback_query(call.id, "Invalid data.") + + def add_to_watchlist(self, chat_id, slot): + if chat_id not in self.watchlist: + self.watchlist[chat_id] = [] + self.watchlist[chat_id].append(slot) + + def check_watchlist(self): + now = datetime.now().replace(tzinfo=timezone.utc) + for chat_id, slots in list(self.watchlist.items()): + available_slots = [] + for slot in slots: + if slot.start < now: + # If the slot has expired + self.bot.send_message(chat_id, f"Slot {slot.start_stamp} has expired and is no longer available.") + available_slots.append(slot) # Mark it to remove from watchlist + elif self.xclient.check_booking_availability(slot): + # If the slot becomes available + try: + self.xclient.make_booking(slot) + self.bot.send_message(chat_id, f"Slot {slot.start_stamp} is now available and has been booked for you.") + available_slots.append(slot) # Mark it to remove from watchlist + except Exception as e: + self.bot.send_message(chat_id, f"Error booking slot {slot.start_stamp}: {str(e)}") + + # Remove the expired or booked slots from the watchlist + self.watchlist[chat_id] = [slot for slot in slots if slot not in available_slots] + if not self.watchlist[chat_id]: + del self.watchlist[chat_id] # Remove chat_id from watchlist if empty + + def calculate_polling_interval(self, slot_time): + now = datetime.now().replace(tzinfo=timezone.utc) + time_until_slot = (slot_time - now).total_seconds() + + if time_until_slot < 3600: # If less than 1 hour + return 300 # 5 minutes + else: + return 1800 # 30 minutes + + def poll_periodically(self): + # Continuously poll the watchlist at dynamically adjusted intervals + while True: + self.check_watchlist() + + # Calculate the minimum polling interval based on upcoming slots + polling_interval = 1800 # Default 30 minutes + for chat_id, slots in self.watchlist.items(): + for slot in slots: + interval = self.calculate_polling_interval(slot.start) + polling_interval = min(polling_interval, interval) + + time.sleep(polling_interval) # Wait before checking the watchlist again + + + def run(self): + ### start poll_periodically on seperate thread + thread = Thread(target = self.poll_periodically, args = ()) + thread.daemon = True + thread.start() + self.bot.polling() + thread.kill() \ No newline at end of file diff --git a/xclient.py b/xclient.py new file mode 100644 index 0000000..dfe438d --- /dev/null +++ b/xclient.py @@ -0,0 +1,292 @@ +from selenium import webdriver +from models import Booking +import json +from selenium.webdriver.common.by import By +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +import time +import requests + + +class XClient: + def __init__(self, username, password): + self.username = username + self.password = password + self.access_token = None + self.id = -1 + + self.session = requests.Session() + # Default headers for session that mimicks browser + self.session.headers.update( + { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" + } + ) + + def fetch_access_token(self): + # Set up the WebDriver (make sure to use the correct path for your WebDriver) + options = webdriver.ChromeOptions() + options.add_argument("--headless=new") + driver = webdriver.Chrome(options=options) + + driver.get("https://x.tudelft.nl") + + button = WebDriverWait(driver, 30).until( + EC.element_to_be_clickable( + (By.XPATH, "//span[contains(text(), 'TUDelft')]") + ) + ) + button.click() + + button = WebDriverWait(driver, 30).until( + EC.element_to_be_clickable( + ( + By.XPATH, + "//h3[contains(text(), 'Delft University of Technology')]", + ) + ) + ) + button.click() + + # Input the username + username_input = driver.find_element(By.ID, "username") + username_input.send_keys(self.username) # Replace with your actual username + + password_input = driver.find_element(By.ID, "password") + password_input.send_keys(self.password) + + password_input.submit() + + time.sleep(1) + + delcom_auth = json.loads( + driver.execute_script( + "return window.localStorage.getItem('delcom_auth');" + ) + ) + driver.quit() + + + access_token = delcom_auth["tokenResponse"].get("accessToken", None) + return access_token + + + def list_slots(self, start, end, tagIDs=[28]): + """ + URl: + https://backbone-web-api.production.delft.delcom.nl/bookable-slots + Get parameters: + s: {"startDate":"2024-09-09T14:23:24.934Z","endDate":"2024-09-09T22:00:00.000Z","tagIds":{"$in":[28]},"availableFromDate":{"$gt":"2024-09-09T14:23:24.935Z"},"availableTillDate":{"$gte":"2024-09-09T14:23:24.934Z"}} + join: product + """ + + self.login_if_necessary() + start_stamp = start.strftime("%Y-%m-%dT%H:%M:%S.000Z") + end_stamp = end.strftime("%Y-%m-%dT%H:%M:%S.000Z") + + response = self.session.get( + "https://backbone-web-api.production.delft.delcom.nl/bookable-slots", + params={ + "s": json.dumps( + { + "startDate": start_stamp, + "endDate": end_stamp, + "tagIds": {"$in": tagIDs}, + "availableFromDate": {"$gt": start_stamp}, + "availableTillDate": {"$gte": start_stamp}, + } + ), + "join": ["product", "linkedProduct"], + }, + headers={"Authorization": f"Bearer {self.access_token}"}, + ) + return [Booking(**x) for x in response.json()["data"]] + + def my_bookings(self, start, end): + # Ensure user is logged in + self.login_if_necessary() + + # Replace this with the actual member ID of the logged-in user + member_id = self.user_id # Assuming it's stored when the user logs in + + # Define the URL + url = "https://backbone-web-api.production.delft.delcom.nl/participations" + + # Create the parameters for the request + params = { + "s": json.dumps( + { + "$or": [ + {"memberId": member_id}, # Fetch bookings based on the user ID + ], + "booking.startDate": {"$gte": start.strftime("%Y-%m-%dT%H:%M:%S.%fZ")}, + "booking.endDate": {"$lte": end.strftime("%Y-%m-%dT%H:%M:%S.%fZ")}, + }, + ), + "join": ["booking", "linkedProduct", "product"], + "fetchOptimizedCustomerDashboard": "true", + "fetchTicket": "false", + "sort": "booking.startDate,ASC" + } + + # Make the GET request to the API + response = self.session.get( + url, + params=params, + headers={"Authorization": f"Bearer {self.access_token}"}, + ) + + # Check if the request was successful + if response.status_code == 200: + bookings = [Booking(**(x | {"bookingId": x["id"], "bookableProductId": None})) for x in response.json()["data"]["bookings"]] + return bookings + elif response.status_code == 403: + raise Exception( + f"Failed to fetch bookings: {response.json()['message']}" + ) + else: + raise Exception( + f"Failed to fetch bookings: {response.status_code}, {response.text}" + ) + + def make_booking(self, booking): + # Ensure that we're logged in + self.login_if_necessary() + + # Extract necessary details from the booking object + booking_data = { + "organizationId": None, # This is None in the example curl + "memberId": self.user_id, # This is fetched during login + "bookingId": booking.booking_id, # Booking ID from the booking object + "primaryPurchaseMessage": None, + "secondaryPurchaseMessage": None, + "params": { + "startDate": booking.start.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "endDate": booking.end.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "bookableProductId": booking.bookable_product_id, + "bookableLinkedProductId": booking.linked_product_id, + "bookingId": booking.booking_id, # Booking ID again + "invitedMemberEmails": [], + "invitedGuests": [], + "invitedOthers": [], + "primaryPurchaseMessage": None, + "secondaryPurchaseMessage": None, + }, + "dateOfRegistration": None, + } + + # Send the booking request + response = self.session.post( + "https://backbone-web-api.production.delft.delcom.nl/participations", + json=booking_data, + headers={"Authorization": f"Bearer {self.access_token}"}, + ) + + # Check the response status + if response.status_code == 201: + return response.json() # Return response if successful + elif response.status_code == 403: + raise Exception( + f"Failed to make booking: {response.json()['message']}" + ) + else: + raise Exception( + f"Failed to make booking: {response.status_code}, {response.text}" + ) + + def check_booking_availability(self, booking): + # Ensure user is logged in + self.login_if_necessary() + + # Define the URL + url = "https://backbone-web-api.production.delft.delcom.nl/bookable-slots" + + # Create the parameters for the request + params = { + "s": json.dumps( + { + "bookingId": booking.booking_id, + "tagIds": {"$in": [28]}, + }, + ), + "join": ["product", "linkedProduct"], + } + + # Make the GET request to the API + response = self.session.get( + url, + params=params, + headers={"Authorization": f"Bearer {self.access_token}"}, + ) + + # Check if the request was successful + if response.status_code == 200: + if len(response.json()["data"]) != 1: + return False + booking = Booking(**response.json()["data"][0]) + return booking.available + + elif response.status_code == 403: + raise Exception( + f"Failed to check booking availability: {response.json()['message']}" + ) + else: + raise Exception( + f"Failed to check booking availability: {response.status_code}, {response.text}" + ) + + @property + def user_id(self): + if self.id != -1: + return self.id + + self.login_if_necessary() + + response = self.session.get( + "https://backbone-web-api.production.delft.delcom.nl/auth?cf=0", + headers={"Authorization": f"Bearer {self.access_token}"}, + ) + self.id = response.json()["id"] + return self.id + + def cancel_booking(self, booking): + participation_id = None + for i in booking.booking_dict.get("participations", []): + if i["memberId"] == self.user_id: + participation_id = i["id"] + break + + assert participation_id is not None, "Booking not found in user's participations" + + ## Send a DELETE request to the URL https://backbone-web-api.production.delft.delcom.nl/participations/{id} + response = self.session.delete( + f"https://backbone-web-api.production.delft.delcom.nl/participations/{participation_id}", + headers={"Authorization": f"Bearer {self.access_token}"}, + ) + + if response.status_code == 200: + return True + elif response.status_code == 403: + raise Exception( + f"Failed to cancel booking: {response.json()['message']}" + ) + else: + raise Exception( + f"Failed to fetch bookings: {response.status_code}, {response.text}" + ) + + def login(self): + self.access_token = self.fetch_access_token() + + def is_logged_in(self): + # Send request to 'https://backbone-web-api.production.delft.delcom.nl/auth?cf=0 with access token as bearer, check status code + response = self.session.get( + "https://backbone-web-api.production.delft.delcom.nl/auth?cf=0", + headers={"Authorization": f"Bearer {self.access_token}"}, + ) + + return response.status_code == 200 + + def login_if_necessary(self): + if not self.is_logged_in(): + self.login()