Merge updates from dev branch to master #10

Merged
dymik739 merged 12 commits from dev into master 2023-09-12 12:32:45 +03:00
15 changed files with 346 additions and 57 deletions

2
.gitignore vendored
View File

@ -1,2 +1,4 @@
config/*
modules/irc-bridge/error.log
__pycache__/
modules/auto-schedule-pro-v2/preference-db

36
main.py
View File

@ -7,6 +7,7 @@ import sys
import os
import threading
import importlib
import traceback
# global variables
STOP_REQUESTED = False
@ -39,6 +40,7 @@ class ModuleV1:
# set environmental variables
def set_env(self):
self.RESPONSE = ""
self.FORMAT = ""
def set_predefine(self):
try:
@ -54,10 +56,11 @@ class ModuleV1:
self.MESSAGE = msg
try:
exec(self.code)
return self.RESPONSE
return self.RESPONSE, self.FORMAT
except Exception as e:
print(f"[ERROR] module v1: module \"{self.path}\" ({self.alias}) raised exception \"{e}\"")
return ""
print(f"[ERROR] module v1: traceback:\ntraceback.format_exc()")
return "", None
class ModuleV2:
@ -75,7 +78,8 @@ class ModuleV2:
return self.obj.process(msg, self.path)
except Exception as e:
print(f"[ERROR] module v2: module \"{self.path}\" ({self.alias}) raised exception \"{e}\"")
return None
print(f"[ERROR] module v2: traceback:\ntraceback.format_exc()")
return None, None
# module control unit
@ -187,21 +191,22 @@ def queue_processor():
# modules are used in here
for mod in mcu.modules:
if mod.enabled:
if mod.version == 1 or mod.version == 2:
response = mod.process(msg)
if mod.version in [1, 2]:
response, formatting = mod.process(msg)
if response:
'''
# protecting output
symbols_to_escape = ['[', ']', '(', ')', '~', '`', '>', '#', '+', '-', '=', '|', '{', '}', '.', '!']
for symbol in symbols_to_escape:
response = response.replace(symbol, f"\\{symbol}")
'''
if formatting == None:
updater.bot.send_message(chat_id=msg.chat.id, text=response,
disable_web_page_preview=True)
print(f"Responded using module {mod.path} ({mod.alias}) with text: {response}")
break
updater.bot.send_message(chat_id=msg.chat.id, text=response,
disable_web_page_preview=True)
print(f"Responded using module {mod.path} ({mod.alias}) with text: {response}")
break
elif formatting in ["Markdown", "MarkdownV2", "HTML"]:
updater.bot.send_message(chat_id=msg.chat.id, text=response,
disable_web_page_preview=True,
parse_mode=formatting)
print(f"Responded using module {mod.path} ({mod.alias}) with text (using {formatting}): {response}")
break
del message_queue[0]
@ -213,6 +218,7 @@ def queue_processor():
time.sleep(1)
except Exception as e:
print(f"[ERROR] queue_processor: current message queue: {message_queue}")
print(f"[ERROR] Traceback:\n{traceback.format_exc()}")
print(f"[ERROR] queue_processor: error while processing message ({e}), trying to delete it...")
try:
del message_queue[0]

View File

@ -6,6 +6,7 @@ import sys
import os
import threading
import importlib
import traceback
from telegram import Message, Chat
@ -40,6 +41,7 @@ class ModuleV1:
# set environmental variables
def set_env(self):
self.RESPONSE = ""
self.FORMAT = ""
def set_predefine(self):
try:
@ -55,10 +57,11 @@ class ModuleV1:
self.MESSAGE = msg
try:
exec(self.code)
return self.RESPONSE
return self.RESPONSE, self.FORMAT
except Exception as e:
print(f"[ERROR] module v1: module \"{self.path}\" ({self.alias}) raised exception \"{e}\"")
return ""
print(f"[ERROR] module v1: traceback:\n{traceback.format_exc()}")
return "", None
class ModuleV2:
@ -73,11 +76,11 @@ class ModuleV2:
# running the module
def process(self, msg):
try:
responce = self.obj.process(msg, self.path)
return responce
return self.obj.process(msg, self.path)
except Exception as e:
print(f"[ERROR] module v2: module \"{self.path}\" ({self.alias}) raised exception \"{e}\"")
return None
print(f"[ERROR] module v2: traceback:\n{traceback.format_exc()}")
return None, None
# module control unit
@ -188,11 +191,16 @@ def queue_processor():
# modules are used in here
for mod in mcu.modules:
if mod.enabled:
if mod.version == 1 or mod.version == 2:
response = mod.process(msg)
if mod.version in [1, 2]:
response, formatting = mod.process(msg)
if response:
print(f"Responded using module {mod.path} ({mod.alias}) with text: {response}")
break
if formatting == None:
print(f"Responded using module {mod.path} ({mod.alias}) with text: {response}")
break
elif formatting in ["Markdown", "MarkdownV2", "HTML"]:
print(f"Responded using module {mod.path} ({mod.alias}) with text (using {formatting}): {response}")
break
del message_queue[0]
else:
@ -203,6 +211,7 @@ def queue_processor():
except Exception:
print(f"[ERROR] queue_processor: current message queue: {message_queue}")
print(f"[ERROR] Traceback:\n{traceback.format_exc()}")
print("[ERROR] queue_processor: error while processing message, trying to delete it...")
try:
del message_queue[0]

View File

@ -1,15 +1,19 @@
from datetime import datetime
import json
import os
def readfile(filename):
with open(module_path + filename) as f:
return f.read()
def writefile(filename, data):
with open(module_path + filename, 'w') as f:
f.write(data)
# global constants
# Accusative - znahidnyj
WEEKDAYS_ACCUSATIVE = ["понеділок", "вівторок", "середу", "четвер", "п'ятницю", "суботу", "неділю"]
# Genitive - rodovyj
WEEKDAYS_GENITIVE_NEXT = ["наступного понеділка", "наступного вівторка", "наступної середи", "наступного четверга",
"наступної п'ятниці", "наступної суботи", "наступної неділі"]
@ -17,9 +21,119 @@ WEEKDAYS_GENITIVE_NEXT = ["наступного понеділка", "насту
WEEKDAYS_GENITIVE_THIS = ["цього понеділка", "цього вівторка", "цієї середи", "цього четверга", "цієї п'ятниці",
"цієї суботи", "цієї неділі"]
lesson_types_to_strings = {
"lec": "лекція",
"prac": "практика",
"lab": "лабораторна"
}
# global variables
module_path = ""
def get_preference_by_id(user_id, name):
if not os.path.exists(module_path + f"preference-db/{user_id}.json"):
return None
raw_prefs = readfile(f"preference-db/{user_id}.json")
try:
preferences = json.loads(raw_prefs)
except Exception as e:
return None
if not name in preferences:
return None
return preferences[name]
def get_all_preferences_by_id(user_id):
user_preferences = {
"output-style": "legacy-vibrant",
"output-style-lesson": "None",
"output-style-lookup": "None"
}
# label defaults as defaults and let custom settings override these labels
for i in user_preferences:
user_preferences[i] += " <i>(default)</i>"
for i in user_preferences:
override = get_preference_by_id(user_id, i)
if override != None:
user_preferences[i] = override
return user_preferences
def set_preference_by_id(user_id, name, value):
if not os.path.exists(module_path + "preference-db/"):
os.mkdir(module_path + "preference-db/")
preferences = {}
if os.path.exists(module_path + f"preference-db/{user_id}.json"):
try:
raw_prefs = readfile(f"preference-db/{user_id}.json")
preferences = json.loads(raw_prefs)
except Exception as e:
preferences = {}
else:
preferences = {}
preferences[name] = value
final_data = json.dumps(preferences)
writefile(f"preference-db/{user_id}.json", final_data)
def clear_preference_by_id(user_id, name):
if not os.path.exists(module_path + "preference-db/"):
os.mkdir(module_path + "preference-db/")
preferences = {}
if os.path.exists(module_path + f"preference-db/{user_id}.json"):
try:
raw_prefs = readfile(f"preference-db/{user_id}.json")
preferences = json.loads(raw_prefs)
except Exception as e:
preferences = {}
else:
preferences = {}
if name in preferences:
del preferences[name]
final_data = json.dumps(preferences)
writefile(f"preference-db/{user_id}.json", final_data)
def load_template(template, part):
return readfile(f"templates/{template}/{part}.msg")
def escaped_string_markdownV2(input_string):
result_string = input_string
symbols_to_escape = ['_', '*', '[', ']', '(', ')', '~', '`', '>', '#', '+', '-', '=', '|', '{', '}', '.', '!']
for symbol in symbols_to_escape:
result_string = result_string.replace(symbol, f"\\{symbol}")
return result_string
def escaped_string_html(input_string):
result_string = input_string
symbols_to_escape = ['<', '>']
for symbol in symbols_to_escape:
result_string = result_string.replace(symbol, f"\\{symbol}")
return result_string
def get_human_readable_date(start_datetime, end_datetime,
current_day, current_week):
@ -43,27 +157,69 @@ def get_human_readable_date(start_datetime, end_datetime,
return human_readable_date
def generate_lesson_description(lesson, start_datetime, end_datetime, current_day, current_week, overrides={}):
output_settings = {"name": True, "date": True, "teacher": True, "link": True}
output_settings.update(overrides)
def get_name_of_lesson_type(lesson_type):
if lesson_type in lesson_types_to_strings:
return lesson_types_to_strings[lesson_type]
result = ""
if output_settings['name']:
result += f"{lesson['name']}\n"
def generate_lesson_description(lesson, start_datetime, end_datetime, current_day, current_week, overrides={},
custom_name_prefix="Назва", template="legacy-vibrant", force_date_at_top=False):
# temporarily not supported
#output_settings = {"name": True, "date": True, "teacher": True, "link": True, "comment": True}
#output_settings.update(overrides)
if output_settings['date']:
if lesson.__class__ == dict:
if force_date_at_top:
total_result = load_template(template, "date")
human_readable_date = get_human_readable_date(start_datetime, end_datetime,
current_day, current_week)
total_result = total_result.replace("%DATE%", human_readable_date)
total_result += load_template(template, "multiple")
for i in ['name', 'teacher', 'link']:
total_result = total_result.replace(f"%{i.upper()}%", lesson[i])
total_result = total_result.replace("%DATE%", human_readable_date)
total_result = total_result.replace("%TYPE%", get_name_of_lesson_type(lesson['type']))
total_result = total_result.replace("%NAME_PREFIX%", custom_name_prefix)
return total_result + "\n"
else:
active_template = load_template(template, "single")
for i in ['name', 'teacher', 'link']:
active_template = active_template.replace(f"%{i.upper()}%", lesson[i])
human_readable_date = get_human_readable_date(start_datetime, end_datetime,
current_day, current_week)
active_template = active_template.replace("%DATE%", human_readable_date)
active_template = active_template.replace("%TYPE%", get_name_of_lesson_type(lesson['type']))
active_template = active_template.replace("%NAME_PREFIX%", custom_name_prefix)
return active_template
elif lesson.__class__ == list:
total_result = load_template(template, "date")
human_readable_date = get_human_readable_date(start_datetime, end_datetime,
current_day, current_week)
result += f"*Дата*: {human_readable_date}\n"
total_result = total_result.replace("%DATE%", human_readable_date)
if output_settings['teacher']:
result += f"*Викладач*: {lesson['teacher']}\n"
for l in lesson:
active_template = load_template(template, "multiple")
if output_settings['link']:
result += f"*Посилання*: {lesson['link']}"
for i in ['name', 'teacher', 'link']:
active_template = active_template.replace(f"%{i.upper()}%", l[i])
return result
active_template = active_template.replace("%DATE%", human_readable_date)
active_template = active_template.replace("%TYPE%", get_name_of_lesson_type(l['type']))
active_template = active_template.replace("%NAME_PREFIX%", custom_name_prefix)
total_result += active_template + "\n"
return total_result
def get_schedule_data_from(filename):
@ -76,8 +232,15 @@ def get_schedule_data_from(filename):
timestamp = day_number * 86400 + int(lesson_time.split(":")[0]) * 3600 \
+ int(lesson_time.split(":")[1]) * 60
new_record = dict(raw_schedule[day_number][lesson_time])
new_record["source"] = filename.split(".json")[0]
new_record = raw_schedule[day_number][lesson_time]
item_source = filename.split(".json")[0]
if new_record.__class__ == list:
for item in new_record:
item["source"] = item_source
else:
new_record["source"] = item_source
baked_schedule[timestamp] = new_record
return baked_schedule
@ -105,14 +268,16 @@ def process_arguments(args, base_day):
return preferences, selected_day
def get_lesson_description(schedule, reference_time, lesson_time, current_day, current_week, overrides={}):
def get_lesson_description(schedule, reference_time, lesson_time, current_day, current_week, overrides={},
custom_name_prefix="Назва", template="legacy-vibrant", force_date_at_top=False):
lesson_record = schedule[lesson_time]
lesson_start_datetime = datetime.fromtimestamp(reference_time + lesson_time)
lesson_end_datetime = datetime.fromtimestamp(reference_time + lesson_time + 5400)
return generate_lesson_description(lesson_record, lesson_start_datetime, lesson_end_datetime, current_day,
current_week, overrides=overrides)
current_week, overrides=overrides, custom_name_prefix=custom_name_prefix, template=template,
force_date_at_top=force_date_at_top)
def process(message, path):
@ -124,14 +289,14 @@ def process(message, path):
# one printable symbol, so this is already protected
base_command = full_command[0].lower()
if base_command not in ["!пара", "!пари"]:
return ""
if base_command not in ["!пара", "!пари", "!schedule-ctl"]:
return None, None
global module_path
module_path = path
schedule = get_schedule_data_from("schedule.json")
schedule.update(get_schedule_data_from("additions.json"))
schedule = get_schedule_data_from("schedule-v2.json")
schedule.update(get_schedule_data_from("additions-v2.json"))
current_time = datetime.now()
@ -144,16 +309,18 @@ def process(message, path):
reference_time = int(current_time.strftime("%s")) - current_seconds
if base_command == "!пара":
# easter egg
study_begin_ts = int(datetime(year=2023, month=9, day=4).strftime("%s"))
current_ts = int(datetime.now().strftime("%s"))
if -3600*4 < study_begin_ts - current_ts < 0:
return "Навчання незабаром розпочнеться!"
return "Навчання от-от розпочнеться!", None
elif 0 <= study_begin_ts - current_ts < 1209600:
return f"До навчання залишилося {study_begin_ts - current_ts} секунд..."
return f"До навчання залишилося {study_begin_ts - current_ts} секунд...", None
elif study_begin_ts - current_ts >= 1209600:
return "Ви маєте законне право відпочити, пари почнуться не скоро"
return "Ви маєте законне право відпочити, пари почнуться не скоро", None
# actual lesson finding code
upcoming_lessons = [timestamp for timestamp in schedule if timestamp > current_seconds - 5400]
if len(upcoming_lessons) > 0:
@ -161,8 +328,34 @@ def process(message, path):
else:
closest_lesson_time = min(schedule)
return "*Актуальна пара*: " + get_lesson_description(schedule, reference_time, closest_lesson_time, current_day,
current_week)
# shifting lesson pointer if requested to do so
if len(full_command) >= 2:
possible_times = list(schedule.keys())
current_pointer_position = possible_times.index(closest_lesson_time)
total_list_length = len(possible_times)
if len(full_command[1]) > 1 and full_command[1][1:].isdigit():
if full_command[1][0] == "+":
current_pointer_position = (current_pointer_position + int(full_command[1][1:])) % total_list_length
else:
current_pointer_position = (current_pointer_position - int(full_command[1][1:])) % total_list_length
closest_lesson_time = possible_times[current_pointer_position]
# getting corrent style
output_style_preference = "legacy-vibrant"
general_output_style_preference = get_preference_by_id(message.from_user.id, "output-style")
if general_output_style_preference != None:
output_style_preference = general_output_style_preference
specific_output_style_preference = get_preference_by_id(message.from_user.id, "output-style-lesson")
if specific_output_style_preference != None:
output_style_preference = specific_output_style_preference
# returning generated pair description
return get_lesson_description(schedule, reference_time, closest_lesson_time, current_day,
current_week, custom_name_prefix="Актуальна пара", template=output_style_preference), "HTML"
elif base_command == "!пари":
base_day = current_week * 7 + current_day
@ -176,8 +369,52 @@ def process(message, path):
lesson_list = [i for i in schedule if selected_day * 86400 <= i < (selected_day + 1) * 86400]
lesson_descriptions_list = ["*Назва*: " + get_lesson_description(schedule, reference_time, lesson_time,
current_day, current_week, overrides=preferences)
output_style_preference = "legacy-vibrant"
general_output_style_preference = get_preference_by_id(message.from_user.id, "output-style")
if general_output_style_preference != None:
output_style_preference = general_output_style_preference
specific_output_style_preference = get_preference_by_id(message.from_user.id, "output-style-lookup")
if specific_output_style_preference != None:
output_style_preference = specific_output_style_preference
lesson_descriptions_list = [get_lesson_description(schedule, reference_time, lesson_time, current_day,
current_week, overrides=preferences, custom_name_prefix="Назва", template=output_style_preference, force_date_at_top=True)
for lesson_time in lesson_list]
return f"__Пари у {WEEKDAYS_ACCUSATIVE[selected_day % 7]}__:\n" + "\n\n".join(lesson_descriptions_list)
return f"<b><u>Пари у {WEEKDAYS_ACCUSATIVE[selected_day % 7]}</u></b>:\n\n\n" + "\n".join(lesson_descriptions_list), "HTML"
elif base_command == "!schedule-ctl" and len(full_command) >= 2:
if full_command[1] == "list":
prefs = get_all_preferences_by_id(message.from_user.id)
return "Ваші персональні налаштування:\n" + '\n'.join([f"- {k} = {v}" for k, v in prefs.items()]), "HTML"
elif full_command[1] == "set" and len(full_command) == 4:
prefs = get_all_preferences_by_id(message.from_user.id)
if full_command[2] in prefs:
if full_command[2] in ["output-style", "output-style-lesson", "output-style-lookup"]:
if full_command[3] not in os.listdir(module_path + "templates/"):
return f"Стилю {full_command[3]} не існує; доступні варіанти: " \
+ ', '.join(os.listdir(module_path + "templates/")), "HTML"
previous_value = prefs[full_command[2]]
prefs[full_command[2]] = full_command[3]
set_preference_by_id(message.from_user.id, full_command[2], full_command[3])
return f"Змінено значення {full_command[2]}: {previous_value} -> {full_command[3]}", "HTML"
else:
return f"Такого налаштування не існує; переглянути наявні налаштування можна за допомогою команди <u>!schedule-ctl list</u>", "HTML"
elif full_command[1] == "get" and len(full_command) == 3:
requested_preference = get_preference_by_id(message.from_user.id, full_command[2])
return f"Налаштування {full_command[2]} має значення {requested_preference}", "HTML"
elif full_command[1] == "clear" and len(full_command) == 3:
clear_preference_by_id(message.from_user.id, full_command[2])
return f"Очищено значення для налаштування {full_command[2]}, надалі для нього використовуватиметься стандартне значення", "HTML"
else:
return "Такої команди не існує (або був використаний помилковий синтаксис)", "HTML"

View File

@ -0,0 +1 @@
<b><u>%DATE%</u></b>:

View File

@ -0,0 +1,3 @@
<b>%NAME_PREFIX%</b>: %NAME% (%TYPE%)
<b>Викладач</b>: %TEACHER%
<b>Посилання</b>: %LINK%

View File

@ -0,0 +1,4 @@
<b>%NAME_PREFIX%</b>: %NAME% (%TYPE%)
<b>Дата</b>: %DATE%
<b>Викладач</b>: %TEACHER%
<b>Посилання</b>: %LINK%

View File

@ -0,0 +1 @@
<u>%DATE%</u>:

View File

@ -0,0 +1,3 @@
%NAME_PREFIX%: %NAME% (%TYPE%)
Викладач: %TEACHER%
Посилання: %LINK%

View File

@ -0,0 +1,4 @@
%NAME_PREFIX%: %NAME% (%TYPE%)
Дата: %DATE%
Викладач: %TEACHER%
Посилання: %LINK%

View File

@ -0,0 +1 @@
<b><u>%DATE%</u></b>:

View File

@ -0,0 +1,3 @@
<b>%NAME%</b> (%TYPE%)
<i>Викладач</i>: %TEACHER%
<i>Посилання</i>: %LINK%

View File

@ -0,0 +1,4 @@
<b>%NAME%</b> (%TYPE%)
<i>Дата</i>: %DATE%
<i>Викладач</i>: %TEACHER%
<i>Посилання</i>: %LINK%

View File

@ -13,4 +13,6 @@ def get_num():
def process(message, path):
if message.text == "!v2-testing":
return f"Testing successful - call number {get_num()}"
return f"Testing successful - call number {get_num()}", None
else:
return None, None

View File

@ -1,6 +1,14 @@
command = self.MESSAGE['text'].split(" ", 2)
command_length = len(command)
def escaped_string(unescaped_string):
result_string = str(unescaped_string)
for i in ["/", "<", ">"]:
result_string = result_string.replace(i, f"\\{i}")
return result_string
if (command[0] in self.aliases) and (1 <= command_length <= 3):
try:
models = json.loads(readfile(self.path + "translate_models.json"))
@ -25,7 +33,8 @@ if (command[0] in self.aliases) and (1 <= command_length <= 3):
decoded_text = decoded_text.replace(i[0].capitalize(), i[1].capitalize())
decoded_text = decoded_text.replace(i[0].upper(), i[1].upper())
self.RESPONSE = f"__Результат__\n{decoded_text}"
self.RESPONSE = f"<b><u>Результат</u></b>\n{escaped_string(decoded_text)}"
self.FORMAT = "HTML"
except Exception as e:
print(f"[translit-decoder] Got exception: {e}")