Compare commits

..

14 Commits

12 changed files with 272 additions and 87 deletions

50
main.py
View File

@ -1,4 +1,5 @@
from telegram.ext import Updater, MessageHandler, Filters
from telegram.constants import ParseMode
import datetime
import codecs
import time
@ -11,16 +12,18 @@ import importlib
# global variables
STOP_REQUESTED = False
# some functions that increase readability of the code
def readfile(filename):
try:
return codecs.open(filename, encoding = "utf-8").read()
return codecs.open(filename, encoding="utf-8").read()
except FileNotFoundError:
return False
except Exception as e:
print( "[ERROR] Unexpected error occured in readfile() ({0})".format(e) )
print(f"[ERROR] Unexpected error occurred in readfile() ({e})")
return False
# module object classes
class ModuleV1:
def __init__(self, path, code, enabled, alias, predefine):
@ -42,7 +45,8 @@ class ModuleV1:
try:
exec(self.predefine)
except Exception as e:
print("[ERROR] module v1: module \"{}\" ({}) raised exception \"{}\" during predefine stage, disabling it...".format(self.path, self.alias, e))
print(f"[ERROR] module v1: module \"{self.path}\" ({self.alias}) raised exception \"{e}\" "
f"during predefine stage, disabling it...")
# running the module
def process(self, msg):
@ -53,9 +57,10 @@ class ModuleV1:
exec(self.code)
return self.RESPONCE
except Exception as e:
print("[ERROR] module v1: module \"{}\" ({}) raised exception \"{}\"".format(self.path, self.alias, e))
print(f"[ERROR] module v1: module \"{self.path}\" ({self.alias}) raised exception \"{e}\"")
return ""
class ModuleV2:
def __init__(self, path, index_file, enabled, alias):
self.version = 2
@ -68,8 +73,8 @@ class ModuleV2:
# running the module
def process(self, msg):
try:
responce = self.obj.process(msg, self.path)
return responce
response = self.obj.process(msg, self.path)
return response
except Exception as e:
print(f"[ERROR] module v2: module \"{self.path}\" ({self.alias}) raised exception \"{e}\"")
return None
@ -88,7 +93,7 @@ class ModuleControlUnit:
try:
meta_raw = readfile("modules/{}/meta.json".format(folder))
if not meta_raw:
print("[WARN] module_loader: no meta.json found in module folder \"{}\"".format(folder))
print(f"[WARN] module_loader: no meta.json found in module folder \"{folder}\"")
continue
meta = json.loads( meta_raw )
@ -119,9 +124,10 @@ class ModuleControlUnit:
else:
predefine = False
self.modules.append( ModuleV1( "modules/{}/".format(folder), code, enabled, alias, predefine ) )
self.modules.append(ModuleV1(f"modules/{folder}/", code, enabled, alias, predefine))
print("[INFO] reload_modules: successfully loaded {} as {} (start_on_boot: {})".format(folder, alias, enabled))
print(f"[INFO] reload_modules: successfully loaded {folder} as {alias} "
f"(start_on_boot: {enabled})")
elif meta["version"] == 2:
if "index_file" in meta:
@ -141,19 +147,15 @@ class ModuleControlUnit:
self.modules.append(ModuleV2(f"modules/{folder}/", index_file, enabled, alias))
print(f"[INFO] reload_modules: successfully loaded {folder} as {alias} (start_on_boot: {enabled})")
print(f"[INFO] reload_modules: successfully loaded {folder} as {alias} "
f"(start_on_boot: {enabled})")
else:
print(f"[WARN] reload_modules: module {folder} requires unsupported version ({meta['version']} > 2), skipping...")
print(f"[WARN] reload_modules: module {folder} requires unsupported version "
f"({meta['version']} > 2), skipping...")
except Exception as e:
print("[ERROR] module_loader: error while loading module \"{}\" ({})".format(folder, e))
# message queue object to go back to synchronous message processing
#class MessageQueue:
# def __init__(self):
# print("[INFO] Initializing the message queue...")
# self.queue = []
print(f"[ERROR] module_loader: error while loading module \"{folder}\" ({e})")
# synchronous message processor
@ -190,7 +192,9 @@ def queue_processor():
if mod.version == 1 or mod.version == 2:
responce = mod.process(msg)
if responce:
updater.bot.send_message(chat_id = msg.chat.id, text = responce, disable_web_page_preview = True)
updater.bot.send_message(chat_id=msg.chat.id, text=responce,
disable_web_page_preview=True,
parse_mode=ParseMode.MARKDOWN_V2)
print(f"Responded using module {mod.path} ({mod.alias}) with text: {responce}")
break
@ -202,8 +206,8 @@ def queue_processor():
break
else:
time.sleep(1)
except Exception as e:
print("[ERROR] queue_processor: current message queue: {}".format(message_queue))
except Exception:
print(f"[ERROR] queue_processor: current message queue: {message_queue}")
print("[ERROR] queue_processor: error while processing message, trying to delete it...")
try:
del message_queue[0]
@ -229,7 +233,7 @@ message_queue = []
mcu = ModuleControlUnit()
processor_thread = threading.Thread( target = queue_processor, args = [] )
processor_thread = threading.Thread(target=queue_processor, args=[])
processor_thread.start()
@ -241,7 +245,7 @@ if not TOKEN:
sys.exit(1)
# connect to Telegram servers
updater = Updater(TOKEN, use_context = True)
updater = Updater(TOKEN, use_context=True)
dispatcher = updater.dispatcher
# assign the handler for messages

