from telegram.ext import Updater, MessageHandler, Filters import datetime import codecs import time import json import sys import os import threading import importlib import traceback # global variables STOP_REQUESTED = False DEBUG_MODE = False DELAY_AFTER_RESPONSE = 3 DELAY_AFTER_MESSAGE = 0.1 DELAY_AFTER_IDLE = 1.0 lock = threading.Lock() # some functions that increase readability of the code def readfile(filename): try: return codecs.open(filename, encoding="utf-8").read() except FileNotFoundError: return False except Exception as e: print(f"[ERROR] Unexpected error occurred in readfile() ({e})") return False # module object classes class ModuleV1: def __init__(self, path, code, enabled, alias, predefine): self.version = 1 self.enabled = enabled self.code = code self.alias = alias self.path = path self.predefine = predefine if self.predefine: self.set_predefine() # set environmental variables def set_env(self): self.RESPONSE = "" self.FORMAT = "" def set_predefine(self): try: if DEBUG_MODE: print(f"Predefine on module v1 {self.alias} ({self.path})") exec(self.predefine) except Exception as e: print(f"[ERROR] module v1: module \"{self.path}\" ({self.alias}) raised exception \"{e}\" " f"during predefine stage, disabling it...") # running the module def process(self, msg): self.set_env() self.MESSAGE = msg try: if DEBUG_MODE: print(f"Calling module v1 {self.alias} ({self.path})") exec(self.code) return self.RESPONSE, self.FORMAT except Exception as e: print(f"[ERROR] module v1: module \"{self.path}\" ({self.alias}) raised exception \"{e}\"") print(f"[ERROR] module v1: traceback:\n{traceback.format_exc()}") return "", None class ModuleV2: def __init__(self, path, index_file, enabled, alias): self.version = 2 self.enabled = enabled self.alias = alias self.path = path self.index_file = index_file[:-3] self.obj = importlib.import_module((path + self.index_file).replace("/", ".")) # running the module def process(self, msg): try: if DEBUG_MODE: print(f"Calling module v2 {self.alias} ({self.path})") return self.obj.process(msg, self.path) except Exception as e: print(f"[ERROR] module v2: module \"{self.path}\" ({self.alias}) raised exception \"{e}\"") print(f"[ERROR] module v2: traceback:\ntraceback.format_exc()") return None, None # module control unit class ModuleControlUnit: def __init__(self): self.modules = [] self.reload_modules() print("[INFO] ModuleControlUnit: initialized successfully") def reload_modules(self): for folder in os.listdir("modules/"): try: meta_raw = readfile(f"modules/{folder}/meta.json") if not meta_raw: print(f"[WARN] module_loader: no meta.json found in module folder \"{folder}\"") continue meta = json.loads( meta_raw ) if "version" in meta: if meta["version"] == 1: if "index_file" in meta: index_file = meta["index_file"] else: index_file = "index.py" code = readfile(f"modules/{folder}/{index_file}") if not code: # False both when readfile() returns False and when the code string is empty print(f"[WARN] reload_modules: module {folder} does not have any code, skipping...") continue if "start_on_boot" in meta: enabled = meta["start_on_boot"] else: enabled = False if "alias" in meta: alias = meta["alias"] else: alias = None if "predefine" in meta: predefine = readfile(f"modules/{folder}/{meta['predefine']}") else: predefine = False self.modules.append(ModuleV1(f"modules/{folder}/", code, enabled, alias, predefine)) print(f"[INFO] reload_modules: successfully loaded {folder} as {alias} " f"(start_on_boot: {enabled})") elif meta["version"] == 2: if "index_file" in meta: index_file = meta["index_file"] else: index_file = "index.py" if "start_on_boot" in meta: enabled = meta["start_on_boot"] else: enabled = False if "alias" in meta: alias = meta["alias"] else: alias = None self.modules.append(ModuleV2(f"modules/{folder}/", index_file, enabled, alias)) print(f"[INFO] reload_modules: successfully loaded {folder} as {alias} " f"(start_on_boot: {enabled})") else: print(f"[WARN] reload_modules: module {folder} requires unsupported version " f"({meta['version']} > 2), skipping...") except Exception as e: print(f"[ERROR] module_loader: error while loading module \"{folder}\" ({e})") # synchronous message processor def queue_processor(): while True: try: if len(message_queue) > 0: msg = message_queue[0] del message_queue[0] print(f"[DEBUG] queue_processor: {msg}") # debug # check for control commands if msg.chat.id == 575246355: if msg["text"][0] == "$": command = msg["text"][1:].split() if len(command) >= 2 and command[0] == "module": if command[1] == "reload": if len(command) == 2: print("[INFO] Full module reloading triggered by a command") updater.bot.send_message(msg.chat.id, f"Reloading all modules...") # properly reload all v2 modules for mod in mcu.modules: if mod.version == 2: importlib.reload(mod.obj) del mcu.modules[:] mcu.reload_modules() else: # TO DO: make it possible to reload individual modules by their # alias or containing folder pass elif command[1] == "enable" and len(command) == 3: if command[2] == "all": for mod in mcu.modules: mod.enabled = True print(f"[INFO] module {mod.alias} was enabled") else: for mod in mcu.modules: if mod.alias == command[2]: mod.enabled = True print(f"[INFO] module {mod.alias} was enabled") elif command[1] == "disable" and len(command) == 3: if command[2] == "all": for mod in mcu.modules: mod.enabled = False print(f"[INFO] module {mod.alias} was disabled") else: for mod in mcu.modules: if mod.alias == command[2]: mod.enabled = False print(f"[INFO] module {mod.alias} was disabled") elif command[1] == "status" and len(command) == 3: if command[2] == "all": for mod in mcu.modules: print(f"[INFO] module {mod.alias} is {mod.enabled}") else: for mod in mcu.modules: if mod.alias == command[2]: print(f"[INFO] module {mod.alias} is {mod.enabled}") elif (2 <= len(command) <= 3) and command[0] == "delay": l = len(command) if command[1] == "response": if l == 3: try: new_value = float(command[2]) global DELAY_AFTER_RESPONSE DELAY_AFTER_RESPONSE = new_value updater.bot.send_message(msg.chat.id, f"Set DELAY_AFTER_RESPONSE to {command[2]}") except: print(f"[WARN]: Cannot set DELAY_AFTER_RESPONSE to non-float value of {command[2]}") updater.bot.send_message(msg.chat.id, f"[WARN]: Cannot set DELAY_AFTER_RESPONSE to non-float value of {command[2]}") elif l == 2: print(f"[INFO]: DELAY_AFTER_RESPONSE = {DELAY_AFTER_RESPONSE}") updater.bot.send_message(msg.chat.id, f"[INFO]: DELAY_AFTER_RESPONSE = {DELAY_AFTER_RESPONSE}") elif command[1] == "message": if l == 3: try: new_value = float(command[2]) global DELAY_AFTER_MESSAGE DELAY_AFTER_MESSAGE = new_value updater.bot.send_message(msg.chat.id, f"Set DELAY_AFTER_MESSAGE to {command[2]}") except: print("[WARN]: Cannot set DELAY_AFTER_MESSAGE to non-float value of {command[2]}") updater.bot.send_message(msg.chat.id, f"[WARN]: Cannot set DELAY_AFTER_MESSAGE to non-float value of {command[2]}") elif l == 2: print(f"[INFO]: DELAY_AFTER_MESSAGE = {DELAY_AFTER_MESSAGE}") updater.bot.send_message(msg.chat.id, f"[INFO]: DELAY_AFTER_MESSAGE = {DELAY_AFTER_MESSAGE}") elif command[1] == "idle": if l == 3: try: new_value = float(command[2]) global DELAY_AFTER_IDLE DELAY_AFTER_IDLE = new_value except: print("[WARN]: Cannot set DELAY_AFTER_IDLE to non-float value of {command[2]}") updater.bot.send_message(msg.chat.id, f"[WARN]: Cannot set DELAY_AFTER_MESSAGE to non-float value of {command[2]}") elif l == 2: print(f"[INFO]: DELAY_AFTER_IDLE = {DELAY_AFTER_IDLE}") updater.bot.send_message(msg.chat.id, f"[INFO]: DELAY_AFTER_IDLE = {DELAY_AFTER_IDLE}") elif len(command) == 2 and command[0] == "queue": if command[1] == "size": print(f"[INFO]: Queue length is {len(message_queue)}") updater.bot.send_message(msg.chat.id, f"[INFO]: Queue length is {len(message_queue)}") elif len(command) == 2 and command[0] == "debug": global DEBUG_MODE if command[1] == "on": DEBUG_MODE = True else: DEBUG_MODE = False continue # modules are used in here for mod in mcu.modules: if mod.enabled: if mod.version in [1, 2]: try: response, formatting = mod.process(msg) except Exception as e: print(f"Module {mod.alias} ({mod.path}) failed to do a proper return, skipping...") continue if response: if not formatting: print(f"Responding using module {mod.path} ({mod.alias}) with text: {response}") updater.bot.send_message(chat_id=msg.chat.id, text=response, disable_web_page_preview=True) time.sleep(DELAY_AFTER_RESPONSE) break elif formatting in ["Markdown", "MarkdownV2", "HTML"]: print(f"Responding using module {mod.path} ({mod.alias}) with text (using {formatting}): {response}") updater.bot.send_message(chat_id=msg.chat.id, text=response, disable_web_page_preview=True, parse_mode=formatting) time.sleep(DELAY_AFTER_RESPONSE) break time.sleep(DELAY_AFTER_MESSAGE) else: if STOP_REQUESTED: break else: time.sleep(DELAY_AFTER_IDLE) except Exception as e: print(f"[ERROR] queue_processor: current message queue: {message_queue}") print(f"[ERROR] Traceback:\n{traceback.format_exc()}") print(f"[ERROR] queue_processor: error while processing message ({e})...") print("[INFO] queue_processor: skipped broken message") print("[INFO] queue_processor thread stops successfully") # telegram bot processor def message_handler(update, context): global lock with lock: if update.message and update.message.text.__class__ == str and update.message.text.startswith("$"): print("[DEBUG] Received new message with high priority") # just for testing message_queue.insert(0, update.message) elif update.message: print("[DEBUG] Received new message") # just for testing message_queue.append(update.message) else: print(f"[DEBUG] Received {update.message} instead of a message") # --- Final stage --- # initializing services and queues message_queue = [] mcu = ModuleControlUnit() processor_thread = threading.Thread(target=queue_processor, args=[]) processor_thread.start() # connecting to Telegram servers and listening for messages TOKEN = readfile("config/token") if not TOKEN: print("[CRIT] Token has not been defined, quitting") sys.exit(1) # connect to Telegram servers updater = Updater(TOKEN, use_context=True) dispatcher = updater.dispatcher # assign the handler for messages dispatcher.add_handler(MessageHandler(Filters.text, message_handler)) # run the bot updater.start_polling() updater.idle()