diff --git a/tgbot.py b/tgbot.py index c12a822..2c0ba3b 100644 --- a/tgbot.py +++ b/tgbot.py @@ -71,6 +71,10 @@ class TelegramBot: def calendar_auth(message): self.calendar_auth(message) + @self.bot.message_handler(commands=["login"]) + def cancel_booking(message): + self.process_login(message) + def init_db(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() @@ -229,210 +233,224 @@ class TelegramBot: def make_booking(self, message): try: xclient = self.get_xclient(message.chat.id) + + if not xclient: + self.bot.reply_to( + message, + "You are not logged in. Please use `/login username password` to log in.", + ) + return + + start = datetime.now() + end = start + timedelta(days=1) + + slots = xclient.list_slots(start, end) + + if slots: + markup = InlineKeyboardMarkup() + for i, slot in enumerate(slots): + # Create a button for each slot + status = "Available" if slot.available else "FULL" + button_text = f"Slot {i + 1}: {slot.start_stamp} ({status})" + markup.add( + InlineKeyboardButton(button_text, callback_data=f"book_{i}") + ) + + self.bot.reply_to( + message, "Select a slot to book or watch:", reply_markup=markup + ) + self.user_selected_slot[message.chat.id] = slots + else: + self.bot.reply_to(message, "No slots found.") + except Exception as e: self.bot.reply_to(message, f"Error: {str(e)}") return - if not xclient: - self.bot.reply_to( - message, - "You are not logged in. Please use `/login username password` to log in.", - ) - return - - start = datetime.now() - end = start + timedelta(days=1) - slots = xclient.list_slots(start, end) - - if slots: - markup = InlineKeyboardMarkup() - for i, slot in enumerate(slots): - # Create a button for each slot - status = "Available" if slot.available else "FULL" - button_text = f"Slot {i + 1}: {slot.start_stamp} ({status})" - markup.add(InlineKeyboardButton(button_text, callback_data=f"book_{i}")) - - self.bot.reply_to( - message, "Select a slot to book or watch:", reply_markup=markup - ) - self.user_selected_slot[message.chat.id] = slots - else: - self.bot.reply_to(message, "No slots found.") - def callback_booking(self, call): try: xclient = self.get_xclient(call.message.chat.id) - except Exception as e: - self.bot.reply_to(call.message, f"Error: {str(e)}") - return - if not xclient: - self.bot.reply_to( - call.message, - "You are not logged in. Please use `/login username password` to log in.", - ) - return + if not xclient: + self.bot.reply_to( + call.message, + "You are not logged in. Please use `/login username password` to log in.", + ) + return - try: - # Extract slot index from callback data - slot_index = int(call.data.split("_")[1]) - slots = self.user_selected_slot.get(call.message.chat.id, []) + try: + # Extract slot index from callback data + slot_index = int(call.data.split("_")[1]) + slots = self.user_selected_slot.get(call.message.chat.id, []) - if 0 <= slot_index < len(slots): - selected_slot = slots[slot_index] + if 0 <= slot_index < len(slots): + selected_slot = slots[slot_index] - if selected_slot.available: - # Attempt to book the available slot - try: - xclient.make_booking(selected_slot) + if selected_slot.available: + # Attempt to book the available slot + try: + xclient.make_booking(selected_slot) - if ( - token := self.get_calendar_token(call.message.chat.id) - ) is not None: - self.calendar.create_event( - token, - { - "summary": "Gym Booking", - "start": { - "dateTime": selected_slot.start.strftime( - "%Y-%m-%dT%H:%M:%S.%fZ" - ), - "timeZone": "Europe/Amsterdam", + if ( + token := self.get_calendar_token(call.message.chat.id) + ) is not None: + self.calendar.create_event( + token, + { + "summary": "Gym Booking", + "start": { + "dateTime": selected_slot.start.strftime( + "%Y-%m-%dT%H:%M:%S.%fZ" + ), + "timeZone": "Europe/Amsterdam", + }, + "end": { + "dateTime": selected_slot.end.strftime( + "%Y-%m-%dT%H:%M:%S.%fZ" + ), + "timeZone": "Europe/Amsterdam", + }, }, - "end": { - "dateTime": selected_slot.end.strftime( - "%Y-%m-%dT%H:%M:%S.%fZ" - ), - "timeZone": "Europe/Amsterdam", - }, - }, + ) + + self.bot.answer_callback_query( + call.id, "Slot booked successfully!" ) - + self.bot.send_message( + call.message.chat.id, + f"Booking confirmed for: {selected_slot.start_stamp}", + ) + except Exception as e: + self.bot.answer_callback_query( + call.id, "Failed to book slot." + ) + self.bot.send_message( + call.message.chat.id, f"Error: {str(e)}" + ) + else: + # Slot is full, add to watchlist + self.add_to_watchlist(call.message.chat.id, selected_slot) self.bot.answer_callback_query( - call.id, "Slot booked successfully!" + call.id, "Slot is full. Added to watchlist." ) self.bot.send_message( call.message.chat.id, - f"Booking confirmed for: {selected_slot.start_stamp}", + f"Slot {selected_slot.start_stamp} is full. You will be notified when it becomes available.", ) - except Exception as e: - self.bot.answer_callback_query(call.id, "Failed to book slot.") - self.bot.send_message(call.message.chat.id, f"Error: {str(e)}") else: - # Slot is full, add to watchlist - self.add_to_watchlist(call.message.chat.id, selected_slot) - self.bot.answer_callback_query( - call.id, "Slot is full. Added to watchlist." - ) - self.bot.send_message( - call.message.chat.id, - f"Slot {selected_slot.start_stamp} is full. You will be notified when it becomes available.", - ) - else: - self.bot.answer_callback_query(call.id, "Invalid slot selection.") - except (IndexError, ValueError): - self.bot.answer_callback_query(call.id, "Invalid data.") + self.bot.answer_callback_query(call.id, "Invalid slot selection.") + except (IndexError, ValueError): + self.bot.answer_callback_query(call.id, "Invalid data.") - ## Remove the buttons from the previous message - self.bot.edit_message_reply_markup( - call.message.chat.id, call.message.message_id - ) + ## Remove the buttons from the previous message + self.bot.edit_message_reply_markup( + call.message.chat.id, call.message.message_id + ) + except Exception as e: + self.bot.reply_to(call.message, f"Error: {str(e)}") + return def cancel_booking(self, message): try: xclient = self.get_xclient(message.chat.id) + + if not xclient: + self.bot.reply_to( + message, + "You are not logged in. Please use `/login username password` to log in.", + ) + return + + # Fetch user's current bookings + bookings = xclient.my_bookings( + datetime.now(), datetime.now() + timedelta(days=31) + ) + + if bookings: + markup = InlineKeyboardMarkup() + for i, booking in enumerate(bookings): + # Create a button for each booking + button_text = f"Booking {i + 1}: {booking.start_stamp}" + markup.add( + InlineKeyboardButton(button_text, callback_data=f"cancel_{i}") + ) + + self.user_bookings[message.chat.id] = bookings + self.bot.reply_to( + message, "Select a booking to cancel:", reply_markup=markup + ) + else: + self.bot.reply_to(message, "You have no bookings to cancel.") + except Exception as e: self.bot.reply_to(message, f"Error: {str(e)}") return - if not xclient: - self.bot.reply_to( - message, - "You are not logged in. Please use `/login username password` to log in.", - ) - return - - # Fetch user's current bookings - bookings = xclient.my_bookings( - datetime.now(), datetime.now() + timedelta(days=31) - ) - - if bookings: - markup = InlineKeyboardMarkup() - for i, booking in enumerate(bookings): - # Create a button for each booking - button_text = f"Booking {i + 1}: {booking.start_stamp}" - markup.add( - InlineKeyboardButton(button_text, callback_data=f"cancel_{i}") - ) - - self.user_bookings[message.chat.id] = bookings - self.bot.reply_to( - message, "Select a booking to cancel:", reply_markup=markup - ) - else: - self.bot.reply_to(message, "You have no bookings to cancel.") - def callback_cancel_booking(self, call): try: xclient = self.get_xclient(call.message.chat.id) - except Exception as e: - self.bot.reply_to(call.message, f"Error: {str(e)}") - return - if not xclient: - self.bot.reply_to( - call.message, - "You are not logged in. Please use `/login username password` to log in.", - ) - return + if not xclient: + self.bot.reply_to( + call.message, + "You are not logged in. Please use `/login username password` to log in.", + ) + return - try: - # Extract booking index from callback data - booking_index = int(call.data.split("_")[1]) - bookings = self.user_bookings.get(call.message.chat.id, []) + try: + # Extract booking index from callback data + booking_index = int(call.data.split("_")[1]) + bookings = self.user_bookings.get(call.message.chat.id, []) - if 0 <= booking_index < len(bookings): - selected_booking = bookings[booking_index] - # Attempt to cancel the selected booking - try: - if xclient.cancel_booking(selected_booking): - self.bot.answer_callback_query( - call.id, "Booking canceled successfully!" - ) - self.bot.send_message( - call.message.chat.id, - f"Booking for {selected_booking.start_stamp} has been canceled.", - ) - - if ( - token := self.get_calendar_token(call.message.chat.id) - ) is not None: - gmt_plus_2 = timezone(timedelta(hours=2)) - - event = self.calendar.get_first_event( - token, selected_booking.start.astimezone(gmt_plus_2) + if 0 <= booking_index < len(bookings): + selected_booking = bookings[booking_index] + # Attempt to cancel the selected booking + try: + if xclient.cancel_booking(selected_booking): + self.bot.answer_callback_query( + call.id, "Booking canceled successfully!" + ) + self.bot.send_message( + call.message.chat.id, + f"Booking for {selected_booking.start_stamp} has been canceled.", ) - if event: - self.calendar.delete_event(token, event["id"]) + if ( + token := self.get_calendar_token(call.message.chat.id) + ) is not None: + gmt_plus_2 = timezone(timedelta(hours=2)) - else: + event = self.calendar.get_first_event( + token, selected_booking.start.astimezone(gmt_plus_2) + ) + + if event: + self.calendar.delete_event(token, event["id"]) + + else: + self.bot.answer_callback_query( + call.id, "Failed to cancel booking." + ) + except Exception as e: self.bot.answer_callback_query( call.id, "Failed to cancel booking." ) - except Exception as e: - self.bot.answer_callback_query(call.id, "Failed to cancel booking.") - self.bot.send_message(call.message.chat.id, f"Error: {str(e)}") - else: - self.bot.answer_callback_query(call.id, "Invalid booking selection.") - except (IndexError, ValueError): - self.bot.answer_callback_query(call.id, "Invalid data.") + self.bot.send_message(call.message.chat.id, f"Error: {str(e)}") + else: + self.bot.answer_callback_query( + call.id, "Invalid booking selection." + ) + except (IndexError, ValueError): + self.bot.answer_callback_query(call.id, "Invalid data.") - ## Remove the buttons from the previous message - self.bot.edit_message_reply_markup( - call.message.chat.id, call.message.message_id - ) + ## Remove the buttons from the previous message + self.bot.edit_message_reply_markup( + call.message.chat.id, call.message.message_id + ) + + except Exception as e: + self.bot.reply_to(call.message, f"Error: {str(e)}") + return def manage_watchlist(self, message): chat_id = message.chat.id diff --git a/xclient.py b/xclient.py index 7463217..31fdbf5 100644 --- a/xclient.py +++ b/xclient.py @@ -11,7 +11,9 @@ from selenium.webdriver.chrome.service import Service from webdriver_manager.chrome import ChromeDriverManager -service = Service(ChromeDriverManager().install()) +class LoginFailedException(Exception): + def __init__(self, *args: object) -> None: + super().__init__(*args) class XClient: @@ -32,19 +34,16 @@ class XClient: def fetch_access_token(self): # Set up the WebDriver (make sure to use the correct path for your WebDriver) - print("starting chromedriver") + service = Service(executable_path=config.CHROMEDRIVER_PATH) + options = webdriver.ChromeOptions() options.add_argument("--headless=new") options.add_argument("--no-sandbox") - print("started chromedriver") - driver = webdriver.Chrome(options=options, service=service) driver.get("https://x.tudelft.nl") - print("Check1") - button = WebDriverWait(driver, 30).until( EC.element_to_be_clickable( (By.XPATH, "//span[contains(text(), 'TUDelft')]") @@ -61,7 +60,6 @@ class XClient: ) ) - print("Check2") button.click() # Input the username @@ -75,11 +73,23 @@ class XClient: time.sleep(1) - delcom_auth = json.loads( - driver.execute_script("return window.localStorage.getItem('delcom_auth');") + cookie = driver.execute_script( + "return window.localStorage.getItem('delcom_auth');" ) + + if cookie is None: + raise LoginFailedException("Logging in to X has failed") + + delcom_auth = json.loads(cookie) + print(cookie, delcom_auth) + driver.quit() + if delcom_auth.get("tokenResponse") is None: + raise LoginFailedException( + "Logging in to X has failed, missing tokenResponse" + ) + access_token = delcom_auth["tokenResponse"].get("accessToken", None) return access_token