Compare commits

..

3 Commits

Author SHA1 Message Date
Yigit Colakoglu
083ea29c21 Fix dockerfile, switch to firefox 2024-10-01 14:05:37 +02:00
Yigit Colakoglu
10fb02c90d Switch to firefox 2024-09-30 15:10:09 +02:00
Yigit Colakoglu
00f3bc4ffd Fixed dockerfile issues 2024-09-30 14:44:56 +02:00
4 changed files with 245 additions and 228 deletions

View File

@ -19,20 +19,13 @@ RUN apt-get update && \
build-essential \ build-essential \
libsqlite3-dev \ libsqlite3-dev \
sqlite3 \ sqlite3 \
chromium \ firefox-esr \
&& apt-get clean \ && apt-get clean \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
# Install the required Python packages # Install the required Python packages
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt
# Set up Chrome WebDriver for Selenium
RUN wget -q -O /app/chromedriver.zip https://storage.googleapis.com/chrome-for-testing-public/129.0.6668.70/linux64/chromedriver-linux64.zip && \
unzip /app/chromedriver.zip && \
mv /app/chromedriver-linux64/chromedriver /app && \
chmod +x /app/chromedriver && \
rm -rf /app/chromedriver.zip /app/chromedriver-linux64
# Set environment variables for Chrome and ChromeDriver # Set environment variables for Chrome and ChromeDriver
ENV PATH=/app:$PATH ENV PATH=/app:$PATH
ENV DISPLAY=:99 ENV DISPLAY=:99

View File

@ -2,8 +2,6 @@ import os
TELEGRAM_TOKEN = os.environ.get("SECRETARX_TG_TOKEN", None) TELEGRAM_TOKEN = os.environ.get("SECRETARX_TG_TOKEN", None)
DB_PATH = "data.db" DB_PATH = "data.db"
CHROMEDRIVER_PATH = "chromedriver"
LISTEN_PORT = 9090 LISTEN_PORT = 9090
GOOGLE_CLIENT_SECRET_FILE = "credentials.json" GOOGLE_CLIENT_SECRET_FILE = "credentials.json"

424
tgbot.py
View File

