Initial commit
This commit is contained in:
commit
468bf36f52
9
.gitignore
vendored
Normal file
9
.gitignore
vendored
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
__pycache__
|
||||||
|
.venv
|
||||||
|
.venv_wsl
|
||||||
|
.wsl
|
||||||
|
chromedriver.exe
|
||||||
|
chromedriver
|
||||||
|
|
||||||
|
config.py
|
||||||
|
default_config.py
|
||||||
7
main.py
Normal file
7
main.py
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
from xclient import XClient
|
||||||
|
from tgbot import TelegramBot
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
client = XClient("ycolakoglu", "mTJNhBZI8PBdIx0e0Lak")
|
||||||
|
telegram_bot = TelegramBot("7312187888:AAG9w2UhU8zu9CZTxRnh72ltSqa59friRs0")
|
||||||
|
telegram_bot.run()
|
||||||
61
models.py
Normal file
61
models.py
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
|
||||||
|
class Booking:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
bookingId,
|
||||||
|
startDate,
|
||||||
|
endDate,
|
||||||
|
bookableProductId,
|
||||||
|
linkedProductId,
|
||||||
|
isAvailable=False,
|
||||||
|
**kwargs,
|
||||||
|
):
|
||||||
|
self.booking_id = bookingId
|
||||||
|
self.start = datetime.strptime(startDate, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc)
|
||||||
|
self.end = datetime.strptime(endDate, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc)
|
||||||
|
self.bookable_product_id = bookableProductId
|
||||||
|
self.linked_product_id = linkedProductId
|
||||||
|
self.available = isAvailable
|
||||||
|
|
||||||
|
self.booking_dict = kwargs
|
||||||
|
|
||||||
|
|
||||||
|
@property
|
||||||
|
def start_stamp(self):
|
||||||
|
## Get string representation of the timestamp in GMT +2 with only Day of week and month, hour and minute
|
||||||
|
gmt_plus_2 = timezone(timedelta(hours=2))
|
||||||
|
return self.start.astimezone(gmt_plus_2).strftime("%A %d %B %H:%M")
|
||||||
|
|
||||||
|
@property
|
||||||
|
def end_stamp(self):
|
||||||
|
gmt_plus_2 = timezone(timedelta(hours=2))
|
||||||
|
return self.start.astimezone(gmt_plus_2).strftime("%A %d %B %H:%M")
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return (
|
||||||
|
f"Booking(booking_id={self.booking_id}, start={self.start}, end={self.end})"
|
||||||
|
)
|
||||||
|
|
||||||
|
def __eq__(self, other):
|
||||||
|
if isinstance(other, Booking):
|
||||||
|
return (
|
||||||
|
self.booking_id == other.booking_id
|
||||||
|
and self.start == other.start
|
||||||
|
and self.end == other.end
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
def __repr__(self) -> str:
|
||||||
|
return self.__str__()
|
||||||
|
|
||||||
|
def __hash__(self):
|
||||||
|
return hash((self.booking_id, self.start, self.end))
|
||||||
|
|
||||||
|
def time_left_to_booking(self, current_date):
|
||||||
|
current_datetime = datetime.strptime(current_date, "%Y-%m-%dT%H:%M:%S.%fZ")
|
||||||
|
time_left = self.start - current_datetime
|
||||||
|
if time_left.total_seconds() > 0:
|
||||||
|
return time_left
|
||||||
|
else:
|
||||||
|
return timedelta(0) # No time left if the booking has already started
|
||||||
166
tgbot.py
Normal file
166
tgbot.py
Normal file
@ -0,0 +1,166 @@
|
|||||||
|
import telebot
|
||||||
|
from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
import time
|
||||||
|
from threading import Thread
|
||||||
|
|
||||||
|
class TelegramBot:
|
||||||
|
def __init__(self, bot_token, xclient):
|
||||||
|
self.bot = telebot.TeleBot(bot_token)
|
||||||
|
self.xclient = xclient
|
||||||
|
self.user_selected_slot = {} # Store user selected slot by chat ID
|
||||||
|
self.user_bookings = {} # Store user bookings by chat ID
|
||||||
|
self.watchlist = {} # Watchlist to store full slots by chat ID
|
||||||
|
|
||||||
|
# Set up the command handlers
|
||||||
|
@self.bot.message_handler(commands=['start'])
|
||||||
|
def send_welcome(message):
|
||||||
|
self.bot.reply_to(message, "Welcome! Use /make_booking to view and book available slots.")
|
||||||
|
|
||||||
|
@self.bot.message_handler(commands=['make_booking'])
|
||||||
|
def make_booking(message):
|
||||||
|
start = datetime.now()
|
||||||
|
end = start + timedelta(days=1)
|
||||||
|
slots = self.xclient.list_slots(start, end)
|
||||||
|
|
||||||
|
if slots:
|
||||||
|
markup = InlineKeyboardMarkup()
|
||||||
|
for i, slot in enumerate(slots):
|
||||||
|
# Create a button for each slot
|
||||||
|
status = "Available" if slot.available else "FULL"
|
||||||
|
button_text = f"Slot {i + 1}: {slot.start_stamp} ({status})"
|
||||||
|
markup.add(InlineKeyboardButton(button_text, callback_data=f"book_{i}"))
|
||||||
|
|
||||||
|
self.bot.reply_to(message, "Select a slot to book or watch:", reply_markup=markup)
|
||||||
|
self.user_selected_slot[message.chat.id] = slots
|
||||||
|
else:
|
||||||
|
self.bot.reply_to(message, "No slots found.")
|
||||||
|
|
||||||
|
@self.bot.callback_query_handler(func=lambda call: call.data.startswith("book_"))
|
||||||
|
def callback_booking(call):
|
||||||
|
try:
|
||||||
|
# Extract slot index from callback data
|
||||||
|
slot_index = int(call.data.split('_')[1])
|
||||||
|
slots = self.user_selected_slot.get(call.message.chat.id, [])
|
||||||
|
|
||||||
|
if 0 <= slot_index < len(slots):
|
||||||
|
selected_slot = slots[slot_index]
|
||||||
|
|
||||||
|
if selected_slot.available:
|
||||||
|
# Attempt to book the available slot
|
||||||
|
try:
|
||||||
|
self.xclient.make_booking(selected_slot)
|
||||||
|
self.bot.answer_callback_query(call.id, "Slot booked successfully!")
|
||||||
|
self.bot.send_message(call.message.chat.id, f"Booking confirmed for: {selected_slot.start_stamp}")
|
||||||
|
except Exception as e:
|
||||||
|
self.bot.answer_callback_query(call.id, "Failed to book slot.")
|
||||||
|
self.bot.send_message(call.message.chat.id, f"Error: {str(e)}")
|
||||||
|
else:
|
||||||
|
# Slot is full, add to watchlist
|
||||||
|
self.add_to_watchlist(call.message.chat.id, selected_slot)
|
||||||
|
self.bot.answer_callback_query(call.id, "Slot is full. Added to watchlist.")
|
||||||
|
self.bot.send_message(call.message.chat.id, f"Slot {selected_slot.start_stamp} is full. You will be notified when it becomes available.")
|
||||||
|
else:
|
||||||
|
self.bot.answer_callback_query(call.id, "Invalid slot selection.")
|
||||||
|
except (IndexError, ValueError):
|
||||||
|
self.bot.answer_callback_query(call.id, "Invalid data.")
|
||||||
|
|
||||||
|
@self.bot.message_handler(commands=['cancel_booking'])
|
||||||
|
def cancel_booking(message):
|
||||||
|
# Fetch user's current bookings
|
||||||
|
bookings = self.xclient.my_bookings(datetime.now(), datetime.now() + timedelta(days=31))
|
||||||
|
|
||||||
|
if bookings:
|
||||||
|
markup = InlineKeyboardMarkup()
|
||||||
|
for i, booking in enumerate(bookings):
|
||||||
|
# Create a button for each booking
|
||||||
|
button_text = f"Booking {i + 1}: {booking.start_stamp}"
|
||||||
|
markup.add(InlineKeyboardButton(button_text, callback_data=f"cancel_{i}"))
|
||||||
|
|
||||||
|
self.user_bookings[message.chat.id] = bookings
|
||||||
|
self.bot.reply_to(message, "Select a booking to cancel:", reply_markup=markup)
|
||||||
|
else:
|
||||||
|
self.bot.reply_to(message, "You have no bookings to cancel.")
|
||||||
|
|
||||||
|
@self.bot.callback_query_handler(func=lambda call: call.data.startswith("cancel_"))
|
||||||
|
def callback_cancel_booking(call):
|
||||||
|
try:
|
||||||
|
# Extract booking index from callback data
|
||||||
|
booking_index = int(call.data.split('_')[1])
|
||||||
|
bookings = self.user_bookings.get(call.message.chat.id, [])
|
||||||
|
|
||||||
|
if 0 <= booking_index < len(bookings):
|
||||||
|
selected_booking = bookings[booking_index]
|
||||||
|
# Attempt to cancel the selected booking
|
||||||
|
try:
|
||||||
|
if self.xclient.cancel_booking(selected_booking):
|
||||||
|
self.bot.answer_callback_query(call.id, "Booking canceled successfully!")
|
||||||
|
self.bot.send_message(call.message.chat.id, f"Booking for {selected_booking.start_stamp} has been canceled.")
|
||||||
|
else:
|
||||||
|
self.bot.answer_callback_query(call.id, "Failed to cancel booking.")
|
||||||
|
except Exception as e:
|
||||||
|
self.bot.answer_callback_query(call.id, "Failed to cancel booking.")
|
||||||
|
self.bot.send_message(call.message.chat.id, f"Error: {str(e)}")
|
||||||
|
else:
|
||||||
|
self.bot.answer_callback_query(call.id, "Invalid booking selection.")
|
||||||
|
except (IndexError, ValueError):
|
||||||
|
self.bot.answer_callback_query(call.id, "Invalid data.")
|
||||||
|
|
||||||
|
def add_to_watchlist(self, chat_id, slot):
|
||||||
|
if chat_id not in self.watchlist:
|
||||||
|
self.watchlist[chat_id] = []
|
||||||
|
self.watchlist[chat_id].append(slot)
|
||||||
|
|
||||||
|
def check_watchlist(self):
|
||||||
|
now = datetime.now().replace(tzinfo=timezone.utc)
|
||||||
|
for chat_id, slots in list(self.watchlist.items()):
|
||||||
|
available_slots = []
|
||||||
|
for slot in slots:
|
||||||
|
if slot.start < now:
|
||||||
|
# If the slot has expired
|
||||||
|
self.bot.send_message(chat_id, f"Slot {slot.start_stamp} has expired and is no longer available.")
|
||||||
|
available_slots.append(slot) # Mark it to remove from watchlist
|
||||||
|
elif self.xclient.check_booking_availability(slot):
|
||||||
|
# If the slot becomes available
|
||||||
|
try:
|
||||||
|
self.xclient.make_booking(slot)
|
||||||
|
self.bot.send_message(chat_id, f"Slot {slot.start_stamp} is now available and has been booked for you.")
|
||||||
|
available_slots.append(slot) # Mark it to remove from watchlist
|
||||||
|
except Exception as e:
|
||||||
|
self.bot.send_message(chat_id, f"Error booking slot {slot.start_stamp}: {str(e)}")
|
||||||
|
|
||||||
|
# Remove the expired or booked slots from the watchlist
|
||||||
|
self.watchlist[chat_id] = [slot for slot in slots if slot not in available_slots]
|
||||||
|
if not self.watchlist[chat_id]:
|
||||||
|
del self.watchlist[chat_id] # Remove chat_id from watchlist if empty
|
||||||
|
|
||||||
|
def calculate_polling_interval(self, slot_time):
|
||||||
|
now = datetime.now().replace(tzinfo=timezone.utc)
|
||||||
|
time_until_slot = (slot_time - now).total_seconds()
|
||||||
|
|
||||||
|
if time_until_slot < 3600: # If less than 1 hour
|
||||||
|
return 300 # 5 minutes
|
||||||
|
else:
|
||||||
|
return 1800 # 30 minutes
|
||||||
|
|
||||||
|
def poll_periodically(self):
|
||||||
|
# Continuously poll the watchlist at dynamically adjusted intervals
|
||||||
|
while True:
|
||||||
|
self.check_watchlist()
|
||||||
|
|
||||||
|
# Calculate the minimum polling interval based on upcoming slots
|
||||||
|
polling_interval = 1800 # Default 30 minutes
|
||||||
|
for chat_id, slots in self.watchlist.items():
|
||||||
|
for slot in slots:
|
||||||
|
interval = self.calculate_polling_interval(slot.start)
|
||||||
|
polling_interval = min(polling_interval, interval)
|
||||||
|
|
||||||
|
time.sleep(polling_interval) # Wait before checking the watchlist again
|
||||||
|
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
### start poll_periodically on seperate thread
|
||||||
|
thread = Thread(target = self.bot.polling, args = ())
|
||||||
|
thread.start()
|
||||||
|
self.poll_periodically()
|
||||||
|
thread.kill()
|
||||||
236
tgbot_multi.py
Normal file
236
tgbot_multi.py
Normal file
@ -0,0 +1,236 @@
|
|||||||
|
import telebot
|
||||||
|
from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
import time
|
||||||
|
from threading import Thread
|
||||||
|
|
||||||
|
class TelegramBot:
|
||||||
|
def __init__(self, bot_token, db_path='bot_database.db'):
|
||||||
|
self.bot = telebot.TeleBot(bot_token)
|
||||||
|
self.db_path = db_path
|
||||||
|
self.user_selected_slot = {} # Store user selected slot by chat ID
|
||||||
|
self.user_bookings = {} # Store user bookings by chat ID
|
||||||
|
self.watchlist = {} # Watchlist to store full slots by chat ID
|
||||||
|
|
||||||
|
self.init_db()
|
||||||
|
|
||||||
|
# Set up the command handlers
|
||||||
|
@self.bot.message_handler(commands=['start'])
|
||||||
|
def send_welcome(message):
|
||||||
|
self.handle_login(message)
|
||||||
|
|
||||||
|
@self.bot.message_handler(commands=['make_booking'])
|
||||||
|
def make_booking(message):
|
||||||
|
self.handle_booking(message)
|
||||||
|
|
||||||
|
@self.bot.callback_query_handler(func=lambda call: call.data.startswith("book_"))
|
||||||
|
def callback_booking(call):
|
||||||
|
self.handle_callback_booking(call)
|
||||||
|
|
||||||
|
@self.bot.message_handler(commands=['cancel_booking'])
|
||||||
|
def cancel_booking(message):
|
||||||
|
self.handle_cancel_booking(message)
|
||||||
|
|
||||||
|
@self.bot.callback_query_handler(func=lambda call: call.data.startswith("cancel_"))
|
||||||
|
def callback_cancel_booking(call):
|
||||||
|
self.handle_callback_cancel_booking(call)
|
||||||
|
|
||||||
|
def init_db(self):
|
||||||
|
with sqlite3.connect(self.db_path) as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
# Create tables
|
||||||
|
cursor.execute('''
|
||||||
|
CREATE TABLE IF NOT EXISTS users (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
chat_id INTEGER UNIQUE,
|
||||||
|
username TEXT,
|
||||||
|
password TEXT
|
||||||
|
)
|
||||||
|
''')
|
||||||
|
cursor.execute('''
|
||||||
|
CREATE TABLE IF NOT EXISTS chats (
|
||||||
|
chat_id INTEGER PRIMARY KEY,
|
||||||
|
username TEXT
|
||||||
|
)
|
||||||
|
''')
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
def handle_login(self, message):
|
||||||
|
chat_id = message.chat.id
|
||||||
|
with sqlite3.connect(self.db_path) as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute('SELECT username FROM chats WHERE chat_id = ?', (chat_id,))
|
||||||
|
result = cursor.fetchone()
|
||||||
|
|
||||||
|
if result:
|
||||||
|
self.bot.reply_to(message, "You are already logged in.")
|
||||||
|
else:
|
||||||
|
self.bot.reply_to(message, "Please provide your username and password in the format:\n`/login username password`")
|
||||||
|
self.bot.register_next_step_handler(message, self.process_login)
|
||||||
|
|
||||||
|
def process_login(self, message):
|
||||||
|
try:
|
||||||
|
command, username, password = message.text.split()
|
||||||
|
with sqlite3.connect(self.db_path) as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute('SELECT * FROM users WHERE username = ? AND password = ?', (username, password))
|
||||||
|
user = cursor.fetchone()
|
||||||
|
|
||||||
|
if user:
|
||||||
|
cursor.execute('INSERT OR IGNORE INTO chats (chat_id, username) VALUES (?, ?)', (message.chat.id, username))
|
||||||
|
conn.commit()
|
||||||
|
self.bot.reply_to(message, "Login successful!")
|
||||||
|
else:
|
||||||
|
self.bot.reply_to(message, "Invalid credentials. Please try again.")
|
||||||
|
self.handle_login(message)
|
||||||
|
except ValueError:
|
||||||
|
self.bot.reply_to(message, "Invalid format. Please use `/login username password`.")
|
||||||
|
|
||||||
|
|
||||||
|
def send_welcome(self,message):
|
||||||
|
self.bot.reply_to(message, "Welcome! Use /make_booking to view and book available slots.")
|
||||||
|
|
||||||
|
def handle_booking(self,message):
|
||||||
|
start = datetime.now()
|
||||||
|
end = start + timedelta(days=1)
|
||||||
|
slots = self.xclient.list_slots(start, end)
|
||||||
|
|
||||||
|
if slots:
|
||||||
|
markup = InlineKeyboardMarkup()
|
||||||
|
for i, slot in enumerate(slots):
|
||||||
|
# Create a button for each slot
|
||||||
|
status = "Available" if slot.available else "FULL"
|
||||||
|
button_text = f"Slot {i + 1}: {slot.start_stamp} ({status})"
|
||||||
|
markup.add(InlineKeyboardButton(button_text, callback_data=f"book_{i}"))
|
||||||
|
|
||||||
|
self.bot.reply_to(message, "Select a slot to book or watch:", reply_markup=markup)
|
||||||
|
self.user_selected_slot[message.chat.id] = slots
|
||||||
|
else:
|
||||||
|
self.bot.reply_to(message, "No slots found.")
|
||||||
|
|
||||||
|
def handle_callback_booking(self, call):
|
||||||
|
try:
|
||||||
|
# Extract slot index from callback data
|
||||||
|
slot_index = int(call.data.split('_')[1])
|
||||||
|
slots = self.user_selected_slot.get(call.message.chat.id, [])
|
||||||
|
|
||||||
|
if 0 <= slot_index < len(slots):
|
||||||
|
selected_slot = slots[slot_index]
|
||||||
|
|
||||||
|
if selected_slot.available:
|
||||||
|
# Attempt to book the available slot
|
||||||
|
try:
|
||||||
|
self.xclient.make_booking(selected_slot)
|
||||||
|
self.bot.answer_callback_query(call.id, "Slot booked successfully!")
|
||||||
|
self.bot.send_message(call.message.chat.id, f"Booking confirmed for: {selected_slot.start_stamp}")
|
||||||
|
except Exception as e:
|
||||||
|
self.bot.answer_callback_query(call.id, "Failed to book slot.")
|
||||||
|
self.bot.send_message(call.message.chat.id, f"Error: {str(e)}")
|
||||||
|
else:
|
||||||
|
# Slot is full, add to watchlist
|
||||||
|
self.add_to_watchlist(call.message.chat.id, selected_slot)
|
||||||
|
self.bot.answer_callback_query(call.id, "Slot is full. Added to watchlist.")
|
||||||
|
self.bot.send_message(call.message.chat.id, f"Slot {selected_slot.start_stamp} is full. You will be notified when it becomes available.")
|
||||||
|
else:
|
||||||
|
self.bot.answer_callback_query(call.id, "Invalid slot selection.")
|
||||||
|
except (IndexError, ValueError):
|
||||||
|
self.bot.answer_callback_query(call.id, "Invalid data.")
|
||||||
|
|
||||||
|
def handle_cancel_booking(self, message):
|
||||||
|
# Fetch user's current bookings
|
||||||
|
bookings = self.xclient.my_bookings(datetime.now(), datetime.now() + timedelta(days=31))
|
||||||
|
|
||||||
|
if bookings:
|
||||||
|
markup = InlineKeyboardMarkup()
|
||||||
|
for i, booking in enumerate(bookings):
|
||||||
|
# Create a button for each booking
|
||||||
|
button_text = f"Booking {i + 1}: {booking.start_stamp}"
|
||||||
|
markup.add(InlineKeyboardButton(button_text, callback_data=f"cancel_{i}"))
|
||||||
|
|
||||||
|
self.user_bookings[message.chat.id] = bookings
|
||||||
|
self.bot.reply_to(message, "Select a booking to cancel:", reply_markup=markup)
|
||||||
|
else:
|
||||||
|
self.bot.reply_to(message, "You have no bookings to cancel.")
|
||||||
|
|
||||||
|
def handle_callback_cancel_booking(self, call):
|
||||||
|
try:
|
||||||
|
# Extract booking index from callback data
|
||||||
|
booking_index = int(call.data.split('_')[1])
|
||||||
|
bookings = self.user_bookings.get(call.message.chat.id, [])
|
||||||
|
|
||||||
|
if 0 <= booking_index < len(bookings):
|
||||||
|
selected_booking = bookings[booking_index]
|
||||||
|
# Attempt to cancel the selected booking
|
||||||
|
try:
|
||||||
|
if self.xclient.cancel_booking(selected_booking):
|
||||||
|
self.bot.answer_callback_query(call.id, "Booking canceled successfully!")
|
||||||
|
self.bot.send_message(call.message.chat.id, f"Booking for {selected_booking.start_stamp} has been canceled.")
|
||||||
|
else:
|
||||||
|
self.bot.answer_callback_query(call.id, "Failed to cancel booking.")
|
||||||
|
except Exception as e:
|
||||||
|
self.bot.answer_callback_query(call.id, "Failed to cancel booking.")
|
||||||
|
self.bot.send_message(call.message.chat.id, f"Error: {str(e)}")
|
||||||
|
else:
|
||||||
|
self.bot.answer_callback_query(call.id, "Invalid booking selection.")
|
||||||
|
except (IndexError, ValueError):
|
||||||
|
self.bot.answer_callback_query(call.id, "Invalid data.")
|
||||||
|
|
||||||
|
def add_to_watchlist(self, chat_id, slot):
|
||||||
|
if chat_id not in self.watchlist:
|
||||||
|
self.watchlist[chat_id] = []
|
||||||
|
self.watchlist[chat_id].append(slot)
|
||||||
|
|
||||||
|
def check_watchlist(self):
|
||||||
|
now = datetime.now().replace(tzinfo=timezone.utc)
|
||||||
|
for chat_id, slots in list(self.watchlist.items()):
|
||||||
|
available_slots = []
|
||||||
|
for slot in slots:
|
||||||
|
if slot.start < now:
|
||||||
|
# If the slot has expired
|
||||||
|
self.bot.send_message(chat_id, f"Slot {slot.start_stamp} has expired and is no longer available.")
|
||||||
|
available_slots.append(slot) # Mark it to remove from watchlist
|
||||||
|
elif self.xclient.check_booking_availability(slot):
|
||||||
|
# If the slot becomes available
|
||||||
|
try:
|
||||||
|
self.xclient.make_booking(slot)
|
||||||
|
self.bot.send_message(chat_id, f"Slot {slot.start_stamp} is now available and has been booked for you.")
|
||||||
|
available_slots.append(slot) # Mark it to remove from watchlist
|
||||||
|
except Exception as e:
|
||||||
|
self.bot.send_message(chat_id, f"Error booking slot {slot.start_stamp}: {str(e)}")
|
||||||
|
|
||||||
|
# Remove the expired or booked slots from the watchlist
|
||||||
|
self.watchlist[chat_id] = [slot for slot in slots if slot not in available_slots]
|
||||||
|
if not self.watchlist[chat_id]:
|
||||||
|
del self.watchlist[chat_id] # Remove chat_id from watchlist if empty
|
||||||
|
|
||||||
|
def calculate_polling_interval(self, slot_time):
|
||||||
|
now = datetime.now().replace(tzinfo=timezone.utc)
|
||||||
|
time_until_slot = (slot_time - now).total_seconds()
|
||||||
|
|
||||||
|
if time_until_slot < 3600: # If less than 1 hour
|
||||||
|
return 300 # 5 minutes
|
||||||
|
else:
|
||||||
|
return 1800 # 30 minutes
|
||||||
|
|
||||||
|
def poll_periodically(self):
|
||||||
|
# Continuously poll the watchlist at dynamically adjusted intervals
|
||||||
|
while True:
|
||||||
|
self.check_watchlist()
|
||||||
|
|
||||||
|
# Calculate the minimum polling interval based on upcoming slots
|
||||||
|
polling_interval = 1800 # Default 30 minutes
|
||||||
|
for chat_id, slots in self.watchlist.items():
|
||||||
|
for slot in slots:
|
||||||
|
interval = self.calculate_polling_interval(slot.start)
|
||||||
|
polling_interval = min(polling_interval, interval)
|
||||||
|
|
||||||
|
time.sleep(polling_interval) # Wait before checking the watchlist again
|
||||||
|
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
### start poll_periodically on seperate thread
|
||||||
|
thread = Thread(target = self.poll_periodically, args = ())
|
||||||
|
thread.daemon = True
|
||||||
|
thread.start()
|
||||||
|
self.bot.polling()
|
||||||
|
thread.kill()
|
||||||
292
xclient.py
Normal file
292
xclient.py
Normal file
@ -0,0 +1,292 @@
|
|||||||
|
from selenium import webdriver
|
||||||
|
from models import Booking
|
||||||
|
import json
|
||||||
|
from selenium.webdriver.common.by import By
|
||||||
|
from selenium.webdriver.support.ui import WebDriverWait
|
||||||
|
from selenium.webdriver.support import expected_conditions as EC
|
||||||
|
import time
|
||||||
|
import requests
|
||||||
|
|
||||||
|
|
||||||
|
class XClient:
|
||||||
|
def __init__(self, username, password):
|
||||||
|
self.username = username
|
||||||
|
self.password = password
|
||||||
|
self.access_token = None
|
||||||
|
self.id = -1
|
||||||
|
|
||||||
|
self.session = requests.Session()
|
||||||
|
# Default headers for session that mimicks browser
|
||||||
|
self.session.headers.update(
|
||||||
|
{
|
||||||
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
def fetch_access_token(self):
|
||||||
|
# Set up the WebDriver (make sure to use the correct path for your WebDriver)
|
||||||
|
options = webdriver.ChromeOptions()
|
||||||
|
options.add_argument("--headless=new")
|
||||||
|
driver = webdriver.Chrome(options=options)
|
||||||
|
|
||||||
|
driver.get("https://x.tudelft.nl")
|
||||||
|
|
||||||
|
button = WebDriverWait(driver, 30).until(
|
||||||
|
EC.element_to_be_clickable(
|
||||||
|
(By.XPATH, "//span[contains(text(), 'TUDelft')]")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
button.click()
|
||||||
|
|
||||||
|
button = WebDriverWait(driver, 30).until(
|
||||||
|
EC.element_to_be_clickable(
|
||||||
|
(
|
||||||
|
By.XPATH,
|
||||||
|
"//h3[contains(text(), 'Delft University of Technology')]",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
button.click()
|
||||||
|
|
||||||
|
# Input the username
|
||||||
|
username_input = driver.find_element(By.ID, "username")
|
||||||
|
username_input.send_keys(self.username) # Replace with your actual username
|
||||||
|
|
||||||
|
password_input = driver.find_element(By.ID, "password")
|
||||||
|
password_input.send_keys(self.password)
|
||||||
|
|
||||||
|
password_input.submit()
|
||||||
|
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
delcom_auth = json.loads(
|
||||||
|
driver.execute_script(
|
||||||
|
"return window.localStorage.getItem('delcom_auth');"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
driver.quit()
|
||||||
|
|
||||||
|
|
||||||
|
access_token = delcom_auth["tokenResponse"].get("accessToken", None)
|
||||||
|
return access_token
|
||||||
|
|
||||||
|
|
||||||
|
def list_slots(self, start, end, tagIDs=[28]):
|
||||||
|
"""
|
||||||
|
URl:
|
||||||
|
https://backbone-web-api.production.delft.delcom.nl/bookable-slots
|
||||||
|
Get parameters:
|
||||||
|
s: {"startDate":"2024-09-09T14:23:24.934Z","endDate":"2024-09-09T22:00:00.000Z","tagIds":{"$in":[28]},"availableFromDate":{"$gt":"2024-09-09T14:23:24.935Z"},"availableTillDate":{"$gte":"2024-09-09T14:23:24.934Z"}}
|
||||||
|
join: product
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.login_if_necessary()
|
||||||
|
start_stamp = start.strftime("%Y-%m-%dT%H:%M:%S.000Z")
|
||||||
|
end_stamp = end.strftime("%Y-%m-%dT%H:%M:%S.000Z")
|
||||||
|
|
||||||
|
response = self.session.get(
|
||||||
|
"https://backbone-web-api.production.delft.delcom.nl/bookable-slots",
|
||||||
|
params={
|
||||||
|
"s": json.dumps(
|
||||||
|
{
|
||||||
|
"startDate": start_stamp,
|
||||||
|
"endDate": end_stamp,
|
||||||
|
"tagIds": {"$in": tagIDs},
|
||||||
|
"availableFromDate": {"$gt": start_stamp},
|
||||||
|
"availableTillDate": {"$gte": start_stamp},
|
||||||
|
}
|
||||||
|
),
|
||||||
|
"join": ["product", "linkedProduct"],
|
||||||
|
},
|
||||||
|
headers={"Authorization": f"Bearer {self.access_token}"},
|
||||||
|
)
|
||||||
|
return [Booking(**x) for x in response.json()["data"]]
|
||||||
|
|
||||||
|
def my_bookings(self, start, end):
|
||||||
|
# Ensure user is logged in
|
||||||
|
self.login_if_necessary()
|
||||||
|
|
||||||
|
# Replace this with the actual member ID of the logged-in user
|
||||||
|
member_id = self.user_id # Assuming it's stored when the user logs in
|
||||||
|
|
||||||
|
# Define the URL
|
||||||
|
url = "https://backbone-web-api.production.delft.delcom.nl/participations"
|
||||||
|
|
||||||
|
# Create the parameters for the request
|
||||||
|
params = {
|
||||||
|
"s": json.dumps(
|
||||||
|
{
|
||||||
|
"$or": [
|
||||||
|
{"memberId": member_id}, # Fetch bookings based on the user ID
|
||||||
|
],
|
||||||
|
"booking.startDate": {"$gte": start.strftime("%Y-%m-%dT%H:%M:%S.%fZ")},
|
||||||
|
"booking.endDate": {"$lte": end.strftime("%Y-%m-%dT%H:%M:%S.%fZ")},
|
||||||
|
},
|
||||||
|
),
|
||||||
|
"join": ["booking", "linkedProduct", "product"],
|
||||||
|
"fetchOptimizedCustomerDashboard": "true",
|
||||||
|
"fetchTicket": "false",
|
||||||
|
"sort": "booking.startDate,ASC"
|
||||||
|
}
|
||||||
|
|
||||||
|
# Make the GET request to the API
|
||||||
|
response = self.session.get(
|
||||||
|
url,
|
||||||
|
params=params,
|
||||||
|
headers={"Authorization": f"Bearer {self.access_token}"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check if the request was successful
|
||||||
|
if response.status_code == 200:
|
||||||
|
bookings = [Booking(**(x | {"bookingId": x["id"], "bookableProductId": None})) for x in response.json()["data"]["bookings"]]
|
||||||
|
return bookings
|
||||||
|
elif response.status_code == 403:
|
||||||
|
raise Exception(
|
||||||
|
f"Failed to fetch bookings: {response.json()['message']}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise Exception(
|
||||||
|
f"Failed to fetch bookings: {response.status_code}, {response.text}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def make_booking(self, booking):
|
||||||
|
# Ensure that we're logged in
|
||||||
|
self.login_if_necessary()
|
||||||
|
|
||||||
|
# Extract necessary details from the booking object
|
||||||
|
booking_data = {
|
||||||
|
"organizationId": None, # This is None in the example curl
|
||||||
|
"memberId": self.user_id, # This is fetched during login
|
||||||
|
"bookingId": booking.booking_id, # Booking ID from the booking object
|
||||||
|
"primaryPurchaseMessage": None,
|
||||||
|
"secondaryPurchaseMessage": None,
|
||||||
|
"params": {
|
||||||
|
"startDate": booking.start.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
|
||||||
|
"endDate": booking.end.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
|
||||||
|
"bookableProductId": booking.bookable_product_id,
|
||||||
|
"bookableLinkedProductId": booking.linked_product_id,
|
||||||
|
"bookingId": booking.booking_id, # Booking ID again
|
||||||
|
"invitedMemberEmails": [],
|
||||||
|
"invitedGuests": [],
|
||||||
|
"invitedOthers": [],
|
||||||
|
"primaryPurchaseMessage": None,
|
||||||
|
"secondaryPurchaseMessage": None,
|
||||||
|
},
|
||||||
|
"dateOfRegistration": None,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Send the booking request
|
||||||
|
response = self.session.post(
|
||||||
|
"https://backbone-web-api.production.delft.delcom.nl/participations",
|
||||||
|
json=booking_data,
|
||||||
|
headers={"Authorization": f"Bearer {self.access_token}"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check the response status
|
||||||
|
if response.status_code == 201:
|
||||||
|
return response.json() # Return response if successful
|
||||||
|
elif response.status_code == 403:
|
||||||
|
raise Exception(
|
||||||
|
f"Failed to make booking: {response.json()['message']}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise Exception(
|
||||||
|
f"Failed to make booking: {response.status_code}, {response.text}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def check_booking_availability(self, booking):
|
||||||
|
# Ensure user is logged in
|
||||||
|
self.login_if_necessary()
|
||||||
|
|
||||||
|
# Define the URL
|
||||||
|
url = "https://backbone-web-api.production.delft.delcom.nl/bookable-slots"
|
||||||
|
|
||||||
|
# Create the parameters for the request
|
||||||
|
params = {
|
||||||
|
"s": json.dumps(
|
||||||
|
{
|
||||||
|
"bookingId": booking.booking_id,
|
||||||
|
"tagIds": {"$in": [28]},
|
||||||
|
},
|
||||||
|
),
|
||||||
|
"join": ["product", "linkedProduct"],
|
||||||
|
}
|
||||||
|
|
||||||
|
# Make the GET request to the API
|
||||||
|
response = self.session.get(
|
||||||
|
url,
|
||||||
|
params=params,
|
||||||
|
headers={"Authorization": f"Bearer {self.access_token}"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check if the request was successful
|
||||||
|
if response.status_code == 200:
|
||||||
|
if len(response.json()["data"]) != 1:
|
||||||
|
return False
|
||||||
|
booking = Booking(**response.json()["data"][0])
|
||||||
|
return booking.available
|
||||||
|
|
||||||
|
elif response.status_code == 403:
|
||||||
|
raise Exception(
|
||||||
|
f"Failed to check booking availability: {response.json()['message']}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise Exception(
|
||||||
|
f"Failed to check booking availability: {response.status_code}, {response.text}"
|
||||||
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def user_id(self):
|
||||||
|
if self.id != -1:
|
||||||
|
return self.id
|
||||||
|
|
||||||
|
self.login_if_necessary()
|
||||||
|
|
||||||
|
response = self.session.get(
|
||||||
|
"https://backbone-web-api.production.delft.delcom.nl/auth?cf=0",
|
||||||
|
headers={"Authorization": f"Bearer {self.access_token}"},
|
||||||
|
)
|
||||||
|
self.id = response.json()["id"]
|
||||||
|
return self.id
|
||||||
|
|
||||||
|
def cancel_booking(self, booking):
|
||||||
|
participation_id = None
|
||||||
|
for i in booking.booking_dict.get("participations", []):
|
||||||
|
if i["memberId"] == self.user_id:
|
||||||
|
participation_id = i["id"]
|
||||||
|
break
|
||||||
|
|
||||||
|
assert participation_id is not None, "Booking not found in user's participations"
|
||||||
|
|
||||||
|
## Send a DELETE request to the URL https://backbone-web-api.production.delft.delcom.nl/participations/{id}
|
||||||
|
response = self.session.delete(
|
||||||
|
f"https://backbone-web-api.production.delft.delcom.nl/participations/{participation_id}",
|
||||||
|
headers={"Authorization": f"Bearer {self.access_token}"},
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
return True
|
||||||
|
elif response.status_code == 403:
|
||||||
|
raise Exception(
|
||||||
|
f"Failed to cancel booking: {response.json()['message']}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise Exception(
|
||||||
|
f"Failed to fetch bookings: {response.status_code}, {response.text}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def login(self):
|
||||||
|
self.access_token = self.fetch_access_token()
|
||||||
|
|
||||||
|
def is_logged_in(self):
|
||||||
|
# Send request to 'https://backbone-web-api.production.delft.delcom.nl/auth?cf=0 with access token as bearer, check status code
|
||||||
|
response = self.session.get(
|
||||||
|
"https://backbone-web-api.production.delft.delcom.nl/auth?cf=0",
|
||||||
|
headers={"Authorization": f"Bearer {self.access_token}"},
|
||||||
|
)
|
||||||
|
|
||||||
|
return response.status_code == 200
|
||||||
|
|
||||||
|
def login_if_necessary(self):
|
||||||
|
if not self.is_logged_in():
|
||||||
|
self.login()
|
||||||
Loading…
x
Reference in New Issue
Block a user