Source code

Source code

Lee

Skip to content



 PaulSonOfLars / tgbot


Code Issues 0 Pull requests 11 Projects 0 Wiki Security Pulse Community

Branch: master 

Find file

Copy path

tgbot/tg_bot/modules/warns.py

@julianodorneles

 julianodorneles At warn and ban commands, add user ID info to log channel message (#39)

ef9761d on Aug 21, 2018

3 contributors

434 lines (350 sloc)  17.7 KB

Raw

Blame

History

 

import htmlimport refrom typing import Optional, List

import telegramfrom telegram import InlineKeyboardButton, InlineKeyboardMarkup, ParseMode, User, CallbackQueryfrom telegram import Message, Chat, Update, Botfrom telegram.error import BadRequestfrom telegram.ext import CommandHandler, run_async, DispatcherHandlerStop, MessageHandler, Filters, CallbackQueryHandlerfrom telegram.utils.helpers import mention_html

from tg_bot import dispatcher, BAN_STICKERfrom tg_bot.modules.disable import DisableAbleCommandHandlerfrom tg_bot.modules.helper_funcs.chat_status import is_user_admin, bot_admin, user_admin_no_reply, user_admin, \ can_restrictfrom tg_bot.modules.helper_funcs.extraction import extract_text, extract_user_and_text, extract_userfrom tg_bot.modules.helper_funcs.filters import CustomFiltersfrom tg_bot.modules.helper_funcs.misc import split_messagefrom tg_bot.modules.helper_funcs.string_handling import split_quotesfrom tg_bot.modules.log_channel import loggablefrom tg_bot.modules.sql import warns_sql as sql

WARN_HANDLER_GROUP = 9CURRENT_WARNING_FILTER_STRING = "<b>Current warning filters in this chat:</b>\n"


# Not asyncdef warn(user: User, chat: Chat, reason: str, message: Message, warner: User = None) -> str: if is_user_admin(chat, user.id): message.reply_text("Damn admins, can't even be warned!") return ""

if warner: warner_tag = mention_html(warner.id, warner.first_name) else: warner_tag = "Automated warn filter."

limit, soft_warn = sql.get_warn_setting(chat.id) num_warns, reasons = sql.warn_user(user.id, chat.id, reason) if num_warns >= limit: sql.reset_warns(user.id, chat.id) if soft_warn: # kick chat.unban_member(user.id) reply = "{} warnings, {} has been kicked!".format(limit, mention_html(user.id, user.first_name))

else: # ban chat.kick_member(user.id) reply = "{} warnings, {} has been banned!".format(limit, mention_html(user.id, user.first_name))

for warn_reason in reasons: reply += "\n - {}".format(html.escape(warn_reason))

message.bot.send_sticker(chat.id, BAN_STICKER) # banhammer marie sticker keyboard = [] log_reason = "<b>{}:</b>" \ "\n#WARN_BAN" \ "\n<b>Admin:</b> {}" \ "\n<b>User:</b> {} (<code>{}</code>)" \ "\n<b>Reason:</b> {}"\ "\n<b>Counts:</b> <code>{}/{}</code>".format(html.escape(chat.title), warner_tag, mention_html(user.id, user.first_name), user.id, reason, num_warns, limit)

else: keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton("Remove warn", callback_data="rm_warn({})".format(user.id))]])

reply = "{} has {}/{} warnings... watch out!".format(mention_html(user.id, user.first_name), num_warns, limit) if reason: reply += "\nReason for last warn:\n{}".format(html.escape(reason))

log_reason = "<b>{}:</b>" \ "\n#WARN" \ "\n<b>Admin:</b> {}" \ "\n<b>User:</b> {} (<code>{}</code>)" \ "\n<b>Reason:</b> {}"\ "\n<b>Counts:</b> <code>{}/{}</code>".format(html.escape(chat.title), warner_tag, mention_html(user.id, user.first_name), user.id, reason, num_warns, limit)

try: message.reply_text(reply, reply_markup=keyboard, parse_mode=ParseMode.HTML) except BadRequest as excp: if excp.message == "Reply message not found": # Do not reply message.reply_text(reply, reply_markup=keyboard, parse_mode=ParseMode.HTML, quote=False) else: raise return log_reason


