import telebot import logging from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton from datetime import datetime, timedelta, timezone import time from threading import Thread import sqlite3 from xclient import XClient import config class TelegramBot: def __init__(self, bot_token, db_path="bot_database.db"): self.bot = telebot.TeleBot(bot_token) self.xclients = {} self.user_selected_slot = {} # Store user selected slot by chat ID self.user_bookings = {} # Store user bookings by chat ID self.watchlist = {} # Watchlist to store full slots by chat ID self.db_path = db_path self.init_db() # Set up the command handlers @self.bot.message_handler(commands=["start"]) def send_welcome(message): self.send_welcome(message) @self.bot.message_handler(commands=["book"]) def make_booking(message): self.make_booking(message) @self.bot.callback_query_handler( func=lambda call: call.data.startswith("book_") ) def callback_booking(call): self.callback_booking(call) @self.bot.message_handler(commands=["cancel"]) def cancel_booking(message): self.cancel_booking(message) @self.bot.callback_query_handler( func=lambda call: call.data.startswith("cancel_") ) def callback_cancel_booking(call): self.callback_cancel_booking(call) @self.bot.message_handler(commands=["watchlist"]) def manage_watchlist(call): self.manage_watchlist(call) @self.bot.callback_query_handler( func=lambda call: call.data.startswith("remove_") ) def callback_remove_slot(call): self.callback_remove_slot(call) def update_user_access_token_callback(self, chat_id): def callback(access_token): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( "UPDATE users SET access_token = ? WHERE chat_id = ?", (access_token, chat_id), ) conn.commit() return callback def init_db(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Create tables cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, chat_id INTEGER UNIQUE, username TEXT, password TEXT, access_token TEXT ) """) conn.commit() def get_xclient(self, chat_id): if chat_id not in self.xclients: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( "SELECT username, password, access_token FROM users WHERE chat_id = ?", (chat_id,) ) result = cursor.fetchone() if result: username, password, access_token = result self.xclients[chat_id] = XClient(username, password, on_access_token_change=self.update_user_access_token_callback(chat_id)) self.xclients[chat_id].access_token = access_token else: self.xclients[chat_id] = None return self.xclients[chat_id] def handle_login(self, message): chat_id = message.chat.id with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT username FROM users WHERE chat_id = ?", (chat_id,)) result = cursor.fetchone() if result: self.bot.reply_to(message, "You are already logged in.") else: self.bot.reply_to( message, "Please provide your X username and password in the format:\n`/login username password`. This is necessary so that the bot can login to X as you.", ) self.bot.register_next_step_handler(message, self.process_login) def process_login(self, message): try: command, username, password = message.text.split() chat_id = message.chat.id with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( "SELECT * FROM users WHERE chat_id = ?", (chat_id,), ) user = cursor.fetchone() if user: # Update the row cursor.execute( "UPDATE users SET username = ?, password = ? WHERE chat_id = ?", (username, password, chat_id), ) self.bot.reply_to(message, f"Updated login details for {username}.") else: # Insert new row cursor.execute( "INSERT INTO users (chat_id, username, password) VALUES (?, ?, ?)", (chat_id, username, password), ) self.bot.reply_to(message, f"Registered new user {username}.") except ValueError: self.bot.reply_to( message, "Invalid format. Please use `/login username password`." ) def send_welcome(self, message): self.bot.reply_to( message, "Welcome! Using this bot you can create and cancel X gym slots as well as place them on a watchlist to be booked when available.", ) self.handle_login(message) def make_booking(self, message): try: xclient = self.get_xclient(message.chat.id) except Exception as e: self.bot.reply_to(message, f"Error: {str(e)}") return if not xclient: self.bot.reply_to( message, "You are not logged in. Please use `/login username password` to log in.", ) return start = datetime.now() end = start + timedelta(days=1) slots = xclient.list_slots(start, end) if slots: markup = InlineKeyboardMarkup() for i, slot in enumerate(slots): # Create a button for each slot status = "Available" if slot.available else "FULL" button_text = f"Slot {i + 1}: {slot.start_stamp} ({status})" markup.add(InlineKeyboardButton(button_text, callback_data=f"book_{i}")) self.bot.reply_to( message, "Select a slot to book or watch:", reply_markup=markup ) self.user_selected_slot[message.chat.id] = slots else: self.bot.reply_to(message, "No slots found.") def callback_booking(self, call): try: xclient = self.get_xclient(call.message.chat.id) except Exception as e: self.bot.reply_to(call.message, f"Error: {str(e)}") return if not xclient: self.bot.reply_to( call.message, "You are not logged in. Please use `/login username password` to log in.", ) return try: # Extract slot index from callback data slot_index = int(call.data.split("_")[1]) slots = self.user_selected_slot.get(call.message.chat.id, []) if 0 <= slot_index < len(slots): selected_slot = slots[slot_index] if selected_slot.available: # Attempt to book the available slot try: xclient.make_booking(selected_slot) self.bot.answer_callback_query( call.id, "Slot booked successfully!" ) self.bot.send_message( call.message.chat.id, f"Booking confirmed for: {selected_slot.start_stamp}", ) except Exception as e: self.bot.answer_callback_query(call.id, "Failed to book slot.") self.bot.send_message(call.message.chat.id, f"Error: {str(e)}") else: # Slot is full, add to watchlist self.add_to_watchlist(call.message.chat.id, selected_slot) self.bot.answer_callback_query( call.id, "Slot is full. Added to watchlist." ) self.bot.send_message( call.message.chat.id, f"Slot {selected_slot.start_stamp} is full. You will be notified when it becomes available.", ) else: self.bot.answer_callback_query(call.id, "Invalid slot selection.") except (IndexError, ValueError): self.bot.answer_callback_query(call.id, "Invalid data.") ## Remove the buttons from the previous message self.bot.edit_message_reply_markup( call.message.chat.id, call.message.message_id ) def cancel_booking(self, message): try: xclient = self.get_xclient(message.chat.id) except Exception as e: self.bot.reply_to(message, f"Error: {str(e)}") return if not xclient: self.bot.reply_to( message, "You are not logged in. Please use `/login username password` to log in.", ) return # Fetch user's current bookings bookings = xclient.my_bookings( datetime.now(), datetime.now() + timedelta(days=31) ) if bookings: markup = InlineKeyboardMarkup() for i, booking in enumerate(bookings): # Create a button for each booking button_text = f"Booking {i + 1}: {booking.start_stamp}" markup.add( InlineKeyboardButton(button_text, callback_data=f"cancel_{i}") ) self.user_bookings[message.chat.id] = bookings self.bot.reply_to( message, "Select a booking to cancel:", reply_markup=markup ) else: self.bot.reply_to(message, "You have no bookings to cancel.") def callback_cancel_booking(self, call): try: xclient = self.get_xclient(call.message.chat.id) except Exception as e: self.bot.reply_to(call.message, f"Error: {str(e)}") return if not xclient: self.bot.reply_to( call.message, "You are not logged in. Please use `/login username password` to log in.", ) return try: # Extract booking index from callback data booking_index = int(call.data.split("_")[1]) bookings = self.user_bookings.get(call.message.chat.id, []) if 0 <= booking_index < len(bookings): selected_booking = bookings[booking_index] # Attempt to cancel the selected booking try: if xclient.cancel_booking(selected_booking): self.bot.answer_callback_query( call.id, "Booking canceled successfully!" ) self.bot.send_message( call.message.chat.id, f"Booking for {selected_booking.start_stamp} has been canceled.", ) else: self.bot.answer_callback_query( call.id, "Failed to cancel booking." ) except Exception as e: self.bot.answer_callback_query(call.id, "Failed to cancel booking.") self.bot.send_message(call.message.chat.id, f"Error: {str(e)}") else: self.bot.answer_callback_query(call.id, "Invalid booking selection.") except (IndexError, ValueError): self.bot.answer_callback_query(call.id, "Invalid data.") ## Remove the buttons from the previous message self.bot.edit_message_reply_markup( call.message.chat.id, call.message.message_id ) def manage_watchlist(self, message): chat_id = message.chat.id # Check if the user has any slots in the watchlist if chat_id not in self.watchlist or len(self.watchlist[chat_id]) == 0: self.bot.reply_to(message, "Your watchlist is empty.") return markup = InlineKeyboardMarkup() # List the slots in the watchlist for i, slot in enumerate(self.watchlist[chat_id]): button_text = f"Slot {i + 1}: {slot.start_stamp} (FULL)" markup.add(InlineKeyboardButton(button_text, callback_data=f"remove_{i}")) # Send the list of slots to the user with remove buttons self.bot.reply_to(message, "Your watchlist:", reply_markup=markup) def callback_remove_slot(self, call): try: # Extract slot index from callback data slot_index = int(call.data.split("_")[1]) slots = self.watchlist.get(call.message.chat.id, []) if 0 <= slot_index < len(slots): removed_slot = slots.pop(slot_index) # Inform the user that the slot has been removed self.bot.answer_callback_query( call.id, f"Removed slot: {removed_slot.start_stamp}" ) self.bot.send_message( call.message.chat.id, f"Slot {removed_slot.start_stamp} has been removed from your watchlist.", ) # Update watchlist after removal if not slots: del self.watchlist[ call.message.chat.id ] # Remove the chat ID if the watchlist is empty else: self.bot.answer_callback_query(call.id, "Invalid slot selection.") except (IndexError, ValueError): self.bot.answer_callback_query(call.id, "Invalid data.") # Remove the buttons from the previous message self.bot.edit_message_reply_markup( call.message.chat.id, call.message.message_id ) def add_to_watchlist(self, chat_id, slot): if chat_id not in self.watchlist: self.watchlist[chat_id] = [] self.watchlist[chat_id].append(slot) def check_watchlist(self): now = datetime.now().replace(tzinfo=timezone.utc) for chat_id, slots in list(self.watchlist.items()): try: xclient = self.get_xclient(chat_id) except Exception as e: print(f"Error polling watchlist: {str(e)}") available_slots = [] for slot in slots: if slot.start < now: # If the slot has expired self.bot.send_message( chat_id, f"Slot {slot.start_stamp} has expired and is no longer available.", ) available_slots.append(slot) # Mark it to remove from watchlist elif xclient.check_booking_availability(slot): # If the slot becomes available try: xclient.make_booking(slot) self.bot.send_message( chat_id, f"Slot {slot.start_stamp} is now available and has been booked for you.", ) available_slots.append(slot) # Mark it to remove from watchlist except Exception as e: self.bot.send_message( chat_id, f"Error booking slot {slot.start_stamp}: {str(e)}" ) # Remove the expired or booked slots from the watchlist self.watchlist[chat_id] = [ slot for slot in slots if slot not in available_slots ] if not self.watchlist[chat_id]: del self.watchlist[chat_id] # Remove chat_id from watchlist if empty def calculate_polling_interval(self, slot_time): now = datetime.now().replace(tzinfo=timezone.utc) time_until_slot = (slot_time - now).total_seconds() intervals = sorted([(k,v) for k,v in config.INTERVALS.items()], key=lambda x: x[0]) last_wait = 1800 for (threshold, wait) in intervals: if time_until_slot < threshold: return wait last_wait = wait return last_wait def poll_periodically(self): # Continuously poll the watchlist at dynamically adjusted intervals logging.info(f"POLLING {self.watchlist}") while True: self.check_watchlist() # Calculate the minimum polling interval based on upcoming slots polling_interval = 1800 # Default 30 minutes watching = 0 for chat_id, slots in self.watchlist.items(): watching += len(slots) for slot in slots: interval = self.calculate_polling_interval(slot.start) polling_interval = min(polling_interval, interval) if watching == 0: polling_interval = 15 time.sleep(polling_interval) # Wait before checking the watchlist again def run(self): ### start poll_periodically on seperate thread thread = Thread(target=self.poll_periodically, args=()) thread.daemon = True thread.start() self.bot.polling()