Spaces:
Paused
Paused
#ia_filterdb.py | |
import asyncio | |
from pyrogram import enums | |
from imdb import Cinemagoer | |
from pymongo.errors import DuplicateKeyError | |
from pyrogram.errors import UserNotParticipant | |
from motor.motor_asyncio import AsyncIOMotorClient | |
from pyrogram.types import ChatPermissions, InlineKeyboardMarkup, InlineKeyboardButton | |
import logging | |
from struct import pack | |
import re | |
import base64 | |
from pyrogram.file_id import FileId | |
from imdb import Cinemagoer | |
from umongo import Instance, Document, fields | |
from motor.motor_asyncio import AsyncIOMotorClient | |
from marshmallow.exceptions import ValidationError | |
from info import DATABASE_URI, DATABASE_NAME, COLLECTION_NAME, USE_CAPTION_FILTER, MAX_B_TN | |
from utils import get_settings, save_group_settings | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.INFO) | |
client = AsyncIOMotorClient(DATABASE_URI) | |
db = client[DATABASE_NAME] | |
db1 = client["F_SUB_DB"] | |
grp_col = db1["GROUPS"] | |
user_col = db1["USERS"] | |
dlt_col = db1["Auto-Delete"] | |
instance = Instance.from_db(db) | |
ia = Cinemagoer() | |
class Media(Document): | |
file_id = fields.StrField(attribute='_id') | |
file_ref = fields.StrField(allow_none=True) | |
file_name = fields.StrField(required=True) | |
file_size = fields.IntField(required=True) | |
file_type = fields.StrField(allow_none=True) | |
mime_type = fields.StrField(allow_none=True) | |
caption = fields.StrField(allow_none=True) | |
class Meta: | |
indexes = ('$file_name', ) | |
collection_name = COLLECTION_NAME | |
async def save_file(media): | |
"""Save file in database""" | |
# TODO: Find better way to get same file_id for same media to avoid duplicates | |
file_id, file_ref = unpack_new_file_id(media.file_id) | |
file_name = re.sub(r"(_|\-|\.|\+)", " ", str(media.file_name)) | |
try: | |
file = Media( | |
file_id=file_id, | |
file_ref=file_ref, | |
file_name=file_name, | |
file_size=media.file_size, | |
file_type=media.file_type, | |
mime_type=media.mime_type, | |
caption=media.caption.html if media.caption else None, | |
) | |
except ValidationError: | |
logger.exception('Error occurred while saving file in database') | |
return False, 2 | |
else: | |
try: | |
await file.commit() | |
except DuplicateKeyError: | |
logger.warning( | |
f'{getattr(media, "file_name", "NO_FILE")} is already saved in database' | |
) | |
return False, 0 | |
else: | |
logger.info(f'{getattr(media, "file_name", "NO_FILE")} is saved to database') | |
return True, 1 | |
async def get_search_results(chat_id, query, file_type=None, max_results=10, offset=0, filter=False): | |
"""For given query return (results, next_offset)""" | |
if chat_id is not None: | |
settings = await get_settings(int(chat_id)) | |
try: | |
if settings['max_btn']: | |
max_results = 10 | |
else: | |
max_results = int(MAX_B_TN) | |
except KeyError: | |
await save_group_settings(int(chat_id), 'max_btn', False) | |
settings = await get_settings(int(chat_id)) | |
if settings['max_btn']: | |
max_results = 10 | |
else: | |
max_results = int(MAX_B_TN) | |
query = query.strip() | |
#if filter: | |
#better ? | |
#query = query.replace(' ', r'(\s|\.|\+|\-|_)') | |
#raw_pattern = r'(\s|_|\-|\.|\+)' + query + r'(\s|_|\-|\.|\+)' | |
if not query: | |
raw_pattern = '.' | |
elif ' ' not in query: | |
raw_pattern = r'(\b|[\.\+\-_])' + query + r'(\b|[\.\+\-_])' | |
else: | |
raw_pattern = query.replace(' ', r'.*[\s\.\+\-_]') | |
try: | |
regex = re.compile(raw_pattern, flags=re.IGNORECASE) | |
except: | |
return [] | |
if USE_CAPTION_FILTER: | |
filter = {'$or': [{'file_name': regex}, {'caption': regex}]} | |
else: | |
filter = {'file_name': regex} | |
if file_type: | |
filter['file_type'] = file_type | |
total_results = await Media.count_documents(filter) | |
next_offset = offset + max_results | |
if next_offset > total_results: | |
next_offset = '' | |
cursor = Media.find(filter) | |
# Sort by recent | |
cursor.sort('$natural', -1) | |
# Slice files according to offset and max results | |
cursor.skip(offset).limit(max_results) | |
# Get list of files | |
files = await cursor.to_list(length=max_results) | |
return files, next_offset, total_results | |
async def get_bad_files(query, file_type=None, filter=False): | |
"""For given query return (results, next_offset)""" | |
query = query.strip() | |
#if filter: | |
#better ? | |
#query = query.replace(' ', r'(\s|\.|\+|\-|_)') | |
#raw_pattern = r'(\s|_|\-|\.|\+)' + query + r'(\s|_|\-|\.|\+)' | |
if not query: | |
raw_pattern = '.' | |
elif ' ' not in query: | |
raw_pattern = r'(\b|[\.\+\-_])' + query + r'(\b|[\.\+\-_])' | |
else: | |
raw_pattern = query.replace(' ', r'.*[\s\.\+\-_]') | |
try: | |
regex = re.compile(raw_pattern, flags=re.IGNORECASE) | |
except: | |
return [] | |
if USE_CAPTION_FILTER: | |
filter = {'$or': [{'file_name': regex}, {'caption': regex}]} | |
else: | |
filter = {'file_name': regex} | |
if file_type: | |
filter['file_type'] = file_type | |
total_results = await Media.count_documents(filter) | |
cursor = Media.find(filter) | |
# Sort by recent | |
cursor.sort('$natural', -1) | |
# Get list of files | |
files = await cursor.to_list(length=total_results) | |
return files, total_results | |
async def get_file_details(query): | |
filter = {'file_id': query} | |
cursor = Media.find(filter) | |
filedetails = await cursor.to_list(length=1) | |
return filedetails | |
def encode_file_id(s: bytes) -> str: | |
r = b"" | |
n = 0 | |
for i in s + bytes([22]) + bytes([4]): | |
if i == 0: | |
n += 1 | |
else: | |
if n: | |
r += b"\x00" + bytes([n]) | |
n = 0 | |
r += bytes([i]) | |
return base64.urlsafe_b64encode(r).decode().rstrip("=") | |
def encode_file_ref(file_ref: bytes) -> str: | |
return base64.urlsafe_b64encode(file_ref).decode().rstrip("=") | |
def unpack_new_file_id(new_file_id): | |
"""Return file_id, file_ref""" | |
decoded = FileId.decode(new_file_id) | |
file_id = encode_file_id( | |
pack( | |
"<iiqq", | |
int(decoded.file_type), | |
decoded.dc_id, | |
decoded.media_id, | |
decoded.access_hash | |
) | |
) | |
file_ref = encode_file_ref(decoded.file_reference) | |
return file_id, file_ref | |
async def add_group(group_id, group_name, user_name, user_id, channels, f_sub, verified): | |
data = {"_id": group_id, "name":group_name, | |
"user_id":user_id, "user_name":user_name, | |
"channels":channels, "f_sub":f_sub, "verified":verified} | |
try: | |
await grp_col.insert_one(data) | |
except DuplicateKeyError: | |
pass | |
async def get_group(id): | |
data = {'_id':id} | |
group = await grp_col.find_one(data) | |
return dict(group) | |
async def update_group(id, new_data): | |
data = {"_id":id} | |
new_value = {"$set": new_data} | |
await grp_col.update_one(data, new_value) | |
async def delete_group(id): | |
data = {"_id":id} | |
await grp_col.delete_one(data) | |
async def get_groups(): | |
count = await grp_col.count_documents({}) | |
cursor = grp_col.find({}) | |
list = await cursor.to_list(length=int(count)) | |
return count, list | |
async def add_user(id, name): | |
data = {"_id":id, "name":name} | |
try: | |
await user_col.insert_one(data) | |
except DuplicateKeyError: | |
pass | |
async def get_users(): | |
count = await user_col.count_documents({}) | |
cursor = user_col.find({}) | |
list = await cursor.to_list(length=int(count)) | |
return count, list | |
async def save_dlt_message(message, time): | |
data = {"chat_id": message.chat.id, | |
"message_id": message.id, | |
"time": time} | |
await dlt_col.insert_one(data) | |
async def get_all_dlt_data(time): | |
data = {"time":{"$lte":time}} | |
count = await dlt_col.count_documents(data) | |
cursor = dlt_col.find(data) | |
all_data = await cursor.to_list(length=int(count)) | |
return all_data | |
async def delete_all_dlt_data(time): | |
data = {"time":{"$lte":time}} | |
await dlt_col.delete_many(data) | |
async def search_imdb(query): | |
try: | |
int(query) | |
movie = ia.get_movie(query) | |
return movie["title"] | |
except: | |
movies = ia.search_movie(query, results=10) | |
list = [] | |
for movie in movies: | |
title = movie["title"] | |
try: year = f" - {movie['year']}" | |
except: year = "" | |
list.append({"title":title, "year":year, "id":movie.movieID}) | |
return list | |
async def force_sub(bot, message): | |
group = await get_group(message.chat.id) | |
f_sub = group["f_sub"] | |
admin = group["user_id"] | |
if f_sub==False: | |
return True | |
if message.from_user is None: | |
return True | |
try: | |
f_link = (await bot.get_chat(f_sub)).invite_link | |
member = await bot.get_chat_member(f_sub, message.from_user.id) | |
if member.status==enums.ChatMemberStatus.BANNED: | |
await message.reply(f"Sorry {message.from_user.mention}!\n You are banned in our channel, you will be banned from here within 10 seconds") | |
await asyncio.sleep(10) | |
await bot.ban_chat_member(message.chat.id, message.from_user.id) | |
return False | |
except UserNotParticipant: | |
try: | |
await bot.restrict_chat_member(chat_id=message.chat.id, | |
user_id=message.from_user.id, | |
permissions=ChatPermissions(can_send_messages=False, | |
can_invite_users=True) | |
) | |
k = await message.reply(f"<b>⚠ Dear User {message.from_user.mention}!\n\nTo Send message here, You have to join our channel </b>", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("Join Channel", url=f_link)], [InlineKeyboardButton("Try Again", callback_data=f"checksub_{message.from_user.id}")]])) | |
await message.delete() | |
await asyncio.sleep(60) | |
await bot.restrict_chat_member(chat_id=message.chat.id, | |
user_id=message.from_user.id, | |
permissions=ChatPermissions(can_send_messages=True, | |
can_invite_users=True) | |
) | |
await k.delete() | |
except: | |
k = await message.reply(f"<b>⚠ Dear User {message.from_user.mention}!\n\nTo Send message here, You have to join our channel </b>", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("Join Channel", url=f_link)], [InlineKeyboardButton("Try Again", callback_data=f"checksub_{message.from_user.id}")]])) | |
await message.delete() | |
await asyncio.sleep(60) | |
await k.delete() | |
return False | |
except Exception as e: | |
await update_group(message.chat.id, {"f_sub":False}) | |
await bot.send_message(chat_id=admin, text=f"❌ Error in Fsub: `{str(e)}`\n\nYour Fsub to Channel is Removed, Make sure I'm admin in that channel & this group with all permissions\n\nAfter doing this, you need to Add Fsub again!") | |
return False | |
else: | |
return True | |