SecretarX/tgbot.py
2024-09-10 13:57:34 +02:00

436 lines
17 KiB
Python

import telebot
from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton
from datetime import datetime, timedelta, timezone
import time
from threading import Thread
import sqlite3
from xclient import XClient
class TelegramBot:
def __init__(self, bot_token, db_path="bot_database.db"):
self.bot = telebot.TeleBot(bot_token)
self.xclients = {}
self.user_selected_slot = {} # Store user selected slot by chat ID
self.user_bookings = {} # Store user bookings by chat ID
self.watchlist = {} # Watchlist to store full slots by chat ID
self.db_path = db_path
self.init_db()
# Set up the command handlers
@self.bot.message_handler(commands=["start"])
def send_welcome(message):
self.handle_login(message)
@self.bot.message_handler(commands=["book"])
def make_booking(message):
self.make_booking(message)
@self.bot.callback_query_handler(
func=lambda call: call.data.startswith("book_")
)
def callback_booking(call):
self.callback_booking(call)
@self.bot.message_handler(commands=["cancel"])
def cancel_booking(message):
self.cancel_booking(message)
@self.bot.callback_query_handler(
func=lambda call: call.data.startswith("cancel_")
)
def callback_cancel_booking(call):
self.callback_cancel_booking(call)
@self.bot.message_handler(commands=["watchlist"])
def manage_watchlist(call):
self.manage_watchlist(call)
@self.bot.callback_query_handler(
func=lambda call: call.data.startswith("remove_")
)
def callback_remove_slot(call):
self.callback_remove_slot(call)
def init_db(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Create tables
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER UNIQUE,
username TEXT,
password TEXT
)
""")
conn.commit()
def get_xclient(self, chat_id):
if chat_id not in self.xclients:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT username, password FROM users WHERE chat_id = ?", (chat_id,)
)
result = cursor.fetchone()
if result:
username, password = result
self.xclients[chat_id] = XClient(username, password)
else:
self.xclients[chat_id] = None
return self.xclients[chat_id]
def handle_login(self, message):
chat_id = message.chat.id
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT username FROM users WHERE chat_id = ?", (chat_id,))
result = cursor.fetchone()
if result:
self.bot.reply_to(message, "You are already logged in.")
else:
self.bot.reply_to(
message,
"Please provide your username and password in the format:\n`/login username password`",
)
self.bot.register_next_step_handler(message, self.process_login)
def process_login(self, message):
try:
command, username, password = message.text.split()
chat_id = message.chat.id
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM users WHERE chat_id = ?",
(chat_id,),
)
user = cursor.fetchone()
if user:
# Update the row
cursor.execute(
"UPDATE users SET username = ?, password = ? WHERE chat_id = ?",
(username, password, chat_id),
)
self.bot.reply_to(message, f"Updated login details for {username}.")
else:
# Insert new row
cursor.execute(
"INSERT INTO users (chat_id, username, password) VALUES (?, ?, ?)",
(chat_id, username, password),
)
self.bot.reply_to(message, f"Registered new user {username}.")
except ValueError:
self.bot.reply_to(
message, "Invalid format. Please use `/login username password`."
)
def send_welcome(self, message):
self.bot.reply_to(
message, "Welcome! Use /make_booking to view and book available slots."
)
def make_booking(self, message):
try:
xclient = self.get_xclient(message.chat.id)
except Exception as e:
self.bot.reply_to(message, f"Error: {str(e)}")
return
if not xclient:
self.bot.reply_to(
message,
"You are not logged in. Please use `/login username password` to log in.",
)
return
start = datetime.now()
end = start + timedelta(days=1)
slots = xclient.list_slots(start, end)
if slots:
markup = InlineKeyboardMarkup()
for i, slot in enumerate(slots):
# Create a button for each slot
status = "Available" if slot.available else "FULL"
button_text = f"Slot {i + 1}: {slot.start_stamp} ({status})"
markup.add(InlineKeyboardButton(button_text, callback_data=f"book_{i}"))
self.bot.reply_to(
message, "Select a slot to book or watch:", reply_markup=markup
)
self.user_selected_slot[message.chat.id] = slots
else:
self.bot.reply_to(message, "No slots found.")
def callback_booking(self, call):
try:
xclient = self.get_xclient(call.message.chat.id)
except Exception as e:
self.bot.reply_to(call.message, f"Error: {str(e)}")
return
if not xclient:
self.bot.reply_to(
call.message,
"You are not logged in. Please use `/login username password` to log in.",
)
return
try:
# Extract slot index from callback data
slot_index = int(call.data.split("_")[1])
slots = self.user_selected_slot.get(call.message.chat.id, [])
if 0 <= slot_index < len(slots):
selected_slot = slots[slot_index]
if selected_slot.available:
# Attempt to book the available slot
try:
xclient.make_booking(selected_slot)
self.bot.answer_callback_query(
call.id, "Slot booked successfully!"
)
self.bot.send_message(
call.message.chat.id,
f"Booking confirmed for: {selected_slot.start_stamp}",
)
except Exception as e:
self.bot.answer_callback_query(call.id, "Failed to book slot.")
self.bot.send_message(call.message.chat.id, f"Error: {str(e)}")
else:
# Slot is full, add to watchlist
self.add_to_watchlist(call.message.chat.id, selected_slot)
self.bot.answer_callback_query(
call.id, "Slot is full. Added to watchlist."
)
self.bot.send_message(
call.message.chat.id,
f"Slot {selected_slot.start_stamp} is full. You will be notified when it becomes available.",
)
else:
self.bot.answer_callback_query(call.id, "Invalid slot selection.")
except (IndexError, ValueError):
self.bot.answer_callback_query(call.id, "Invalid data.")
## Remove the buttons from the previous message
self.bot.edit_message_reply_markup(
call.message.chat.id, call.message.message_id
)
def cancel_booking(self, message):
try:
xclient = self.get_xclient(message.chat.id)
except Exception as e:
self.bot.reply_to(message, f"Error: {str(e)}")
return
if not xclient:
self.bot.reply_to(
message,
"You are not logged in. Please use `/login username password` to log in.",
)
return
# Fetch user's current bookings
bookings = xclient.my_bookings(
datetime.now(), datetime.now() + timedelta(days=31)
)
if bookings:
markup = InlineKeyboardMarkup()
for i, booking in enumerate(bookings):
# Create a button for each booking
button_text = f"Booking {i + 1}: {booking.start_stamp}"
markup.add(
InlineKeyboardButton(button_text, callback_data=f"cancel_{i}")
)
self.user_bookings[message.chat.id] = bookings
self.bot.reply_to(
message, "Select a booking to cancel:", reply_markup=markup
)
else:
self.bot.reply_to(message, "You have no bookings to cancel.")
def callback_cancel_booking(self, call):
try:
xclient = self.get_xclient(call.message.chat.id)
except Exception as e:
self.bot.reply_to(call.message, f"Error: {str(e)}")
return
if not xclient:
self.bot.reply_to(
call.message,
"You are not logged in. Please use `/login username password` to log in.",
)
return
try:
# Extract booking index from callback data
booking_index = int(call.data.split("_")[1])
bookings = self.user_bookings.get(call.message.chat.id, [])
if 0 <= booking_index < len(bookings):
selected_booking = bookings[booking_index]
# Attempt to cancel the selected booking
try:
if xclient.cancel_booking(selected_booking):
self.bot.answer_callback_query(
call.id, "Booking canceled successfully!"
)
self.bot.send_message(
call.message.chat.id,
f"Booking for {selected_booking.start_stamp} has been canceled.",
)
else:
self.bot.answer_callback_query(
call.id, "Failed to cancel booking."
)
except Exception as e:
self.bot.answer_callback_query(call.id, "Failed to cancel booking.")
self.bot.send_message(call.message.chat.id, f"Error: {str(e)}")
else:
self.bot.answer_callback_query(call.id, "Invalid booking selection.")
except (IndexError, ValueError):
self.bot.answer_callback_query(call.id, "Invalid data.")
## Remove the buttons from the previous message
self.bot.edit_message_reply_markup(
call.message.chat.id, call.message.message_id
)
def manage_watchlist(self, message):
chat_id = message.chat.id
# Check if the user has any slots in the watchlist
if chat_id not in self.watchlist or len(self.watchlist[chat_id]) == 0:
self.bot.reply_to(message, "Your watchlist is empty.")
return
markup = InlineKeyboardMarkup()
# List the slots in the watchlist
for i, slot in enumerate(self.watchlist[chat_id]):
button_text = f"Slot {i + 1}: {slot.start_stamp} (FULL)"
markup.add(InlineKeyboardButton(button_text, callback_data=f"remove_{i}"))
# Send the list of slots to the user with remove buttons
self.bot.reply_to(message, "Your watchlist:", reply_markup=markup)
def callback_remove_slot(self, call):
try:
# Extract slot index from callback data
slot_index = int(call.data.split("_")[1])
slots = self.watchlist.get(call.message.chat.id, [])
if 0 <= slot_index < len(slots):
removed_slot = slots.pop(slot_index)
# Inform the user that the slot has been removed
self.bot.answer_callback_query(
call.id, f"Removed slot: {removed_slot.start_stamp}"
)
self.bot.send_message(
call.message.chat.id,
f"Slot {removed_slot.start_stamp} has been removed from your watchlist.",
)
# Update watchlist after removal
if not slots:
del self.watchlist[
call.message.chat.id
] # Remove the chat ID if the watchlist is empty
else:
self.bot.answer_callback_query(call.id, "Invalid slot selection.")
except (IndexError, ValueError):
self.bot.answer_callback_query(call.id, "Invalid data.")
# Remove the buttons from the previous message
self.bot.edit_message_reply_markup(
call.message.chat.id, call.message.message_id
)
def add_to_watchlist(self, chat_id, slot):
if chat_id not in self.watchlist:
self.watchlist[chat_id] = []
self.watchlist[chat_id].append(slot)
def check_watchlist(self):
now = datetime.now().replace(tzinfo=timezone.utc)
for chat_id, slots in list(self.watchlist.items()):
try:
xclient = self.get_xclient(chat_id)
except Exception as e:
print(f"Error polling watchlist: {str(e)}")
available_slots = []
for slot in slots:
if slot.start < now:
# If the slot has expired
self.bot.send_message(
chat_id,
f"Slot {slot.start_stamp} has expired and is no longer available.",
)
available_slots.append(slot) # Mark it to remove from watchlist
elif xclient.check_booking_availability(slot):
# If the slot becomes available
try:
xclient.make_booking(slot)
self.bot.send_message(
chat_id,
f"Slot {slot.start_stamp} is now available and has been booked for you.",
)
available_slots.append(slot) # Mark it to remove from watchlist
except Exception as e:
self.bot.send_message(
chat_id, f"Error booking slot {slot.start_stamp}: {str(e)}"
)
# Remove the expired or booked slots from the watchlist
self.watchlist[chat_id] = [
slot for slot in slots if slot not in available_slots
]
if not self.watchlist[chat_id]:
del self.watchlist[chat_id] # Remove chat_id from watchlist if empty
def calculate_polling_interval(self, slot_time):
now = datetime.now().replace(tzinfo=timezone.utc)
time_until_slot = (slot_time - now).total_seconds()
if time_until_slot < 3600: # If less than 1 hour
return 300 # 5 minutes
else:
return 1800 # 30 minutes
def poll_periodically(self):
# Continuously poll the watchlist at dynamically adjusted intervals
while True:
self.check_watchlist()
# Calculate the minimum polling interval based on upcoming slots
polling_interval = 1800 # Default 30 minutes
for chat_id, slots in self.watchlist.items():
for slot in slots:
interval = self.calculate_polling_interval(slot.start)
polling_interval = min(polling_interval, interval)
time.sleep(polling_interval) # Wait before checking the watchlist again
def run(self):
### start poll_periodically on seperate thread
thread = Thread(target=self.poll_periodically, args=())
thread.daemon = True
thread.start()
self.bot.polling()