forked from dymik739/modular-bot-framework-for-telegram
		
	added the ability to test modules locally without using dedicated telegram bot
This commit is contained in:
		
							parent
							
								
									4a3f46f2a2
								
							
						
					
					
						commit
						a1d76d3bad
					
				
							
								
								
									
										240
									
								
								module-testing.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										240
									
								
								module-testing.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,240 @@
 | 
			
		||||
import datetime
 | 
			
		||||
import codecs
 | 
			
		||||
import time
 | 
			
		||||
import json
 | 
			
		||||
import sys
 | 
			
		||||
import os
 | 
			
		||||
import threading
 | 
			
		||||
 | 
			
		||||
# global variables
 | 
			
		||||
STOP_REQUESTED = False
 | 
			
		||||
 | 
			
		||||
# some functions that increase readability of the code
 | 
			
		||||
def readfile(filename):
 | 
			
		||||
    try:
 | 
			
		||||
        return codecs.open(filename, encoding = "utf-8").read()
 | 
			
		||||
    except FileNotFoundError:
 | 
			
		||||
        return False
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        print( "[ERROR] Unexpected error occured in readfile() ({0})".format(e) )
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
# module object classes
 | 
			
		||||
class ModuleV1:
 | 
			
		||||
    def __init__(self, path, code, enabled, alias, predefine):
 | 
			
		||||
        self.version = 1
 | 
			
		||||
        self.enabled = enabled
 | 
			
		||||
        self.code = code
 | 
			
		||||
        self.alias = alias
 | 
			
		||||
        self.path = path
 | 
			
		||||
        self.predefine = predefine
 | 
			
		||||
 | 
			
		||||
        if self.predefine:
 | 
			
		||||
            self.set_predefine()
 | 
			
		||||
 | 
			
		||||
    # set environmental variables
 | 
			
		||||
    def set_env(self):
 | 
			
		||||
        self.RESPONCE = ""
 | 
			
		||||
 | 
			
		||||
    def set_predefine(self):
 | 
			
		||||
        try:
 | 
			
		||||
            exec(self.predefine)
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            print("[ERROR] module: module \"{}\" ({}) raised exception \"{}\" during predefine stage, disabling it...".format(self.path, self.alias, e))
 | 
			
		||||
 | 
			
		||||
    # running the module
 | 
			
		||||
    def process(self, msg):
 | 
			
		||||
        self.set_env()
 | 
			
		||||
 | 
			
		||||
        self.MESSAGE = msg
 | 
			
		||||
        try:
 | 
			
		||||
            exec(self.code)
 | 
			
		||||
            return self.RESPONCE
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            print("[ERROR] module: module \"{}\" ({}) raised exception \"{}\"".format(self.path, self.alias, e))
 | 
			
		||||
            return ""
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# module control unit
 | 
			
		||||
