BenfilterbotV2 / plugins /filters.py
JamesBond531's picture
First Commit to Add Files
e7f5d94
import io
from pyrogram import filters, Client, enums
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup
from database.filters_mdb import(
add_filter,
get_filters,
delete_filter,
count_filters
)
from database.connections_mdb import active_connection
from utils import get_file_id, parser, split_quotes
from info import ADMINS
@Client.on_message(filters.command(['filter', 'add']) & filters.incoming)
async def addfilter(client, message):
userid = message.from_user.id if message.from_user else None
if not userid:
return await message.reply(f"You are anonymous admin. Use /connect {message.chat.id} in PM")
chat_type = message.chat.type
args = message.text.html.split(None, 1)
if chat_type == enums.ChatType.PRIVATE:
grpid = await active_connection(str(userid))
if grpid is not None:
grp_id = grpid
try:
chat = await client.get_chat(grpid)
title = chat.title
except:
await message.reply_text("Make sure I'm present in your group!!", quote=True)
return
else:
await message.reply_text("I'm not connected to any groups!", quote=True)
return
elif chat_type in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]:
grp_id = message.chat.id
title = message.chat.title
else:
return
st = await client.get_chat_member(grp_id, userid)
if (
st.status != enums.ChatMemberStatus.ADMINISTRATOR
and st.status != enums.ChatMemberStatus.OWNER
and str(userid) not in ADMINS
):
return
if len(args) < 2:
await message.reply_text("Command Incomplete :(", quote=True)
return
extracted = split_quotes(args[1])
text = extracted[0].lower()
if not message.reply_to_message and len(extracted) < 2:
await message.reply_text("Add some content to save your filter!", quote=True)
return
if (len(extracted) >= 2) and not message.reply_to_message:
reply_text, btn, alert = parser(extracted[1], text)
fileid = None
if not reply_text:
await message.reply_text("You cannot have buttons alone, give some text to go with it!", quote=True)
return
elif message.reply_to_message and message.reply_to_message.reply_markup:
try:
rm = message.reply_to_message.reply_markup
btn = rm.inline_keyboard
msg = get_file_id(message.reply_to_message)
if msg:
fileid = msg.file_id
reply_text = message.reply_to_message.caption.html
else:
reply_text = message.reply_to_message.text.html
fileid = None
alert = None
except:
reply_text = ""
btn = "[]"
fileid = None
alert = None
elif message.reply_to_message and message.reply_to_message.media:
try:
msg = get_file_id(message.reply_to_message)
fileid = msg.file_id if msg else None
reply_text, btn, alert = parser(extracted[1], text) if message.reply_to_message.sticker else parser(message.reply_to_message.caption.html, text)
except:
reply_text = ""
btn = "[]"
alert = None
elif message.reply_to_message and message.reply_to_message.text:
try:
fileid = None
reply_text, btn, alert = parser(message.reply_to_message.text.html, text)
except:
reply_text = ""
btn = "[]"
alert = None
else:
return
await add_filter(grp_id, text, reply_text, btn, fileid, alert)
await message.reply_text(
f"Filter for `{text}` added in **{title}**",
quote=True,
parse_mode=enums.ParseMode.MARKDOWN
)
@Client.on_message(filters.command(['viewfilters', 'filters']) & filters.incoming)
async def get_all(client, message):
chat_type = message.chat.type
userid = message.from_user.id if message.from_user else None
if not userid:
return await message.reply(f"You are anonymous admin. Use /connect {message.chat.id} in PM")
if chat_type == enums.ChatType.PRIVATE:
userid = message.from_user.id
grpid = await active_connection(str(userid))
if grpid is not None:
grp_id = grpid
try:
chat = await client.get_chat(grpid)
title = chat.title
except:
await message.reply_text("Make sure I'm present in your group!!", quote=True)
return
else:
await message.reply_text("I'm not connected to any groups!", quote=True)
return
elif chat_type in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]:
grp_id = message.chat.id
title = message.chat.title
else:
return
st = await client.get_chat_member(grp_id, userid)
if (
st.status != enums.ChatMemberStatus.ADMINISTRATOR
and st.status != enums.ChatMemberStatus.OWNER
and str(userid) not in ADMINS
):
return
texts = await get_filters(grp_id)
count = await count_filters(grp_id)
if count:
filterlist = f"Total number of filters in **{title}** : {count}\n\n"
for text in texts:
keywords = " × `{}`\n".format(text)
filterlist += keywords
if len(filterlist) > 4096:
with io.BytesIO(str.encode(filterlist.replace("`", ""))) as keyword_file:
keyword_file.name = "keywords.txt"
await message.reply_document(
document=keyword_file,
quote=True
)
return
else:
filterlist = f"There are no active filters in **{title}**"
await message.reply_text(
text=filterlist,
quote=True,
parse_mode=enums.ParseMode.MARKDOWN
)
@Client.on_message(filters.command('del') & filters.incoming)
async def deletefilter(client, message):
userid = message.from_user.id if message.from_user else None
if not userid:
return await message.reply(f"You are anonymous admin. Use /connect {message.chat.id} in PM")
chat_type = message.chat.type
if chat_type == enums.ChatType.PRIVATE:
grpid = await active_connection(str(userid))
if grpid is not None:
grp_id = grpid
try:
chat = await client.get_chat(grpid)
title = chat.title
except:
await message.reply_text("Make sure I'm present in your group!!", quote=True)
return
else:
await message.reply_text("I'm not connected to any groups!", quote=True)
elif chat_type in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]:
grp_id = message.chat.id
title = message.chat.title
else:
return
st = await client.get_chat_member(grp_id, userid)
if (
st.status != enums.ChatMemberStatus.ADMINISTRATOR
and st.status != enums.ChatMemberStatus.OWNER
and str(userid) not in ADMINS
):
return
try:
cmd, text = message.text.split(" ", 1)
except:
await message.reply_text(
"<i>Mention the filtername which you wanna delete!</i>\n\n"
"<code>/del filtername</code>\n\n"
"Use /viewfilters to view all available filters",
quote=True
)
return
query = text.lower()
await delete_filter(message, query, grp_id)
@Client.on_message(filters.command('delall') & filters.incoming)
async def delallconfirm(client, message):
userid = message.from_user.id if message.from_user else None
if not userid:
return await message.reply(f"You are anonymous admin. Use /connect {message.chat.id} in PM")
chat_type = message.chat.type
if chat_type == enums.ChatType.PRIVATE:
grpid = await active_connection(str(userid))
if grpid is not None:
grp_id = grpid
try:
chat = await client.get_chat(grpid)
title = chat.title
except:
await message.reply_text("Make sure I'm present in your group!!", quote=True)
return
else:
await message.reply_text("I'm not connected to any groups!", quote=True)
return
elif chat_type in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]:
grp_id = message.chat.id
title = message.chat.title
else:
return
st = await client.get_chat_member(grp_id, userid)
if (st.status == enums.ChatMemberStatus.OWNER) or (str(userid) in ADMINS):
await message.reply_text(
f"This will delete all filters from '{title}'.\nDo you want to continue??",
reply_markup=InlineKeyboardMarkup([
[InlineKeyboardButton(text="YES",callback_data="delallconfirm")],
[InlineKeyboardButton(text="CANCEL",callback_data="delallcancel")]
]),
quote=True
)