Compare commits
5 Commits
51307b5234
...
e830bae282
Author | SHA1 | Date |
---|---|---|
dymik739 | e830bae282 | |
dymik739 | 2a9bd28d65 | |
dymik739 | 2b8105d2a8 | |
dymik739 | a7bbb1c1cc | |
dymik739 | e002dc46a2 |
22
main.py
22
main.py
|
@ -11,6 +11,7 @@ import traceback
|
|||
|
||||
# global variables
|
||||
STOP_REQUESTED = False
|
||||
DEBUG_MODE = False
|
||||
DELAY_AFTER_RESPONSE = 3
|
||||
DELAY_AFTER_MESSAGE = 0.1
|
||||
DELAY_AFTER_IDLE = 1.0
|
||||
|
@ -48,6 +49,8 @@ class ModuleV1:
|
|||
|
||||
def set_predefine(self):
|
||||
try:
|
||||
if DEBUG_MODE:
|
||||
print(f"Predefine on module v1 {self.alias} ({self.path})")
|
||||
exec(self.predefine)
|
||||
except Exception as e:
|
||||
print(f"[ERROR] module v1: module \"{self.path}\" ({self.alias}) raised exception \"{e}\" "
|
||||
|
@ -59,6 +62,8 @@ class ModuleV1:
|
|||
|
||||
self.MESSAGE = msg
|
||||
try:
|
||||
if DEBUG_MODE:
|
||||
print(f"Calling module v1 {self.alias} ({self.path})")
|
||||
exec(self.code)
|
||||
return self.RESPONSE, self.FORMAT
|
||||
except Exception as e:
|
||||
|
@ -79,6 +84,8 @@ class ModuleV2:
|
|||
# running the module
|
||||
def process(self, msg):
|
||||
try:
|
||||
if DEBUG_MODE:
|
||||
print(f"Calling module v2 {self.alias} ({self.path})")
|
||||
return self.obj.process(msg, self.path)
|
||||
except Exception as e:
|
||||
print(f"[ERROR] module v2: module \"{self.path}\" ({self.alias}) raised exception \"{e}\"")
|
||||
|
@ -276,27 +283,38 @@ def queue_processor():
|
|||
print(f"[INFO]: Queue length is {len(message_queue)}")
|
||||
updater.bot.send_message(msg.chat.id, f"[INFO]: Queue length is {len(message_queue)}")
|
||||
|
||||
elif len(command) == 2 and command[0] == "debug":
|
||||
global DEBUG_MODE
|
||||
if command[1] == "on":
|
||||
DEBUG_MODE = True
|
||||
else:
|
||||
DEBUG_MODE = False
|
||||
|
||||
continue
|
||||
|
||||
# modules are used in here
|
||||
for mod in mcu.modules:
|
||||
if mod.enabled:
|
||||
if mod.version in [1, 2]:
|
||||
try:
|
||||
response, formatting = mod.process(msg)
|
||||
except Exception as e:
|
||||
print(f"Module {mod.alias} ({mod.path}) failed to do a proper return, skipping...")
|
||||
continue
|
||||
|
||||
if response:
|
||||
if not formatting:
|
||||
print(f"Responding using module {mod.path} ({mod.alias}) with text: {response}")
|
||||
updater.bot.send_message(chat_id=msg.chat.id, text=response,
|
||||
disable_web_page_preview=True)
|
||||
print(f"Responded using module {mod.path} ({mod.alias}) with text: {response}")
|
||||
time.sleep(DELAY_AFTER_RESPONSE)
|
||||
break
|
||||
|
||||
elif formatting in ["Markdown", "MarkdownV2", "HTML"]:
|
||||
print(f"Responding using module {mod.path} ({mod.alias}) with text (using {formatting}): {response}")
|
||||
updater.bot.send_message(chat_id=msg.chat.id, text=response,
|
||||
disable_web_page_preview=True,
|
||||
parse_mode=formatting)
|
||||
print(f"Responded using module {mod.path} ({mod.alias}) with text (using {formatting}): {response}")
|
||||
time.sleep(DELAY_AFTER_RESPONSE)
|
||||
break
|
||||
|
||||
|
|
|
@ -158,7 +158,7 @@
|
|||
{
|
||||
"name": "Політична наука: конфліктологічний підхід",
|
||||
"teacher": "Северинчик О. П.",
|
||||
"link": "(посилання відсутнє!)",
|
||||
"link": "https://us04web.zoom.us/j/2279372490?pwd=bHR5QmpCT0tvQXJMLzRzaldHbFZ3dz09",
|
||||
"type": "prac",
|
||||
"selectable": true
|
||||
},
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
Usage: !hardsub [event_name]
|
||||
|
||||
Permanently subscribes to [event_name]. If blank, uses "default".
|
||||
To unsubscribe, use !hardunsub
|
||||
For one-time subscriptions, use !sub
|
|
@ -0,0 +1,5 @@
|
|||
Usage: !hardunsub [event_name]
|
||||
|
||||
Remove permanent subscription to [event_name]. If blank, uses "default".
|
||||
To remove one-time subscription, use !unsub
|
||||
To add permanent subscription, use !hardsub
|
|
@ -0,0 +1,5 @@
|
|||
Usage:
|
||||
!schedule-ctl list - see all settings
|
||||
!schedule-ctl get <setting> - see specific personal setting
|
||||
!schedule-ctl set <setting> <value> - change <setting> to <value>
|
||||
!schedule-ctl clear <setting> - restore <setting> to default value
|
|
@ -0,0 +1,7 @@
|
|||
Usage: !sub [event_name]
|
||||
|
||||
Soft-subscribes to [event_name]. If black, uses "default".
|
||||
If you want to remove soft substription, use !unsub
|
||||
|
||||
Soft subscription is automatically removed after the event is triggered once.
|
||||
If you want to subscribe permanently, use !hardsub
|
|
@ -0,0 +1,5 @@
|
|||
Usage: !unsub [event_name]
|
||||
|
||||
Removes one-time subscription to [event_name]. If blank, uses "default".
|
||||
Does not affect hard subscriptions. To remove hard subscription, use !hardunsub
|
||||
To add one-time supscription, use !sub
|
|
@ -0,0 +1,54 @@
|
|||
import json
|
||||
import os
|
||||
|
||||
module_path = ""
|
||||
|
||||
def readfile(filename):
|
||||
if os.path.exists(module_path + filename):
|
||||
return open(module_path + filename).read()
|
||||
else:
|
||||
return False
|
||||
|
||||
def list_entries(search_query):
|
||||
files = []
|
||||
for p, d, f in os.walk(module_path + "db/"):
|
||||
for i in f:
|
||||
entry_name = f"{p.split('/', 3)[3]}/{i}"
|
||||
if search_query in entry_name:
|
||||
files.append(f"{p.split('/', 3)[3]}/{i}".rsplit(".", 1)[0].replace("/", ".").lstrip("."))
|
||||
|
||||
files.sort()
|
||||
return files
|
||||
|
||||
def process(message, path):
|
||||
if message.text[:5] != "!help":
|
||||
return None, None
|
||||
|
||||
global module_path
|
||||
module_path = path
|
||||
|
||||
cmd = message.text[1:].split()
|
||||
|
||||
if len(cmd) == 1:
|
||||
return "Usage:\n```\n!help show <entry> - displays specified manual entry\n!help list [query] - lists out available entries (optionally, filtered by [query])```", "Markdown"
|
||||
|
||||
elif cmd[1] == "list":
|
||||
if len(cmd) == 2:
|
||||
return "Available entries:\n" + "\n".join(list_entries("")), None
|
||||
else:
|
||||
return f"Found entries for {cmd[2]}:\n" + "\n".join(list_entries(cmd[2])), None
|
||||
|
||||
elif cmd[1] == "show":
|
||||
if len(cmd) >= 3:
|
||||
result = ""
|
||||
for entry_name in cmd[2:]:
|
||||
data = readfile("db/" + entry_name.replace(".", "/") + ".txt")
|
||||
|
||||
if data:
|
||||
result += f"Manual page for {entry_name}:\n```\n{data}```\n"
|
||||
else:
|
||||
result += f"No manual found for {entry_name}\n\n"
|
||||
|
||||
return result, "Markdown"
|
||||
else:
|
||||
return "Usage: !help show <entry> - displays specified manual entry", None
|
|
@ -0,0 +1,6 @@
|
|||
{
|
||||
"start_on_boot": true,
|
||||
"alias": "help",
|
||||
"version": 2,
|
||||
"index_file": "index.py"
|
||||
}
|
|
@ -0,0 +1,133 @@
|
|||
import os
|
||||
import json
|
||||
|
||||
def setup():
|
||||
for i in ["sub-storage"]:
|
||||
if not os.path.exists(module_path + i):
|
||||
os.mkdir(module_path + i)
|
||||
|
||||
def load_group(group_name):
|
||||
if os.path.exists(module_path + f"sub-storage/{group_name}.json"):
|
||||
return json.loads(open(module_path + f"sub-storage/{group_name}.json").read())
|
||||
else:
|
||||
return {"hard": [], "soft": []}
|
||||
|
||||
def save_group(group_name, existing_group):
|
||||
with open(module_path + f"sub-storage/{group_name}.json", "w") as f:
|
||||
f.write(json.dumps(existing_group))
|
||||
|
||||
def sub_to_group(group_name, tag, persistence = "soft"):
|
||||
for i in ["..", "/", "\\"]:
|
||||
if i in group_name:
|
||||
return False
|
||||
|
||||
existing_group = load_group(group_name)
|
||||
|
||||
if tag not in existing_group[persistence]:
|
||||
existing_group[persistence].append(tag)
|
||||
|
||||
save_group(group_name, existing_group)
|
||||
|
||||
return True
|
||||
|
||||
def unsub_from_group(group_name, tag, persistence = "soft"):
|
||||
for i in ["..", "/", "\\"]:
|
||||
if i in group_name:
|
||||
return False
|
||||
|
||||
existing_group = load_group(group_name)
|
||||
|
||||
if tag in existing_group[persistence]:
|
||||
existing_group[persistence].remove(tag)
|
||||
|
||||
save_group(group_name, existing_group)
|
||||
|
||||
return True
|
||||
|
||||
def get_group(cmd):
|
||||
if len(cmd) == 1:
|
||||
return "default"
|
||||
else:
|
||||
return cmd[1]
|
||||
|
||||
def clear_subs_in(group_name, category = "soft"):
|
||||
existing_group = load_group(group_name)
|
||||
existing_group[category] = []
|
||||
save_group(group_name, existing_group)
|
||||
|
||||
module_path = ""
|
||||
setup()
|
||||
|
||||
def process(message, path):
|
||||
global module_path
|
||||
module_path = path
|
||||
|
||||
if not message.text[0] == "!":
|
||||
return "", None
|
||||
|
||||
msg = message.text.lower()
|
||||
cmd = [i for i in msg[1:].split(" ", 2) if i]
|
||||
|
||||
chosen_group_name = get_group(cmd)
|
||||
|
||||
if cmd[0] == "sub":
|
||||
if message.from_user.username:
|
||||
if sub_to_group(chosen_group_name, message.from_user.username):
|
||||
return f"Subscribed {message.from_user.username} to the {chosen_group_name} event", None
|
||||
else:
|
||||
return f"Failed to subscribe {message.from_user.username} to the \"{chosen_group_name}\" event (event name violation)", None
|
||||
else:
|
||||
return f"Failed to subscribe {message.from_user.id} to the \"{chosen_group_name}\" event (no username available)", None
|
||||
|
||||
if cmd[0] == "hardsub":
|
||||
if message.from_user.username:
|
||||
if sub_to_group(chosen_group_name, message.from_user.username, persistence = "hard"):
|
||||
return f"Hard-subscribed {message.from_user.username} to the {chosen_group_name} event", None
|
||||
else:
|
||||
return f"Failed to hard-subscribe {message.from_user.username} to the \"{chosen_group_name}\" event (event name violation)", None
|
||||
else:
|
||||
return f"Failed to hard-subscribe {message.from_user.id} to the \"{chosen_group_name}\" event (no username available)", None
|
||||
|
||||
elif cmd[0] == "unsub":
|
||||
if message.from_user.username:
|
||||
if unsub_from_group(chosen_group_name, message.from_user.username):
|
||||
return f"Unsubscribed {message.from_user.username} from the {chosen_group_name} event", None
|
||||
else:
|
||||
return f"Failed to unsubscribe {message.from_user.username} from the \"{chosen_group_name}\" event (event name violation)", None
|
||||
else:
|
||||
return f"Failed to unsubscribe {message.from_user.id} from the \"{chosen_group_name}\" event (no username available)", None
|
||||
|
||||
elif cmd[0] == "hardunsub":
|
||||
if message.from_user.username:
|
||||
if unsub_from_group(chosen_group_name, message.from_user.username, persistence = "hard"):
|
||||
return f"Unsubscribed {message.from_user.username} from the {chosen_group_name} event", None
|
||||
else:
|
||||
return f"Failed to unsubscribe {message.from_user.username} from the \"{chosen_group_name}\" event (event name violation)", None
|
||||
else:
|
||||
return f"Failed to unsubscribe {message.from_user.id} from the \"{chosen_group_name}\" event (no username available)", None
|
||||
|
||||
elif cmd[0] == "call":
|
||||
group_data = load_group(chosen_group_name)
|
||||
tags = set()
|
||||
|
||||
for subs in group_data.values():
|
||||
tags |= set([f"@{i}" for i in subs if i])
|
||||
|
||||
ping_string = " ".join(tags)
|
||||
clear_subs_in(chosen_group_name)
|
||||
|
||||
if len(cmd) == 3:
|
||||
return f"{message.from_user.username} calls \"{chosen_group_name}\": {cmd[2]}\n{ping_string}", None
|
||||
else:
|
||||
return f"{message.from_user.username} calls \"{chosen_group_name}\"\n{ping_string}", None
|
||||
|
||||
|
||||
elif cmd[0] == "sublist":
|
||||
group_data = load_group(chosen_group_name)
|
||||
responce = f"Subscribers for group \"{chosen_group_name}\":\n"
|
||||
|
||||
for name, subs in group_data.items():
|
||||
responce += f"{name}: {', '.join(subs)}\n"
|
||||
|
||||
return responce, None
|
||||
|
|
@ -0,0 +1,6 @@
|
|||
{
|
||||
"start_on_boot": true,
|
||||
"alias": "tag-sub",
|
||||
"version": 2,
|
||||
"index_file": "index.py"
|
||||
}
|
Loading…
Reference in New Issue