Code
import importlib
import time
import re
from sys import argv
from typing import Optional
from SaitamaRobot import (
ALLOW_EXCL,
CERT_PATH,
DONATION_LINK,
LOGGER,
OWNER_ID,
PORT,
TOKEN,
URL,
WEBHOOK,
SUPPORT_CHAT,
dispatcher,
StartTime,
telethn,
updater)
# needed to dynamically load modules
# NOTE: Module order is not guaranteed, specify that in the config file!
from SaitamaRobot.modules import ALL_MODULES
from SaitamaRobot.modules.helper_funcs.chat_status import is_user_admin
from SaitamaRobot.modules.helper_funcs.misc import paginate_modules
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, ParseMode, Update
from telegram.error import (
BadRequest,
ChatMigrated,
NetworkError,
TelegramError,
TimedOut,
Unauthorized,
)
from telegram.ext import (
CallbackContext,
CallbackQueryHandler,
CommandHandler,
Filters,
MessageHandler,
)
from telegram.ext.dispatcher import DispatcherHandlerStop, run_async
from telegram.utils.helpers import escape_markdown
def get_readable_time(seconds: int) -> str:
count = 0
ping_time = ""
time_list = []
time_suffix_list = ["s", "m", "h", "days"]
while count < 4:
count += 1
remainder, result = divmod(seconds, 60) if count < 3 else divmod(seconds, 24)
if seconds == 0 and remainder == 0:
break
time_list.append(int(result))
seconds = int(remainder)
for x in range(len(time_list)):
time_list[x] = str(time_list[x]) + time_suffix_list[x]
if len(time_list) == 4:
ping_time += time_list.pop() + ", "
time_list.reverse()
ping_time += ":".join(time_list)
return ping_time
PM_START_TEXT = """
Hey hi {}, I'm {}!
I am an Anime themed group management bot.
Built by weebs for weebs, I specialize in managing anime eccentric communities!
"""
HELP_STRINGS = """
Hey there! My name is *{}*.
I'm a Hero For Fun and help admins manage their groups with One Punch! Have a look at the following for an idea of some of \
the things I can help you with.
*Main* commands available:
• /help: PM's you this message.
• /help <module name>: PM's you info about that module.
• /donate: information on how to donate!
• /settings:
• in PM: will send you your settings for all supported modules.
• in a group: will redirect you to pm, with all that chat's settings.
{}
And the following:
""".format(
dispatcher.bot.first_name,
"" if not ALLOW_EXCL else "\nAll commands can either be used with / or !.\n",
)
SAITAMA_IMG = "https://telegra.ph/file/46e6d9dfcb3eb9eae95d9.jpg"
DONATE_STRING = """Heya, glad to hear you want to donate!
You can support the project via [Paypal](ko-fi.com/sawada) or by contacting @Sawada \
Supporting isnt always financial! \
Those who cannot provide monetary support are welcome to help us develop the bot at @OnePunchDev."""
IMPORTED = {}
MIGRATEABLE = []
HELPABLE = {}
STATS = []
USER_INFO = []
DATA_IMPORT = []
DATA_EXPORT = []
CHAT_SETTINGS = {}
USER_SETTINGS = {}
for module_name in ALL_MODULES:
imported_module = importlib.import_module("SaitamaRobot.modules." + module_name)
if not hasattr(imported_module, "__mod_name__"):
imported_module.__mod_name__ = imported_module.__name__
if imported_module.__mod_name__.lower() not in IMPORTED:
IMPORTED[imported_module.__mod_name__.lower()] = imported_module
else:
raise Exception("Can't have two modules with the same name! Please change one")
if hasattr(imported_module, "__help__") and imported_module.__help__:
HELPABLE[imported_module.__mod_name__.lower()] = imported_module
# Chats to migrate on chat_migrated events
if hasattr(imported_module, "__migrate__"):
MIGRATEABLE.append(imported_module)
if hasattr(imported_module, "__stats__"):
STATS.append(imported_module)
if hasattr(imported_module, "__user_info__"):
USER_INFO.append(imported_module)
if hasattr(imported_module, "__import_data__"):
DATA_IMPORT.append(imported_module)
if hasattr(imported_module, "__export_data__"):
DATA_EXPORT.append(imported_module)
if hasattr(imported_module, "__chat_settings__"):
CHAT_SETTINGS[imported_module.__mod_name__.lower()] = imported_module
if hasattr(imported_module, "__user_settings__"):
USER_SETTINGS[imported_module.__mod_name__.lower()] = imported_module
# do not async
def send_help(chat_id, text, keyboard=None):
if not keyboard:
keyboard = InlineKeyboardMarkup(paginate_modules(0, HELPABLE, "help"))
dispatcher.bot.send_message(
chat_id=chat_id,
text=text,
parse_mode=ParseMode.MARKDOWN,
disable_web_page_preview=True,
reply_markup=keyboard,
)
@run_async
def test(update: Update, context: CallbackContext):
# pprint(eval(str(update)))
# update.effective_message.reply_text("Hola tester! _I_ *have* `markdown`", parse_mode=ParseMode.MARKDOWN)
update.effective_message.reply_text("This person edited a message")
print(update.effective_message)
@run_async
def start(update: Update, context: CallbackContext):
args = context.args
uptime = get_readable_time((time.time() - StartTime))
if update.effective_chat.type == "private":
if len(args) >= 1:
if args[0].lower() == "help":
send_help(update.effective_chat.id, HELP_STRINGS)
elif args[0].lower().startswith("ghelp_"):
mod = args[0].lower().split("_", 1)[1]
if not HELPABLE.get(mod, False):
return
send_help(
update.effective_chat.id,
HELPABLE[mod].__help__,
InlineKeyboardMarkup(
[[InlineKeyboardButton(text="Back", callback_data="help_back")]],
),
)
elif args[0].lower() == "markdownhelp":
IMPORTED["extras"].markdown_help_sender(update)
elif args[0].lower() == "disasters":
IMPORTED["disasters"].send_disasters(update)
elif args[0].lower().startswith("stngs_"):
match = re.match("stngs_(.*)", args[0].lower())
chat = dispatcher.bot.getChat(match.group(1))
if is_user_admin(chat, update.effective_user.id):
send_settings(match.group(1), update.effective_user.id, False)
else:
send_settings(match.group(1), update.effective_user.id, True)
elif args[0][1:].isdigit() and "rules" in IMPORTED:
IMPORTED["rules"].send_rules(update, args[0], from_pm=True)
else:
first_name = update.effective_user.first_name
update.effective_message.reply_photo(
SAITAMA_IMG,
PM_START_TEXT.format(
escape_markdown(first_name), escape_markdown(context.bot.first_name),
),
parse_mode=ParseMode.MARKDOWN,
disable_web_page_preview=True,
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="☑️ Add me",
url="t.me/{}?startgroup=true".format(
context.bot.username,
),
),
],
[
InlineKeyboardButton(
text="🚑 Support",
url=f"https://t.me/{SUPPORT_CHAT}",
),
InlineKeyboardButton(
text="🔔 Updates",
url="https://t.me/OnePunchUpdates",
),
],
[
InlineKeyboardButton(
text="🧾 Getting Started",
url="https://t.me/OnePunchUpdates/29",
),
InlineKeyboardButton(
text="🗄 Source code",
url="https://github.com/AnimeKaizoku/SaitamaRobot",
),
],
[
InlineKeyboardButton(
text="☠️ Kaizoku Network",
url="https://t.me/Kaizoku/4",
),
],
],
),
)
else:
update.effective_message.reply_text(
"I'm awake already!\n<b>Haven't slept since:</b> <code>{}</code>".format(
uptime,
),
parse_mode=ParseMode.HTML,
)
# for test purposes
def error_callback(update: Update, context: CallbackContext):
error = context.error
try:
raise error
except Unauthorized:
print("no nono1")
print(error)
# remove update.message.chat_id from conversation list
except BadRequest:
print("no nono2")
print("BadRequest caught")
print(error)
# handle malformed requests - read more below!
except TimedOut:
print("no nono3")
# handle slow connection problems
except NetworkError:
print("no nono4")
# handle other connection problems
except ChatMigrated as err:
print("no nono5")
print(err)
# the chat_id of a group has changed, use e.new_chat_id instead
except TelegramError:
print(error)
# handle all other telegram related errors
@run_async
def help_button(update, context):
query = update.callback_query
mod_match = re.match(r"help_module\((.+?)\)", query.data)
prev_match = re.match(r"help_prev\((.+?)\)", query.data)
next_match = re.match(r"help_next\((.+?)\)", query.data)
back_match = re.match(r"help_back", query.data)
print(query.message.chat.id)
try:
if mod_match:
module = mod_match.group(1)
text = (
"Here is the help for the *{}* module:\n".format(
HELPABLE[module].__mod_name__,
)
+ HELPABLE[module].__help__
)
query.message.edit_text(
text=text,
parse_mode=ParseMode.MARKDOWN,
disable_web_page_preview=True,
reply_markup=InlineKeyboardMarkup(
[[InlineKeyboardButton(text="Back", callback_data="help_back")]],
),
)
elif prev_match:
curr_page = int(prev_match.group(1))
query.message.edit_text(
text=HELP_STRINGS,
parse_mode=ParseMode.MARKDOWN,
reply_markup=InlineKeyboardMarkup(
paginate_modules(curr_page - 1, HELPABLE, "help"),
),
)
elif next_match:
next_page = int(next_match.group(1))
query.message.edit_text(
text=HELP_STRINGS,
parse_mode=ParseMode.MARKDOWN,
reply_markup=InlineKeyboardMarkup(
paginate_modules(next_page + 1, HELPABLE, "help"),
),
)
elif back_match:
query.message.edit_text(
text=HELP_STRINGS,
parse_mode=ParseMode.MARKDOWN,
reply_markup=InlineKeyboardMarkup(
paginate_modules(0, HELPABLE, "help"),
),
)
# ensure no spinny white circle
context.bot.answer_callback_query(query.id)
# query.message.delete()
except BadRequest:
pass
@run_async
def get_help(update: Update, context: CallbackContext):
chat = update.effective_chat # type: Optional[Chat]
args = update.effective_message.text.split(None, 1)
# ONLY send help in PM
if chat.type != chat.PRIVATE:
if len(args) >= 2 and any(args[1].lower() == x for x in HELPABLE):
module = args[1].lower()
update.effective_message.reply_text(
f"Contact me in PM to get help of {module.capitalize()}",
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="Help",
url="t.me/{}?start=ghelp_{}".format(
context.bot.username, module,
),
),
],
],
),
)
return
update.effective_message.reply_text(
"Contact me in PM to get the list of possible commands.",
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="Help",
url="t.me/{}?start=help".format(context.bot.username),
),
],
],
),
)
return
elif len(args) >= 2 and any(args[1].lower() == x for x in HELPABLE):
module = args[1].lower()
text = (
"Here is the available help for the *{}* module:\n".format(
HELPABLE[module].__mod_name__,
)
+ HELPABLE[module].__help__
)
send_help(
chat.id,
text,
InlineKeyboardMarkup(
[[InlineKeyboardButton(text="Back", callback_data="help_back")]],
),
)
else:
send_help(chat.id, HELP_STRINGS)
def send_settings(chat_id, user_id, user=False):
if user:
if USER_SETTINGS:
settings = "\n\n".join(
"*{}*:\n{}".format(mod.__mod_name__, mod.__user_settings__(user_id))
for mod in USER_SETTINGS.values()
)
dispatcher.bot.send_message(
user_id,
"These are your current settings:" + "\n\n" + settings,
parse_mode=ParseMode.MARKDOWN,
)
else:
dispatcher.bot.send_message(
user_id,
"Seems like there aren't any user specific settings available :'(",
parse_mode=ParseMode.MARKDOWN,
)
else:
if CHAT_SETTINGS:
chat_name = dispatcher.bot.getChat(chat_id).title
dispatcher.bot.send_message(
user_id,
text="Which module would you like to check {}'s settings for?".format(
chat_name,
),
reply_markup=InlineKeyboardMarkup(
paginate_modules(0, CHAT_SETTINGS, "stngs", chat=chat_id),
),
)
else:
dispatcher.bot.send_message(
user_id,
"Seems like there aren't any chat settings available :'(\nSend this "
"in a group chat you're admin in to find its current settings!",
parse_mode=ParseMode.MARKDOWN,
)
@run_async
def settings_button(update: Update, context: CallbackContext):
query = update.callback_query
user = update.effective_user
bot = context.bot
mod_match = re.match(r"stngs_module\((.+?),(.+?)\)", query.data)
prev_match = re.match(r"stngs_prev\((.+?),(.+?)\)", query.data)
next_match = re.match(r"stngs_next\((.+?),(.+?)\)", query.data)
back_match = re.match(r"stngs_back\((.+?)\)", query.data)
try:
if mod_match:
chat_id = mod_match.group(1)
module = mod_match.group(2)
chat = bot.get_chat(chat_id)
text = "*{}* has the following settings for the *{}* module:\n\n".format(
escape_markdown(chat.title), CHAT_SETTINGS[module].__mod_name__,
) + CHAT_SETTINGS[module].__chat_settings__(chat_id, user.id)
query.message.reply_text(
text=text,
parse_mode=ParseMode.MARKDOWN,
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="Back",
callback_data="stngs_back({})".format(chat_id),
),
],
],
),
)
elif prev_match:
chat_id = prev_match.group(1)
curr_page = int(prev_match.group(2))
chat = bot.get_chat(chat_id)
query.message.reply_text(
"Hi there! There are quite a few settings for {} - go ahead and pick what "
"you're interested in.".format(chat.title),
reply_markup=InlineKeyboardMarkup(
paginate_modules(
curr_page - 1, CHAT_SETTINGS, "stngs", chat=chat_id,
),
),
)
elif next_match:
chat_id = next_match.group(1)
next_page = int(next_match.group(2))
chat = bot.get_chat(chat_id)
query.message.reply_text(
"Hi there! There are quite a few settings for {} - go ahead and pick what "
"you're interested in.".format(chat.title),
reply_markup=InlineKeyboardMarkup(
paginate_modules(
next_page + 1, CHAT_SETTINGS, "stngs", chat=chat_id,
),
),
)
elif back_match:
chat_id = back_match.group(1)
chat = bot.get_chat(chat_id)
query.message.reply_text(
text="Hi there! There are quite a few settings for {} - go ahead and pick what "
"you're interested in.".format(escape_markdown(chat.title)),
parse_mode=ParseMode.MARKDOWN,
reply_markup=InlineKeyboardMarkup(
paginate_modules(0, CHAT_SETTINGS, "stngs", chat=chat_id),
),
)
# ensure no spinny white circle
bot.answer_callback_query(query.id)
query.message.delete()
except BadRequest as excp:
if excp.message not in [
"Message is not modified",
"Query_id_invalid",
"Message can't be deleted",
]:
LOGGER.exception("Exception in settings buttons. %s", str(query.data))
@run_async
def get_settings(update: Update, context: CallbackContext):
chat = update.effective_chat # type: Optional[Chat]
user = update.effective_user # type: Optional[User]
msg = update.effective_message # type: Optional[Message]
# ONLY send settings in PM
if chat.type != chat.PRIVATE:
if is_user_admin(chat, user.id):
text = "Click here to get this chat's settings, as well as yours."
msg.reply_text(
text,
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="Settings",
url="t.me/{}?start=stngs_{}".format(
context.bot.username, chat.id,
),
),
],
],
),
)
else:
text = "Click here to check your settings."
else:
send_settings(chat.id, user.id, True)
@run_async
def donate(update: Update, context: CallbackContext):
user = update.effective_message.from_user
chat = update.effective_chat # type: Optional[Chat]
bot = context.bot
if chat.type == "private":
update.effective_message.reply_text(
DONATE_STRING, parse_mode=ParseMode.MARKDOWN, disable_web_page_preview=True,
)
if OWNER_ID != 254318997 and DONATION_LINK:
update.effective_message.reply_text(
"You can also donate to the person currently running me "
"[here]({})".format(DONATION_LINK),
parse_mode=ParseMode.MARKDOWN,
)
else:
try:
bot.send_message(
user.id,
DONATE_STRING,
parse_mode=ParseMode.MARKDOWN,
disable_web_page_preview=True,
)
update.effective_message.reply_text(
"I've PM'ed you about donating to my creator!",
)
except Unauthorized:
update.effective_message.reply_text(
"Contact me in PM first to get donation information.",
)
def migrate_chats(update: Update, context: CallbackContext):
msg = update.effective_message # type: Optional[Message]
if msg.migrate_to_chat_id:
old_chat = update.effective_chat.id
new_chat = msg.migrate_to_chat_id
elif msg.migrate_from_chat_id:
old_chat = msg.migrate_from_chat_id
new_chat = update.effective_chat.id
else:
return
LOGGER.info("Migrating from %s, to %s", str(old_chat), str(new_chat))
for mod in MIGRATEABLE:
mod.__migrate__(old_chat, new_chat)
LOGGER.info("Successfully migrated!")
raise DispatcherHandlerStop
def main():
if SUPPORT_CHAT is not None and isinstance(SUPPORT_CHAT, str):
try:
dispatcher.bot.sendMessage(f"@{SUPPORT_CHAT}", "I am now online!")
except Unauthorized:
LOGGER.warning(
"Bot isnt able to send message to support_chat, go and check!",
)
except BadRequest as e:
LOGGER.warning(e.message)
test_handler = CommandHandler("test", test)
start_handler = CommandHandler("start", start)
help_handler = CommandHandler("help", get_help)
help_callback_handler = CallbackQueryHandler(help_button, pattern=r"help_.*")
settings_handler = CommandHandler("settings", get_settings)
settings_callback_handler = CallbackQueryHandler(settings_button, pattern=r"stngs_")
donate_handler = CommandHandler("donate", donate)
migrate_handler = MessageHandler(Filters.status_update.migrate, migrate_chats)
# dispatcher.add_handler(test_handler)
dispatcher.add_handler(start_handler)
dispatcher.add_handler(help_handler)
dispatcher.add_handler(settings_handler)
dispatcher.add_handler(help_callback_handler)
dispatcher.add_handler(settings_callback_handler)
dispatcher.add_handler(migrate_handler)
dispatcher.add_handler(donate_handler)
dispatcher.add_error_handler(error_callback)
if WEBHOOK:
LOGGER.info("Using webhooks.")
updater.start_webhook(listen="0.0.0.0", port=PORT, url_path=TOKEN)
if CERT_PATH:
updater.bot.set_webhook(url=URL + TOKEN, certificate=open(CERT_PATH, "rb"))
else:
updater.bot.set_webhook(url=URL + TOKEN)
else:
LOGGER.info("Using long polling.")
updater.start_polling(timeout=15, read_latency=4, clean=True)
if len(argv) not in (1, 3, 4):
telethn.disconnect()
else:
telethn.run_until_disconnected()
updater.idle()
if __name__ == "__main__":
LOGGER.info("Successfully loaded modules: " + str(ALL_MODULES))
telethn.start(bot_token=TOKEN)
main()