@run_async@user_admin_no_reply@bot_admin@loggabledef button(bot: Bot, update: Update) -> str: query = update.callback_query # type: Optional[CallbackQuery] user = update.effective_user # type: Optional[User] match = re.match(r"rm_warn\((.+?)\)", query.data) if match: user_id = match.group(1) chat = update.effective_chat # type: Optional[Chat] res = sql.remove_warn(user_id, chat.id) if res: update.effective_message.edit_text( "Warn removed by {}.".format(mention_html(user.id, user.first_name)), parse_mode=ParseMode.HTML) user_member = chat.get_member(user_id) return "<b>{}:</b>" \ "\n#UNWARN" \ "\n<b>Admin:</b> {}" \ "\n<b>User:</b> {} (<code>{}</code>)".format(html.escape(chat.title), mention_html(user.id, user.first_name), mention_html(user_member.user.id, user_member.user.first_name), user_member.user.id) else: update.effective_message.edit_text( "User has already has no warns.".format(mention_html(user.id, user.first_name)), parse_mode=ParseMode.HTML)

return ""


@run_async@user_admin@can_restrict@loggabledef warn_user(bot: Bot, update: Update, args: List[str]) -> str: message = update.effective_message # type: Optional[Message] chat = update.effective_chat # type: Optional[Chat] warner = update.effective_user # type: Optional[User]

user_id, reason = extract_user_and_text(message, args)

if user_id: if message.reply_to_message and message.reply_to_message.from_user.id == user_id: return warn(message.reply_to_message.from_user, chat, reason, message.reply_to_message, warner) else: return warn(chat.get_member(user_id).user, chat, reason, message, warner) else: message.reply_text("No user was designated!") return ""


@run_async@user_admin@bot_admin@loggabledef reset_warns(bot: Bot, update: Update, args: List[str]) -> str: message = update.effective_message # type: Optional[Message] chat = update.effective_chat # type: Optional[Chat] user = update.effective_user # type: Optional[User]

user_id = extract_user(message, args)

if user_id: sql.reset_warns(user_id, chat.id) message.reply_text("Warnings have been reset!") warned = chat.get_member(user_id).user return "<b>{}:</b>" \ "\n#RESETWARNS" \ "\n<b>Admin:</b> {}" \ "\n<b>User:</b> {} (<code>{}</code>)".format(html.escape(chat.title), mention_html(user.id, user.first_name), mention_html(warned.id, warned.first_name), warned.id) else: message.reply_text("No user has been designated!") return ""


@run_asyncdef warns(bot: Bot, update: Update, args: List[str]): message = update.effective_message # type: Optional[Message] chat = update.effective_chat # type: Optional[Chat] user_id = extract_user(message, args) or update.effective_user.id result = sql.get_warns(user_id, chat.id)

if result and result[0] != 0: num_warns, reasons = result limit, soft_warn = sql.get_warn_setting(chat.id)

if reasons: text = "This user has {}/{} warnings, for the following reasons:".format(num_warns, limit) for reason in reasons: text += "\n - {}".format(reason)

msgs = split_message(text) for msg in msgs: update.effective_message.reply_text(msg) else: update.effective_message.reply_text( "User has {}/{} warnings, but no reasons for any of them.".format(num_warns, limit)) else: update.effective_message.reply_text("This user hasn't got any warnings!")


# Dispatcher handler stop - do not async@user_admindef add_warn_filter(bot: Bot, update: Update): chat = update.effective_chat # type: Optional[Chat] msg = update.effective_message # type: Optional[Message]

args = msg.text.split(None, 1) # use python's maxsplit to separate Cmd, keyword, and reply_text

if len(args) < 2: return

extracted = split_quotes(args[1])

if len(extracted) >= 2: # set trigger -> lower, so as to avoid adding duplicate filters with different cases keyword = extracted[0].lower() content = extracted[1]

else: return

# Note: perhaps handlers can be removed somehow using sql.get_chat_filters for handler in dispatcher.handlers.get(WARN_HANDLER_GROUP, []): if handler.filters == (keyword, chat.id): dispatcher.remove_handler(handler, WARN_HANDLER_GROUP)

