import logging from pyrogram.errors import InputUserDeactivated, UserNotParticipant, FloodWait, UserIsBlocked, PeerIdInvalid from info import AUTH_CHANNEL, LONG_IMDB_DESCRIPTION, MAX_LIST_ELM, SHORTLINK_URL, SHORTLINK_API, IS_SHORTLINK, LOG_CHANNEL, TUTORIAL, GRP_LNK, CHNL_LNK, CUSTOM_FILE_CAPTION, SECOND_SHORTLINK_URL, SECOND_SHORTLINK_API, STREAM_URL, STREAM_BIN, IS_STREAM from imdb import Cinemagoer import asyncio from pyrogram.types import Message, InlineKeyboardButton, InlineKeyboardMarkup from pyrogram.errors import FloodWait, UserIsBlocked, MessageNotModified, PeerIdInvalid from pyrogram import enums from typing import Union from Script import script import pytz import random import re import os from datetime import datetime, date import string from typing import List from database.users_chats_db import db from bs4 import BeautifulSoup import requests import aiohttp from shortzy import Shortzy import http.client import json from urllib.parse import quote_plus logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) BTN_URL_REGEX = re.compile( r"(\[([^\[]+?)\]\((buttonurl|buttonalert):(?:/{0,2})(.+?)(:same)?\))" ) imdb = Cinemagoer() TOKENS = {} VERIFIED = {} BANNED = {} SECOND_SHORTENER = {} SMART_OPEN = '“' SMART_CLOSE = '”' START_CHAR = ('\'', '"', SMART_OPEN) # temp db for banned class temp(object): BANNED_USERS = [] BANNED_CHATS = [] ME = None CURRENT=int(os.environ.get("SKIP", 2)) CANCEL = False MELCOW = {} U_NAME = None B_NAME = None GETALL = {} SHORT = {} SETTINGS = {} IMDB_CAP = {} async def is_subscribed(bot, query): try: user = await bot.get_chat_member(AUTH_CHANNEL, query.from_user.id) except UserNotParticipant: pass except Exception as e: logger.exception(e) else: if user.status != enums.ChatMemberStatus.BANNED: return True return False async def get_poster(query, bulk=False, id=False, file=None): if not id: # https://t.me/GetTGLink/4183 query = (query.strip()).lower() title = query year = re.findall(r'[1-2]\d{3}$', query, re.IGNORECASE) if year: year = list_to_str(year[:1]) title = (query.replace(year, "")).strip() elif file is not None: year = re.findall(r'[1-2]\d{3}', file, re.IGNORECASE) if year: year = list_to_str(year[:1]) else: year = None movieid = imdb.search_movie(title.lower(), results=10) if not movieid: return None if year: filtered=list(filter(lambda k: str(k.get('year')) == str(year), movieid)) if not filtered: filtered = movieid else: filtered = movieid movieid=list(filter(lambda k: k.get('kind') in ['movie', 'tv series'], filtered)) if not movieid: movieid = filtered if bulk: return movieid movieid = movieid[0].movieID else: movieid = query movie = imdb.get_movie(movieid) if movie.get("original air date"): date = movie["original air date"] elif movie.get("year"): date = movie.get("year") else: date = "N/A" plot = "" if not LONG_IMDB_DESCRIPTION: plot = movie.get('plot') if plot and len(plot) > 0: plot = plot[0] else: plot = movie.get('plot outline') if plot and len(plot) > 800: plot = plot[0:800] + "..." return { 'title': movie.get('title'), 'votes': movie.get('votes'), "aka": list_to_str(movie.get("akas")), "seasons": movie.get("number of seasons"), "box_office": movie.get('box office'), 'localized_title': movie.get('localized title'), 'kind': movie.get("kind"), "imdb_id": f"tt{movie.get('imdbID')}", "cast": list_to_str(movie.get("cast")), "runtime": list_to_str(movie.get("runtimes")), "countries": list_to_str(movie.get("countries")), "certificates": list_to_str(movie.get("certificates")), "languages": list_to_str(movie.get("languages")), "director": list_to_str(movie.get("director")), "writer":list_to_str(movie.get("writer")), "producer":list_to_str(movie.get("producer")), "composer":list_to_str(movie.get("composer")) , "cinematographer":list_to_str(movie.get("cinematographer")), "music_team": list_to_str(movie.get("music department")), "distributors": list_to_str(movie.get("distributors")), 'release_date': date, 'year': movie.get('year'), 'genres': list_to_str(movie.get("genres")), 'poster': movie.get('full-size cover url'), 'plot': plot, 'rating': str(movie.get("rating")), 'url':f'https://www.imdb.com/title/tt{movieid}' } # https://github.com/odysseusmax/animated-lamp/blob/2ef4730eb2b5f0596ed6d03e7b05243d93e3415b/bot/utils/broadcast.py#L37 async def broadcast_messages(user_id, message): try: await message.copy(chat_id=user_id) return True, "Success" except FloodWait as e: await asyncio.sleep(e.x) return await broadcast_messages(user_id, message) except InputUserDeactivated: await db.delete_user(int(user_id)) logging.info(f"{user_id}-Removed from Database, since deleted account.") return False, "Deleted" except UserIsBlocked: logging.info(f"{user_id} -Blocked the bot.") return False, "Blocked" except PeerIdInvalid: await db.delete_user(int(user_id)) logging.info(f"{user_id} - PeerIdInvalid") return False, "Error" except Exception as e: return False, "Error" async def broadcast_messages_group(chat_id, message): try: kd = await message.copy(chat_id=chat_id) try: await kd.pin() except: pass return True, "Success" except FloodWait as e: await asyncio.sleep(e.x) return await broadcast_messages_group(chat_id, message) except Exception as e: return False, "Error" async def search_gagala(text): usr_agent = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/61.0.3163.100 Safari/537.36' } text = text.replace(" ", '+') url = f'https://www.google.com/search?q={text}' response = requests.get(url, headers=usr_agent) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') titles = soup.find_all( 'h3' ) return [title.getText() for title in titles] async def get_settings(group_id): settings = temp.SETTINGS.get(group_id) if not settings: settings = await db.get_settings(group_id) temp.SETTINGS[group_id] = settings return settings async def save_group_settings(group_id, key, value): current = await get_settings(group_id) current[key] = value temp.SETTINGS[group_id] = current await db.update_settings(group_id, current) def get_size(size): """Get size in readable format""" units = ["Bytes", "KB", "MB", "GB", "TB", "PB", "EB"] size = float(size) i = 0 while size >= 1024.0 and i < len(units): i += 1 size /= 1024.0 return "%.2f %s" % (size, units[i]) def split_list(l, n): for i in range(0, len(l), n): yield l[i:i + n] def get_file_id(msg: Message): if msg.media: for message_type in ( "photo", "animation", "audio", "document", "video", "video_note", "voice", "sticker" ): obj = getattr(msg, message_type) if obj: setattr(obj, "message_type", message_type) return obj def extract_user(message: Message) -> Union[int, str]: """extracts the user from a message""" # https://github.com/SpEcHiDe/PyroGramBot/blob/f30e2cca12002121bad1982f68cd0ff9814ce027/pyrobot/helper_functions/extract_user.py#L7 user_id = None user_first_name = None if message.reply_to_message: user_id = message.reply_to_message.from_user.id user_first_name = message.reply_to_message.from_user.first_name elif len(message.command) > 1: if ( len(message.entities) > 1 and message.entities[1].type == enums.MessageEntityType.TEXT_MENTION ): required_entity = message.entities[1] user_id = required_entity.user.id user_first_name = required_entity.user.first_name else: user_id = message.command[1] # don't want to make a request -_- user_first_name = user_id try: user_id = int(user_id) except ValueError: pass else: user_id = message.from_user.id user_first_name = message.from_user.first_name return (user_id, user_first_name) def list_to_str(k): if not k: return "N/A" elif len(k) == 1: return str(k[0]) elif MAX_LIST_ELM: k = k[:int(MAX_LIST_ELM)] return ' '.join(f'{elem}, ' for elem in k) else: return ' '.join(f'{elem}, ' for elem in k) def last_online(from_user): time = "" if from_user.is_bot: time += "🤖 Bot :(" elif from_user.status == enums.UserStatus.RECENTLY: time += "Recently" elif from_user.status == enums.UserStatus.LAST_WEEK: time += "Within the last week" elif from_user.status == enums.UserStatus.LAST_MONTH: time += "Within the last month" elif from_user.status == enums.UserStatus.LONG_AGO: time += "A long time ago :(" elif from_user.status == enums.UserStatus.ONLINE: time += "Currently Online" elif from_user.status == enums.UserStatus.OFFLINE: time += from_user.last_online_date.strftime("%a, %d %b %Y, %H:%M:%S") return time def split_quotes(text: str) -> List: if not any(text.startswith(char) for char in START_CHAR): return text.split(None, 1) counter = 1 # ignore first char -> is some kind of quote while counter < len(text): if text[counter] == "\\": counter += 1 elif text[counter] == text[0] or (text[0] == SMART_OPEN and text[counter] == SMART_CLOSE): break counter += 1 else: return text.split(None, 1) # 1 to avoid starting quote, and counter is exclusive so avoids ending key = remove_escapes(text[1:counter].strip()) # index will be in range, or `else` would have been executed and returned rest = text[counter + 1:].strip() if not key: key = text[0] + text[0] return list(filter(None, [key, rest])) def gfilterparser(text, keyword): if "buttonalert" in text: text = (text.replace("\n", "\\n").replace("\t", "\\t")) buttons = [] note_data = "" prev = 0 i = 0 alerts = [] for match in BTN_URL_REGEX.finditer(text): # Check if btnurl is escaped n_escapes = 0 to_check = match.start(1) - 1 while to_check > 0 and text[to_check] == "\\": n_escapes += 1 to_check -= 1 # if even, not escaped -> create button if n_escapes % 2 == 0: note_data += text[prev:match.start(1)] prev = match.end(1) if match.group(3) == "buttonalert": # create a thruple with button label, url, and newline status if bool(match.group(5)) and buttons: buttons[-1].append(InlineKeyboardButton( text=match.group(2), callback_data=f"gfilteralert:{i}:{keyword}" )) else: buttons.append([InlineKeyboardButton( text=match.group(2), callback_data=f"gfilteralert:{i}:{keyword}" )]) i += 1 alerts.append(match.group(4)) elif bool(match.group(5)) and buttons: buttons[-1].append(InlineKeyboardButton( text=match.group(2), url=match.group(4).replace(" ", "") )) else: buttons.append([InlineKeyboardButton( text=match.group(2), url=match.group(4).replace(" ", "") )]) else: note_data += text[prev:to_check] prev = match.start(1) - 1 else: note_data += text[prev:] try: return note_data, buttons, alerts except: return note_data, buttons, None def parser(text, keyword): if "buttonalert" in text: text = (text.replace("\n", "\\n").replace("\t", "\\t")) buttons = [] note_data = "" prev = 0 i = 0 alerts = [] for match in BTN_URL_REGEX.finditer(text): # Check if btnurl is escaped n_escapes = 0 to_check = match.start(1) - 1 while to_check > 0 and text[to_check] == "\\": n_escapes += 1 to_check -= 1 # if even, not escaped -> create button if n_escapes % 2 == 0: note_data += text[prev:match.start(1)] prev = match.end(1) if match.group(3) == "buttonalert": # create a thruple with button label, url, and newline status if bool(match.group(5)) and buttons: buttons[-1].append(InlineKeyboardButton( text=match.group(2), callback_data=f"alertmessage:{i}:{keyword}" )) else: buttons.append([InlineKeyboardButton( text=match.group(2), callback_data=f"alertmessage:{i}:{keyword}" )]) i += 1 alerts.append(match.group(4)) elif bool(match.group(5)) and buttons: buttons[-1].append(InlineKeyboardButton( text=match.group(2), url=match.group(4).replace(" ", "") )) else: buttons.append([InlineKeyboardButton( text=match.group(2), url=match.group(4).replace(" ", "") )]) else: note_data += text[prev:to_check] prev = match.start(1) - 1 else: note_data += text[prev:] try: return note_data, buttons, alerts except: return note_data, buttons, None def remove_escapes(text: str) -> str: res = "" is_escaped = False for counter in range(len(text)): if is_escaped: res += text[counter] is_escaped = False elif text[counter] == "\\": is_escaped = True else: res += text[counter] return res def humanbytes(size): if not size: return "" power = 2**10 n = 0 Dic_powerN = {0: ' ', 1: 'Ki', 2: 'Mi', 3: 'Gi', 4: 'Ti'} while size > power: size /= power n += 1 return str(round(size, 2)) + " " + Dic_powerN[n] + 'B' async def get_cap(settings, remaining_seconds, files, query, total_results, search): if settings["imdb"]: IMDB_CAP = temp.IMDB_CAP.get(query.from_user.id) if IMDB_CAP: cap = IMDB_CAP cap+="\n\n📚 Requested Files 👇\n\n" for file in files: cap += f"📁 [{get_size(file.file_size)}] {' '.join(filter(lambda x: not x.startswith('[') and not x.startswith('@') and not x.startswith('www.'), file.file_name.split()))}\n\n" else: imdb = await get_poster(search, file=(files[0]).file_name) if settings["imdb"] else None if imdb: TEMPLATE = script.IMDB_TEMPLATE_TXT cap = TEMPLATE.format( qurey=search, title=imdb['title'], votes=imdb['votes'], aka=imdb["aka"], seasons=imdb["seasons"], box_office=imdb['box_office'], localized_title=imdb['localized_title'], kind=imdb['kind'], imdb_id=imdb["imdb_id"], cast=imdb["cast"], runtime=imdb["runtime"], countries=imdb["countries"], certificates=imdb["certificates"], languages=imdb["languages"], director=imdb["director"], writer=imdb["writer"], producer=imdb["producer"], composer=imdb["composer"], cinematographer=imdb["cinematographer"], music_team=imdb["music_team"], distributors=imdb["distributors"], release_date=imdb['release_date'], year=imdb['year'], genres=imdb['genres'], poster=imdb['poster'], plot=imdb['plot'], rating=imdb['rating'], url=imdb['url'], **locals() ) cap+="\n\n📚 Requested Files 👇\n\n" for file in files: cap += f"📁 [{get_size(file.file_size)}] {' '.join(filter(lambda x: not x.startswith('[') and not x.startswith('@') and not x.startswith('www.'), file.file_name.split()))}\n\n" else: cap = f"Hᴇʏ {query.from_user.mention}, Fᴏᴜɴᴅ {total_results} Rᴇsᴜʟᴛs ғᴏʀ Yᴏᴜʀ Qᴜᴇʀʏ {search}\n\n" cap+="📚 Requested Files 👇\n\n" for file in files: cap += f"📁 [{get_size(file.file_size)}] {' '.join(filter(lambda x: not x.startswith('[') and not x.startswith('@') and not x.startswith('www.'), file.file_name.split()))}\n\n" else: cap = f"Hᴇʏ {query.from_user.mention}, Fᴏᴜɴᴅ {total_results} Rᴇsᴜʟᴛs ғᴏʀ Yᴏᴜʀ Qᴜᴇʀʏ {search}\n\n" cap+="📚 Requested Files 👇\n\n" for file in files: cap += f"📁 [{get_size(file.file_size)}] {' '.join(filter(lambda x: not x.startswith('[') and not x.startswith('@') and not x.startswith('www.'), file.file_name.split()))}\n\n" return cap async def get_shortlink(chat_id, link, second=False): settings = await get_settings(chat_id) #fetching settings for group if 'shortlink' in settings.keys(): URL = settings['shortlink'] API = settings['shortlink_api'] else: URL = SHORTLINK_URL API = SHORTLINK_API if URL.startswith("shorturllink") or URL.startswith("terabox.in") or URL.startswith("urlshorten.in") or second: URL = SECOND_SHORTLINK_URL API = SECOND_SHORTLINK_API if URL == "api.shareus.io": # method 1: # https = link.split(":")[0] #splitting https or http from link # if "http" == https: #if https == "http": # https = "https" # link = link.replace("http", https) #replacing http to https # conn = http.client.HTTPSConnection("api.shareus.io") # payload = json.dumps({ # "api_key": "4c1YTBacB6PTuwogBiEIFvZN5TI3", # "monetization": True, # "destination": link, # "ad_page": 3, # "category": "Entertainment", # "tags": ["trendinglinks"], # "monetize_with_money": False, # "price": 0, # "currency": "INR", # "purchase_note":"" # }) # headers = { # 'Keep-Alive': '', # 'Content-Type': 'application/json' # } # conn.request("POST", "/generate_link", payload, headers) # res = conn.getresponse() # data = res.read().decode("utf-8") # parsed_data = json.loads(data) # if parsed_data["status"] == "success": # return parsed_data["link"] #method 2 url = f'https://{URL}/easy_api' params = { "key": API, "link": link, } try: async with aiohttp.ClientSession() as session: async with session.get(url, params=params, raise_for_status=True, ssl=False) as response: data = await response.text() return data except Exception as e: logger.error(e) return link else: shortzy = Shortzy(api_key=API, base_site=URL) try: link = await shortzy.convert(link) except: return await get_shortlink(chat_id, link, second=True) return link # async def get_shortlink(chat_id, link, second=False): # if not second: # settings = await get_settings(chat_id) #fetching settings for group # if 'shortlink' in settings.keys(): # URL = settings['shortlink'] # API = settings['shortlink_api'] # else: # URL = SHORTLINK_URL # API = SHORTLINK_API # if URL.startswith("shorturllink"): # URL = SECOND_SHORTLINK_URL # API = SECOND_SHORTLINK_API # # if 'shortlink_api' in settings.keys(): # # API = settings['shortlink_api'] # # elif URL.startswith("shorturllink"): # # URL = SECOND_SHORTLINK_URL # # else: # # API = SHORTLINK_API # https = link.split(":")[0] #splitting https or http from link # if "http" == https: #if https == "http": # https = "https" # link = link.replace("http", https) #replacing http to https # if URL == "api.shareus.in": # url = f'https://{URL}/shortLink' # params = { # "token": API, # "format": "json", # "link": link, # } # try: # async with aiohttp.ClientSession() as session: # async with session.get(url, params=params, raise_for_status=True, ssl=False) as response: # data = await response.json(content_type="text/html") # if data["status"] == "success": # return data["shortlink"] # else: # logger.error(f"Error: {data['message']}") # return f'https://{URL}/shortLink?token={API}&format=json&link={link}' # except Exception as e: # logger.error(e) # return f'https://{URL}/shortLink?token={API}&format=json&link={link}' # else: # url = f'https://{URL}/api' # params = { # "api": API, # "url": link, # } # try: # async with aiohttp.ClientSession() as session: # async with session.get(url, params=params, raise_for_status=True, ssl=False) as response: # data = await response.json() # if data["status"] == "success": # return data["shortenedUrl"] # else: # logger.error(f"Error: {data['message']}") # if URL == 'clicksfly.com': # return f'https://{URL}/api?api={API}&url={link}' # else: # return f'https://{URL}/api?api={API}&link={link}' # except Exception as e: # SECOND_SHORTENER[chat_id] = URL # logger.error(e) # await get_shortlink(chat_id, link, second=True) # # return f'https://{URL}/api?api={API}&link={link}' # else: # if SECOND_SHORTENER.get(chat_id).startswith('shorturllink'): # URL = SECOND_SHORTLINK_URL # API = SECOND_SHORTLINK_API # else: # URL = SHORTLINK_URL # API = SHORTLINK_API # url = f'https://{URL}/api' # params = { # "api": API, # "url": link, # } # try: # async with aiohttp.ClientSession() as session: # async with session.get(url, params=params, raise_for_status=True, ssl=False) as response: # data = await response.json() # if data["status"] == "success": # return data["shortenedUrl"] # else: # logger.error(f"Error: {data['message']}") # return f'https://{URL}/api?api={API}&link={link}' # except Exception as e: # logger.error(e) # return f'https://{URL}/api?api={API}&link={link}' # # settings = await get_settings(chat_id) #fetching settings for group # # if 'shortlink' in settings.keys(): # # URL = settings['shortlink'] # # API = settings['shortlink_api'] # # else: # # URL = SHORTLINK_URL # # API = SHORTLINK_API # # if URL.startswith("shorturllink"): # # URL = SECOND_SHORTLINK_URL # # API = SECOND_SHORTLINK_API # # # url = settings['url'] # # # api = settings['api'] # # shortzy = Shortzy(api_key=API, base_site=URL) # # link = await shortzy.convert(link) # # return link async def second_shortlink(url, link): if "shrs" in url: URL = SECOND_SHORTLINK_URL API = SECOND_SHORTLINK_API else: URL = SHORTLINK_URL API = SHORTLINK_API if URL == "api.shareus.io": url = f'https://{URL}/easy_api' params = { "key": API, "link": link, } try: async with aiohttp.ClientSession() as session: async with session.get(url, params=params, raise_for_status=True, ssl=False) as response: data = await response.text() return data except Exception as e: logger.error(e) else: shortzy = Shortzy(api_key=API, base_site=URL) try: link = await shortzy.convert(link) except: return await second_shortlink("mplaylink", link) return link async def get_tutorial(chat_id): settings = await get_settings(chat_id) #fetching settings for group if 'tutorial' in settings.keys(): if settings['is_tutorial']: TUTORIAL_URL = settings['tutorial'] else: TUTORIAL_URL = TUTORIAL else: TUTORIAL_URL = TUTORIAL return TUTORIAL_URL async def get_verify_shorted_link(link): API = SHORTLINK_API URL = SHORTLINK_URL https = link.split(":")[0] if "http" == https: https = "https" link = link.replace("http", https) if URL == "api.shareus.in": url = f"https://{URL}/shortLink" params = {"token": API, "format": "json", "link": link, } try: async with aiohttp.ClientSession() as session: async with session.get(url, params=params, raise_for_status=True, ssl=False) as response: data = await response.json(content_type="text/html") if data["status"] == "success": return data["shortlink"] else: logger.error(f"Error: {data['message']}") return f'https://{URL}/shortLink?token={API}&format=json&link={link}' except Exception as e: logger.error(e) return f'https://{URL}/shortLink?token={API}&format=json&link={link}' else: url = f'https://{URL}/api' params = {'api': API, 'url': link, } try: async with aiohttp.ClientSession() as session: async with session.get(url, params=params, raise_for_status=True, ssl=False) as response: data = await response.json() if data["status"] == "success": return data['shortenedUrl'] else: logger.error(f"Error: {data['message']}") return f'https://{URL}/api?api={API}&link={link}' except Exception as e: logger.error(e) return f'{URL}/api?api={API}&link={link}' async def check_token(bot, userid, token): user = await bot.get_users(userid) if not await db.is_user_exist(user.id): await db.add_user(user.id, user.first_name) await bot.send_message(LOG_CHANNEL, script.LOG_TEXT_P.format(user.id, user.mention)) if user.id in TOKENS.keys(): TKN = TOKENS[user.id] if token in TKN.keys(): is_used = TKN[token] if is_used == True: return False else: return True else: return False async def get_token(bot, userid, link): user = await bot.get_users(userid) if not await db.is_user_exist(user.id): await db.add_user(user.id, user.first_name) await bot.send_message(LOG_CHANNEL, script.LOG_TEXT_P.format(user.id, user.mention)) token = ''.join(random.choices(string.ascii_letters + string.digits, k=7)) TOKENS[user.id] = {token: False} link = f"{link}verify-{user.id}-{token}" shortened_verify_url = await get_verify_shorted_link(link) return str(shortened_verify_url) async def verify_user(bot, userid, token): user = await bot.get_users(userid) if not await db.is_user_exist(user.id): await db.add_user(user.id, user.first_name) await bot.send_message(LOG_CHANNEL, script.LOG_TEXT_P.format(user.id, user.mention)) TOKENS[user.id] = {token: True} tz = pytz.timezone('Asia/Kolkata') today = date.today() VERIFIED[user.id] = str(today) async def check_verification(bot, userid): user = await bot.get_users(userid) if not await db.is_user_exist(user.id): await db.add_user(user.id, user.first_name) await bot.send_message(LOG_CHANNEL, script.LOG_TEXT_P.format(user.id, user.mention)) tz = pytz.timezone('Asia/Kolkata') today = date.today() if user.id in VERIFIED.keys(): EXP = VERIFIED[user.id] years, month, day = EXP.split('-') comp = date(int(years), int(month), int(day)) if compHᴇʏ ᴛʜᴇʀᴇ {user_name} 👋🏽 \n\n✅ Sᴇᴄᴜʀᴇ ʟɪɴᴋ ᴛᴏ ʏᴏᴜʀ ғɪʟᴇ ʜᴀs sᴜᴄᴄᴇssғᴜʟʟʏ ʙᴇᴇɴ ɢᴇɴᴇʀᴀᴛᴇᴅ ᴘʟᴇᴀsᴇ ᴄʟɪᴄᴋ ᴅᴏᴡɴʟᴏᴀᴅ ʙᴜᴛᴛᴏɴ\n\n🗃️ Fɪʟᴇ Nᴀᴍᴇ : {title}\n🔖 Fɪʟᴇ Sɪᴢᴇ : {size}", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("📤 Dᴏᴡɴʟᴏᴀᴅ 📥", url=await get_shortlink(chat_id, f"https://telegram.me/{temp.U_NAME}?start=files_{file.file_id}"))]])) else: for file in files: f_caption = file.caption title = file.file_name size = get_size(file.file_size) if CUSTOM_FILE_CAPTION: try: f_caption = CUSTOM_FILE_CAPTION.format(file_name='' if title is None else title, file_size='' if size is None else size, file_caption='' if f_caption is None else f_caption) except Exception as e: print(e) f_caption = f_caption if f_caption is None: f_caption = f"{title}" await bot.send_cached_media( chat_id=userid, file_id=file.file_id, caption=f_caption, protect_content=True if ident == "filep" else False, reply_markup=InlineKeyboardMarkup( [ [ InlineKeyboardButton('Sᴜᴘᴘᴏʀᴛ Gʀᴏᴜᴘ', url=GRP_LNK), InlineKeyboardButton('Uᴘᴅᴀᴛᴇs Cʜᴀɴɴᴇʟ', url=CHNL_LNK) ],[ InlineKeyboardButton("Bᴏᴛ Oᴡɴᴇʀ", url="t.me/Kgashok04") ] ] ) ) except UserIsBlocked: await query.answer('Uɴʙʟᴏᴄᴋ ᴛʜᴇ ʙᴏᴛ ᴍᴀʜɴ !', show_alert=True) except PeerIdInvalid: await query.answer('Hᴇʏ, Sᴛᴀʀᴛ Bᴏᴛ Fɪʀsᴛ Aɴᴅ Cʟɪᴄᴋ Sᴇɴᴅ Aʟʟ', show_alert=True) except Exception as e: await query.answer('Hᴇʏ, Sᴛᴀʀᴛ Bᴏᴛ Fɪʀsᴛ Aɴᴅ Cʟɪᴄᴋ Sᴇɴᴅ Aʟʟ', show_alert=True) def get_media_from_message(message: "Message"): media_types = ( "audio", "document", "photo", "sticker", "animation", "video", "voice", "video_note", ) for attr in media_types: if media := getattr(message, attr, None): return media def get_name(media_msg: Message) -> str: media = get_media_from_message(media_msg) return getattr(media, "file_name", "None") def get_hash(media_msg: Message) -> str: media = get_media_from_message(media_msg) return getattr(media, "file_unique_id", "")[:6] def addBracketInYear(name): year = [] for i in range(1900, 2025): string = str(i); year.append(string) name_list = name.split(" ") for x in range(1, len(name_list)): if name_list[x] in year: name_list[x] = "(" + name_list[x] + ")" name = " " return name.join(name_list) def removeYearInName(name): nyear = list(range(1900, 2025)) years = ["(" + str(nyear) + ")" for nyear in nyear] names = name.split(" ") for i in range(len(names) - 1, -1, -1): # Iterate in reverse order if names[i] in years and i!=0: names.pop(i) # Remove the year from the list break return " ".join(names)