class ModuleControlUnit:
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        self.modules = []
 | 
			
		||||
        self.reload_modules()
 | 
			
		||||
 | 
			
		||||
        print("[INFO] ModuleControlUnit: initialized successfully")
 | 
			
		||||
 | 
			
		||||
    def reload_modules(self):
 | 
			
		||||
        for folder in os.listdir("modules/"):
 | 
			
		||||
            try:
 | 
			
		||||
                meta_raw = readfile("modules/{}/meta.json".format(folder))
 | 
			
		||||
                if not meta_raw:
 | 
			
		||||
                    print("[WARN] module_loader: no meta.json found in module folder \"{}\"".format(folder))
 | 
			
		||||
                    continue
 | 
			
		||||
 | 
			
		||||
                meta = json.loads( meta_raw )
 | 
			
		||||
                if "version" in meta:
 | 
			
		||||
                    if meta["version"] == 1:
 | 
			
		||||
                        if "index_file" in meta:
 | 
			
		||||
                            index_file = meta["index_file"]
 | 
			
		||||
                        else:
 | 
			
		||||
                            index_file = "index.py"
 | 
			
		||||
 | 
			
		||||
                        code = readfile( "modules/{}/{}".format(folder, index_file) )
 | 
			
		||||
                        if not code: # False both when readfile() returns False and when the code string is empty
 | 
			
		||||
                            print("[WARN] reload_modules: module {} does not have any code, skipping...".format(folder))
 | 
			
		||||
                            continue
 | 
			
		||||
                        
 | 
			
		||||
                        if "start_on_boot" in meta:
 | 
			
		||||
                            enabled = meta["start_on_boot"]
 | 
			
		||||
                        else:
 | 
			
		||||
                            enabled = False
 | 
			
		||||
 | 
			
		||||
                        if "alias" in meta:
 | 
			
		||||
                            alias = meta["alias"]
 | 
			
		||||
                        else:
 | 
			
		||||
                            alias = None
 | 
			
		||||
 | 
			
		||||
                        if "predefine" in meta:
 | 
			
		||||
                            predefine = readfile("modules/{}/{}".format(folder, meta["predefine"]))
 | 
			
		||||
                        else:
 | 
			
		||||
                            predefine = False
 | 
			
		||||
 | 
			
		||||
                        self.modules.append( ModuleV1( "modules/{}/".format(folder), code, enabled, alias, predefine ) )
 | 
			
		||||
 | 
			
		||||
                        print("[INFO] reload_modules: successfully loaded {} as {} (start_on_boot: {})".format(folder, alias, enabled))
 | 
			
		||||
                    else:
 | 
			
		||||
                        print("[WARN] reload_modules: module {} requires unsupported version ({} > 1), skipping...".format(folder, meta["version"]))
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                print("[ERROR] module_loader: error while loading module \"{}\" ({})".format(folder, e))
 | 
			
		||||
 | 
			
		||||
    def reload_module(self, folder_name):
 | 
			
		||||
        try:
 | 
			
		||||
            meta_raw = readfile("modules/{}/meta.json".format(folder))
 | 
			
		||||
            if not meta_raw:
 | 
			
		||||
                print("[WARN] module_loader: no meta.json found in module folder \"{}\"".format(folder_name))
 | 
			
		||||
                return 1
 | 
			
		||||
 | 
			
		||||
            meta = json.loads( readfile("modules/{}/meta.json".format(folder_name)) )
 | 
			
		||||
            if "version" in meta:
 | 
			
		||||
                if meta["version"] == 1:
 | 
			
		||||
                    if "index_file" in meta:
 | 
			
		||||
                        index_file = meta["index_file"]
 | 
			
		||||
                    else:
 | 
			
		||||
                        index_file = "index.py"
 | 
			
		||||
 | 
			
		||||
                    code = readfile( "modules/{}/{}".format(folder_name, index_file) )
 | 
			
		||||
                    if not code: # False both when readfile() returns False and when the code string is empty
 | 
			
		||||
                        print("[WARN] reload_modules: module {} does not have any code, skipping...".format(folder_name))
 | 
			
		||||
                
 | 
			
		||||
                    if "start_on_boot" in meta:
 | 
			
		||||
                        enabled = meta["start_on_boot"]
 | 
			
		||||
                    else:
 | 
			
		||||
                        enabled = False
 | 
			
		||||
 | 
			
		||||
                    if "alias" in meta:
 | 
			
		||||
                        alias = meta["alias"]
 | 
			
		||||
                    else:
 | 
			
		||||
                        alias = None
 | 
			
		||||
 | 
			
		||||
                    self.modules.append( ModuleV1( code, enabled, alias ) )
 | 
			
		||||
 | 
			
		||||
                    print("[INFO] reload_modules: successfully loaded {} as {} (start_on_boot: {})".format(folder_name, alias, enabled))
 | 
			
		||||
                else:
 | 
			
		||||
                    print("[WARN] reload_modules: module {} requires unsupported version ({} > 1), skipping...".format(folder_name, meta["version"]))
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            print("[ERROR] module_loader: error while loading module \"{}\" ({})".format(folder_name, e))
 | 
			
		||||
 | 
			
		||||
# message queue object to go back to synchronous message processing
 | 
			
		||||
#class MessageQueue:
 | 
			
		||||
#    def __init__(self):
 | 
			
		||||
#        print("[INFO] Initializing the message queue...")
 | 
			
		||||
#        self.queue = []
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# synchronous message processor
 | 
			
		||||
