BenfilterbotV2 / utils.py
JamesBond531's picture
Update utils.py
1c55a4d verified
import logging
from pyrogram.errors import InputUserDeactivated, UserNotParticipant, FloodWait, UserIsBlocked, PeerIdInvalid
from info import AUTH_CHANNEL, LONG_IMDB_DESCRIPTION, MAX_LIST_ELM, SHORTLINK_URL, SHORTLINK_API, IS_SHORTLINK, LOG_CHANNEL, TUTORIAL, GRP_LNK, CHNL_LNK, CUSTOM_FILE_CAPTION, SECOND_SHORTLINK_URL, SECOND_SHORTLINK_API, STREAM_URL, STREAM_BIN, IS_STREAM
from imdb import Cinemagoer
import asyncio
from pyrogram.types import Message, InlineKeyboardButton, InlineKeyboardMarkup
from pyrogram.errors import FloodWait, UserIsBlocked, MessageNotModified, PeerIdInvalid
from pyrogram import enums
from typing import Union
from Script import script
import pytz
import random
import re
import os
from datetime import datetime, date
import string
from typing import List
from database.users_chats_db import db
from bs4 import BeautifulSoup
import requests
import aiohttp
from shortzy import Shortzy
import http.client
import json
from urllib.parse import quote_plus
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
BTN_URL_REGEX = re.compile(
r"(\[([^\[]+?)\]\((buttonurl|buttonalert):(?:/{0,2})(.+?)(:same)?\))"
)
imdb = Cinemagoer()
TOKENS = {}
VERIFIED = {}
BANNED = {}
SECOND_SHORTENER = {}
SMART_OPEN = '“'
SMART_CLOSE = '”'
START_CHAR = ('\'', '"', SMART_OPEN)
# temp db for banned
class temp(object):
BANNED_USERS = []
BANNED_CHATS = []
ME = None
CURRENT=int(os.environ.get("SKIP", 2))
CANCEL = False
MELCOW = {}
U_NAME = None
B_NAME = None
GETALL = {}
SHORT = {}
SETTINGS = {}
IMDB_CAP = {}
async def is_subscribed(bot, query):
try:
user = await bot.get_chat_member(AUTH_CHANNEL, query.from_user.id)
except UserNotParticipant:
pass
except Exception as e:
logger.exception(e)
else:
if user.status != enums.ChatMemberStatus.BANNED:
return True
return False
async def get_poster(query, bulk=False, id=False, file=None):
if not id:
# https://t.me/GetTGLink/4183
query = (query.strip()).lower()
title = query
year = re.findall(r'[1-2]\d{3}$', query, re.IGNORECASE)
if year:
year = list_to_str(year[:1])
title = (query.replace(year, "")).strip()
elif file is not None:
year = re.findall(r'[1-2]\d{3}', file, re.IGNORECASE)
if year:
year = list_to_str(year[:1])
else:
year = None
movieid = imdb.search_movie(title.lower(), results=10)
if not movieid:
return None
if year:
filtered=list(filter(lambda k: str(k.get('year')) == str(year), movieid))
if not filtered:
filtered = movieid
else:
filtered = movieid
movieid=list(filter(lambda k: k.get('kind') in ['movie', 'tv series'], filtered))
if not movieid:
movieid = filtered
if bulk:
return movieid
movieid = movieid[0].movieID
else:
movieid = query
movie = imdb.get_movie(movieid)
if movie.get("original air date"):
date = movie["original air date"]
elif movie.get("year"):
date = movie.get("year")
else:
date = "N/A"
plot = ""
if not LONG_IMDB_DESCRIPTION:
plot = movie.get('plot')
if plot and len(plot) > 0:
plot = plot[0]
else:
plot = movie.get('plot outline')
if plot and len(plot) > 800:
plot = plot[0:800] + "..."
return {
'title': movie.get('title'),
'votes': movie.get('votes'),
"aka": list_to_str(movie.get("akas")),
"seasons": movie.get("number of seasons"),
"box_office": movie.get('box office'),
'localized_title': movie.get('localized title'),
'kind': movie.get("kind"),
"imdb_id": f"tt{movie.get('imdbID')}",
"cast": list_to_str(movie.get("cast")),
"runtime": list_to_str(movie.get("runtimes")),
"countries": list_to_str(movie.get("countries")),
"certificates": list_to_str(movie.get("certificates")),
"languages": list_to_str(movie.get("languages")),
"director": list_to_str(movie.get("director")),
"writer":list_to_str(movie.get("writer")),
"producer":list_to_str(movie.get("producer")),
"composer":list_to_str(movie.get("composer")) ,
"cinematographer":list_to_str(movie.get("cinematographer")),
"music_team": list_to_str(movie.get("music department")),
"distributors": list_to_str(movie.get("distributors")),
'release_date': date,
'year': movie.get('year'),
'genres': list_to_str(movie.get("genres")),
'poster': movie.get('full-size cover url'),
'plot': plot,
'rating': str(movie.get("rating")),
'url':f'https://www.imdb.com/title/tt{movieid}'
}
# https://github.com/odysseusmax/animated-lamp/blob/2ef4730eb2b5f0596ed6d03e7b05243d93e3415b/bot/utils/broadcast.py#L37
async def broadcast_messages(user_id, message):
try:
await message.copy(chat_id=user_id)
return True, "Success"
except FloodWait as e:
await asyncio.sleep(e.x)
return await broadcast_messages(user_id, message)
except InputUserDeactivated:
await db.delete_user(int(user_id))
logging.info(f"{user_id}-Removed from Database, since deleted account.")
return False, "Deleted"
except UserIsBlocked:
logging.info(f"{user_id} -Blocked the bot.")
return False, "Blocked"
except PeerIdInvalid:
await db.delete_user(int(user_id))
logging.info(f"{user_id} - PeerIdInvalid")
return False, "Error"
except Exception as e:
return False, "Error"
async def broadcast_messages_group(chat_id, message):
try:
kd = await message.copy(chat_id=chat_id)
try:
await kd.pin()
except:
pass
return True, "Success"
except FloodWait as e:
await asyncio.sleep(e.x)
return await broadcast_messages_group(chat_id, message)
except Exception as e:
return False, "Error"
async def search_gagala(text):
usr_agent = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/61.0.3163.100 Safari/537.36'
}
text = text.replace(" ", '+')
url = f'https://www.google.com/search?q={text}'
response = requests.get(url, headers=usr_agent)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
titles = soup.find_all( 'h3' )
return [title.getText() for title in titles]
async def get_settings(group_id):
settings = temp.SETTINGS.get(group_id)
if not settings:
settings = await db.get_settings(group_id)
temp.SETTINGS[group_id] = settings
return settings
async def save_group_settings(group_id, key, value):
current = await get_settings(group_id)
current[key] = value
temp.SETTINGS[group_id] = current
await db.update_settings(group_id, current)
def get_size(size):
"""Get size in readable format"""
units = ["Bytes", "KB", "MB", "GB", "TB", "PB", "EB"]
size = float(size)
i = 0
while size >= 1024.0 and i < len(units):
i += 1
size /= 1024.0
return "%.2f %s" % (size, units[i])
def split_list(l, n):
for i in range(0, len(l), n):
yield l[i:i + n]
def get_file_id(msg: Message):
if msg.media:
for message_type in (
"photo",
"animation",
"audio",
"document",
"video",
"video_note",
"voice",
"sticker"
):
obj = getattr(msg, message_type)
if obj:
setattr(obj, "message_type", message_type)
return obj
def extract_user(message: Message) -> Union[int, str]:
"""extracts the user from a message"""
# https://github.com/SpEcHiDe/PyroGramBot/blob/f30e2cca12002121bad1982f68cd0ff9814ce027/pyrobot/helper_functions/extract_user.py#L7
user_id = None
user_first_name = None
if message.reply_to_message:
user_id = message.reply_to_message.from_user.id
user_first_name = message.reply_to_message.from_user.first_name
elif len(message.command) > 1:
if (
len(message.entities) > 1 and
message.entities[1].type == enums.MessageEntityType.TEXT_MENTION
):
required_entity = message.entities[1]
user_id = required_entity.user.id
user_first_name = required_entity.user.first_name
else:
user_id = message.command[1]
# don't want to make a request -_-
user_first_name = user_id
try:
user_id = int(user_id)
except ValueError:
pass
else:
user_id = message.from_user.id
user_first_name = message.from_user.first_name
return (user_id, user_first_name)
def list_to_str(k):
if not k:
return "N/A"
elif len(k) == 1:
return str(k[0])
elif MAX_LIST_ELM:
k = k[:int(MAX_LIST_ELM)]
return ' '.join(f'{elem}, ' for elem in k)
else:
return ' '.join(f'{elem}, ' for elem in k)
def last_online(from_user):
time = ""
if from_user.is_bot:
time += "🤖 Bot :("
elif from_user.status == enums.UserStatus.RECENTLY:
time += "Recently"
elif from_user.status == enums.UserStatus.LAST_WEEK:
time += "Within the last week"
elif from_user.status == enums.UserStatus.LAST_MONTH:
time += "Within the last month"
elif from_user.status == enums.UserStatus.LONG_AGO:
time += "A long time ago :("
elif from_user.status == enums.UserStatus.ONLINE:
time += "Currently Online"
elif from_user.status == enums.UserStatus.OFFLINE:
time += from_user.last_online_date.strftime("%a, %d %b %Y, %H:%M:%S")
return time
def split_quotes(text: str) -> List:
if not any(text.startswith(char) for char in START_CHAR):
return text.split(None, 1)
counter = 1 # ignore first char -> is some kind of quote
while counter < len(text):
if text[counter] == "\\":
counter += 1
elif text[counter] == text[0] or (text[0] == SMART_OPEN and text[counter] == SMART_CLOSE):
break
counter += 1
else:
return text.split(None, 1)
# 1 to avoid starting quote, and counter is exclusive so avoids ending
key = remove_escapes(text[1:counter].strip())
# index will be in range, or `else` would have been executed and returned
rest = text[counter + 1:].strip()
if not key:
key = text[0] + text[0]
return list(filter(None, [key, rest]))
def gfilterparser(text, keyword):
if "buttonalert" in text:
text = (text.replace("\n", "\\n").replace("\t", "\\t"))
buttons = []
note_data = ""
prev = 0
i = 0
alerts = []
for match in BTN_URL_REGEX.finditer(text):
# Check if btnurl is escaped
n_escapes = 0
to_check = match.start(1) - 1
while to_check > 0 and text[to_check] == "\\":
n_escapes += 1
to_check -= 1
# if even, not escaped -> create button
if n_escapes % 2 == 0:
note_data += text[prev:match.start(1)]
prev = match.end(1)
if match.group(3) == "buttonalert":
# create a thruple with button label, url, and newline status
if bool(match.group(5)) and buttons:
buttons[-1].append(InlineKeyboardButton(
text=match.group(2),
callback_data=f"gfilteralert:{i}:{keyword}"
))
else:
buttons.append([InlineKeyboardButton(
text=match.group(2),
callback_data=f"gfilteralert:{i}:{keyword}"
)])
i += 1
alerts.append(match.group(4))
elif bool(match.group(5)) and buttons:
buttons[-1].append(InlineKeyboardButton(
text=match.group(2),
url=match.group(4).replace(" ", "")
))
else:
buttons.append([InlineKeyboardButton(
text=match.group(2),
url=match.group(4).replace(" ", "")
)])
else:
note_data += text[prev:to_check]
prev = match.start(1) - 1
else:
note_data += text[prev:]
try:
return note_data, buttons, alerts
except:
return note_data, buttons, None
def parser(text, keyword):
if "buttonalert" in text:
text = (text.replace("\n", "\\n").replace("\t", "\\t"))
buttons = []
note_data = ""
prev = 0
i = 0
alerts = []
for match in BTN_URL_REGEX.finditer(text):
# Check if btnurl is escaped
n_escapes = 0
to_check = match.start(1) - 1
while to_check > 0 and text[to_check] == "\\":
n_escapes += 1
to_check -= 1
# if even, not escaped -> create button
if n_escapes % 2 == 0:
note_data += text[prev:match.start(1)]
prev = match.end(1)
if match.group(3) == "buttonalert":
# create a thruple with button label, url, and newline status
if bool(match.group(5)) and buttons:
buttons[-1].append(InlineKeyboardButton(
text=match.group(2),
callback_data=f"alertmessage:{i}:{keyword}"
))
else:
buttons.append([InlineKeyboardButton(
text=match.group(2),
callback_data=f"alertmessage:{i}:{keyword}"
)])
i += 1
alerts.append(match.group(4))
elif bool(match.group(5)) and buttons:
buttons[-1].append(InlineKeyboardButton(
text=match.group(2),
url=match.group(4).replace(" ", "")
))
else:
buttons.append([InlineKeyboardButton(
text=match.group(2),
url=match.group(4).replace(" ", "")
)])
else:
note_data += text[prev:to_check]
prev = match.start(1) - 1
else:
note_data += text[prev:]
try:
return note_data, buttons, alerts
except:
return note_data, buttons, None
def remove_escapes(text: str) -> str:
res = ""
is_escaped = False
for counter in range(len(text)):
if is_escaped:
res += text[counter]
is_escaped = False
elif text[counter] == "\\":
is_escaped = True
else:
res += text[counter]
return res
def humanbytes(size):
if not size:
return ""
power = 2**10
n = 0
Dic_powerN = {0: ' ', 1: 'Ki', 2: 'Mi', 3: 'Gi', 4: 'Ti'}
while size > power:
size /= power
n += 1
return str(round(size, 2)) + " " + Dic_powerN[n] + 'B'
async def get_cap(settings, remaining_seconds, files, query, total_results, search):
if settings["imdb"]:
IMDB_CAP = temp.IMDB_CAP.get(query.from_user.id)
if IMDB_CAP:
cap = IMDB_CAP
cap+="<b>\n\n<u>📚 Requested Files 👇</u></b>\n\n"
for file in files:
cap += f"<b>📁 <a href='https://telegram.me/{temp.U_NAME}?start=files_{file.file_id}'>[{get_size(file.file_size)}] {' '.join(filter(lambda x: not x.startswith('[') and not x.startswith('@') and not x.startswith('www.'), file.file_name.split()))}\n\n</a></b>"
else:
imdb = await get_poster(search, file=(files[0]).file_name) if settings["imdb"] else None
if imdb:
TEMPLATE = script.IMDB_TEMPLATE_TXT
cap = TEMPLATE.format(
qurey=search,
title=imdb['title'],
votes=imdb['votes'],
aka=imdb["aka"],
seasons=imdb["seasons"],
box_office=imdb['box_office'],
localized_title=imdb['localized_title'],
kind=imdb['kind'],
imdb_id=imdb["imdb_id"],
cast=imdb["cast"],
runtime=imdb["runtime"],
countries=imdb["countries"],
certificates=imdb["certificates"],
languages=imdb["languages"],
director=imdb["director"],
writer=imdb["writer"],
producer=imdb["producer"],
composer=imdb["composer"],
cinematographer=imdb["cinematographer"],
music_team=imdb["music_team"],
distributors=imdb["distributors"],
release_date=imdb['release_date'],
year=imdb['year'],
genres=imdb['genres'],
poster=imdb['poster'],
plot=imdb['plot'],
rating=imdb['rating'],
url=imdb['url'],
**locals()
)
cap+="<b>\n\n<u>📚 Requested Files 👇</u></b>\n\n"
for file in files:
cap += f"<b>📁 <a href='https://telegram.me/{temp.U_NAME}?start=files_{file.file_id}'>[{get_size(file.file_size)}] {' '.join(filter(lambda x: not x.startswith('[') and not x.startswith('@') and not x.startswith('www.'), file.file_name.split()))}\n\n</a></b>"
else:
cap = f"<b>Hᴇʏ {query.from_user.mention}, Fᴏᴜɴᴅ {total_results} Rᴇsᴜʟᴛs ғᴏʀ Yᴏᴜʀ Qᴜᴇʀʏ {search}\n\n</b>"
cap+="<b><u>📚 Requested Files 👇</u></b>\n\n"
for file in files:
cap += f"<b>📁 <a href='https://telegram.me/{temp.U_NAME}?start=files_{file.file_id}'>[{get_size(file.file_size)}] {' '.join(filter(lambda x: not x.startswith('[') and not x.startswith('@') and not x.startswith('www.'), file.file_name.split()))}\n\n</a></b>"
else:
cap = f"<b>Hᴇʏ {query.from_user.mention}, Fᴏᴜɴᴅ {total_results} Rᴇsᴜʟᴛs ғᴏʀ Yᴏᴜʀ Qᴜᴇʀʏ {search}\n\n</b>"
cap+="<b><u>📚 Requested Files 👇</u></b>\n\n"
for file in files:
cap += f"<b>📁 <a href='https://telegram.me/{temp.U_NAME}?start=files_{file.file_id}'>[{get_size(file.file_size)}] {' '.join(filter(lambda x: not x.startswith('[') and not x.startswith('@') and not x.startswith('www.'), file.file_name.split()))}\n\n</a></b>"
return cap
async def get_shortlink(chat_id, link, second=False):
settings = await get_settings(chat_id) #fetching settings for group
if 'shortlink' in settings.keys():
URL = settings['shortlink']
API = settings['shortlink_api']
else:
URL = SHORTLINK_URL
API = SHORTLINK_API
if URL.startswith("shorturllink") or URL.startswith("terabox.in") or URL.startswith("urlshorten.in") or second:
URL = SECOND_SHORTLINK_URL
API = SECOND_SHORTLINK_API
if URL == "api.shareus.io":
# method 1:
# https = link.split(":")[0] #splitting https or http from link
# if "http" == https: #if https == "http":
# https = "https"
# link = link.replace("http", https) #replacing http to https
# conn = http.client.HTTPSConnection("api.shareus.io")
# payload = json.dumps({
# "api_key": "4c1YTBacB6PTuwogBiEIFvZN5TI3",
# "monetization": True,
# "destination": link,
# "ad_page": 3,
# "category": "Entertainment",
# "tags": ["trendinglinks"],
# "monetize_with_money": False,
# "price": 0,
# "currency": "INR",
# "purchase_note":""
# })
# headers = {
# 'Keep-Alive': '',
# 'Content-Type': 'application/json'
# }
# conn.request("POST", "/generate_link", payload, headers)
# res = conn.getresponse()
# data = res.read().decode("utf-8")
# parsed_data = json.loads(data)
# if parsed_data["status"] == "success":
# return parsed_data["link"]
#method 2
url = f'https://{URL}/easy_api'
params = {
"key": API,
"link": link,
}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params, raise_for_status=True, ssl=False) as response:
data = await response.text()
return data
except Exception as e:
logger.error(e)
return link
else:
shortzy = Shortzy(api_key=API, base_site=URL)
try:
link = await shortzy.convert(link)
except:
return await get_shortlink(chat_id, link, second=True)
return link
# async def get_shortlink(chat_id, link, second=False):
# if not second:
# settings = await get_settings(chat_id) #fetching settings for group
# if 'shortlink' in settings.keys():
# URL = settings['shortlink']
# API = settings['shortlink_api']
# else:
# URL = SHORTLINK_URL
# API = SHORTLINK_API
# if URL.startswith("shorturllink"):
# URL = SECOND_SHORTLINK_URL
# API = SECOND_SHORTLINK_API
# # if 'shortlink_api' in settings.keys():
# # API = settings['shortlink_api']
# # elif URL.startswith("shorturllink"):
# # URL = SECOND_SHORTLINK_URL
# # else:
# # API = SHORTLINK_API
# https = link.split(":")[0] #splitting https or http from link
# if "http" == https: #if https == "http":
# https = "https"
# link = link.replace("http", https) #replacing http to https
# if URL == "api.shareus.in":
# url = f'https://{URL}/shortLink'
# params = {
# "token": API,
# "format": "json",
# "link": link,
# }
# try:
# async with aiohttp.ClientSession() as session:
# async with session.get(url, params=params, raise_for_status=True, ssl=False) as response:
# data = await response.json(content_type="text/html")
# if data["status"] == "success":
# return data["shortlink"]
# else:
# logger.error(f"Error: {data['message']}")
# return f'https://{URL}/shortLink?token={API}&format=json&link={link}'
# except Exception as e:
# logger.error(e)
# return f'https://{URL}/shortLink?token={API}&format=json&link={link}'
# else:
# url = f'https://{URL}/api'
# params = {
# "api": API,
# "url": link,
# }
# try:
# async with aiohttp.ClientSession() as session:
# async with session.get(url, params=params, raise_for_status=True, ssl=False) as response:
# data = await response.json()
# if data["status"] == "success":
# return data["shortenedUrl"]
# else:
# logger.error(f"Error: {data['message']}")
# if URL == 'clicksfly.com':
# return f'https://{URL}/api?api={API}&url={link}'
# else:
# return f'https://{URL}/api?api={API}&link={link}'
# except Exception as e:
# SECOND_SHORTENER[chat_id] = URL
# logger.error(e)
# await get_shortlink(chat_id, link, second=True)
# # return f'https://{URL}/api?api={API}&link={link}'
# else:
# if SECOND_SHORTENER.get(chat_id).startswith('shorturllink'):
# URL = SECOND_SHORTLINK_URL
# API = SECOND_SHORTLINK_API
# else:
# URL = SHORTLINK_URL
# API = SHORTLINK_API
# url = f'https://{URL}/api'
# params = {
# "api": API,
# "url": link,
# }
# try:
# async with aiohttp.ClientSession() as session:
# async with session.get(url, params=params, raise_for_status=True, ssl=False) as response:
# data = await response.json()
# if data["status"] == "success":
# return data["shortenedUrl"]
# else:
# logger.error(f"Error: {data['message']}")
# return f'https://{URL}/api?api={API}&link={link}'
# except Exception as e:
# logger.error(e)
# return f'https://{URL}/api?api={API}&link={link}'
# # settings = await get_settings(chat_id) #fetching settings for group
# # if 'shortlink' in settings.keys():
# # URL = settings['shortlink']
# # API = settings['shortlink_api']
# # else:
# # URL = SHORTLINK_URL
# # API = SHORTLINK_API
# # if URL.startswith("shorturllink"):
# # URL = SECOND_SHORTLINK_URL
# # API = SECOND_SHORTLINK_API
# # # url = settings['url']
# # # api = settings['api']
# # shortzy = Shortzy(api_key=API, base_site=URL)
# # link = await shortzy.convert(link)
# # return link
async def second_shortlink(url, link):
if "shrs" in url:
URL = SECOND_SHORTLINK_URL
API = SECOND_SHORTLINK_API
else:
URL = SHORTLINK_URL
API = SHORTLINK_API
if URL == "api.shareus.io":
url = f'https://{URL}/easy_api'
params = {
"key": API,
"link": link,
}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params, raise_for_status=True, ssl=False) as response:
data = await response.text()
return data
except Exception as e:
logger.error(e)
else:
shortzy = Shortzy(api_key=API, base_site=URL)
try:
link = await shortzy.convert(link)
except:
return await second_shortlink("mplaylink", link)
return link
async def get_tutorial(chat_id):
settings = await get_settings(chat_id) #fetching settings for group
if 'tutorial' in settings.keys():
if settings['is_tutorial']:
TUTORIAL_URL = settings['tutorial']
else:
TUTORIAL_URL = TUTORIAL
else:
TUTORIAL_URL = TUTORIAL
return TUTORIAL_URL
async def get_verify_shorted_link(link):
API = SHORTLINK_API
URL = SHORTLINK_URL
https = link.split(":")[0]
if "http" == https:
https = "https"
link = link.replace("http", https)
if URL == "api.shareus.in":
url = f"https://{URL}/shortLink"
params = {"token": API,
"format": "json",
"link": link,
}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params, raise_for_status=True, ssl=False) as response:
data = await response.json(content_type="text/html")
if data["status"] == "success":
return data["shortlink"]
else:
logger.error(f"Error: {data['message']}")
return f'https://{URL}/shortLink?token={API}&format=json&link={link}'
except Exception as e:
logger.error(e)
return f'https://{URL}/shortLink?token={API}&format=json&link={link}'
else:
url = f'https://{URL}/api'
params = {'api': API,
'url': link,
}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params, raise_for_status=True, ssl=False) as response:
data = await response.json()
if data["status"] == "success":
return data['shortenedUrl']
else:
logger.error(f"Error: {data['message']}")
return f'https://{URL}/api?api={API}&link={link}'
except Exception as e:
logger.error(e)
return f'{URL}/api?api={API}&link={link}'
async def check_token(bot, userid, token):
user = await bot.get_users(userid)
if not await db.is_user_exist(user.id):
await db.add_user(user.id, user.first_name)
await bot.send_message(LOG_CHANNEL, script.LOG_TEXT_P.format(user.id, user.mention))
if user.id in TOKENS.keys():
TKN = TOKENS[user.id]
if token in TKN.keys():
is_used = TKN[token]
if is_used == True:
return False
else:
return True
else:
return False
async def get_token(bot, userid, link):
user = await bot.get_users(userid)
if not await db.is_user_exist(user.id):
await db.add_user(user.id, user.first_name)
await bot.send_message(LOG_CHANNEL, script.LOG_TEXT_P.format(user.id, user.mention))
token = ''.join(random.choices(string.ascii_letters + string.digits, k=7))
TOKENS[user.id] = {token: False}
link = f"{link}verify-{user.id}-{token}"
shortened_verify_url = await get_verify_shorted_link(link)
return str(shortened_verify_url)
async def verify_user(bot, userid, token):
user = await bot.get_users(userid)
if not await db.is_user_exist(user.id):
await db.add_user(user.id, user.first_name)
await bot.send_message(LOG_CHANNEL, script.LOG_TEXT_P.format(user.id, user.mention))
TOKENS[user.id] = {token: True}
tz = pytz.timezone('Asia/Kolkata')
today = date.today()
VERIFIED[user.id] = str(today)
async def check_verification(bot, userid):
user = await bot.get_users(userid)
if not await db.is_user_exist(user.id):
await db.add_user(user.id, user.first_name)
await bot.send_message(LOG_CHANNEL, script.LOG_TEXT_P.format(user.id, user.mention))
tz = pytz.timezone('Asia/Kolkata')
today = date.today()
if user.id in VERIFIED.keys():
EXP = VERIFIED[user.id]
years, month, day = EXP.split('-')
comp = date(int(years), int(month), int(day))
if comp<today:
return False
else:
return True
else:
return False
async def send_all(bot, userid, files, ident, chat_id, user_name, query):
settings = await get_settings(chat_id)
if 'is_shortlink' in settings.keys():
ENABLE_SHORTLINK = settings['is_shortlink']
else:
await save_group_settings(message.chat.id, 'is_shortlink', False)
ENABLE_SHORTLINK = False
try:
if ENABLE_SHORTLINK:
for file in files:
title = file.file_name
size = get_size(file.file_size)
await bot.send_message(chat_id=userid, text=f"<b>Hᴇʏ ᴛʜᴇʀᴇ {user_name} 👋🏽 \n\n✅ Sᴇᴄᴜʀᴇ ʟɪɴᴋ ᴛᴏ ʏᴏᴜʀ ғɪʟᴇ ʜᴀs sᴜᴄᴄᴇssғᴜʟʟʏ ʙᴇᴇɴ ɢᴇɴᴇʀᴀᴛᴇᴅ ᴘʟᴇᴀsᴇ ᴄʟɪᴄᴋ ᴅᴏᴡɴʟᴏᴀᴅ ʙᴜᴛᴛᴏɴ\n\n🗃️ Fɪʟᴇ Nᴀᴍᴇ : {title}\n🔖 Fɪʟᴇ Sɪᴢᴇ : {size}</b>", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("📤 Dᴏᴡɴʟᴏᴀᴅ 📥", url=await get_shortlink(chat_id, f"https://telegram.me/{temp.U_NAME}?start=files_{file.file_id}"))]]))
else:
for file in files:
f_caption = file.caption
title = file.file_name
size = get_size(file.file_size)
if CUSTOM_FILE_CAPTION:
try:
f_caption = CUSTOM_FILE_CAPTION.format(file_name='' if title is None else title,
file_size='' if size is None else size,
file_caption='' if f_caption is None else f_caption)
except Exception as e:
print(e)
f_caption = f_caption
if f_caption is None:
f_caption = f"{title}"
await bot.send_cached_media(
chat_id=userid,
file_id=file.file_id,
caption=f_caption,
protect_content=True if ident == "filep" else False,
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton('Sᴜᴘᴘᴏʀᴛ Gʀᴏᴜᴘ', url=GRP_LNK),
InlineKeyboardButton('Uᴘᴅᴀᴛᴇs Cʜᴀɴɴᴇʟ', url=CHNL_LNK)
],[
InlineKeyboardButton("Bᴏᴛ Oᴡɴᴇʀ", url="t.me/Kgashok04")
]
]
)
)
except UserIsBlocked:
await query.answer('Uɴʙʟᴏᴄᴋ ᴛʜᴇ ʙᴏᴛ ᴍᴀʜɴ !', show_alert=True)
except PeerIdInvalid:
await query.answer('Hᴇʏ, Sᴛᴀʀᴛ Bᴏᴛ Fɪʀsᴛ Aɴᴅ Cʟɪᴄᴋ Sᴇɴᴅ Aʟʟ', show_alert=True)
except Exception as e:
await query.answer('Hᴇʏ, Sᴛᴀʀᴛ Bᴏᴛ Fɪʀsᴛ Aɴᴅ Cʟɪᴄᴋ Sᴇɴᴅ Aʟʟ', show_alert=True)
def get_media_from_message(message: "Message"):
media_types = (
"audio",
"document",
"photo",
"sticker",
"animation",
"video",
"voice",
"video_note",
)
for attr in media_types:
if media := getattr(message, attr, None):
return media
def get_name(media_msg: Message) -> str:
media = get_media_from_message(media_msg)
return getattr(media, "file_name", "None")
def get_hash(media_msg: Message) -> str:
media = get_media_from_message(media_msg)
return getattr(media, "file_unique_id", "")[:6]
def addBracketInYear(name):
year = []
for i in range(1900, 2025):
string = str(i);
year.append(string)
name_list = name.split(" ")
for x in range(1, len(name_list)):
if name_list[x] in year:
name_list[x] = "(" + name_list[x] + ")"
name = " "
return name.join(name_list)
def removeYearInName(name):
nyear = list(range(1900, 2025))
years = ["(" + str(nyear) + ")" for nyear in nyear]
names = name.split(" ")
for i in range(len(names) - 1, -1, -1): # Iterate in reverse order
if names[i] in years and i!=0:
names.pop(i) # Remove the year from the list
break
return " ".join(names)