Compare commits
3 Commits
d63a39b49a
...
083ea29c21
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
083ea29c21 | ||
|
|
10fb02c90d | ||
|
|
00f3bc4ffd |
@ -19,20 +19,13 @@ RUN apt-get update && \
|
||||
build-essential \
|
||||
libsqlite3-dev \
|
||||
sqlite3 \
|
||||
chromium \
|
||||
firefox-esr \
|
||||
&& apt-get clean \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Install the required Python packages
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
# Set up Chrome WebDriver for Selenium
|
||||
RUN wget -q -O /app/chromedriver.zip https://storage.googleapis.com/chrome-for-testing-public/129.0.6668.70/linux64/chromedriver-linux64.zip && \
|
||||
unzip /app/chromedriver.zip && \
|
||||
mv /app/chromedriver-linux64/chromedriver /app && \
|
||||
chmod +x /app/chromedriver && \
|
||||
rm -rf /app/chromedriver.zip /app/chromedriver-linux64
|
||||
|
||||
# Set environment variables for Chrome and ChromeDriver
|
||||
ENV PATH=/app:$PATH
|
||||
ENV DISPLAY=:99
|
||||
|
||||
@ -2,8 +2,6 @@ import os
|
||||
|
||||
TELEGRAM_TOKEN = os.environ.get("SECRETARX_TG_TOKEN", None)
|
||||
DB_PATH = "data.db"
|
||||
CHROMEDRIVER_PATH = "chromedriver"
|
||||
|
||||
|
||||
LISTEN_PORT = 9090
|
||||
GOOGLE_CLIENT_SECRET_FILE = "credentials.json"
|
||||
|
||||
424
tgbot.py
424
tgbot.py
@ -71,6 +71,10 @@ class TelegramBot:
|
||||
def calendar_auth(message):
|
||||
self.calendar_auth(message)
|
||||
|
||||
@self.bot.message_handler(commands=["login"])
|
||||
def cancel_booking(message):
|
||||
self.process_login(message)
|
||||
|
||||
def init_db(self):
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
cursor = conn.cursor()
|
||||
@ -229,210 +233,224 @@ class TelegramBot:
|
||||
def make_booking(self, message):
|
||||
try:
|
||||
xclient = self.get_xclient(message.chat.id)
|
||||
|
||||
if not xclient:
|
||||
self.bot.reply_to(
|
||||
message,
|
||||
"You are not logged in. Please use `/login username password` to log in.",
|
||||
)
|
||||
return
|
||||
|
||||
start = datetime.now()
|
||||
end = start + timedelta(days=1)
|
||||
slots = xclient.list_slots(start, end)
|
||||
|
||||
if slots:
|
||||
markup = InlineKeyboardMarkup()
|
||||
for i, slot in enumerate(slots):
|
||||
# Create a button for each slot
|
||||
status = "Available" if slot.available else "FULL"
|
||||
button_text = f"Slot {i + 1}: {slot.start_stamp} ({status})"
|
||||
markup.add(
|
||||
InlineKeyboardButton(button_text, callback_data=f"book_{i}")
|
||||
)
|
||||
|
||||
self.bot.reply_to(
|
||||
message, "Select a slot to book or watch:", reply_markup=markup
|
||||
)
|
||||
self.user_selected_slot[message.chat.id] = slots
|
||||
else:
|
||||
self.bot.reply_to(message, "No slots found.")
|
||||
|
||||
except Exception as e:
|
||||
self.bot.reply_to(message, f"Error: {str(e)}")
|
||||
return
|
||||
|
||||
if not xclient:
|
||||
self.bot.reply_to(
|
||||
message,
|
||||
"You are not logged in. Please use `/login username password` to log in.",
|
||||
)
|
||||
return
|
||||
|
||||
start = datetime.now()
|
||||
end = start + timedelta(days=1)
|
||||
slots = xclient.list_slots(start, end)
|
||||
|
||||
if slots:
|
||||
markup = InlineKeyboardMarkup()
|
||||
for i, slot in enumerate(slots):
|
||||
# Create a button for each slot
|
||||
status = "Available" if slot.available else "FULL"
|
||||
button_text = f"Slot {i + 1}: {slot.start_stamp} ({status})"
|
||||
markup.add(InlineKeyboardButton(button_text, callback_data=f"book_{i}"))
|
||||
|
||||
self.bot.reply_to(
|
||||
message, "Select a slot to book or watch:", reply_markup=markup
|
||||
)
|
||||
self.user_selected_slot[message.chat.id] = slots
|
||||
else:
|
||||
self.bot.reply_to(message, "No slots found.")
|
||||
|
||||
def callback_booking(self, call):
|
||||
try:
|
||||
xclient = self.get_xclient(call.message.chat.id)
|
||||
except Exception as e:
|
||||
self.bot.reply_to(call.message, f"Error: {str(e)}")
|
||||
return
|
||||
|
||||
if not xclient:
|
||||
self.bot.reply_to(
|
||||
call.message,
|
||||
"You are not logged in. Please use `/login username password` to log in.",
|
||||
)
|
||||
return
|
||||
if not xclient:
|
||||
self.bot.reply_to(
|
||||
call.message,
|
||||
"You are not logged in. Please use `/login username password` to log in.",
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
# Extract slot index from callback data
|
||||
slot_index = int(call.data.split("_")[1])
|
||||
slots = self.user_selected_slot.get(call.message.chat.id, [])
|
||||
try:
|
||||
# Extract slot index from callback data
|
||||
slot_index = int(call.data.split("_")[1])
|
||||
slots = self.user_selected_slot.get(call.message.chat.id, [])
|
||||
|
||||
if 0 <= slot_index < len(slots):
|
||||
selected_slot = slots[slot_index]
|
||||
if 0 <= slot_index < len(slots):
|
||||
selected_slot = slots[slot_index]
|
||||
|
||||
if selected_slot.available:
|
||||
# Attempt to book the available slot
|
||||
try:
|
||||
xclient.make_booking(selected_slot)
|
||||
if selected_slot.available:
|
||||
# Attempt to book the available slot
|
||||
try:
|
||||
xclient.make_booking(selected_slot)
|
||||
|
||||
if (
|
||||
token := self.get_calendar_token(call.message.chat.id)
|
||||
) is not None:
|
||||
self.calendar.create_event(
|
||||
token,
|
||||
{
|
||||
"summary": "Gym Booking",
|
||||
"start": {
|
||||
"dateTime": selected_slot.start.strftime(
|
||||
"%Y-%m-%dT%H:%M:%S.%fZ"
|
||||
),
|
||||
"timeZone": "Europe/Amsterdam",
|
||||
if (
|
||||
token := self.get_calendar_token(call.message.chat.id)
|
||||
) is not None:
|
||||
self.calendar.create_event(
|
||||
token,
|
||||
{
|
||||
"summary": "Gym Booking",
|
||||
"start": {
|
||||
"dateTime": selected_slot.start.strftime(
|
||||
"%Y-%m-%dT%H:%M:%S.%fZ"
|
||||
),
|
||||
"timeZone": "Europe/Amsterdam",
|
||||
},
|
||||
"end": {
|
||||
"dateTime": selected_slot.end.strftime(
|
||||
"%Y-%m-%dT%H:%M:%S.%fZ"
|
||||
),
|
||||
"timeZone": "Europe/Amsterdam",
|
||||
},
|
||||
},
|
||||
"end": {
|
||||
"dateTime": selected_slot.end.strftime(
|
||||
"%Y-%m-%dT%H:%M:%S.%fZ"
|
||||
),
|
||||
"timeZone": "Europe/Amsterdam",
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
self.bot.answer_callback_query(
|
||||
call.id, "Slot booked successfully!"
|
||||
)
|
||||
|
||||
self.bot.send_message(
|
||||
call.message.chat.id,
|
||||
f"Booking confirmed for: {selected_slot.start_stamp}",
|
||||
)
|
||||
except Exception as e:
|
||||
self.bot.answer_callback_query(
|
||||
call.id, "Failed to book slot."
|
||||
)
|
||||
self.bot.send_message(
|
||||
call.message.chat.id, f"Error: {str(e)}"
|
||||
)
|
||||
else:
|
||||
# Slot is full, add to watchlist
|
||||
self.add_to_watchlist(call.message.chat.id, selected_slot)
|
||||
self.bot.answer_callback_query(
|
||||
call.id, "Slot booked successfully!"
|
||||
call.id, "Slot is full. Added to watchlist."
|
||||
)
|
||||
self.bot.send_message(
|
||||
call.message.chat.id,
|
||||
f"Booking confirmed for: {selected_slot.start_stamp}",
|
||||
f"Slot {selected_slot.start_stamp} is full. You will be notified when it becomes available.",
|
||||
)
|
||||
except Exception as e:
|
||||
self.bot.answer_callback_query(call.id, "Failed to book slot.")
|
||||
self.bot.send_message(call.message.chat.id, f"Error: {str(e)}")
|
||||
else:
|
||||
# Slot is full, add to watchlist
|
||||
self.add_to_watchlist(call.message.chat.id, selected_slot)
|
||||
self.bot.answer_callback_query(
|
||||
call.id, "Slot is full. Added to watchlist."
|
||||
)
|
||||
self.bot.send_message(
|
||||
call.message.chat.id,
|
||||
f"Slot {selected_slot.start_stamp} is full. You will be notified when it becomes available.",
|
||||
)
|
||||
else:
|
||||
self.bot.answer_callback_query(call.id, "Invalid slot selection.")
|
||||
except (IndexError, ValueError):
|
||||
self.bot.answer_callback_query(call.id, "Invalid data.")
|
||||
self.bot.answer_callback_query(call.id, "Invalid slot selection.")
|
||||
except (IndexError, ValueError):
|
||||
self.bot.answer_callback_query(call.id, "Invalid data.")
|
||||
|
||||
## Remove the buttons from the previous message
|
||||
self.bot.edit_message_reply_markup(
|
||||
call.message.chat.id, call.message.message_id
|
||||
)
|
||||
## Remove the buttons from the previous message
|
||||
self.bot.edit_message_reply_markup(
|
||||
call.message.chat.id, call.message.message_id
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
self.bot.reply_to(call.message, f"Error: {str(e)}")
|
||||
return
|
||||
|
||||
def cancel_booking(self, message):
|
||||
try:
|
||||
xclient = self.get_xclient(message.chat.id)
|
||||
|
||||
if not xclient:
|
||||
self.bot.reply_to(
|
||||
message,
|
||||
"You are not logged in. Please use `/login username password` to log in.",
|
||||
)
|
||||
return
|
||||
|
||||
# Fetch user's current bookings
|
||||
bookings = xclient.my_bookings(
|
||||
datetime.now(), datetime.now() + timedelta(days=31)
|
||||
)
|
||||
|
||||
if bookings:
|
||||
markup = InlineKeyboardMarkup()
|
||||
for i, booking in enumerate(bookings):
|
||||
# Create a button for each booking
|
||||
button_text = f"Booking {i + 1}: {booking.start_stamp}"
|
||||
markup.add(
|
||||
InlineKeyboardButton(button_text, callback_data=f"cancel_{i}")
|
||||
)
|
||||
|
||||
self.user_bookings[message.chat.id] = bookings
|
||||
self.bot.reply_to(
|
||||
message, "Select a booking to cancel:", reply_markup=markup
|
||||
)
|
||||
else:
|
||||
self.bot.reply_to(message, "You have no bookings to cancel.")
|
||||
|
||||
except Exception as e:
|
||||
self.bot.reply_to(message, f"Error: {str(e)}")
|
||||
return
|
||||
|
||||
if not xclient:
|
||||
self.bot.reply_to(
|
||||
message,
|
||||
"You are not logged in. Please use `/login username password` to log in.",
|
||||
)
|
||||
return
|
||||
|
||||
# Fetch user's current bookings
|
||||
bookings = xclient.my_bookings(
|
||||
datetime.now(), datetime.now() + timedelta(days=31)
|
||||
)
|
||||
|
||||
if bookings:
|
||||
markup = InlineKeyboardMarkup()
|
||||
for i, booking in enumerate(bookings):
|
||||
# Create a button for each booking
|
||||
button_text = f"Booking {i + 1}: {booking.start_stamp}"
|
||||
markup.add(
|
||||
InlineKeyboardButton(button_text, callback_data=f"cancel_{i}")
|
||||
)
|
||||
|
||||
self.user_bookings[message.chat.id] = bookings
|
||||
self.bot.reply_to(
|
||||
message, "Select a booking to cancel:", reply_markup=markup
|
||||
)
|
||||
else:
|
||||
self.bot.reply_to(message, "You have no bookings to cancel.")
|
||||
|
||||
def callback_cancel_booking(self, call):
|
||||
try:
|
||||
xclient = self.get_xclient(call.message.chat.id)
|
||||
except Exception as e:
|
||||
self.bot.reply_to(call.message, f"Error: {str(e)}")
|
||||
return
|
||||
|
||||
if not xclient:
|
||||
self.bot.reply_to(
|
||||
call.message,
|
||||
"You are not logged in. Please use `/login username password` to log in.",
|
||||
)
|
||||
return
|
||||
if not xclient:
|
||||
self.bot.reply_to(
|
||||
call.message,
|
||||
"You are not logged in. Please use `/login username password` to log in.",
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
# Extract booking index from callback data
|
||||
booking_index = int(call.data.split("_")[1])
|
||||
bookings = self.user_bookings.get(call.message.chat.id, [])
|
||||
try:
|
||||
# Extract booking index from callback data
|
||||
booking_index = int(call.data.split("_")[1])
|
||||
bookings = self.user_bookings.get(call.message.chat.id, [])
|
||||
|
||||
if 0 <= booking_index < len(bookings):
|
||||
selected_booking = bookings[booking_index]
|
||||
# Attempt to cancel the selected booking
|
||||
try:
|
||||
if xclient.cancel_booking(selected_booking):
|
||||
self.bot.answer_callback_query(
|
||||
call.id, "Booking canceled successfully!"
|
||||
)
|
||||
self.bot.send_message(
|
||||
call.message.chat.id,
|
||||
f"Booking for {selected_booking.start_stamp} has been canceled.",
|
||||
)
|
||||
|
||||
if (
|
||||
token := self.get_calendar_token(call.message.chat.id)
|
||||
) is not None:
|
||||
gmt_plus_2 = timezone(timedelta(hours=2))
|
||||
|
||||
event = self.calendar.get_first_event(
|
||||
token, selected_booking.start.astimezone(gmt_plus_2)
|
||||
if 0 <= booking_index < len(bookings):
|
||||
selected_booking = bookings[booking_index]
|
||||
# Attempt to cancel the selected booking
|
||||
try:
|
||||
if xclient.cancel_booking(selected_booking):
|
||||
self.bot.answer_callback_query(
|
||||
call.id, "Booking canceled successfully!"
|
||||
)
|
||||
self.bot.send_message(
|
||||
call.message.chat.id,
|
||||
f"Booking for {selected_booking.start_stamp} has been canceled.",
|
||||
)
|
||||
|
||||
if event:
|
||||
self.calendar.delete_event(token, event["id"])
|
||||
if (
|
||||
token := self.get_calendar_token(call.message.chat.id)
|
||||
) is not None:
|
||||
gmt_plus_2 = timezone(timedelta(hours=2))
|
||||
|
||||
else:
|
||||
event = self.calendar.get_first_event(
|
||||
token, selected_booking.start.astimezone(gmt_plus_2)
|
||||
)
|
||||
|
||||
if event:
|
||||
self.calendar.delete_event(token, event["id"])
|
||||
|
||||
else:
|
||||
self.bot.answer_callback_query(
|
||||
call.id, "Failed to cancel booking."
|
||||
)
|
||||
except Exception as e:
|
||||
self.bot.answer_callback_query(
|
||||
call.id, "Failed to cancel booking."
|
||||
)
|
||||
except Exception as e:
|
||||
self.bot.answer_callback_query(call.id, "Failed to cancel booking.")
|
||||
self.bot.send_message(call.message.chat.id, f"Error: {str(e)}")
|
||||
else:
|
||||
self.bot.answer_callback_query(call.id, "Invalid booking selection.")
|
||||
except (IndexError, ValueError):
|
||||
self.bot.answer_callback_query(call.id, "Invalid data.")
|
||||
self.bot.send_message(call.message.chat.id, f"Error: {str(e)}")
|
||||
else:
|
||||
self.bot.answer_callback_query(
|
||||
call.id, "Invalid booking selection."
|
||||
)
|
||||
except (IndexError, ValueError):
|
||||
self.bot.answer_callback_query(call.id, "Invalid data.")
|
||||
|
||||
## Remove the buttons from the previous message
|
||||
self.bot.edit_message_reply_markup(
|
||||
call.message.chat.id, call.message.message_id
|
||||
)
|
||||
## Remove the buttons from the previous message
|
||||
self.bot.edit_message_reply_markup(
|
||||
call.message.chat.id, call.message.message_id
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
self.bot.reply_to(call.message, f"Error: {str(e)}")
|
||||
return
|
||||
|
||||
def manage_watchlist(self, message):
|
||||
chat_id = message.chat.id
|
||||
@ -495,50 +513,54 @@ class TelegramBot:
|
||||
for chat_id, slots in list(self.watchlist.items()):
|
||||
try:
|
||||
xclient = self.get_xclient(chat_id)
|
||||
except Exception as e:
|
||||
print(f"Error polling watchlist: {str(e)}")
|
||||
|
||||
available_slots = []
|
||||
for slot in slots:
|
||||
if slot.start < now:
|
||||
# If the slot has expired
|
||||
self.bot.send_message(
|
||||
chat_id,
|
||||
f"Slot {slot.start_stamp} has expired and is no longer available.",
|
||||
)
|
||||
available_slots.append(slot) # Mark it to remove from watchlist
|
||||
elif xclient.check_booking_availability(slot):
|
||||
# If the slot becomes available
|
||||
try:
|
||||
xclient.make_booking(slot)
|
||||
if (token := self.get_calendar_token(chat_id)) is not None:
|
||||
self.calendar.create_event(
|
||||
token,
|
||||
{
|
||||
"summary": "Gym Booking",
|
||||
"start": {
|
||||
"dateTime": slot.start.strftime(
|
||||
"%Y-%m-%dT%H:%M:%S.%fZ"
|
||||
),
|
||||
"timeZone": "Europe/Amsterdam",
|
||||
},
|
||||
"end": {
|
||||
"dateTime": slot.end.strftime(
|
||||
"%Y-%m-%dT%H:%M:%S.%fZ"
|
||||
),
|
||||
"timeZone": "Europe/Amsterdam",
|
||||
},
|
||||
},
|
||||
)
|
||||
available_slots = []
|
||||
for slot in slots:
|
||||
if slot.start < now:
|
||||
# If the slot has expired
|
||||
self.bot.send_message(
|
||||
chat_id,
|
||||
f"Slot {slot.start_stamp} is now available and has been booked for you.",
|
||||
f"Slot {slot.start_stamp} has expired and is no longer available.",
|
||||
)
|
||||
available_slots.append(slot) # Mark it to remove from watchlist
|
||||
except Exception as e:
|
||||
self.bot.send_message(
|
||||
chat_id, f"Error booking slot {slot.start_stamp}: {str(e)}"
|
||||
)
|
||||
elif xclient.check_booking_availability(slot):
|
||||
# If the slot becomes available
|
||||
try:
|
||||
xclient.make_booking(slot)
|
||||
if (token := self.get_calendar_token(chat_id)) is not None:
|
||||
self.calendar.create_event(
|
||||
token,
|
||||
{
|
||||
"summary": "Gym Booking",
|
||||
"start": {
|
||||
"dateTime": slot.start.strftime(
|
||||
"%Y-%m-%dT%H:%M:%S.%fZ"
|
||||
),
|
||||
"timeZone": "Europe/Amsterdam",
|
||||
},
|
||||
"end": {
|
||||
"dateTime": slot.end.strftime(
|
||||
"%Y-%m-%dT%H:%M:%S.%fZ"
|
||||
),
|
||||
"timeZone": "Europe/Amsterdam",
|
||||
},
|
||||
},
|
||||
)
|
||||
self.bot.send_message(
|
||||
chat_id,
|
||||
f"Slot {slot.start_stamp} is now available and has been booked for you.",
|
||||
)
|
||||
available_slots.append(
|
||||
slot
|
||||
) # Mark it to remove from watchlist
|
||||
except Exception as e:
|
||||
self.bot.send_message(
|
||||
chat_id,
|
||||
f"Error booking slot {slot.start_stamp}: {str(e)}",
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error polling watchlist: {str(e)}")
|
||||
|
||||
# Remove the expired or booked slots from the watchlist
|
||||
self.watchlist[chat_id] = [
|
||||
|
||||
38
xclient.py
38
xclient.py
@ -1,4 +1,5 @@
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.firefox.options import Options
|
||||
from models import Booking
|
||||
import json
|
||||
from selenium.webdriver.common.by import By
|
||||
@ -6,12 +7,12 @@ from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
import time
|
||||
import requests
|
||||
import config
|
||||
from selenium.webdriver.chrome.service import Service
|
||||
from webdriver_manager.chrome import ChromeDriverManager
|
||||
|
||||
|
||||
service = Service(ChromeDriverManager().install())
|
||||
class LoginFailedException(Exception):
|
||||
def __init__(self, *args: object) -> None:
|
||||
super().__init__(*args)
|
||||
|
||||
|
||||
class XClient:
|
||||
@ -31,20 +32,13 @@ class XClient:
|
||||
)
|
||||
|
||||
def fetch_access_token(self):
|
||||
# Set up the WebDriver (make sure to use the correct path for your WebDriver)
|
||||
print("starting chromedriver")
|
||||
options = webdriver.ChromeOptions()
|
||||
options.add_argument("--headless=new")
|
||||
options.add_argument("--no-sandbox")
|
||||
options = webdriver.FirefoxOptions()
|
||||
options.add_argument("--headless")
|
||||
|
||||
print("started chromedriver")
|
||||
|
||||
driver = webdriver.Chrome(options=options, service=service)
|
||||
driver = webdriver.Firefox(options=options)
|
||||
|
||||
driver.get("https://x.tudelft.nl")
|
||||
|
||||
print("Check1")
|
||||
|
||||
button = WebDriverWait(driver, 30).until(
|
||||
EC.element_to_be_clickable(
|
||||
(By.XPATH, "//span[contains(text(), 'TUDelft')]")
|
||||
@ -61,7 +55,6 @@ class XClient:
|
||||
)
|
||||
)
|
||||
|
||||
print("Check2")
|
||||
button.click()
|
||||
|
||||
# Input the username
|
||||
@ -73,13 +66,24 @@ class XClient:
|
||||
|
||||
password_input.submit()
|
||||
|
||||
time.sleep(1)
|
||||
time.sleep(2)
|
||||
|
||||
delcom_auth = json.loads(
|
||||
driver.execute_script("return window.localStorage.getItem('delcom_auth');")
|
||||
cookie = driver.execute_script(
|
||||
"return window.localStorage.getItem('delcom_auth');"
|
||||
)
|
||||
|
||||
if cookie is None:
|
||||
raise LoginFailedException("Logging in to X has failed")
|
||||
|
||||
delcom_auth = json.loads(cookie)
|
||||
|
||||
driver.quit()
|
||||
|
||||
if delcom_auth.get("tokenResponse") is None:
|
||||
raise LoginFailedException(
|
||||
"Logging in to X has failed, missing tokenResponse"
|
||||
)
|
||||
|
||||
access_token = delcom_auth["tokenResponse"].get("accessToken", None)
|
||||
return access_token
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user