View File

@ -12,16 +12,18 @@ from telegram import Message, Chat
# global variables
STOP_REQUESTED = False
# some functions that increase readability of the code
def readfile(filename):
try:
return codecs.open(filename, encoding = "utf-8").read()
return codecs.open(filename, encoding="utf-8").read()
except FileNotFoundError:
return False
except Exception as e:
print( "[ERROR] Unexpected error occured in readfile() ({0})".format(e) )
print(f"[ERROR] Unexpected error occurred in readfile() ({e})")
return False
# module object classes
class ModuleV1:
def __init__(self, path, code, enabled, alias, predefine):
@ -37,13 +39,14 @@ class ModuleV1:
# set environmental variables
def set_env(self):
self.RESPONCE = ""
self.RESPONSE = ""
def set_predefine(self):
try:
exec(self.predefine)
except Exception as e:
print("[ERROR] module v1: module \"{}\" ({}) raised exception \"{}\" during predefine stage, disabling it...".format(self.path, self.alias, e))
print(f"[ERROR] module v1: module \"{self.path}\" ({self.alias}) raised exception \"{e}\" "
f"during predefine stage, disabling it...")
# running the module
def process(self, msg):
@ -52,9 +55,9 @@ class ModuleV1:
self.MESSAGE = msg
try:
exec(self.code)
return self.RESPONCE
return self.RESPONSE
except Exception as e:
print("[ERROR] module v1: module \"{}\" ({}) raised exception \"{}\"".format(self.path, self.alias, e))
print(f"[ERROR] module v1: module \"{self.path}\" ({self.alias}) raised exception \"{e}\"")
return ""
@ -90,10 +93,10 @@ class ModuleControlUnit:
try:
meta_raw = readfile("modules/{}/meta.json".format(folder))
if not meta_raw:
print("[WARN] module_loader: no meta.json found in module folder \"{}\"".format(folder))
print(f"[WARN] module_loader: no meta.json found in module folder \"{folder}\"")
continue
meta = json.loads( meta_raw )
meta = json.loads(meta_raw)
if "version" in meta:
if meta["version"] == 1:
if "index_file" in meta:
@ -101,9 +104,9 @@ class ModuleControlUnit:
else:
index_file = "index.py"
code = readfile( "modules/{}/{}".format(folder, index_file) )
code = readfile("modules/{}/{}".format(folder, index_file))
if not code: # False both when readfile() returns False and when the code string is empty
print("[WARN] reload_modules: module {} does not have any code, skipping...".format(folder))
print(f"[WARN] reload_modules: module {folder} does not have any code, skipping...")
continue
if "start_on_boot" in meta:
@ -121,9 +124,10 @@ class ModuleControlUnit:
else:
predefine = False
self.modules.append( ModuleV1( "modules/{}/".format(folder), code, enabled, alias, predefine ) )
self.modules.append(ModuleV1(f"modules/{folder}/", code, enabled, alias, predefine))
print("[INFO] reload_modules: successfully loaded {} as {} (start_on_boot: {})".format(folder, alias, enabled))
print(f"[INFO] reload_modules: successfully loaded {folder} as {alias} "
f"(start_on_boot: {enabled})")
elif meta["version"] == 2:
if "index_file" in meta:
@ -143,21 +147,15 @@ class ModuleControlUnit:
self.modules.append(ModuleV2(f"modules/{folder}/", index_file, enabled, alias))
print(f"[INFO] reload_modules: successfully loaded {folder} as {alias} (start_on_boot: {enabled})")
print(f"[INFO] reload_modules: successfully loaded {folder} as {alias} "
f"(start_on_boot: {enabled})")
else:
print(f"[WARN] reload_modules: module {folder} requires unsupported version ({meta['version']} > 2), skipping...")
print(f"[WARN] reload_modules: module {folder} requires unsupported version "
f"({meta['version']} > 2), skipping...")
except Exception as e:
print("[ERROR] module_loader: error while loading module \"{}\" ({})".format(folder, e))
# message queue object to go back to synchronous message processing
#class MessageQueue:
# def __init__(self):
# print("[INFO] Initializing the message queue...")
# self.queue = []
print(f"[ERROR] module_loader: error while loading module \"{folder}\" ({e})")
# synchronous message processor
def queue_processor():
@ -191,9 +189,9 @@ def queue_processor():
for mod in mcu.modules:
if mod.enabled:
if mod.version == 1 or mod.version == 2:
responce = mod.process(msg)
if responce:
print(f"Responded using module {mod.path} ({mod.alias}) with text: {responce}")
response = mod.process(msg)
if response:
print(f"Responded using module {mod.path} ({mod.alias}) with text: {response}")
break
del message_queue[0]
@ -203,8 +201,8 @@ def queue_processor():
else:
time.sleep(1)
except Exception as e:
print("[ERROR] queue_processor: current message queue: {}".format(message_queue))
except Exception:
print(f"[ERROR] queue_processor: current message queue: {message_queue}")
print("[ERROR] queue_processor: error while processing message, trying to delete it...")
try:
del message_queue[0]
@ -224,10 +222,9 @@ message_queue = []
mcu = ModuleControlUnit()
processor_thread = threading.Thread( target = queue_processor, args = [] )
processor_thread = threading.Thread(target=queue_processor, args=[])
processor_thread.start()
print("Enter testing messages one by one, end with an empty line")
while True:
@ -235,4 +232,4 @@ while True:
if len(new_msg) == 0:
break
message_queue.append(Message(9, round(time.time()), Chat(575246355, 'supergroup'), text = new_msg))
message_queue.append(Message(9, round(time.time()), Chat(575246355, 'supergroup'), text=new_msg))

