modular-bot-framework-for-t.../module-testing.py

244 lines
8.1 KiB
Python
Raw Normal View History

import datetime
import codecs
import time
import json
import sys
import os
import threading
2023-05-03 21:06:05 +03:00
import importlib
from telegram import Message, Chat
# global variables
STOP_REQUESTED = False
# some functions that increase readability of the code
def readfile(filename):
try:
return codecs.open(filename, encoding="utf-8").read()
except FileNotFoundError:
return False
except Exception as e:
print(f"[ERROR] Unexpected error occurred in readfile() ({e})")
return False
# module object classes
class ModuleV1:
def __init__(self, path, code, enabled, alias, predefine):
self.version = 1
self.enabled = enabled
self.code = code
self.alias = alias
self.path = path
self.predefine = predefine
if self.predefine:
self.set_predefine()
# set environmental variables
def set_env(self):
2023-05-08 10:14:28 +03:00
self.RESPONSE = ""
def set_predefine(self):
try:
exec(self.predefine)
except Exception as e:
print(f"[ERROR] module v1: module \"{self.path}\" ({self.alias}) raised exception \"{e}\""
f" during predefine stage, disabling it...")
# running the module
def process(self, msg):
self.set_env()
self.MESSAGE = msg
try:
exec(self.code)
2023-05-08 10:14:28 +03:00
return self.RESPONSE
except Exception as e:
print(f"[ERROR] module v1: module \"{self.path}\" ({self.alias}) raised exception \"{e}\"")
return ""
2023-05-03 21:06:05 +03:00
class ModuleV2:
def __init__(self, path, index_file, enabled, alias):
self.version = 2
self.enabled = enabled
self.alias = alias
self.path = path
self.index_file = index_file[:-3]
self.obj = importlib.import_module((path + self.index_file).replace("/", "."))
# running the module
def process(self, msg):
try:
responce = self.obj.process(msg, self.path)
return responce
except Exception as e:
print(f"[ERROR] module v2: module \"{self.path}\" ({self.alias}) raised exception \"{e}\"")
return None
# module control unit
class ModuleControlUnit:
def __init__(self):
self.modules = []
self.reload_modules()
print("[INFO] ModuleControlUnit: initialized successfully")
def reload_modules(self):
for folder in os.listdir("modules/"):
try:
meta_raw = readfile("modules/{}/meta.json".format(folder))
if not meta_raw:
print(f"[WARN] module_loader: no meta.json found in module folder \"{folder}\"")
continue
meta = json.loads(meta_raw)
if "version" in meta:
if meta["version"] == 1:
if "index_file" in meta:
index_file = meta["index_file"]
else:
index_file = "index.py"
code = readfile("modules/{}/{}".format(folder, index_file))
if not code: # False both when readfile() returns False and when the code string is empty
print(f"[WARN] reload_modules: module {folder} does not have any code, skipping...")
continue
if "start_on_boot" in meta:
enabled = meta["start_on_boot"]
else:
enabled = False
if "alias" in meta:
alias = meta["alias"]
else:
alias = None
if "predefine" in meta:
predefine = readfile("modules/{}/{}".format(folder, meta["predefine"]))
else:
predefine = False
self.modules.append(ModuleV1("modules/{}/".format(folder), code, enabled, alias, predefine))
print(f"[INFO] reload_modules: successfully loaded {folder} as {alias} "
f"(start_on_boot: {enabled})")
2023-05-03 21:06:05 +03:00
elif meta["version"] == 2:
if "index_file" in meta:
index_file = meta["index_file"]
else:
index_file = "index.py"
if "start_on_boot" in meta:
enabled = meta["start_on_boot"]
else:
enabled = False
if "alias" in meta:
alias = meta["alias"]
else:
alias = None
self.modules.append(ModuleV2(f"modules/{folder}/", index_file, enabled, alias))
print(f"[INFO] reload_modules: successfully loaded {folder} as {alias} "
f"(start_on_boot: {enabled})")
else:
print(f"[WARN] reload_modules: module {folder} requires unsupported version "
f"({meta['version']} > 2), skipping...")
2023-05-03 21:06:05 +03:00
except Exception as e:
print(f"[ERROR] module_loader: error while loading module \"{folder}\" ({e})")
# message queue object to go back to synchronous message processing
# class MessageQueue:
# def __init__(self):
# print("[INFO] Initializing the message queue...")
# self.queue = []
# synchronous message processor
def queue_processor():
while True:
try:
if len(message_queue) > 0:
msg = message_queue[0]
print("[DEBUG] queue_processor: {}".format(msg)) # debug
2023-05-03 21:06:05 +03:00
# check for control commands
if msg["chat"]["id"] == 575246355:
if msg["text"][0] == "$":
command = msg["text"][1:].split(" ")
if len(command) >= 2 and command[0] == "module":
if command[1] == "reload":
print("[INFO] Module reloading triggered by a command")
2023-05-03 21:44:46 +03:00
# properly reload all v2 modules
for mod in mcu.modules:
if mod.version == 2:
importlib.reload(mod.obj)
2023-05-03 21:44:46 +03:00
del mcu.modules[:]
mcu.reload_modules()
del message_queue[0]
continue
2023-05-03 21:06:05 +03:00
# modules are used in here
for mod in mcu.modules:
if mod.enabled:
2023-05-03 21:06:05 +03:00
if mod.version == 1 or mod.version == 2:
2023-05-08 10:14:28 +03:00
response = mod.process(msg)
if response:
print(f"Responded using module {mod.path} ({mod.alias}) with text: {response}")
break
2023-05-03 21:06:05 +03:00
del message_queue[0]
else:
if STOP_REQUESTED:
break
else:
time.sleep(1)
2023-05-03 21:06:05 +03:00
2023-05-08 10:14:28 +03:00
except Exception:
print(f"[ERROR] queue_processor: current message queue: {message_queue}")
print("[ERROR] queue_processor: error while processing message, trying to delete it...")
try:
del message_queue[0]
print("[INFO] queue_processor: deleted broken message from the queue")
except:
print("[WARN] queue_processor: message seems absent, whatever")
print("[INFO] queue_processor thread stops successfully")
# --- Final stage ---
# initializing services and queues
message_queue = []
mcu = ModuleControlUnit()
processor_thread = threading.Thread(target=queue_processor, args=[])
processor_thread.start()
print("Enter testing messages one by one, end with an empty line")
while True:
new_msg = input()
if len(new_msg) == 0:
break
message_queue.append(Message(9, round(time.time()), Chat(575246355, 'supergroup'), text=new_msg))