def queue_processor():
 | 
			
		||||
    while True:
 | 
			
		||||
        if len(message_queue) > 0:
 | 
			
		||||
            msg = message_queue[0]
 | 
			
		||||
            print("[DEBUG] queue_processor: {}".format(msg)) # debug
 | 
			
		||||
 | 
			
		||||
            # check for control commands
 | 
			
		||||
            if msg["chat"]["id"] == 575246355:
 | 
			
		||||
                if msg["text"][0] == "$":
 | 
			
		||||
                    command = msg["text"][1:].split(" ")
 | 
			
		||||
                    
 | 
			
		||||
                    if len(command) >= 2 and command[0] == "module":
 | 
			
		||||
                        if command[1] == "reload":
 | 
			
		||||
                            print("[INFO] Module reloading triggered by a command")
 | 
			
		||||
                            del mcu.modules[:]
 | 
			
		||||
                            mcu.reload_modules()
 | 
			
		||||
                    
 | 
			
		||||
                    del message_queue[0]
 | 
			
		||||
                    continue
 | 
			
		||||
 | 
			
		||||
            # modules are used in here
 | 
			
		||||
            for mod in mcu.modules:
 | 
			
		||||
                if mod.enabled:
 | 
			
		||||
                    if mod.version == 1:
 | 
			
		||||
                        responce = mod.process(msg)
 | 
			
		||||
                        if len(responce) > 0:
 | 
			
		||||
                            #updater.bot.send_message(chat_id = msg.chat.id, text = responce)
 | 
			
		||||
                            print("Responded using module {} ({}) with text: {}".format(mod.path, mod.alias, responce))
 | 
			
		||||
                            break
 | 
			
		||||
 | 
			
		||||
            del message_queue[0]
 | 
			
		||||
        else:
 | 
			
		||||
            if STOP_REQUESTED:
 | 
			
		||||
                break
 | 
			
		||||
            else:
 | 
			
		||||
                time.sleep(1)
 | 
			
		||||
 | 
			
		||||
    print("[INFO] queue_processor thread stops successfully")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# telegram bot processor
 | 
			
		||||
def message_handler(update, context):
 | 
			
		||||
    print("[DEBUG] Received new message") # just for testing
 | 
			
		||||
    message_queue.append(update.message)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# --- Final stage ---
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# initializing services and queues
 | 
			
		||||
 | 
			
		||||
message_queue = []
 | 
			
		||||
 | 
			
		||||
mcu = ModuleControlUnit()
 | 
			
		||||
 | 
			
		||||
processor_thread = threading.Thread( target = queue_processor, args = [] )
 | 
			
		||||
processor_thread.start()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# connecting to Telegram servers and listening for messages
 | 
			
		||||
'''
 | 
			
		||||
TOKEN = readfile("config/token")
 | 
			
		||||
if not TOKEN:
 | 
			
		||||
    print("[CRIT] Token has not been defined, quitting")
 | 
			
		||||
    sys.exit(1)
 | 
			
		||||
 | 
			
		||||
# connect to Telegram servers
 | 
			
		||||
updater = Updater(TOKEN, use_context = True)
 | 
			
		||||
dispatcher = updater.dispatcher
 | 
			
		||||
 | 
			
		||||
# assign the handler for messages
 | 
			
		||||
dispatcher.add_handler(MessageHandler(Filters.text, message_handler))
 | 
			
		||||
 | 
			
		||||
# run the bot
 | 
			
		||||
updater.start_polling()
 | 
			
		||||
updater.idle()
 | 
			
		||||
'''
 | 
			
		||||
 | 
			
		||||
print("Enter testing messages one by one, end with an empty line")
 | 
			
		||||
 | 
			
		||||
while True:
 | 
			
		||||
    new_msg = input()
 | 
			
		||||
    if len(new_msg) == 0:
 | 
			
		||||
        break
 | 
			
		||||
 | 
			
		||||
    message_queue.append({'text': new_msg, 'new_chat_photo': [], 'message_id': 74, 'new_chat_members': [], 'delete_chat_photo': False, 'group_chat_created': False, 'supergroup_chat_created': False, 'chat': {'first_name': 'Дмитро', 'username': 'dmytrofiot23', 'id': 575246355, 'type': 'private'}, 'caption_entities': [], 'channel_chat_created': False, 'date': 1667585646, 'photo': [], 'entities': [], 'from': {'first_name': 'Дмитро', 'id': 575246355, 'is_bot': False, 'username': 'dmytrofiot23', 'language_code': 'en'}})
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user