View File

@ -0,0 +1 @@
../auto-schedule-pro/additions.json

View File

@ -0,0 +1,174 @@
from datetime import datetime
import json
def readfile(filename):
with open(module_path + filename) as f:
return f.read()
# global constants
# Accusative - znahidnyj
WEEKDAYS_ACCUSATIVE = ["в понеділок", "у вівторок", "в середу", "в четвер", "в п'ятницю", "в суботу", "в неділю"]
# Genitive - rodovyj
WEEKDAYS_GENITIVE_NEXT = ["наступного понеділка", "наступного вівторка", "наступної середи", "наступного четверга",
"наступної п'ятниці", "наступної суботи", "наступної неділі"]
WEEKDAYS_GENITIVE_THIS = ["цього понеділка", "цього вівторка", "цієї середи", "цього четверга", "цієї п'ятниці",
"цієї суботи", "цієї неділі"]
# global variables
module_path = ""
def get_human_readable_date(start_datetime, end_datetime,
current_day, current_week):
human_readable_date = ""
if ((current_day + 2) == int(start_datetime.strftime("%u"))) or \
((current_day == 6) and (start_datetime.strftime("%u") == "1")):
human_readable_date += "завтра "
elif current_week != int(start_datetime.strftime("%W")) % 2:
human_readable_date += f"{WEEKDAYS_GENITIVE_NEXT[int(start_datetime.strftime('%u')) - 1]} "
elif current_day != (int(start_datetime.strftime("%u")) - 1):
human_readable_date += f"{WEEKDAYS_GENITIVE_THIS[int(start_datetime.strftime('%u')) - 1]} "
else:
human_readable_date += "сьогодні "
human_readable_date += "з "
human_readable_date += start_datetime.strftime("%H:%M")
human_readable_date += " до "
human_readable_date += end_datetime.strftime("%H:%M")
return human_readable_date
def generate_lesson_description(lesson, start_datetime, end_datetime, current_day, current_week, overrides={}):
output_settings = {"name": True, "date": True, "teacher": True, "link": True}
output_settings.update(overrides)
result = ""
if output_settings['name']:
result += f"{lesson['name']}\n"
if output_settings['date']:
human_readable_date = get_human_readable_date(start_datetime, end_datetime,
current_day, current_week)
result += f"*Дата*: {human_readable_date}\n"
if output_settings['teacher']:
result += f"*Викладач*: {lesson['teacher']}\n"
if output_settings['link']:
result += f"*Посилання*: {lesson['link']}"
return result
def get_schedule_data_from(filename):
raw_schedule = json.loads(readfile(filename))
baked_schedule = {}
for day_number, lesson_times in enumerate(raw_schedule):
for lesson_time in lesson_times:
timestamp = day_number * 86400 + int(lesson_time.split(":")[0]) * 3600 \
+ int(lesson_time.split(":")[1]) * 60
new_record = dict(raw_schedule[day_number][lesson_time])
new_record["source"] = filename.split(".json")[0]
baked_schedule[timestamp] = new_record
return baked_schedule
def process_arguments(args, base_day):
selected_day = int(base_day)
preferences = {}
for arg in args:
if arg[0] == "-":
if arg[1:].isdigit():
selected_day -= int(arg[1:])
else:
preferences[arg[1:]] = False
elif arg[0] == "+":
if arg[1:].isdigit():
selected_day += int(arg[1:])
else:
preferences[arg[1:]] = True
selected_day = selected_day % 14
return preferences, selected_day
def get_lesson_description(schedule, reference_time, lesson_time, current_day, current_week, overrides={}):
lesson_record = schedule[lesson_time]
lesson_start_datetime = datetime.fromtimestamp(reference_time + lesson_time)
lesson_end_datetime = datetime.fromtimestamp(reference_time + lesson_time + 5400)
return generate_lesson_description(lesson_record, lesson_start_datetime, lesson_end_datetime, current_day,
current_week, overrides=overrides)
def process(message, path):
message_text = message["text"]
full_command = message_text.split()
# there is no need to check if the full_command list if empty as it
# never will be - Telegram requires all messages to have at least
# one printable symbol, so this is already protected
base_command = full_command[0].lower()
if base_command not in ["!пара", "!пари"]:
return ""
global module_path
module_path = path
schedule = get_schedule_data_from("schedule.json")
schedule.update(get_schedule_data_from("additions.json"))
current_time = datetime.now()
current_week = current_time.isocalendar()[1] % 2
current_day = current_time.weekday()
current_seconds = current_week * 604800 + current_day * 86400 + current_time.hour * 3600 + current_time.minute \
* 60 + current_time.second
reference_time = int(current_time.strftime("%s")) - current_seconds
if base_command == "!пара":
upcoming_lessons = [timestamp for timestamp in schedule if timestamp > current_seconds - 5400]
if len(upcoming_lessons) > 0:
closest_lesson_time = min(upcoming_lessons)
else:
closest_lesson_time = min(schedule)
return "*Актуальна пара*: " + get_lesson_description(schedule, reference_time, closest_lesson_time, current_day,
current_week)
elif base_command == "!пари":
base_day = current_week * 7 + current_day
if len(full_command) >= 2:
args = [i for i in full_command[1:] if len(i) > 1]
preferences, selected_day = process_arguments(args, base_day)
else:
preferences = {}
selected_day = base_day
lesson_list = [i for i in schedule if selected_day * 86400 <= i < (selected_day + 1) * 86400]
lesson_descriptions_list = ["*Назва*: " + get_lesson_description(schedule, reference_time, lesson_time,
current_day, current_week,
overrides=preferences)
for lesson_time in lesson_list]
return f"__Пари {WEEKDAYS_ACCUSATIVE[selected_day % 7]}__\n" + "\n\n".join(lesson_descriptions_list)