sql.add_warn_filter(chat.id, keyword, content)

update.effective_message.reply_text("Warn handler added for '{}'!".format(keyword)) raise DispatcherHandlerStop


@user_admindef remove_warn_filter(bot: Bot, update: Update): chat = update.effective_chat # type: Optional[Chat] msg = update.effective_message # type: Optional[Message]

args = msg.text.split(None, 1) # use python's maxsplit to separate Cmd, keyword, and reply_text

if len(args) < 2: return

extracted = split_quotes(args[1])

if len(extracted) < 1: return

to_remove = extracted[0]

chat_filters = sql.get_chat_warn_triggers(chat.id)

if not chat_filters: msg.reply_text("No warning filters are active here!") return

for filt in chat_filters: if filt == to_remove: sql.remove_warn_filter(chat.id, to_remove) msg.reply_text("Yep, I'll stop warning people for that.") raise DispatcherHandlerStop

msg.reply_text("That's not a current warning filter - run /warnlist for all active warning filters.")


@run_asyncdef list_warn_filters(bot: Bot, update: Update): chat = update.effective_chat # type: Optional[Chat] all_handlers = sql.get_chat_warn_triggers(chat.id)

if not all_handlers: update.effective_message.reply_text("No warning filters are active here!") return

filter_list = CURRENT_WARNING_FILTER_STRING for keyword in all_handlers: entry = " - {}\n".format(html.escape(keyword)) if len(entry) + len(filter_list) > telegram.MAX_MESSAGE_LENGTH: update.effective_message.reply_text(filter_list, parse_mode=ParseMode.HTML) filter_list = entry else: filter_list += entry

if not filter_list == CURRENT_WARNING_FILTER_STRING: update.effective_message.reply_text(filter_list, parse_mode=ParseMode.HTML)


@run_async@loggabledef reply_filter(bot: Bot, update: Update) -> str: chat = update.effective_chat # type: Optional[Chat] message = update.effective_message # type: Optional[Message]

chat_warn_filters = sql.get_chat_warn_triggers(chat.id) to_match = extract_text(message) if not to_match: return ""

for keyword in chat_warn_filters: pattern = r"( |^|[^\w])" + re.escape(keyword) + r"( |$|[^\w])" if re.search(pattern, to_match, flags=re.IGNORECASE): user = update.effective_user # type: Optional[User] warn_filter = sql.get_warn_filter(chat.id, keyword) return warn(user, chat, warn_filter.reply, message) return ""


@run_async@user_admin@loggabledef set_warn_limit(bot: Bot, update: Update, args: List[str]) -> str: chat = update.effective_chat # type: Optional[Chat] user = update.effective_user # type: Optional[User] msg = update.effective_message # type: Optional[Message]

if args: if args[0].isdigit(): if int(args[0]) < 3: msg.reply_text("The minimum warn limit is 3!") else: sql.set_warn_limit(chat.id, int(args[0])) msg.reply_text("Updated the warn limit to {}".format(args[0])) return "<b>{}:</b>" \ "\n#SET_WARN_LIMIT" \ "\n<b>Admin:</b> {}" \ "\nSet the warn limit to <code>{}</code>".format(html.escape(chat.title), mention_html(user.id, user.first_name), args[0]) else: msg.reply_text("Give me a number as an arg!") else: limit, soft_warn = sql.get_warn_setting(chat.id)

msg.reply_text("The current warn limit is {}".format(limit)) return ""


@run_async@user_admindef set_warn_strength(bot: Bot, update: Update, args: List[str]): chat = update.effective_chat # type: Optional[Chat] user = update.effective_user # type: Optional[User] msg = update.effective_message # type: Optional[Message]

if args: if args[0].lower() in ("on", "yes"): sql.set_warn_strength(chat.id, False) msg.reply_text("Too many warns will now result in a ban!") return "<b>{}:</b>\n" \ "<b>Admin:</b> {}\n" \ "Has enabled strong warns. Users will be banned.".format(html.escape(chat.title), mention_html(user.id, user.first_name))

elif args[0].lower() in ("off", "no"): sql.set_warn_strength(chat.id, True) msg.reply_text("Too many warns will now result in a kick! Users will be able to join again after.") return "<b>{}:</b>\n" \ "<b>Admin:</b> {}\n" \ "Has disabled strong warns. Users will only be kicked.".format(html.escape(chat.title), mention_html(user.id, user.first_name))

