Compare commits

...

2 Commits

3 changed files with 252 additions and 8 deletions

240
module-testing.py Normal file
View File

@ -0,0 +1,240 @@
import datetime
import codecs
import time
import json
import sys
import os
import threading
# global variables
STOP_REQUESTED = False
# some functions that increase readability of the code
def readfile(filename):
try:
return codecs.open(filename, encoding = "utf-8").read()
except FileNotFoundError:
return False
except Exception as e:
print( "[ERROR] Unexpected error occured in readfile() ({0})".format(e) )
return False
# module object classes
class ModuleV1:
def __init__(self, path, code, enabled, alias, predefine):
self.version = 1
self.enabled = enabled
self.code = code
self.alias = alias
self.path = path
self.predefine = predefine
if self.predefine:
self.set_predefine()
# set environmental variables
def set_env(self):
self.RESPONCE = ""
def set_predefine(self):
try:
exec(self.predefine)
except Exception as e:
print("[ERROR] module: module \"{}\" ({}) raised exception \"{}\" during predefine stage, disabling it...".format(self.path, self.alias, e))
# running the module
def process(self, msg):
self.set_env()
self.MESSAGE = msg
try:
exec(self.code)
return self.RESPONCE
except Exception as e:
print("[ERROR] module: module \"{}\" ({}) raised exception \"{}\"".format(self.path, self.alias, e))
return ""
# module control unit
class ModuleControlUnit:
def __init__(self):
self.modules = []
self.reload_modules()
print("[INFO] ModuleControlUnit: initialized successfully")
def reload_modules(self):
for folder in os.listdir("modules/"):
try:
meta_raw = readfile("modules/{}/meta.json".format(folder))
if not meta_raw:
print("[WARN] module_loader: no meta.json found in module folder \"{}\"".format(folder))
continue
meta = json.loads( meta_raw )
if "version" in meta:
if meta["version"] == 1:
if "index_file" in meta:
index_file = meta["index_file"]
else:
index_file = "index.py"
code = readfile( "modules/{}/{}".format(folder, index_file) )
if not code: # False both when readfile() returns False and when the code string is empty
print("[WARN] reload_modules: module {} does not have any code, skipping...".format(folder))
continue
if "start_on_boot" in meta:
enabled = meta["start_on_boot"]
else:
enabled = False
if "alias" in meta:
alias = meta["alias"]
else:
alias = None
if "predefine" in meta:
predefine = readfile("modules/{}/{}".format(folder, meta["predefine"]))
else:
predefine = False
self.modules.append( ModuleV1( "modules/{}/".format(folder), code, enabled, alias, predefine ) )
print("[INFO] reload_modules: successfully loaded {} as {} (start_on_boot: {})".format(folder, alias, enabled))
else:
print("[WARN] reload_modules: module {} requires unsupported version ({} > 1), skipping...".format(folder, meta["version"]))
except Exception as e:
print("[ERROR] module_loader: error while loading module \"{}\" ({})".format(folder, e))
def reload_module(self, folder_name):
try:
meta_raw = readfile("modules/{}/meta.json".format(folder))
if not meta_raw:
print("[WARN] module_loader: no meta.json found in module folder \"{}\"".format(folder_name))
return 1
meta = json.loads( readfile("modules/{}/meta.json".format(folder_name)) )
if "version" in meta:
if meta["version"] == 1:
if "index_file" in meta:
index_file = meta["index_file"]
else:
index_file = "index.py"
code = readfile( "modules/{}/{}".format(folder_name, index_file) )
if not code: # False both when readfile() returns False and when the code string is empty
print("[WARN] reload_modules: module {} does not have any code, skipping...".format(folder_name))
if "start_on_boot" in meta:
enabled = meta["start_on_boot"]
else:
enabled = False
if "alias" in meta:
alias = meta["alias"]
else:
alias = None
self.modules.append( ModuleV1( code, enabled, alias ) )
print("[INFO] reload_modules: successfully loaded {} as {} (start_on_boot: {})".format(folder_name, alias, enabled))
else:
print("[WARN] reload_modules: module {} requires unsupported version ({} > 1), skipping...".format(folder_name, meta["version"]))
except Exception as e:
print("[ERROR] module_loader: error while loading module \"{}\" ({})".format(folder_name, e))
# message queue object to go back to synchronous message processing
#class MessageQueue:
# def __init__(self):
# print("[INFO] Initializing the message queue...")
# self.queue = []
# synchronous message processor
def queue_processor():
while True:
if len(message_queue) > 0:
msg = message_queue[0]
print("[DEBUG] queue_processor: {}".format(msg)) # debug
# check for control commands
if msg["chat"]["id"] == 575246355:
if msg["text"][0] == "$":
command = msg["text"][1:].split(" ")
if len(command) >= 2 and command[0] == "module":
if command[1] == "reload":
print("[INFO] Module reloading triggered by a command")
del mcu.modules[:]
mcu.reload_modules()
del message_queue[0]
continue
# modules are used in here
for mod in mcu.modules:
if mod.enabled:
if mod.version == 1:
responce = mod.process(msg)
if len(responce) > 0:
#updater.bot.send_message(chat_id = msg.chat.id, text = responce)
print("Responded using module {} ({}) with text: {}".format(mod.path, mod.alias, responce))
break
del message_queue[0]
else:
if STOP_REQUESTED:
break
else:
time.sleep(1)
print("[INFO] queue_processor thread stops successfully")
# telegram bot processor
def message_handler(update, context):
print("[DEBUG] Received new message") # just for testing
message_queue.append(update.message)
# --- Final stage ---
# initializing services and queues
message_queue = []
mcu = ModuleControlUnit()
processor_thread = threading.Thread( target = queue_processor, args = [] )
processor_thread.start()
# connecting to Telegram servers and listening for messages
'''
TOKEN = readfile("config/token")
if not TOKEN:
print("[CRIT] Token has not been defined, quitting")
sys.exit(1)
# connect to Telegram servers
updater = Updater(TOKEN, use_context = True)
dispatcher = updater.dispatcher
# assign the handler for messages
dispatcher.add_handler(MessageHandler(Filters.text, message_handler))
# run the bot
updater.start_polling()
updater.idle()
'''
print("Enter testing messages one by one, end with an empty line")
while True:
new_msg = input()
if len(new_msg) == 0:
break
message_queue.append({'text': new_msg, 'new_chat_photo': [], 'message_id': 74, 'new_chat_members': [], 'delete_chat_photo': False, 'group_chat_created': False, 'supergroup_chat_created': False, 'chat': {'first_name': 'Дмитро', 'username': 'dmytrofiot23', 'id': 575246355, 'type': 'private'}, 'caption_entities': [], 'channel_chat_created': False, 'date': 1667585646, 'photo': [], 'entities': [], 'from': {'first_name': 'Дмитро', 'id': 575246355, 'is_bot': False, 'username': 'dmytrofiot23', 'language_code': 'en'}})

