Spaces:
Paused
Paused
import logging | |
from pyrogram.errors import InputUserDeactivated, UserNotParticipant, FloodWait, UserIsBlocked, PeerIdInvalid | |
from info import AUTH_CHANNEL, LONG_IMDB_DESCRIPTION, MAX_LIST_ELM, SHORTLINK_URL, SHORTLINK_API, IS_SHORTLINK, LOG_CHANNEL, TUTORIAL, GRP_LNK, CHNL_LNK, CUSTOM_FILE_CAPTION, SECOND_SHORTLINK_URL, SECOND_SHORTLINK_API, STREAM_URL, STREAM_BIN, IS_STREAM | |
from imdb import Cinemagoer | |
import asyncio | |
from pyrogram.types import Message, InlineKeyboardButton, InlineKeyboardMarkup | |
from pyrogram.errors import FloodWait, UserIsBlocked, MessageNotModified, PeerIdInvalid | |
from pyrogram import enums | |
from typing import Union | |
from Script import script | |
import pytz | |
import random | |
import re | |
import os | |
from datetime import datetime, date | |
import string | |
from typing import List | |
from database.users_chats_db import db | |
from bs4 import BeautifulSoup | |
import requests | |
import aiohttp | |
from shortzy import Shortzy | |
import http.client | |
import json | |
from urllib.parse import quote_plus | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.INFO) | |
BTN_URL_REGEX = re.compile( | |
r"(\[([^\[]+?)\]\((buttonurl|buttonalert):(?:/{0,2})(.+?)(:same)?\))" | |
) | |
imdb = Cinemagoer() | |
TOKENS = {} | |
VERIFIED = {} | |
BANNED = {} | |
SECOND_SHORTENER = {} | |
SMART_OPEN = '“' | |
SMART_CLOSE = '”' | |
START_CHAR = ('\'', '"', SMART_OPEN) | |
# temp db for banned | |
class temp(object): | |
BANNED_USERS = [] | |
BANNED_CHATS = [] | |
ME = None | |
CURRENT=int(os.environ.get("SKIP", 2)) | |
CANCEL = False | |
MELCOW = {} | |
U_NAME = None | |
B_NAME = None | |
GETALL = {} | |
SHORT = {} | |
SETTINGS = {} | |
IMDB_CAP = {} | |
async def is_subscribed(bot, query): | |
try: | |
user = await bot.get_chat_member(AUTH_CHANNEL, query.from_user.id) | |
except UserNotParticipant: | |
pass | |
except Exception as e: | |
logger.exception(e) | |
else: | |
if user.status != enums.ChatMemberStatus.BANNED: | |
return True | |
return False | |
async def get_poster(query, bulk=False, id=False, file=None): | |
if not id: | |
# https://t.me/GetTGLink/4183 | |
query = (query.strip()).lower() | |
title = query | |
year = re.findall(r'[1-2]\d{3}$', query, re.IGNORECASE) | |
if year: | |
year = list_to_str(year[:1]) | |
title = (query.replace(year, "")).strip() | |
elif file is not None: | |
year = re.findall(r'[1-2]\d{3}', file, re.IGNORECASE) | |
if year: | |
year = list_to_str(year[:1]) | |
else: | |
year = None | |
movieid = imdb.search_movie(title.lower(), results=10) | |
if not movieid: | |
return None | |
if year: | |
filtered=list(filter(lambda k: str(k.get('year')) == str(year), movieid)) | |
if not filtered: | |
filtered = movieid | |
else: | |
filtered = movieid | |
movieid=list(filter(lambda k: k.get('kind') in ['movie', 'tv series'], filtered)) | |
if not movieid: | |
movieid = filtered | |
if bulk: | |
return movieid | |
movieid = movieid[0].movieID | |
else: | |
movieid = query | |
movie = imdb.get_movie(movieid) | |
if movie.get("original air date"): | |
date = movie["original air date"] | |
elif movie.get("year"): | |
date = movie.get("year") | |
else: | |
date = "N/A" | |
plot = "" | |
if not LONG_IMDB_DESCRIPTION: | |
plot = movie.get('plot') | |
if plot and len(plot) > 0: | |
plot = plot[0] | |
else: | |
plot = movie.get('plot outline') | |
if plot and len(plot) > 800: | |
plot = plot[0:800] + "..." | |
return { | |
'title': movie.get('title'), | |
'votes': movie.get('votes'), | |
"aka": list_to_str(movie.get("akas")), | |
"seasons": movie.get("number of seasons"), | |
"box_office": movie.get('box office'), | |
'localized_title': movie.get('localized title'), | |
'kind': movie.get("kind"), | |
"imdb_id": f"tt{movie.get('imdbID')}", | |
"cast": list_to_str(movie.get("cast")), | |
"runtime": list_to_str(movie.get("runtimes")), | |
"countries": list_to_str(movie.get("countries")), | |
"certificates": list_to_str(movie.get("certificates")), | |
"languages": list_to_str(movie.get("languages")), | |
"director": list_to_str(movie.get("director")), | |
"writer":list_to_str(movie.get("writer")), | |
"producer":list_to_str(movie.get("producer")), | |
"composer":list_to_str(movie.get("composer")) , | |
"cinematographer":list_to_str(movie.get("cinematographer")), | |
"music_team": list_to_str(movie.get("music department")), | |
"distributors": list_to_str(movie.get("distributors")), | |
'release_date': date, | |
'year': movie.get('year'), | |
'genres': list_to_str(movie.get("genres")), | |
'poster': movie.get('full-size cover url'), | |
'plot': plot, | |
'rating': str(movie.get("rating")), | |
'url':f'https://www.imdb.com/title/tt{movieid}' | |
} | |
# https://github.com/odysseusmax/animated-lamp/blob/2ef4730eb2b5f0596ed6d03e7b05243d93e3415b/bot/utils/broadcast.py#L37 | |
async def broadcast_messages(user_id, message): | |
try: | |
await message.copy(chat_id=user_id) | |
return True, "Success" | |
except FloodWait as e: | |
await asyncio.sleep(e.x) | |
return await broadcast_messages(user_id, message) | |
except InputUserDeactivated: | |
await db.delete_user(int(user_id)) | |
logging.info(f"{user_id}-Removed from Database, since deleted account.") | |
return False, "Deleted" | |
except UserIsBlocked: | |
logging.info(f"{user_id} -Blocked the bot.") | |
return False, "Blocked" | |
except PeerIdInvalid: | |
await db.delete_user(int(user_id)) | |
logging.info(f"{user_id} - PeerIdInvalid") | |
return False, "Error" | |
except Exception as e: | |
return False, "Error" | |
async def broadcast_messages_group(chat_id, message): | |
try: | |
kd = await message.copy(chat_id=chat_id) | |
try: | |
await kd.pin() | |
except: | |
pass | |
return True, "Success" | |
except FloodWait as e: | |
await asyncio.sleep(e.x) | |
return await broadcast_messages_group(chat_id, message) | |
except Exception as e: | |
return False, "Error" | |
async def search_gagala(text): | |
usr_agent = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' | |
'Chrome/61.0.3163.100 Safari/537.36' | |
} | |
text = text.replace(" ", '+') | |
url = f'https://www.google.com/search?q={text}' | |
response = requests.get(url, headers=usr_agent) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, 'html.parser') | |
titles = soup.find_all( 'h3' ) | |
return [title.getText() for title in titles] | |
async def get_settings(group_id): | |
settings = temp.SETTINGS.get(group_id) | |
if not settings: | |
settings = await db.get_settings(group_id) | |
temp.SETTINGS[group_id] = settings | |
return settings | |
async def save_group_settings(group_id, key, value): | |
current = await get_settings(group_id) | |
current[key] = value | |
temp.SETTINGS[group_id] = current | |
await db.update_settings(group_id, current) | |
def get_size(size): | |
"""Get size in readable format""" | |
units = ["Bytes", "KB", "MB", "GB", "TB", "PB", "EB"] | |
size = float(size) | |
i = 0 | |
while size >= 1024.0 and i < len(units): | |
i += 1 | |
size /= 1024.0 | |
return "%.2f %s" % (size, units[i]) | |
def split_list(l, n): | |
for i in range(0, len(l), n): | |
yield l[i:i + n] | |
def get_file_id(msg: Message): | |
if msg.media: | |
for message_type in ( | |
"photo", | |
"animation", | |
"audio", | |
"document", | |
"video", | |
"video_note", | |
"voice", | |
"sticker" | |
): | |
obj = getattr(msg, message_type) | |
if obj: | |
setattr(obj, "message_type", message_type) | |
return obj | |
def extract_user(message: Message) -> Union[int, str]: | |
"""extracts the user from a message""" | |
# https://github.com/SpEcHiDe/PyroGramBot/blob/f30e2cca12002121bad1982f68cd0ff9814ce027/pyrobot/helper_functions/extract_user.py#L7 | |
user_id = None | |
user_first_name = None | |
if message.reply_to_message: | |
user_id = message.reply_to_message.from_user.id | |
user_first_name = message.reply_to_message.from_user.first_name | |
elif len(message.command) > 1: | |
if ( | |
len(message.entities) > 1 and | |
message.entities[1].type == enums.MessageEntityType.TEXT_MENTION | |
): | |
required_entity = message.entities[1] | |
user_id = required_entity.user.id | |
user_first_name = required_entity.user.first_name | |
else: | |
user_id = message.command[1] | |
# don't want to make a request -_- | |
user_first_name = user_id | |
try: | |
user_id = int(user_id) | |
except ValueError: | |
pass | |
else: | |
user_id = message.from_user.id | |
user_first_name = message.from_user.first_name | |
return (user_id, user_first_name) | |
def list_to_str(k): | |
if not k: | |
return "N/A" | |
elif len(k) == 1: | |
return str(k[0]) | |
elif MAX_LIST_ELM: | |
k = k[:int(MAX_LIST_ELM)] | |
return ' '.join(f'{elem}, ' for elem in k) | |
else: | |
return ' '.join(f'{elem}, ' for elem in k) | |
def last_online(from_user): | |
time = "" | |
if from_user.is_bot: | |
time += "🤖 Bot :(" | |
elif from_user.status == enums.UserStatus.RECENTLY: | |
time += "Recently" | |
elif from_user.status == enums.UserStatus.LAST_WEEK: | |
time += "Within the last week" | |
elif from_user.status == enums.UserStatus.LAST_MONTH: | |
time += "Within the last month" | |
elif from_user.status == enums.UserStatus.LONG_AGO: | |
time += "A long time ago :(" | |
elif from_user.status == enums.UserStatus.ONLINE: | |
time += "Currently Online" | |
elif from_user.status == enums.UserStatus.OFFLINE: | |
time += from_user.last_online_date.strftime("%a, %d %b %Y, %H:%M:%S") | |
return time | |
def split_quotes(text: str) -> List: | |
if not any(text.startswith(char) for char in START_CHAR): | |
return text.split(None, 1) | |
counter = 1 # ignore first char -> is some kind of quote | |
while counter < len(text): | |
if text[counter] == "\\": | |
counter += 1 | |
elif text[counter] == text[0] or (text[0] == SMART_OPEN and text[counter] == SMART_CLOSE): | |
break | |
counter += 1 | |
else: | |
return text.split(None, 1) | |
# 1 to avoid starting quote, and counter is exclusive so avoids ending | |
key = remove_escapes(text[1:counter].strip()) | |
# index will be in range, or `else` would have been executed and returned | |
rest = text[counter + 1:].strip() | |
if not key: | |
key = text[0] + text[0] | |
return list(filter(None, [key, rest])) | |
def gfilterparser(text, keyword): | |
if "buttonalert" in text: | |
text = (text.replace("\n", "\\n").replace("\t", "\\t")) | |
buttons = [] | |
note_data = "" | |
prev = 0 | |
i = 0 | |
alerts = [] | |
for match in BTN_URL_REGEX.finditer(text): | |
# Check if btnurl is escaped | |
n_escapes = 0 | |
to_check = match.start(1) - 1 | |
while to_check > 0 and text[to_check] == "\\": | |
n_escapes += 1 | |
to_check -= 1 | |
# if even, not escaped -> create button | |
if n_escapes % 2 == 0: | |
note_data += text[prev:match.start(1)] | |
prev = match.end(1) | |
if match.group(3) == "buttonalert": | |
# create a thruple with button label, url, and newline status | |
if bool(match.group(5)) and buttons: | |
buttons[-1].append(InlineKeyboardButton( | |
text=match.group(2), | |
callback_data=f"gfilteralert:{i}:{keyword}" | |
)) | |
else: | |
buttons.append([InlineKeyboardButton( | |
text=match.group(2), | |
callback_data=f"gfilteralert:{i}:{keyword}" | |
)]) | |
i += 1 | |
alerts.append(match.group(4)) | |
elif bool(match.group(5)) and buttons: | |
buttons[-1].append(InlineKeyboardButton( | |
text=match.group(2), | |
url=match.group(4).replace(" ", "") | |
)) | |
else: | |
buttons.append([InlineKeyboardButton( | |
text=match.group(2), | |
url=match.group(4).replace(" ", "") | |
)]) | |
else: | |
note_data += text[prev:to_check] | |
prev = match.start(1) - 1 | |
else: | |
note_data += text[prev:] | |
try: | |
return note_data, buttons, alerts | |
except: | |
return note_data, buttons, None | |
def parser(text, keyword): | |
if "buttonalert" in text: | |
text = (text.replace("\n", "\\n").replace("\t", "\\t")) | |
buttons = [] | |
note_data = "" | |
prev = 0 | |
i = 0 | |
alerts = [] | |
for match in BTN_URL_REGEX.finditer(text): | |
# Check if btnurl is escaped | |
n_escapes = 0 | |
to_check = match.start(1) - 1 | |
while to_check > 0 and text[to_check] == "\\": | |
n_escapes += 1 | |
to_check -= 1 | |
# if even, not escaped -> create button | |
if n_escapes % 2 == 0: | |
note_data += text[prev:match.start(1)] | |
prev = match.end(1) | |
if match.group(3) == "buttonalert": | |
# create a thruple with button label, url, and newline status | |
if bool(match.group(5)) and buttons: | |
buttons[-1].append(InlineKeyboardButton( | |
text=match.group(2), | |
callback_data=f"alertmessage:{i}:{keyword}" | |
)) | |
else: | |
buttons.append([InlineKeyboardButton( | |
text=match.group(2), | |
callback_data=f"alertmessage:{i}:{keyword}" | |
)]) | |
i += 1 | |
alerts.append(match.group(4)) | |
elif bool(match.group(5)) and buttons: | |
buttons[-1].append(InlineKeyboardButton( | |
text=match.group(2), | |
url=match.group(4).replace(" ", "") | |
)) | |
else: | |
buttons.append([InlineKeyboardButton( | |
text=match.group(2), | |
url=match.group(4).replace(" ", "") | |
)]) | |
else: | |
note_data += text[prev:to_check] | |
prev = match.start(1) - 1 | |
else: | |
note_data += text[prev:] | |
try: | |
return note_data, buttons, alerts | |
except: | |
return note_data, buttons, None | |
def remove_escapes(text: str) -> str: | |
res = "" | |
is_escaped = False | |
for counter in range(len(text)): | |
if is_escaped: | |
res += text[counter] | |
is_escaped = False | |
elif text[counter] == "\\": | |
is_escaped = True | |
else: | |
res += text[counter] | |
return res | |
def humanbytes(size): | |
if not size: | |
return "" | |
power = 2**10 | |
n = 0 | |
Dic_powerN = {0: ' ', 1: 'Ki', 2: 'Mi', 3: 'Gi', 4: 'Ti'} | |
while size > power: | |
size /= power | |
n += 1 | |
return str(round(size, 2)) + " " + Dic_powerN[n] + 'B' | |
async def get_cap(settings, remaining_seconds, files, query, total_results, search): | |
if settings["imdb"]: | |
IMDB_CAP = temp.IMDB_CAP.get(query.from_user.id) | |
if IMDB_CAP: | |
cap = IMDB_CAP | |
cap+="<b>\n\n<u>📚 Requested Files 👇</u></b>\n\n" | |
for file in files: | |
cap += f"<b>📁 <a href='https://telegram.me/{temp.U_NAME}?start=files_{file.file_id}'>[{get_size(file.file_size)}] {' '.join(filter(lambda x: not x.startswith('[') and not x.startswith('@') and not x.startswith('www.'), file.file_name.split()))}\n\n</a></b>" | |
else: | |
imdb = await get_poster(search, file=(files[0]).file_name) if settings["imdb"] else None | |
if imdb: | |
TEMPLATE = script.IMDB_TEMPLATE_TXT | |
cap = TEMPLATE.format( | |
qurey=search, | |
title=imdb['title'], | |
votes=imdb['votes'], | |
aka=imdb["aka"], | |
seasons=imdb["seasons"], | |
box_office=imdb['box_office'], | |
localized_title=imdb['localized_title'], | |
kind=imdb['kind'], | |
imdb_id=imdb["imdb_id"], | |
cast=imdb["cast"], | |
runtime=imdb["runtime"], | |
countries=imdb["countries"], | |
certificates=imdb["certificates"], | |
languages=imdb["languages"], | |
director=imdb["director"], | |
writer=imdb["writer"], | |
producer=imdb["producer"], | |
composer=imdb["composer"], | |
cinematographer=imdb["cinematographer"], | |
music_team=imdb["music_team"], | |
distributors=imdb["distributors"], | |
release_date=imdb['release_date'], | |
year=imdb['year'], | |
genres=imdb['genres'], | |
poster=imdb['poster'], | |
plot=imdb['plot'], | |
rating=imdb['rating'], | |
url=imdb['url'], | |
**locals() | |
) | |
cap+="<b>\n\n<u>📚 Requested Files 👇</u></b>\n\n" | |
for file in files: | |
cap += f"<b>📁 <a href='https://telegram.me/{temp.U_NAME}?start=files_{file.file_id}'>[{get_size(file.file_size)}] {' '.join(filter(lambda x: not x.startswith('[') and not x.startswith('@') and not x.startswith('www.'), file.file_name.split()))}\n\n</a></b>" | |
else: | |
cap = f"<b>Hᴇʏ {query.from_user.mention}, Fᴏᴜɴᴅ {total_results} Rᴇsᴜʟᴛs ғᴏʀ Yᴏᴜʀ Qᴜᴇʀʏ {search}\n\n</b>" | |
cap+="<b><u>📚 Requested Files 👇</u></b>\n\n" | |
for file in files: | |
cap += f"<b>📁 <a href='https://telegram.me/{temp.U_NAME}?start=files_{file.file_id}'>[{get_size(file.file_size)}] {' '.join(filter(lambda x: not x.startswith('[') and not x.startswith('@') and not x.startswith('www.'), file.file_name.split()))}\n\n</a></b>" | |
else: | |
cap = f"<b>Hᴇʏ {query.from_user.mention}, Fᴏᴜɴᴅ {total_results} Rᴇsᴜʟᴛs ғᴏʀ Yᴏᴜʀ Qᴜᴇʀʏ {search}\n\n</b>" | |
cap+="<b><u>📚 Requested Files 👇</u></b>\n\n" | |
for file in files: | |
cap += f"<b>📁 <a href='https://telegram.me/{temp.U_NAME}?start=files_{file.file_id}'>[{get_size(file.file_size)}] {' '.join(filter(lambda x: not x.startswith('[') and not x.startswith('@') and not x.startswith('www.'), file.file_name.split()))}\n\n</a></b>" | |
return cap | |
async def get_shortlink(chat_id, link, second=False): | |
settings = await get_settings(chat_id) #fetching settings for group | |
if 'shortlink' in settings.keys(): | |
URL = settings['shortlink'] | |
API = settings['shortlink_api'] | |
else: | |
URL = SHORTLINK_URL | |
API = SHORTLINK_API | |
if URL.startswith("shorturllink") or URL.startswith("terabox.in") or URL.startswith("urlshorten.in") or second: | |
URL = SECOND_SHORTLINK_URL | |
API = SECOND_SHORTLINK_API | |
if URL == "api.shareus.io": | |
# method 1: | |
# https = link.split(":")[0] #splitting https or http from link | |
# if "http" == https: #if https == "http": | |
# https = "https" | |
# link = link.replace("http", https) #replacing http to https | |
# conn = http.client.HTTPSConnection("api.shareus.io") | |
# payload = json.dumps({ | |
# "api_key": "4c1YTBacB6PTuwogBiEIFvZN5TI3", | |
# "monetization": True, | |
# "destination": link, | |
# "ad_page": 3, | |
# "category": "Entertainment", | |
# "tags": ["trendinglinks"], | |
# "monetize_with_money": False, | |
# "price": 0, | |
# "currency": "INR", | |
# "purchase_note":"" | |
# }) | |
# headers = { | |
# 'Keep-Alive': '', | |
# 'Content-Type': 'application/json' | |
# } | |
# conn.request("POST", "/generate_link", payload, headers) | |
# res = conn.getresponse() | |
# data = res.read().decode("utf-8") | |
# parsed_data = json.loads(data) | |
# if parsed_data["status"] == "success": | |
# return parsed_data["link"] | |
#method 2 | |
url = f'https://{URL}/easy_api' | |
params = { | |
"key": API, | |
"link": link, | |
} | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url, params=params, raise_for_status=True, ssl=False) as response: | |
data = await response.text() | |
return data | |
except Exception as e: | |
logger.error(e) | |
return link | |
else: | |
shortzy = Shortzy(api_key=API, base_site=URL) | |
try: | |
link = await shortzy.convert(link) | |
except: | |
return await get_shortlink(chat_id, link, second=True) | |
return link | |
# async def get_shortlink(chat_id, link, second=False): | |
# if not second: | |
# settings = await get_settings(chat_id) #fetching settings for group | |
# if 'shortlink' in settings.keys(): | |
# URL = settings['shortlink'] | |
# API = settings['shortlink_api'] | |
# else: | |
# URL = SHORTLINK_URL | |
# API = SHORTLINK_API | |
# if URL.startswith("shorturllink"): | |
# URL = SECOND_SHORTLINK_URL | |
# API = SECOND_SHORTLINK_API | |
# # if 'shortlink_api' in settings.keys(): | |
# # API = settings['shortlink_api'] | |
# # elif URL.startswith("shorturllink"): | |
# # URL = SECOND_SHORTLINK_URL | |
# # else: | |
# # API = SHORTLINK_API | |
# https = link.split(":")[0] #splitting https or http from link | |
# if "http" == https: #if https == "http": | |
# https = "https" | |
# link = link.replace("http", https) #replacing http to https | |
# if URL == "api.shareus.in": | |
# url = f'https://{URL}/shortLink' | |
# params = { | |
# "token": API, | |
# "format": "json", | |
# "link": link, | |
# } | |
# try: | |
# async with aiohttp.ClientSession() as session: | |
# async with session.get(url, params=params, raise_for_status=True, ssl=False) as response: | |
# data = await response.json(content_type="text/html") | |
# if data["status"] == "success": | |
# return data["shortlink"] | |
# else: | |
# logger.error(f"Error: {data['message']}") | |
# return f'https://{URL}/shortLink?token={API}&format=json&link={link}' | |
# except Exception as e: | |
# logger.error(e) | |
# return f'https://{URL}/shortLink?token={API}&format=json&link={link}' | |
# else: | |
# url = f'https://{URL}/api' | |
# params = { | |
# "api": API, | |
# "url": link, | |
# } | |
# try: | |
# async with aiohttp.ClientSession() as session: | |
# async with session.get(url, params=params, raise_for_status=True, ssl=False) as response: | |
# data = await response.json() | |
# if data["status"] == "success": | |
# return data["shortenedUrl"] | |
# else: | |
# logger.error(f"Error: {data['message']}") | |
# if URL == 'clicksfly.com': | |
# return f'https://{URL}/api?api={API}&url={link}' | |
# else: | |
# return f'https://{URL}/api?api={API}&link={link}' | |
# except Exception as e: | |
# SECOND_SHORTENER[chat_id] = URL | |
# logger.error(e) | |
# await get_shortlink(chat_id, link, second=True) | |
# # return f'https://{URL}/api?api={API}&link={link}' | |
# else: | |
# if SECOND_SHORTENER.get(chat_id).startswith('shorturllink'): | |
# URL = SECOND_SHORTLINK_URL | |
# API = SECOND_SHORTLINK_API | |
# else: | |
# URL = SHORTLINK_URL | |
# API = SHORTLINK_API | |
# url = f'https://{URL}/api' | |
# params = { | |
# "api": API, | |
# "url": link, | |
# } | |
# try: | |
# async with aiohttp.ClientSession() as session: | |
# async with session.get(url, params=params, raise_for_status=True, ssl=False) as response: | |
# data = await response.json() | |
# if data["status"] == "success": | |
# return data["shortenedUrl"] | |
# else: | |
# logger.error(f"Error: {data['message']}") | |
# return f'https://{URL}/api?api={API}&link={link}' | |
# except Exception as e: | |
# logger.error(e) | |
# return f'https://{URL}/api?api={API}&link={link}' | |
# # settings = await get_settings(chat_id) #fetching settings for group | |
# # if 'shortlink' in settings.keys(): | |
# # URL = settings['shortlink'] | |
# # API = settings['shortlink_api'] | |
# # else: | |
# # URL = SHORTLINK_URL | |
# # API = SHORTLINK_API | |
# # if URL.startswith("shorturllink"): | |
# # URL = SECOND_SHORTLINK_URL | |
# # API = SECOND_SHORTLINK_API | |
# # # url = settings['url'] | |
# # # api = settings['api'] | |
# # shortzy = Shortzy(api_key=API, base_site=URL) | |
# # link = await shortzy.convert(link) | |
# # return link | |
async def second_shortlink(url, link): | |
if "shrs" in url: | |
URL = SECOND_SHORTLINK_URL | |
API = SECOND_SHORTLINK_API | |
else: | |
URL = SHORTLINK_URL | |
API = SHORTLINK_API | |
if URL == "api.shareus.io": | |
url = f'https://{URL}/easy_api' | |
params = { | |
"key": API, | |
"link": link, | |
} | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url, params=params, raise_for_status=True, ssl=False) as response: | |
data = await response.text() | |
return data | |
except Exception as e: | |
logger.error(e) | |
else: | |
shortzy = Shortzy(api_key=API, base_site=URL) | |
try: | |
link = await shortzy.convert(link) | |
except: | |
return await second_shortlink("mplaylink", link) | |
return link | |
async def get_tutorial(chat_id): | |
settings = await get_settings(chat_id) #fetching settings for group | |
if 'tutorial' in settings.keys(): | |
if settings['is_tutorial']: | |
TUTORIAL_URL = settings['tutorial'] | |
else: | |
TUTORIAL_URL = TUTORIAL | |
else: | |
TUTORIAL_URL = TUTORIAL | |
return TUTORIAL_URL | |
async def get_verify_shorted_link(link): | |
API = SHORTLINK_API | |
URL = SHORTLINK_URL | |
https = link.split(":")[0] | |
if "http" == https: | |
https = "https" | |
link = link.replace("http", https) | |
if URL == "api.shareus.in": | |
url = f"https://{URL}/shortLink" | |
params = {"token": API, | |
"format": "json", | |
"link": link, | |
} | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url, params=params, raise_for_status=True, ssl=False) as response: | |
data = await response.json(content_type="text/html") | |
if data["status"] == "success": | |
return data["shortlink"] | |
else: | |
logger.error(f"Error: {data['message']}") | |
return f'https://{URL}/shortLink?token={API}&format=json&link={link}' | |
except Exception as e: | |
logger.error(e) | |
return f'https://{URL}/shortLink?token={API}&format=json&link={link}' | |
else: | |
url = f'https://{URL}/api' | |
params = {'api': API, | |
'url': link, | |
} | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url, params=params, raise_for_status=True, ssl=False) as response: | |
data = await response.json() | |
if data["status"] == "success": | |
return data['shortenedUrl'] | |
else: | |
logger.error(f"Error: {data['message']}") | |
return f'https://{URL}/api?api={API}&link={link}' | |
except Exception as e: | |
logger.error(e) | |
return f'{URL}/api?api={API}&link={link}' | |
async def check_token(bot, userid, token): | |
user = await bot.get_users(userid) | |
if not await db.is_user_exist(user.id): | |
await db.add_user(user.id, user.first_name) | |
await bot.send_message(LOG_CHANNEL, script.LOG_TEXT_P.format(user.id, user.mention)) | |
if user.id in TOKENS.keys(): | |
TKN = TOKENS[user.id] | |
if token in TKN.keys(): | |
is_used = TKN[token] | |
if is_used == True: | |
return False | |
else: | |
return True | |
else: | |
return False | |
async def get_token(bot, userid, link): | |
user = await bot.get_users(userid) | |
if not await db.is_user_exist(user.id): | |
await db.add_user(user.id, user.first_name) | |
await bot.send_message(LOG_CHANNEL, script.LOG_TEXT_P.format(user.id, user.mention)) | |
token = ''.join(random.choices(string.ascii_letters + string.digits, k=7)) | |
TOKENS[user.id] = {token: False} | |
link = f"{link}verify-{user.id}-{token}" | |
shortened_verify_url = await get_verify_shorted_link(link) | |
return str(shortened_verify_url) | |
async def verify_user(bot, userid, token): | |
user = await bot.get_users(userid) | |
if not await db.is_user_exist(user.id): | |
await db.add_user(user.id, user.first_name) | |
await bot.send_message(LOG_CHANNEL, script.LOG_TEXT_P.format(user.id, user.mention)) | |
TOKENS[user.id] = {token: True} | |
tz = pytz.timezone('Asia/Kolkata') | |
today = date.today() | |
VERIFIED[user.id] = str(today) | |
async def check_verification(bot, userid): | |
user = await bot.get_users(userid) | |
if not await db.is_user_exist(user.id): | |
await db.add_user(user.id, user.first_name) | |
await bot.send_message(LOG_CHANNEL, script.LOG_TEXT_P.format(user.id, user.mention)) | |
tz = pytz.timezone('Asia/Kolkata') | |
today = date.today() | |
if user.id in VERIFIED.keys(): | |
EXP = VERIFIED[user.id] | |
years, month, day = EXP.split('-') | |
comp = date(int(years), int(month), int(day)) | |
if comp<today: | |
return False | |
else: | |
return True | |
else: | |
return False | |
async def send_all(bot, userid, files, ident, chat_id, user_name, query): | |
settings = await get_settings(chat_id) | |
if 'is_shortlink' in settings.keys(): | |
ENABLE_SHORTLINK = settings['is_shortlink'] | |
else: | |
await save_group_settings(message.chat.id, 'is_shortlink', False) | |
ENABLE_SHORTLINK = False | |
try: | |
if ENABLE_SHORTLINK: | |
for file in files: | |
title = file.file_name | |
size = get_size(file.file_size) | |
await bot.send_message(chat_id=userid, text=f"<b>Hᴇʏ ᴛʜᴇʀᴇ {user_name} 👋🏽 \n\n✅ Sᴇᴄᴜʀᴇ ʟɪɴᴋ ᴛᴏ ʏᴏᴜʀ ғɪʟᴇ ʜᴀs sᴜᴄᴄᴇssғᴜʟʟʏ ʙᴇᴇɴ ɢᴇɴᴇʀᴀᴛᴇᴅ ᴘʟᴇᴀsᴇ ᴄʟɪᴄᴋ ᴅᴏᴡɴʟᴏᴀᴅ ʙᴜᴛᴛᴏɴ\n\n🗃️ Fɪʟᴇ Nᴀᴍᴇ : {title}\n🔖 Fɪʟᴇ Sɪᴢᴇ : {size}</b>", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("📤 Dᴏᴡɴʟᴏᴀᴅ 📥", url=await get_shortlink(chat_id, f"https://telegram.me/{temp.U_NAME}?start=files_{file.file_id}"))]])) | |
else: | |
for file in files: | |
f_caption = file.caption | |
title = file.file_name | |
size = get_size(file.file_size) | |
if CUSTOM_FILE_CAPTION: | |
try: | |
f_caption = CUSTOM_FILE_CAPTION.format(file_name='' if title is None else title, | |
file_size='' if size is None else size, | |
file_caption='' if f_caption is None else f_caption) | |
except Exception as e: | |
print(e) | |
f_caption = f_caption | |
if f_caption is None: | |
f_caption = f"{title}" | |
await bot.send_cached_media( | |
chat_id=userid, | |
file_id=file.file_id, | |
caption=f_caption, | |
protect_content=True if ident == "filep" else False, | |
reply_markup=InlineKeyboardMarkup( | |
[ | |
[ | |
InlineKeyboardButton('Sᴜᴘᴘᴏʀᴛ Gʀᴏᴜᴘ', url=GRP_LNK), | |
InlineKeyboardButton('Uᴘᴅᴀᴛᴇs Cʜᴀɴɴᴇʟ', url=CHNL_LNK) | |
],[ | |
InlineKeyboardButton("Bᴏᴛ Oᴡɴᴇʀ", url="t.me/Kgashok04") | |
] | |
] | |
) | |
) | |
except UserIsBlocked: | |
await query.answer('Uɴʙʟᴏᴄᴋ ᴛʜᴇ ʙᴏᴛ ᴍᴀʜɴ !', show_alert=True) | |
except PeerIdInvalid: | |
await query.answer('Hᴇʏ, Sᴛᴀʀᴛ Bᴏᴛ Fɪʀsᴛ Aɴᴅ Cʟɪᴄᴋ Sᴇɴᴅ Aʟʟ', show_alert=True) | |
except Exception as e: | |
await query.answer('Hᴇʏ, Sᴛᴀʀᴛ Bᴏᴛ Fɪʀsᴛ Aɴᴅ Cʟɪᴄᴋ Sᴇɴᴅ Aʟʟ', show_alert=True) | |
def get_media_from_message(message: "Message"): | |
media_types = ( | |
"audio", | |
"document", | |
"photo", | |
"sticker", | |
"animation", | |
"video", | |
"voice", | |
"video_note", | |
) | |
for attr in media_types: | |
if media := getattr(message, attr, None): | |
return media | |
def get_name(media_msg: Message) -> str: | |
media = get_media_from_message(media_msg) | |
return getattr(media, "file_name", "None") | |
def get_hash(media_msg: Message) -> str: | |
media = get_media_from_message(media_msg) | |
return getattr(media, "file_unique_id", "")[:6] | |
def addBracketInYear(name): | |
year = [] | |
for i in range(1900, 2025): | |
string = str(i); | |
year.append(string) | |
name_list = name.split(" ") | |
for x in range(1, len(name_list)): | |
if name_list[x] in year: | |
name_list[x] = "(" + name_list[x] + ")" | |
name = " " | |
return name.join(name_list) | |
def removeYearInName(name): | |
nyear = list(range(1900, 2025)) | |
years = ["(" + str(nyear) + ")" for nyear in nyear] | |
names = name.split(" ") | |
for i in range(len(names) - 1, -1, -1): # Iterate in reverse order | |
if names[i] in years and i!=0: | |
names.pop(i) # Remove the year from the list | |
break | |
return " ".join(names) |