View File

@ -0,0 +1,6 @@
{
"version": 2,
"index_file": "main.py",
"start_on_boot": true,
"alias": "auto-schedule-pro-v2"
}

View File

@ -0,0 +1 @@
../auto-schedule-pro/schedule.json

View File

@ -1,5 +1,5 @@
## code ##
if (self.MESSAGE["text"].lower() == "!пара" or self.MESSAGE["text"].lower().split()[0] == "!пари"):
if (self.MESSAGE["text"].lower() == "!пара-old2" or self.MESSAGE["text"].lower().split()[0] == "!пари-old2"):
#getting current time
current_time = datetime.datetime.now()
@ -31,7 +31,7 @@ if (self.MESSAGE["text"].lower() == "!пара" or self.MESSAGE["text"].lower().
full_schedule = dict(list(schedule.items()) + list(additions.items()))
if self.MESSAGE["text"].lower() == "!пара":
if self.MESSAGE["text"].lower() == "!пара-old2":
print("test1")
print(f"Full schedule printout: {full_schedule}")
print(f"Current delta_time: {current_seconds}")
@ -79,10 +79,10 @@ if self.MESSAGE["text"].lower() == "!пара":
human_readable_date += " до "
human_readable_date += dt_lesson_finish.strftime("%H:%M")
self.RESPONCE = "Актуальна пара: {}\nДата: {}\nВикладач: {}\nПосилання на пару: {}".format(p['name'], human_readable_date, p['teacher'], p['link'])
self.RESPONSE = "Актуальна пара: {}\nДата: {}\nВикладач: {}\nПосилання на пару: {}".format(p['name'], human_readable_date, p['teacher'], p['link'])
print("test3.1.5")
else:
self.RESPONCE = "Пар немає взагалі. Ми вільні!"
self.RESPONSE = "Пар немає взагалі. Ми вільні!"
else:
print("test3.2")
@ -106,9 +106,9 @@ if self.MESSAGE["text"].lower() == "!пара":
human_readable_date += " до "
human_readable_date += dt_lesson_finish.strftime("%H:%M")
self.RESPONCE = "Актуальна пара: {}\nДата: {}\nВикладач: {}\nПосилання на пару: {}".format(p['name'], human_readable_date, p['teacher'], p['link'])
self.RESPONSE = "Актуальна пара: {}\nДата: {}\nВикладач: {}\nПосилання на пару: {}".format(p['name'], human_readable_date, p['teacher'], p['link'])
if self.MESSAGE["text"].lower().split()[0] == "!пари":
if self.MESSAGE["text"].lower().split()[0] == "!пари-old2":
command = self.MESSAGE["text"].lower().split()
preferences = {"name": True, "date": True, "teacher": True, "link": True}
@ -183,4 +183,4 @@ if self.MESSAGE["text"].lower().split()[0] == "!пари":
result_text += "\n"
self.RESPONCE = result_text
self.RESPONSE = result_text

View File

@ -24,7 +24,7 @@ if self.MESSAGE["text"].lower() == "!пара-old":
pair_found = True
break
self.RESPONCE = f"Сьогодні вихідний, тому пар немає)\n"\
self.RESPONSE = f"Сьогодні вихідний, тому пар немає)\n"\
f"Наступна пара - {next_pair['subject']} ({next_pair['lector']}) о {self.reverse_timetable[int(j)]} у {self.days_rod[day]}\n"\
f"Посилання (якщо воно чомусь треба): {next_pair['link']}"
else:
@ -33,13 +33,13 @@ if self.MESSAGE["text"].lower() == "!пара-old":
print("[DEBUG] Looking up a relevant pair...")
try:
relevant_pair = schedule[current_week][current_day][str(self.timetable[i])]
self.RESPONCE = f"Актуальна пара: {relevant_pair['subject']} ({relevant_pair['lector']}), початок о {self.reverse_timetable[self.timetable[i]]}\n"\
self.RESPONSE = f"Актуальна пара: {relevant_pair['subject']} ({relevant_pair['lector']}), початок о {self.reverse_timetable[self.timetable[i]]}\n"\
f"Посилання: {relevant_pair['link']}"
break
except Exception as e:
print(f"[WARN] module: auto-schedule: exception {e} while looking up the pair")
else:
self.RESPONCE = "Сьогодні більше немає пар"
self.RESPONSE = "Сьогодні більше немає пар"
except Exception as e:
print(f"[WARN] module: auto-schedule: failed to process schedule.json ({e})")

View File

@ -1,2 +1,2 @@
if msg.chat["type"] == "private":
self.RESPONCE = self.MESSAGE["text"]
self.RESPONSE = self.MESSAGE["text"]

View File

@ -9,4 +9,5 @@ if "%" in self.MESSAGE["text"]:
tagged_users |= self.tag_sets[i]
if tagging_issued:
self.RESPONCE = "Користувач використав масовий тег з повідомленням: {}\n\n{}".format(self.MESSAGE["text"], " ".join(tagged_users))
self.RESPONSE = f"Користувач використав масовий тег з повідомленням: {self.MESSAGE['text']}\n\n" \
f"{' '.join(tagged_users)}"

View File

@ -1,24 +1,24 @@
msg = self.MESSAGE["text"].lower()
responce_given = False
response_given = False
for file in os.listdir(self.path + "db/"):
if responce_given:
if response_given:
break
try:
criteria = json.loads( readfile(self.path + "db/" + file) )
criteria = json.loads(readfile(self.path + "db/" + file))
for wordset in criteria["trigger_lists"]:
for word_set in criteria["trigger_lists"]:
all_words_in = True
for word in wordset:
for word in word_set:
if word not in msg:
all_words_in = False
break
if all_words_in:
self.RESPONCE = criteria["responce_text"]
responce_given = True
self.RESPONSE = criteria["response_text"]
response_given = True
break
except Exception as e:

View File

@ -17,14 +17,15 @@ if (command[0] in self.aliases) and (1 <= command_length <= 3):
decoded_text = text_to_decode
if chosen_model not in models:
self.RESPONCE = f"Такого варіанту транслітерації не існує. Доступні варіанти: {', '.join(list(models.keys()))}"
self.RESPONSE = f"Такого варіанту транслітерації не існує. Доступні варіанти: " \
f"{', '.join(list(models.keys()))}"
else:
for i in models[chosen_model]:
decoded_text = decoded_text.replace(i[0], i[1])
decoded_text = decoded_text.replace(i[0].capitalize(), i[1].capitalize())
decoded_text = decoded_text.replace(i[0].upper(), i[1].upper())
self.RESPONCE = f"Результат: {decoded_text}"
self.RESPONSE = f"__Результат__\n{decoded_text}"
except Exception as e:
print(f"[translit-decoder] Got exception: {e}")