@ -71,6 +71,10 @@ class TelegramBot:
def calendar_auth(message): def calendar_auth(message):
self.calendar_auth(message) self.calendar_auth(message)
@self.bot.message_handler(commands=["login"])
def cancel_booking(message):
self.process_login(message)
def init_db(self): def init_db(self):
with sqlite3.connect(self.db_path) as conn: with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor() cursor = conn.cursor()
@ -229,210 +233,224 @@ class TelegramBot:
def make_booking(self, message): def make_booking(self, message):
try: try:
xclient = self.get_xclient(message.chat.id) xclient = self.get_xclient(message.chat.id)
if not xclient:
self.bot.reply_to(
message,
"You are not logged in. Please use `/login username password` to log in.",
)
return
start = datetime.now()
end = start + timedelta(days=1)
slots = xclient.list_slots(start, end)
if slots:
markup = InlineKeyboardMarkup()
for i, slot in enumerate(slots):
# Create a button for each slot
status = "Available" if slot.available else "FULL"
button_text = f"Slot {i + 1}: {slot.start_stamp} ({status})"
markup.add(
InlineKeyboardButton(button_text, callback_data=f"book_{i}")
)
self.bot.reply_to(
message, "Select a slot to book or watch:", reply_markup=markup
)
self.user_selected_slot[message.chat.id] = slots
else:
self.bot.reply_to(message, "No slots found.")
except Exception as e: except Exception as e:
self.bot.reply_to(message, f"Error: {str(e)}") self.bot.reply_to(message, f"Error: {str(e)}")
return return
if not xclient:
self.bot.reply_to(
message,
"You are not logged in. Please use `/login username password` to log in.",
)
return
start = datetime.now()
end = start + timedelta(days=1)
slots = xclient.list_slots(start, end)
if slots:
markup = InlineKeyboardMarkup()
for i, slot in enumerate(slots):
# Create a button for each slot
status = "Available" if slot.available else "FULL"
button_text = f"Slot {i + 1}: {slot.start_stamp} ({status})"
markup.add(InlineKeyboardButton(button_text, callback_data=f"book_{i}"))
self.bot.reply_to(
message, "Select a slot to book or watch:", reply_markup=markup
)
self.user_selected_slot[message.chat.id] = slots
else:
self.bot.reply_to(message, "No slots found.")
def callback_booking(self, call): def callback_booking(self, call):
try: try:
xclient = self.get_xclient(call.message.chat.id) xclient = self.get_xclient(call.message.chat.id)
except Exception as e:
self.bot.reply_to(call.message, f"Error: {str(e)}")
return
if not xclient: if not xclient:
self.bot.reply_to( self.bot.reply_to(
call.message, call.message,
"You are not logged in. Please use `/login username password` to log in.", "You are not logged in. Please use `/login username password` to log in.",
) )
return return
try: try:
# Extract slot index from callback data # Extract slot index from callback data
slot_index = int(call.data.split("_")[1]) slot_index = int(call.data.split("_")[1])
slots = self.user_selected_slot.get(call.message.chat.id, []) slots = self.user_selected_slot.get(call.message.chat.id, [])
if 0 <= slot_index < len(slots): if 0 <= slot_index < len(slots):
selected_slot = slots[slot_index] selected_slot = slots[slot_index]
if selected_slot.available: if selected_slot.available:
# Attempt to book the available slot # Attempt to book the available slot
try: try:
xclient.make_booking(selected_slot) xclient.make_booking(selected_slot)
if ( if (
token := self.get_calendar_token(call.message.chat.id) token := self.get_calendar_token(call.message.chat.id)
) is not None: ) is not None:
self.calendar.create_event( self.calendar.create_event(
token, token,
{ {
"summary": "Gym Booking", "summary": "Gym Booking",
"start": { "start": {
"dateTime": selected_slot.start.strftime( "dateTime": selected_slot.start.strftime(
"%Y-%m-%dT%H:%M:%S.%fZ" "%Y-%m-%dT%H:%M:%S.%fZ"
), ),
"timeZone": "Europe/Amsterdam", "timeZone": "Europe/Amsterdam",
},
"end": {
"dateTime": selected_slot.end.strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
),
"timeZone": "Europe/Amsterdam",
},
}, },
"end": { )
"dateTime": selected_slot.end.strftime(
"%Y-%m-%dT%H:%M:%S.%fZ" self.bot.answer_callback_query(
), call.id, "Slot booked successfully!"
"timeZone": "Europe/Amsterdam",
},
},
) )
self.bot.send_message(
call.message.chat.id,
f"Booking confirmed for: {selected_slot.start_stamp}",
)
except Exception as e:
self.bot.answer_callback_query(
call.id, "Failed to book slot."
)
self.bot.send_message(
call.message.chat.id, f"Error: {str(e)}"
)
else:
# Slot is full, add to watchlist
self.add_to_watchlist(call.message.chat.id, selected_slot)
self.bot.answer_callback_query( self.bot.answer_callback_query(
call.id, "Slot booked successfully!" call.id, "Slot is full. Added to watchlist."
) )
self.bot.send_message( self.bot.send_message(
call.message.chat.id, call.message.chat.id,
f"Booking confirmed for: {selected_slot.start_stamp}", f"Slot {selected_slot.start_stamp} is full. You will be notified when it becomes available.",
) )
except Exception as e:
self.bot.answer_callback_query(call.id, "Failed to book slot.")
self.bot.send_message(call.message.chat.id, f"Error: {str(e)}")
else: else:
# Slot is full, add to watchlist self.bot.answer_callback_query(call.id, "Invalid slot selection.")
self.add_to_watchlist(call.message.chat.id, selected_slot) except (IndexError, ValueError):
self.bot.answer_callback_query( self.bot.answer_callback_query(call.id, "Invalid data.")
call.id, "Slot is full. Added to watchlist."
)
self.bot.send_message(
call.message.chat.id,
f"Slot {selected_slot.start_stamp} is full. You will be notified when it becomes available.",
)
else:
self.bot.answer_callback_query(call.id, "Invalid slot selection.")
except (IndexError, ValueError):
self.bot.answer_callback_query(call.id, "Invalid data.")
## Remove the buttons from the previous message ## Remove the buttons from the previous message
self.bot.edit_message_reply_markup( self.bot.edit_message_reply_markup(
call.message.chat.id, call.message.message_id call.message.chat.id, call.message.message_id
) )
except Exception as e:
self.bot.reply_to(call.message, f"Error: {str(e)}")
return
def cancel_booking(self, message): def cancel_booking(self, message):
try: try:
xclient = self.get_xclient(message.chat.id) xclient = self.get_xclient(message.chat.id)
if not xclient:
self.bot.reply_to(
message,
"You are not logged in. Please use `/login username password` to log in.",
)
return
# Fetch user's current bookings
bookings = xclient.my_bookings(
datetime.now(), datetime.now() + timedelta(days=31)
)
if bookings:
markup = InlineKeyboardMarkup()
for i, booking in enumerate(bookings):
# Create a button for each booking
button_text = f"Booking {i + 1}: {booking.start_stamp}"
markup.add(
InlineKeyboardButton(button_text, callback_data=f"cancel_{i}")
)
self.user_bookings[message.chat.id] = bookings
self.bot.reply_to(
message, "Select a booking to cancel:", reply_markup=markup
)
else:
self.bot.reply_to(message, "You have no bookings to cancel.")
except Exception as e: except Exception as e:
self.bot.reply_to(message, f"Error: {str(e)}") self.bot.reply_to(message, f"Error: {str(e)}")
return return
if not xclient:
self.bot.reply_to(
message,
"You are not logged in. Please use `/login username password` to log in.",
)
return
# Fetch user's current bookings
bookings = xclient.my_bookings(
datetime.now(), datetime.now() + timedelta(days=31)
)
if bookings:
markup = InlineKeyboardMarkup()
for i, booking in enumerate(bookings):
# Create a button for each booking
button_text = f"Booking {i + 1}: {booking.start_stamp}"
markup.add(
InlineKeyboardButton(button_text, callback_data=f"cancel_{i}")
)
self.user_bookings[message.chat.id] = bookings
self.bot.reply_to(
message, "Select a booking to cancel:", reply_markup=markup
)
else:
self.bot.reply_to(message, "You have no bookings to cancel.")
def callback_cancel_booking(self, call): def callback_cancel_booking(self, call):
try: try:
xclient = self.get_xclient(call.message.chat.id) xclient = self.get_xclient(call.message.chat.id)
except Exception as e:
self.bot.reply_to(call.message, f"Error: {str(e)}")
return
if not xclient: if not xclient:
self.bot.reply_to( self.bot.reply_to(
call.message, call.message,
"You are not logged in. Please use `/login username password` to log in.", "You are not logged in. Please use `/login username password` to log in.",
) )
return return
try: try:
# Extract booking index from callback data # Extract booking index from callback data
booking_index = int(call.data.split("_")[1]) booking_index = int(call.data.split("_")[1])
bookings = self.user_bookings.get(call.message.chat.id, []) bookings = self.user_bookings.get(call.message.chat.id, [])
if 0 <= booking_index < len(bookings): if 0 <= booking_index < len(bookings):
selected_booking = bookings[booking_index] selected_booking = bookings[booking_index]
# Attempt to cancel the selected booking # Attempt to cancel the selected booking
try: try:
if xclient.cancel_booking(selected_booking): if xclient.cancel_booking(selected_booking):
self.bot.answer_callback_query( self.bot.answer_callback_query(
call.id, "Booking canceled successfully!" call.id, "Booking canceled successfully!"
) )
self.bot.send_message( self.bot.send_message(
call.message.chat.id, call.message.chat.id,
f"Booking for {selected_booking.start_stamp} has been canceled.", f"Booking for {selected_booking.start_stamp} has been canceled.",
)
if (
token := self.get_calendar_token(call.message.chat.id)
) is not None:
gmt_plus_2 = timezone(timedelta(hours=2))
event = self.calendar.get_first_event(
token, selected_booking.start.astimezone(gmt_plus_2)
) )
if event: if (
self.calendar.delete_event(token, event["id"]) token := self.get_calendar_token(call.message.chat.id)
) is not None:
gmt_plus_2 = timezone(timedelta(hours=2))
else: event = self.calendar.get_first_event(
token, selected_booking.start.astimezone(gmt_plus_2)
)
if event:
self.calendar.delete_event(token, event["id"])
else:
self.bot.answer_callback_query(
call.id, "Failed to cancel booking."
)
except Exception as e:
self.bot.answer_callback_query( self.bot.answer_callback_query(
call.id, "Failed to cancel booking." call.id, "Failed to cancel booking."
) )
except Exception as e: self.bot.send_message(call.message.chat.id, f"Error: {str(e)}")
self.bot.answer_callback_query(call.id, "Failed to cancel booking.") else:
self.bot.send_message(call.message.chat.id, f"Error: {str(e)}") self.bot.answer_callback_query(
else: call.id, "Invalid booking selection."
self.bot.answer_callback_query(call.id, "Invalid booking selection.") )
except (IndexError, ValueError): except (IndexError, ValueError):
self.bot.answer_callback_query(call.id, "Invalid data.") self.bot.answer_callback_query(call.id, "Invalid data.")
## Remove the buttons from the previous message ## Remove the buttons from the previous message
self.bot.edit_message_reply_markup( self.bot.edit_message_reply_markup(
call.message.chat.id, call.message.message_id call.message.chat.id, call.message.message_id
) )
except Exception as e:
self.bot.reply_to(call.message, f"Error: {str(e)}")
return
def manage_watchlist(self, message): def manage_watchlist(self, message):
chat_id = message.chat.id chat_id = message.chat.id
@ -495,50 +513,54 @@ class TelegramBot:
for chat_id, slots in list(self.watchlist.items()): for chat_id, slots in list(self.watchlist.items()):
try: try:
xclient = self.get_xclient(chat_id) xclient = self.get_xclient(chat_id)
except Exception as e:
print(f"Error polling watchlist: {str(e)}")
available_slots = [] available_slots = []
for slot in slots: for slot in slots:
if slot.start < now: if slot.start < now:
# If the slot has expired # If the slot has expired
self.bot.send_message(
chat_id,
f"Slot {slot.start_stamp} has expired and is no longer available.",
)
available_slots.append(slot) # Mark it to remove from watchlist
elif xclient.check_booking_availability(slot):
# If the slot becomes available
try:
xclient.make_booking(slot)
if (token := self.get_calendar_token(chat_id)) is not None:
self.calendar.create_event(
token,
{
"summary": "Gym Booking",
"start": {
"dateTime": slot.start.strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
),
"timeZone": "Europe/Amsterdam",
},
"end": {
"dateTime": slot.end.strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
),
"timeZone": "Europe/Amsterdam",
},
},
)
self.bot.send_message( self.bot.send_message(
chat_id, chat_id,
f"Slot {slot.start_stamp} is now available and has been booked for you.", f"Slot {slot.start_stamp} has expired and is no longer available.",
) )
available_slots.append(slot) # Mark it to remove from watchlist available_slots.append(slot) # Mark it to remove from watchlist
except Exception as e: elif xclient.check_booking_availability(slot):
self.bot.send_message( # If the slot becomes available
chat_id, f"Error booking slot {slot.start_stamp}: {str(e)}" try:
) xclient.make_booking(slot)
if (token := self.get_calendar_token(chat_id)) is not None:
self.calendar.create_event(
token,
{
"summary": "Gym Booking",
"start": {
"dateTime": slot.start.strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
),
"timeZone": "Europe/Amsterdam",
},
"end": {
"dateTime": slot.end.strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
),
"timeZone": "Europe/Amsterdam",
},
},
)
self.bot.send_message(
chat_id,
f"Slot {slot.start_stamp} is now available and has been booked for you.",
)
available_slots.append(
slot
) # Mark it to remove from watchlist
except Exception as e:
self.bot.send_message(
chat_id,
f"Error booking slot {slot.start_stamp}: {str(e)}",
)
except Exception as e:
print(f"Error polling watchlist: {str(e)}")
# Remove the expired or booked slots from the watchlist # Remove the expired or booked slots from the watchlist
self.watchlist[chat_id] = [ self.watchlist[chat_id] = [

View File

@ -1,4 +1,5 @@
from selenium import webdriver from selenium import webdriver
from selenium.webdriver.firefox.options import Options
from models import Booking from models import Booking
import json import json
from selenium.webdriver.common.by import By from selenium.webdriver.common.by import By
@ -6,12 +7,12 @@ from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support import expected_conditions as EC
import time import time
import requests import requests
import config
from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
service = Service(ChromeDriverManager().install()) class LoginFailedException(Exception):
def __init__(self, *args: object) -> None:
super().__init__(*args)
class XClient: class XClient:
@ -31,20 +32,13 @@ class XClient:
) )
def fetch_access_token(self): def fetch_access_token(self):
# Set up the WebDriver (make sure to use the correct path for your WebDriver) options = webdriver.FirefoxOptions()
print("starting chromedriver") options.add_argument("--headless")
options = webdriver.ChromeOptions()
options.add_argument("--headless=new")
options.add_argument("--no-sandbox")
print("started chromedriver") driver = webdriver.Firefox(options=options)
driver = webdriver.Chrome(options=options, service=service)
driver.get("https://x.tudelft.nl") driver.get("https://x.tudelft.nl")
print("Check1")
button = WebDriverWait(driver, 30).until( button = WebDriverWait(driver, 30).until(
EC.element_to_be_clickable( EC.element_to_be_clickable(
(By.XPATH, "//span[contains(text(), 'TUDelft')]") (By.XPATH, "//span[contains(text(), 'TUDelft')]")
@ -61,7 +55,6 @@ class XClient:
) )
) )
print("Check2")
button.click() button.click()
# Input the username # Input the username
@ -73,13 +66,24 @@ class XClient:
password_input.submit() password_input.submit()
time.sleep(1) time.sleep(2)
delcom_auth = json.loads( cookie = driver.execute_script(
driver.execute_script("return window.localStorage.getItem('delcom_auth');") "return window.localStorage.getItem('delcom_auth');"
) )
if cookie is None:
raise LoginFailedException("Logging in to X has failed")
delcom_auth = json.loads(cookie)
driver.quit() driver.quit()
if delcom_auth.get("tokenResponse") is None:
raise LoginFailedException(
"Logging in to X has failed, missing tokenResponse"
)
access_token = delcom_auth["tokenResponse"].get("accessToken", None) access_token = delcom_auth["tokenResponse"].get("accessToken", None)
return access_token return access_token