modular-bot-framework-for-t.../main.py

257 lines
8.6 KiB
Python

from telegram.ext import Updater, MessageHandler, Filters
from telegram.constants import ParseMode
import datetime
import codecs
import time
import json
import sys
import os
import threading
import importlib
# global variables
STOP_REQUESTED = False
# some functions that increase readability of the code
def readfile(filename):
try:
return codecs.open(filename, encoding="utf-8").read()
except FileNotFoundError:
return False
except Exception as e:
print(f"[ERROR] Unexpected error occurred in readfile() ({e})")
return False
# module object classes
class ModuleV1:
def __init__(self, path, code, enabled, alias, predefine):
self.version = 1
self.enabled = enabled
self.code = code
self.alias = alias
self.path = path
self.predefine = predefine
if self.predefine:
self.set_predefine()
# set environmental variables
def set_env(self):
self.RESPONCE = ""
def set_predefine(self):
try:
exec(self.predefine)
except Exception as e:
print(f"[ERROR] module v1: module \"{self.path}\" ({self.alias}) raised exception \"{e}\" "
f"during predefine stage, disabling it...")
# running the module
def process(self, msg):
self.set_env()
self.MESSAGE = msg
try:
exec(self.code)
return self.RESPONCE
except Exception as e:
print(f"[ERROR] module v1: module \"{self.path}\" ({self.alias}) raised exception \"{e}\"")
return ""
class ModuleV2:
def __init__(self, path, index_file, enabled, alias):
self.version = 2
self.enabled = enabled
self.alias = alias
self.path = path
self.index_file = index_file[:-3]
self.obj = importlib.import_module((path + self.index_file).replace("/", "."))
# running the module
def process(self, msg):
try:
response = self.obj.process(msg, self.path)
return response
except Exception as e:
print(f"[ERROR] module v2: module \"{self.path}\" ({self.alias}) raised exception \"{e}\"")
return None
# module control unit
class ModuleControlUnit:
def __init__(self):
self.modules = []
self.reload_modules()
print("[INFO] ModuleControlUnit: initialized successfully")
def reload_modules(self):
for folder in os.listdir("modules/"):
try:
meta_raw = readfile("modules/{}/meta.json".format(folder))
if not meta_raw:
print(f"[WARN] module_loader: no meta.json found in module folder \"{folder}\"")
continue
meta = json.loads( meta_raw )
if "version" in meta:
if meta["version"] == 1:
if "index_file" in meta:
index_file = meta["index_file"]
else:
index_file = "index.py"
code = readfile( "modules/{}/{}".format(folder, index_file) )
if not code: # False both when readfile() returns False and when the code string is empty
print("[WARN] reload_modules: module {} does not have any code, skipping...".format(folder))
continue
if "start_on_boot" in meta:
enabled = meta["start_on_boot"]
else:
enabled = False
if "alias" in meta:
alias = meta["alias"]
else:
alias = None
if "predefine" in meta:
predefine = readfile("modules/{}/{}".format(folder, meta["predefine"]))
else:
predefine = False
self.modules.append(ModuleV1(f"modules/{folder}/", code, enabled, alias, predefine))
print(f"[INFO] reload_modules: successfully loaded {folder} as {alias} "
f"(start_on_boot: {enabled})")
elif meta["version"] == 2:
if "index_file" in meta:
index_file = meta["index_file"]
else:
index_file = "index.py"
if "start_on_boot" in meta:
enabled = meta["start_on_boot"]
else:
enabled = False
if "alias" in meta:
alias = meta["alias"]
else:
alias = None
self.modules.append(ModuleV2(f"modules/{folder}/", index_file, enabled, alias))
print(f"[INFO] reload_modules: successfully loaded {folder} as {alias} "
f"(start_on_boot: {enabled})")
else:
print(f"[WARN] reload_modules: module {folder} requires unsupported version "
f"({meta['version']} > 2), skipping...")
except Exception as e:
print(f"[ERROR] module_loader: error while loading module \"{folder}\" ({e})")
# synchronous message processor
def queue_processor():
while True:
try:
if len(message_queue) > 0:
msg = message_queue[0]
print("[DEBUG] queue_processor: {}".format(msg)) # debug
# check for control commands
if msg["chat"]["id"] == 575246355:
if msg["text"][0] == "$":
command = msg["text"][1:].split(" ")
if len(command) >= 2 and command[0] == "module":
if command[1] == "reload":
print("[INFO] Module reloading triggered by a command")
# properly reload all v2 modules
for mod in mcu.modules:
if mod.version == 2:
importlib.reload(mod.obj)
del mcu.modules[:]
mcu.reload_modules()
del message_queue[0]
continue
# modules are used in here
for mod in mcu.modules:
if mod.enabled:
if mod.version == 1 or mod.version == 2:
responce = mod.process(msg)
if responce:
updater.bot.send_message(chat_id=msg.chat.id, text=responce,
disable_web_page_preview=True,
parse_mode=ParseMode.MARKDOWN_V2)
print(f"Responded using module {mod.path} ({mod.alias}) with text: {responce}")
break
del message_queue[0]
time.sleep(0.1)
else:
if STOP_REQUESTED:
break
else:
time.sleep(1)
except Exception:
print(f"[ERROR] queue_processor: current message queue: {message_queue}")
print("[ERROR] queue_processor: error while processing message, trying to delete it...")
try:
del message_queue[0]
print("[INFO] queue_processor: deleted broken message from the queue")
except:
print("[WARN] queue_processor: message seems absent, whatever")
print("[INFO] queue_processor thread stops successfully")
# telegram bot processor
def message_handler(update, context):
print("[DEBUG] Received new message") # just for testing
message_queue.append(update.message)
# --- Final stage ---
# initializing services and queues
message_queue = []
mcu = ModuleControlUnit()
processor_thread = threading.Thread(target=queue_processor, args=[])
processor_thread.start()
# connecting to Telegram servers and listening for messages
TOKEN = readfile("config/token")
if not TOKEN:
print("[CRIT] Token has not been defined, quitting")
sys.exit(1)
# connect to Telegram servers
updater = Updater(TOKEN, use_context=True)
dispatcher = updater.dispatcher
# assign the handler for messages
dispatcher.add_handler(MessageHandler(Filters.text, message_handler))
# run the bot
updater.start_polling()
updater.idle()