from telegram.ext import Updater, MessageHandler, Filters import datetime import codecs import time import json import sys import os import threading import importlib import traceback # global variables STOP_REQUESTED = False # some functions that increase readability of the code def readfile(filename): try: return codecs.open(filename, encoding="utf-8").read() except FileNotFoundError: return False except Exception as e: print(f"[ERROR] Unexpected error occurred in readfile() ({e})") return False # module object classes class ModuleV1: def __init__(self, path, code, enabled, alias, predefine): self.version = 1 self.enabled = enabled self.code = code self.alias = alias self.path = path self.predefine = predefine if self.predefine: self.set_predefine() # set environmental variables def set_env(self): self.RESPONSE = "" self.FORMAT = "" def set_predefine(self): try: exec(self.predefine) except Exception as e: print(f"[ERROR] module v1: module \"{self.path}\" ({self.alias}) raised exception \"{e}\" " f"during predefine stage, disabling it...") # running the module def process(self, msg): self.set_env() self.MESSAGE = msg try: exec(self.code) return self.RESPONSE, self.FORMAT except Exception as e: print(f"[ERROR] module v1: module \"{self.path}\" ({self.alias}) raised exception \"{e}\"") print(f"[ERROR] module v1: traceback:\ntraceback.format_exc()") return "", None class ModuleV2: def __init__(self, path, index_file, enabled, alias): self.version = 2 self.enabled = enabled self.alias = alias self.path = path self.index_file = index_file[:-3] self.obj = importlib.import_module((path + self.index_file).replace("/", ".")) # running the module def process(self, msg): try: return self.obj.process(msg, self.path) except Exception as e: print(f"[ERROR] module v2: module \"{self.path}\" ({self.alias}) raised exception \"{e}\"") print(f"[ERROR] module v2: traceback:\ntraceback.format_exc()") return None, None # module control unit class ModuleControlUnit: def __init__(self): self.modules = [] self.reload_modules() print("[INFO] ModuleControlUnit: initialized successfully") def reload_modules(self): for folder in os.listdir("modules/"): try: meta_raw = readfile("modules/{}/meta.json".format(folder)) if not meta_raw: print(f"[WARN] module_loader: no meta.json found in module folder \"{folder}\"") continue meta = json.loads( meta_raw ) if "version" in meta: if meta["version"] == 1: if "index_file" in meta: index_file = meta["index_file"] else: index_file = "index.py" code = readfile( "modules/{}/{}".format(folder, index_file) ) if not code: # False both when readfile() returns False and when the code string is empty print("[WARN] reload_modules: module {} does not have any code, skipping...".format(folder)) continue if "start_on_boot" in meta: enabled = meta["start_on_boot"] else: enabled = False if "alias" in meta: alias = meta["alias"] else: alias = None if "predefine" in meta: predefine = readfile("modules/{}/{}".format(folder, meta["predefine"])) else: predefine = False self.modules.append(ModuleV1(f"modules/{folder}/", code, enabled, alias, predefine)) print(f"[INFO] reload_modules: successfully loaded {folder} as {alias} " f"(start_on_boot: {enabled})") elif meta["version"] == 2: if "index_file" in meta: index_file = meta["index_file"] else: index_file = "index.py" if "start_on_boot" in meta: enabled = meta["start_on_boot"] else: enabled = False if "alias" in meta: alias = meta["alias"] else: alias = None self.modules.append(ModuleV2(f"modules/{folder}/", index_file, enabled, alias)) print(f"[INFO] reload_modules: successfully loaded {folder} as {alias} " f"(start_on_boot: {enabled})") else: print(f"[WARN] reload_modules: module {folder} requires unsupported version " f"({meta['version']} > 2), skipping...") except Exception as e: print(f"[ERROR] module_loader: error while loading module \"{folder}\" ({e})") # synchronous message processor def queue_processor(): while True: try: if len(message_queue) > 0: msg = message_queue[0] print("[DEBUG] queue_processor: {}".format(msg)) # debug # check for control commands if msg["chat"]["id"] == 575246355: if msg["text"][0] == "$": command = msg["text"][1:].split(" ") if len(command) >= 2 and command[0] == "module": if command[1] == "reload": print("[INFO] Module reloading triggered by a command") # properly reload all v2 modules for mod in mcu.modules: if mod.version == 2: importlib.reload(mod.obj) del mcu.modules[:] mcu.reload_modules() del message_queue[0] continue # modules are used in here for mod in mcu.modules: if mod.enabled: if mod.version in [1, 2]: response, formatting = mod.process(msg) if response: if formatting == None: updater.bot.send_message(chat_id=msg.chat.id, text=response, disable_web_page_preview=True) print(f"Responded using module {mod.path} ({mod.alias}) with text: {response}") break elif formatting in ["Markdown", "MarkdownV2", "HTML"]: updater.bot.send_message(chat_id=msg.chat.id, text=response, disable_web_page_preview=True, parse_mode=formatting) print(f"Responded using module {mod.path} ({mod.alias}) with text (using {formatting}): {response}") break del message_queue[0] time.sleep(0.1) else: if STOP_REQUESTED: break else: time.sleep(1) except Exception as e: print(f"[ERROR] queue_processor: current message queue: {message_queue}") print(f"[ERROR] Traceback:\n{traceback.format_exc()}") print(f"[ERROR] queue_processor: error while processing message ({e}), trying to delete it...") try: del message_queue[0] print("[INFO] queue_processor: deleted broken message from the queue") except: print("[WARN] queue_processor: message seems absent, whatever") print("[INFO] queue_processor thread stops successfully") # telegram bot processor def message_handler(update, context): print("[DEBUG] Received new message") # just for testing message_queue.append(update.message) # --- Final stage --- # initializing services and queues message_queue = [] mcu = ModuleControlUnit() processor_thread = threading.Thread(target=queue_processor, args=[]) processor_thread.start() # connecting to Telegram servers and listening for messages TOKEN = readfile("config/token") if not TOKEN: print("[CRIT] Token has not been defined, quitting") sys.exit(1) # connect to Telegram servers updater = Updater(TOKEN, use_context=True) dispatcher = updater.dispatcher # assign the handler for messages dispatcher.add_handler(MessageHandler(Filters.text, message_handler)) # run the bot updater.start_polling() updater.idle()