View File

@ -5,10 +5,11 @@ if self.MESSAGE["text"].lower() == "!пара":
current_time = datetime.datetime.now()
current_week = current_time.isocalendar()[1] % 2
current_day = current_time.weekday() - 1
current_day = current_time.weekday()
current_seconds = current_time.hour * 3600 + current_time.minute * 60 + current_time.second
if current_day > 4:
print("[DEBUG] Current day is {}({})".format(type(current_day), current_day))
if current_day > 4 or current_day < 0:
next_week = int(not bool(current_week))
day = -1
@ -18,19 +19,19 @@ if self.MESSAGE["text"].lower() == "!пара":
for i in schedule[next_week]:
if not pair_found:
day += 1
for j in schedule[next_week][i]:
next_pair = schedule[next_week][i][j]
for j in schedule[next_week][day]:
next_pair = schedule[next_week][day][j]
pair_found = True
break
self.RESPONCE = "Сьогодні вихідний, тому пар немає)\nНаступна пара - {} ({}) у {}\nПосилання (якщо воно чомусь треба): {}".format(next_pair["subject"], next_pair["lector"], self.days[day], next_pair["link"])
self.RESPONCE = "Сьогодні вихідний, тому пар немає)\nНаступна пара - {} ({}) о {} у {}\nПосилання (якщо воно чомусь треба): {}".format(next_pair["subject"], next_pair["lector"], self.reverse_timetable[int(j)], self.days_rod[day], next_pair["link"])
else:
for i in self.timetable:
if current_seconds < i:
print("[DEBUG] Looking up a relevant pair...")
try:
relevant_pair = schedule[current_week][current_day + 1][str(self.timetable[i])]
self.RESPONCE = "Актуальна пара: {} ({})\nПосилання: {}".format(relevant_pair["subject"], relevant_pair["lector"], relevant_pair["link"])
relevant_pair = schedule[current_week][current_day][str(self.timetable[i])]
self.RESPONCE = "Актуальна пара: {} ({}), початок о {}\nПосилання: {}".format(relevant_pair["subject"], relevant_pair["lector"], self.reverse_timetable[self.timetable[i]], relevant_pair["link"])
break
except Exception as e:
print("[WARN] module: auto-schedule: exception {} while looking up the pair".format(e))

View File

@ -1 +1,4 @@
self.timetable = {36300: 0, 43200: 1, 50100: 2, 57000: 3, 63900: 4, 72300: 5, 78900: 6}
self.days_rod = ["понеділок", "вівторок", "середу", "четвер", "п'ятницю"]
self.reverse_timetable = ["8:30", "10:25", "12:20", "14:15", "16:10", "18:30", "20:20"]