else: msg.reply_text("I only understand on/yes/no/off!") else: limit, soft_warn = sql.get_warn_setting(chat.id) if soft_warn: msg.reply_text("Warns are currently set to *kick* users when they exceed the limits.", parse_mode=ParseMode.MARKDOWN) else: msg.reply_text("Warns are currently set to *ban* users when they exceed the limits.", parse_mode=ParseMode.MARKDOWN) return ""


def __stats__(): return "{} overall warns, across {} chats.\n" \ "{} warn filters, across {} chats.".format(sql.num_warns(), sql.num_warn_chats(), sql.num_warn_filters(), sql.num_warn_filter_chats())


def __import_data__(chat_id, data): for user_id, count in data.get('warns', {}).items(): for x in range(int(count)): sql.warn_user(user_id, chat_id)


def __migrate__(old_chat_id, new_chat_id): sql.migrate_chat(old_chat_id, new_chat_id)


def __chat_settings__(chat_id, user_id): num_warn_filters = sql.num_warn_chat_filters(chat_id) limit, soft_warn = sql.get_warn_setting(chat_id) return "This chat has `{}` warn filters. It takes `{}` warns " \ "before the user gets *{}*.".format(num_warn_filters, limit, "kicked" if soft_warn else "banned")


__help__ = """ - /warns <userhandle>: get a user's number, and reason, of warnings. - /warnlist: list of all current warning filters*Admin only:* - /warn <userhandle>: warn a user. After 3 warns, the user will be banned from the group. Can also be used as a reply. - /resetwarn <userhandle>: reset the warnings for a user. Can also be used as a reply. - /addwarn <keyword> <reply message>: set a warning filter on a certain keyword. If you want your keyword to \be a sentence, encompass it with quotes, as such: `/addwarn "very angry" This is an angry user`. - /nowarn <keyword>: stop a warning filter - /warnlimit <num>: set the warning limit - /strongwarn <on/yes/off/no>: If set to on, exceeding the warn limit will result in a ban. Else, will just kick."""

__mod_name__ = "Warnings"

WARN_HANDLER = CommandHandler("warn", warn_user, pass_args=True, filters=Filters.group)RESET_WARN_HANDLER = CommandHandler(["resetwarn", "resetwarns"], reset_warns, pass_args=True, filters=Filters.group)CALLBACK_QUERY_HANDLER = CallbackQueryHandler(button, pattern=r"rm_warn")MYWARNS_HANDLER = DisableAbleCommandHandler("warns", warns, pass_args=True, filters=Filters.group)ADD_WARN_HANDLER = CommandHandler("addwarn", add_warn_filter, filters=Filters.group)RM_WARN_HANDLER = CommandHandler(["nowarn", "stopwarn"], remove_warn_filter, filters=Filters.group)LIST_WARN_HANDLER = DisableAbleCommandHandler(["warnlist", "warnfilters"], list_warn_filters, filters=Filters.group, admin_ok=True)WARN_FILTER_HANDLER = MessageHandler(CustomFilters.has_text & Filters.group, reply_filter)WARN_LIMIT_HANDLER = CommandHandler("warnlimit", set_warn_limit, pass_args=True, filters=Filters.group)WARN_STRENGTH_HANDLER = CommandHandler("strongwarn", set_warn_strength, pass_args=True, filters=Filters.group)

dispatcher.add_handler(WARN_HANDLER)dispatcher.add_handler(CALLBACK_QUERY_HANDLER)dispatcher.add_handler(RESET_WARN_HANDLER)dispatcher.add_handler(MYWARNS_HANDLER)dispatcher.add_handler(ADD_WARN_HANDLER)dispatcher.add_handler(RM_WARN_HANDLER)dispatcher.add_handler(LIST_WARN_HANDLER)dispatcher.add_handler(WARN_LIMIT_HANDLER)dispatcher.add_handler(WARN_STRENGTH_HANDLER)dispatcher.add_handler(WARN_FILTER_HANDLER, WARN_HANDLER_GROUP)



Report Page