modular-bot-framework-for-t.../main.py

380 lines
16 KiB
Python

from telegram.ext import Updater, MessageHandler, Filters
import datetime
import codecs
import time
import json
import sys
import os
import threading
import importlib
import traceback
# global variables
STOP_REQUESTED = False
DEBUG_MODE = False
DELAY_AFTER_RESPONSE = 3
DELAY_AFTER_MESSAGE = 0.1
DELAY_AFTER_IDLE = 1.0
lock = threading.Lock()
# some functions that increase readability of the code
def readfile(filename):
try:
return codecs.open(filename, encoding="utf-8").read()
except FileNotFoundError:
return False
except Exception as e:
print(f"[ERROR] Unexpected error occurred in readfile() ({e})")
return False
# module object classes
class ModuleV1:
def __init__(self, path, code, enabled, alias, predefine):
self.version = 1
self.enabled = enabled
self.code = code
self.alias = alias
self.path = path
self.predefine = predefine
if self.predefine:
self.set_predefine()
# set environmental variables
def set_env(self):
self.RESPONSE = ""
self.FORMAT = ""
def set_predefine(self):
try:
if DEBUG_MODE:
print(f"Predefine on module v1 {self.alias} ({self.path})")
exec(self.predefine)
except Exception as e:
print(f"[ERROR] module v1: module \"{self.path}\" ({self.alias}) raised exception \"{e}\" "
f"during predefine stage, disabling it...")
# running the module
def process(self, msg):
self.set_env()
self.MESSAGE = msg
try:
if DEBUG_MODE:
print(f"Calling module v1 {self.alias} ({self.path})")
exec(self.code)
return self.RESPONSE, self.FORMAT
except Exception as e:
print(f"[ERROR] module v1: module \"{self.path}\" ({self.alias}) raised exception \"{e}\"")
print(f"[ERROR] module v1: traceback:\n{traceback.format_exc()}")
return "", None
class ModuleV2:
def __init__(self, path, index_file, enabled, alias):
self.version = 2
self.enabled = enabled
self.alias = alias
self.path = path
self.index_file = index_file[:-3]
self.obj = importlib.import_module((path + self.index_file).replace("/", "."))
# running the module
def process(self, msg):
try:
if DEBUG_MODE:
print(f"Calling module v2 {self.alias} ({self.path})")
return self.obj.process(msg, self.path)
except Exception as e:
print(f"[ERROR] module v2: module \"{self.path}\" ({self.alias}) raised exception \"{e}\"")
print(f"[ERROR] module v2: traceback:\ntraceback.format_exc()")
return None, None
# module control unit
class ModuleControlUnit:
def __init__(self):
self.modules = []
self.reload_modules()
print("[INFO] ModuleControlUnit: initialized successfully")
def reload_modules(self):
for folder in os.listdir("modules/"):
try:
meta_raw = readfile("modules/{}/meta.json".format(folder))
if not meta_raw:
print(f"[WARN] module_loader: no meta.json found in module folder \"{folder}\"")
continue
meta = json.loads( meta_raw )
if "version" in meta:
if meta["version"] == 1:
if "index_file" in meta:
index_file = meta["index_file"]
else:
index_file = "index.py"
code = readfile( "modules/{}/{}".format(folder, index_file) )
if not code: # False both when readfile() returns False and when the code string is empty
print("[WARN] reload_modules: module {} does not have any code, skipping...".format(folder))
continue
if "start_on_boot" in meta:
enabled = meta["start_on_boot"]
else:
enabled = False
if "alias" in meta:
alias = meta["alias"]
else:
alias = None
if "predefine" in meta:
predefine = readfile("modules/{}/{}".format(folder, meta["predefine"]))
else:
predefine = False
self.modules.append(ModuleV1(f"modules/{folder}/", code, enabled, alias, predefine))
print(f"[INFO] reload_modules: successfully loaded {folder} as {alias} "
f"(start_on_boot: {enabled})")
elif meta["version"] == 2:
if "index_file" in meta:
index_file = meta["index_file"]
else:
index_file = "index.py"
if "start_on_boot" in meta:
enabled = meta["start_on_boot"]
else:
enabled = False
if "alias" in meta:
alias = meta["alias"]
else:
alias = None
self.modules.append(ModuleV2(f"modules/{folder}/", index_file, enabled, alias))
print(f"[INFO] reload_modules: successfully loaded {folder} as {alias} "
f"(start_on_boot: {enabled})")
else:
print(f"[WARN] reload_modules: module {folder} requires unsupported version "
f"({meta['version']} > 2), skipping...")
except Exception as e:
print(f"[ERROR] module_loader: error while loading module \"{folder}\" ({e})")
# synchronous message processor
def queue_processor():
while True:
try:
if len(message_queue) > 0:
msg = message_queue[0]
del message_queue[0]
print("[DEBUG] queue_processor: {}".format(msg)) # debug
# check for control commands
if msg.chat.id == 575246355:
if msg["text"][0] == "$":
command = msg["text"][1:].split()
if len(command) >= 2 and command[0] == "module":
if command[1] == "reload":
if len(command) == 2:
print("[INFO] Full module reloading triggered by a command")
updater.bot.send_message(msg.chat.id, f"Reloading all modules...")
# properly reload all v2 modules
for mod in mcu.modules:
if mod.version == 2:
importlib.reload(mod.obj)
del mcu.modules[:]
mcu.reload_modules()
else:
# TO DO: make it possible to reload individual modules by their
# alias or containing folder
pass
elif command[1] == "enable" and len(command) == 3:
if command[2] == "all":
for mod in mcu.modules:
mod.enabled = True
print(f"[INFO] module {mod.alias} was enabled")
else:
for mod in mcu.modules:
if mod.alias == command[2]:
mod.enabled = True
print(f"[INFO] module {mod.alias} was enabled")
elif command[1] == "disable" and len(command) == 3:
if command[2] == "all":
for mod in mcu.modules:
mod.enabled = False
print(f"[INFO] module {mod.alias} was disabled")
else:
for mod in mcu.modules:
if mod.alias == command[2]:
mod.enabled = False
print(f"[INFO] module {mod.alias} was disabled")
elif command[1] == "status" and len(command) == 3:
if command[2] == "all":
for mod in mcu.modules:
print(f"[INFO] module {mod.alias} is {mod.enabled}")
else:
for mod in mcu.modules:
if mod.alias == command[2]:
print(f"[INFO] module {mod.alias} is {mod.enabled}")
elif (2 <= len(command) <= 3) and command[0] == "delay":
l = len(command)
if command[1] == "response":
if l == 3:
try:
new_value = float(command[2])
global DELAY_AFTER_RESPONSE
DELAY_AFTER_RESPONSE = new_value
updater.bot.send_message(msg.chat.id, f"Set DELAY_AFTER_RESPONSE to {command[2]}")
except:
print(f"[WARN]: Cannot set DELAY_AFTER_RESPONSE to non-float value of {command[2]}")
updater.bot.send_message(msg.chat.id, f"[WARN]: Cannot set DELAY_AFTER_RESPONSE to non-float value of {command[2]}")
elif l == 2:
print(f"[INFO]: DELAY_AFTER_RESPONSE = {DELAY_AFTER_RESPONSE}")
updater.bot.send_message(msg.chat.id, f"[INFO]: DELAY_AFTER_RESPONSE = {DELAY_AFTER_RESPONSE}")
elif command[1] == "message":
if l == 3:
try:
new_value = float(command[2])
global DELAY_AFTER_MESSAGE
DELAY_AFTER_MESSAGE = new_value
updater.bot.send_message(msg.chat.id, f"Set DELAY_AFTER_MESSAGE to {command[2]}")
except:
print("[WARN]: Cannot set DELAY_AFTER_MESSAGE to non-float value of {command[2]}")
updater.bot.send_message(msg.chat.id, f"[WARN]: Cannot set DELAY_AFTER_MESSAGE to non-float value of {command[2]}")
elif l == 2:
print(f"[INFO]: DELAY_AFTER_MESSAGE = {DELAY_AFTER_MESSAGE}")
updater.bot.send_message(msg.chat.id, f"[INFO]: DELAY_AFTER_MESSAGE = {DELAY_AFTER_MESSAGE}")
elif command[1] == "idle":
if l == 3:
try:
new_value = float(command[2])
global DELAY_AFTER_IDLE
DELAY_AFTER_IDLE = new_value
except:
print("[WARN]: Cannot set DELAY_AFTER_IDLE to non-float value of {command[2]}")
updater.bot.send_message(msg.chat.id, f"[WARN]: Cannot set DELAY_AFTER_MESSAGE to non-float value of {command[2]}")
elif l == 2:
print(f"[INFO]: DELAY_AFTER_IDLE = {DELAY_AFTER_IDLE}")
updater.bot.send_message(msg.chat.id, f"[INFO]: DELAY_AFTER_IDLE = {DELAY_AFTER_IDLE}")
elif len(command) == 2 and command[0] == "queue":
if command[1] == "size":
print(f"[INFO]: Queue length is {len(message_queue)}")
updater.bot.send_message(msg.chat.id, f"[INFO]: Queue length is {len(message_queue)}")
elif len(command) == 2 and command[0] == "debug":
global DEBUG_MODE
if command[1] == "on":
DEBUG_MODE = True
else:
DEBUG_MODE = False
continue
# modules are used in here
for mod in mcu.modules:
if mod.enabled:
if mod.version in [1, 2]:
try:
response, formatting = mod.process(msg)
except Exception as e:
print(f"Module {mod.alias} ({mod.path}) failed to do a proper return, skipping...")
continue
if response:
if not formatting:
print(f"Responding using module {mod.path} ({mod.alias}) with text: {response}")
updater.bot.send_message(chat_id=msg.chat.id, text=response,
disable_web_page_preview=True)
time.sleep(DELAY_AFTER_RESPONSE)
break
elif formatting in ["Markdown", "MarkdownV2", "HTML"]:
print(f"Responding using module {mod.path} ({mod.alias}) with text (using {formatting}): {response}")
updater.bot.send_message(chat_id=msg.chat.id, text=response,
disable_web_page_preview=True,
parse_mode=formatting)
time.sleep(DELAY_AFTER_RESPONSE)
break
time.sleep(DELAY_AFTER_MESSAGE)
else:
if STOP_REQUESTED:
break
else:
time.sleep(DELAY_AFTER_IDLE)
except Exception as e:
print(f"[ERROR] queue_processor: current message queue: {message_queue}")
print(f"[ERROR] Traceback:\n{traceback.format_exc()}")
print(f"[ERROR] queue_processor: error while processing message ({e})...")
print("[INFO] queue_processor: skipped broken message")
print("[INFO] queue_processor thread stops successfully")
# telegram bot processor
def message_handler(update, context):
global lock
with lock:
if update.message and update.message.text.__class__ == str and update.message.text.startswith("$"):
print("[DEBUG] Received new message with high priority") # just for testing
message_queue.insert(0, update.message)
elif update.message:
print("[DEBUG] Received new message") # just for testing
message_queue.append(update.message)
else:
print(f"[DEBUG] Received {update.message} instead of a message")
# --- Final stage ---
# initializing services and queues
message_queue = []
mcu = ModuleControlUnit()
processor_thread = threading.Thread(target=queue_processor, args=[])
processor_thread.start()
# connecting to Telegram servers and listening for messages
TOKEN = readfile("config/token")
if not TOKEN:
print("[CRIT] Token has not been defined, quitting")
sys.exit(1)
# connect to Telegram servers
updater = Updater(TOKEN, use_context=True)
dispatcher = updater.dispatcher
# assign the handler for messages
dispatcher.add_handler(MessageHandler(Filters.text, message_handler))
# run the bot
updater.start_polling()
updater.idle()