import telebot from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton from datetime import datetime, timedelta, timezone import time from threading import Thread import sqlite3 from xclient import XClient class TelegramBot: def __init__(self, bot_token, db_path="bot_database.db"): self.bot = telebot.TeleBot(bot_token) self.xclients = {} self.user_selected_slot = {} # Store user selected slot by chat ID self.user_bookings = {} # Store user bookings by chat ID self.watchlist = {} # Watchlist to store full slots by chat ID self.db_path = db_path self.init_db() # Set up the command handlers @self.bot.message_handler(commands=["start"]) def send_welcome(message): self.send_welcome(message) @self.bot.message_handler(commands=["book"]) def make_booking(message): self.make_booking(message) @self.bot.callback_query_handler( func=lambda call: call.data.startswith("book_") ) def callback_booking(call): self.callback_booking(call) @self.bot.message_handler(commands=["cancel"]) def cancel_booking(message): self.cancel_booking(message) @self.bot.callback_query_handler( func=lambda call: call.data.startswith("cancel_") ) def callback_cancel_booking(call): self.callback_cancel_booking(call) @self.bot.message_handler(commands=["watchlist"]) def manage_watchlist(call): self.manage_watchlist(call) @self.bot.callback_query_handler( func=lambda call: call.data.startswith("remove_") ) def callback_remove_slot(call): self.callback_remove_slot(call) def init_db(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Create tables cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, chat_id INTEGER UNIQUE, username TEXT, password TEXT ) """) conn.commit() def get_xclient(self, chat_id): if chat_id not in self.xclients: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( "SELECT username, password FROM users WHERE chat_id = ?", (chat_id,) ) result = cursor.fetchone() if result: username, password = result self.xclients[chat_id] = XClient(username, password) else: self.xclients[chat_id] = None return self.xclients[chat_id] def handle_login(self, message): chat_id = message.chat.id with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT username FROM users WHERE chat_id = ?", (chat_id,)) result = cursor.fetchone() if result: self.bot.reply_to(message, "You are already logged in.") else: self.bot.reply_to( message, "Please provide your X username and password in the format:\n`/login username password`. This is necessary so that the bot can login to X as you.", ) self.bot.register_next_step_handler(message, self.process_login) def process_login(self, message): try: command, username, password = message.text.split() chat_id = message.chat.id with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( "SELECT * FROM users WHERE chat_id = ?", (chat_id,), ) user = cursor.fetchone() if user: # Update the row cursor.execute( "UPDATE users SET username = ?, password = ? WHERE chat_id = ?", (username, password, chat_id), ) self.bot.reply_to(message, f"Updated login details for {username}.") else: # Insert new row cursor.execute( "INSERT INTO users (chat_id, username, password) VALUES (?, ?, ?)", (chat_id, username, password), ) self.bot.reply_to(message, f"Registered new user {username}.") except ValueError: self.bot.reply_to( message, "Invalid format. Please use `/login username password`." ) def send_welcome(self, message): self.bot.reply_to( message, "Welcome! Using this bot you can create and cancel X gym slots as well as place them on a watchlist to be booked when available.", ) self.handle_login(message) def make_booking(self, message): try: xclient = self.get_xclient(message.chat.id) except Exception as e: self.bot.reply_to(message, f"Error: {str(e)}") return if not xclient: self.bot.reply_to( message, "You are not logged in. Please use `/login username password` to log in.", ) return start = datetime.now() end = start + timedelta(days=1) slots = xclient.list_slots(start, end) if slots: markup = InlineKeyboardMarkup() for i, slot in enumerate(slots): # Create a button for each slot status = "Available" if slot.available else "FULL" button_text = f"Slot {i + 1}: {slot.start_stamp} ({status})" markup.add(InlineKeyboardButton(button_text, callback_data=f"book_{i}")) self.bot.reply_to( message, "Select a slot to book or watch:", reply_markup=markup ) self.user_selected_slot[message.chat.id] = slots else: self.bot.reply_to(message, "No slots found.") def callback_booking(self, call): try: xclient = self.get_xclient(call.message.chat.id) except Exception as e: self.bot.reply_to(call.message, f"Error: {str(e)}") return if not xclient: self.bot.reply_to( call.message, "You are not logged in. Please use `/login username password` to log in.", ) return try: # Extract slot index from callback data slot_index = int(call.data.split("_")[1]) slots = self.user_selected_slot.get(call.message.chat.id, []) if 0 <= slot_index < len(slots): selected_slot = slots[slot_index] if selected_slot.available: # Attempt to book the available slot try: xclient.make_booking(selected_slot) self.bot.answer_callback_query( call.id, "Slot booked successfully!" ) self.bot.send_message( call.message.chat.id, f"Booking confirmed for: {selected_slot.start_stamp}", ) except Exception as e: self.bot.answer_callback_query(call.id, "Failed to book slot.") self.bot.send_message(call.message.chat.id, f"Error: {str(e)}") else: # Slot is full, add to watchlist self.add_to_watchlist(call.message.chat.id, selected_slot) self.bot.answer_callback_query( call.id, "Slot is full. Added to watchlist." ) self.bot.send_message( call.message.chat.id, f"Slot {selected_slot.start_stamp} is full. You will be notified when it becomes available.", ) else: self.bot.answer_callback_query(call.id, "Invalid slot selection.") except (IndexError, ValueError): self.bot.answer_callback_query(call.id, "Invalid data.") ## Remove the buttons from the previous message self.bot.edit_message_reply_markup( call.message.chat.id, call.message.message_id ) def cancel_booking(self, message): try: xclient = self.get_xclient(message.chat.id) except Exception as e: self.bot.reply_to(message, f"Error: {str(e)}") return if not xclient: self.bot.reply_to( message, "You are not logged in. Please use `/login username password` to log in.", ) return # Fetch user's current bookings bookings = xclient.my_bookings( datetime.now(), datetime.now() + timedelta(days=31) ) if bookings: markup = InlineKeyboardMarkup() for i, booking in enumerate(bookings): # Create a button for each booking button_text = f"Booking {i + 1}: {booking.start_stamp}" markup.add( InlineKeyboardButton(button_text, callback_data=f"cancel_{i}") ) self.user_bookings[message.chat.id] = bookings self.bot.reply_to( message, "Select a booking to cancel:", reply_markup=markup ) else: self.bot.reply_to(message, "You have no bookings to cancel.") def callback_cancel_booking(self, call): try: xclient = self.get_xclient(call.message.chat.id) except Exception as e: self.bot.reply_to(call.message, f"Error: {str(e)}") return if not xclient: self.bot.reply_to( call.message, "You are not logged in. Please use `/login username password` to log in.", ) return try: # Extract booking index from callback data booking_index = int(call.data.split("_")[1]) bookings = self.user_bookings.get(call.message.chat.id, []) if 0 <= booking_index < len(bookings): selected_booking = bookings[booking_index] # Attempt to cancel the selected booking try: if xclient.cancel_booking(selected_booking): self.bot.answer_callback_query( call.id, "Booking canceled successfully!" ) self.bot.send_message( call.message.chat.id, f"Booking for {selected_booking.start_stamp} has been canceled.", ) else: self.bot.answer_callback_query( call.id, "Failed to cancel booking." ) except Exception as e: self.bot.answer_callback_query(call.id, "Failed to cancel booking.") self.bot.send_message(call.message.chat.id, f"Error: {str(e)}") else: self.bot.answer_callback_query(call.id, "Invalid booking selection.") except (IndexError, ValueError): self.bot.answer_callback_query(call.id, "Invalid data.") ## Remove the buttons from the previous message self.bot.edit_message_reply_markup( call.message.chat.id, call.message.message_id ) def manage_watchlist(self, message): chat_id = message.chat.id # Check if the user has any slots in the watchlist if chat_id not in self.watchlist or len(self.watchlist[chat_id]) == 0: self.bot.reply_to(message, "Your watchlist is empty.") return markup = InlineKeyboardMarkup() # List the slots in the watchlist for i, slot in enumerate(self.watchlist[chat_id]): button_text = f"Slot {i + 1}: {slot.start_stamp} (FULL)" markup.add(InlineKeyboardButton(button_text, callback_data=f"remove_{i}")) # Send the list of slots to the user with remove buttons self.bot.reply_to(message, "Your watchlist:", reply_markup=markup) def callback_remove_slot(self, call): try: # Extract slot index from callback data slot_index = int(call.data.split("_")[1]) slots = self.watchlist.get(call.message.chat.id, []) if 0 <= slot_index < len(slots): removed_slot = slots.pop(slot_index) # Inform the user that the slot has been removed self.bot.answer_callback_query( call.id, f"Removed slot: {removed_slot.start_stamp}" ) self.bot.send_message( call.message.chat.id, f"Slot {removed_slot.start_stamp} has been removed from your watchlist.", ) # Update watchlist after removal if not slots: del self.watchlist[ call.message.chat.id ] # Remove the chat ID if the watchlist is empty else: self.bot.answer_callback_query(call.id, "Invalid slot selection.") except (IndexError, ValueError): self.bot.answer_callback_query(call.id, "Invalid data.") # Remove the buttons from the previous message self.bot.edit_message_reply_markup( call.message.chat.id, call.message.message_id ) def add_to_watchlist(self, chat_id, slot): if chat_id not in self.watchlist: self.watchlist[chat_id] = [] self.watchlist[chat_id].append(slot) def check_watchlist(self): now = datetime.now().replace(tzinfo=timezone.utc) for chat_id, slots in list(self.watchlist.items()): try: xclient = self.get_xclient(chat_id) except Exception as e: print(f"Error polling watchlist: {str(e)}") available_slots = [] for slot in slots: if slot.start < now: # If the slot has expired self.bot.send_message( chat_id, f"Slot {slot.start_stamp} has expired and is no longer available.", ) available_slots.append(slot) # Mark it to remove from watchlist elif xclient.check_booking_availability(slot): # If the slot becomes available try: xclient.make_booking(slot) self.bot.send_message( chat_id, f"Slot {slot.start_stamp} is now available and has been booked for you.", ) available_slots.append(slot) # Mark it to remove from watchlist except Exception as e: self.bot.send_message( chat_id, f"Error booking slot {slot.start_stamp}: {str(e)}" ) # Remove the expired or booked slots from the watchlist self.watchlist[chat_id] = [ slot for slot in slots if slot not in available_slots ] if not self.watchlist[chat_id]: del self.watchlist[chat_id] # Remove chat_id from watchlist if empty def calculate_polling_interval(self, slot_time): now = datetime.now().replace(tzinfo=timezone.utc) time_until_slot = (slot_time - now).total_seconds() if time_until_slot < 3600: # If less than 1 hour return 300 # 5 minutes if time_until_slot < 3600 * 2: # If less than 2 hour return 600 # 10 minutes if time_until_slot < 3600 * 3: # If less than 3 hour return 900 # 15 minutes else: return 1800 # 30 minutes def poll_periodically(self): # Continuously poll the watchlist at dynamically adjusted intervals while True: self.check_watchlist() # Calculate the minimum polling interval based on upcoming slots polling_interval = 1800 # Default 30 minutes for chat_id, slots in self.watchlist.items(): for slot in slots: interval = self.calculate_polling_interval(slot.start) polling_interval = min(polling_interval, interval) time.sleep(polling_interval) # Wait before checking the watchlist again def run(self): ### start poll_periodically on seperate thread thread = Thread(target=self.poll_periodically, args=()) thread.daemon = True thread.start() self.bot.polling()