Spaces:
Paused
Paused
import os | |
from pyrogram import Client, filters, enums | |
from pyrogram.errors.exceptions.bad_request_400 import UserNotParticipant, MediaEmpty, PhotoInvalidDimensions, WebpageMediaEmpty | |
from info import IMDB_TEMPLATE | |
from utils import extract_user, get_file_id, get_poster, last_online | |
import time | |
from datetime import datetime | |
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery | |
import logging | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.ERROR) | |
async def showid(client, message): | |
chat_type = message.chat.type | |
if chat_type == enums.ChatType.PRIVATE: | |
user_id = message.chat.id | |
first = message.from_user.first_name | |
last = message.from_user.last_name or "" | |
username = message.from_user.username | |
dc_id = message.from_user.dc_id or "" | |
await message.reply_text( | |
f"<b>➲ First Name:</b> {first}\n<b>➲ Last Name:</b> {last}\n<b>➲ Username:</b> {username}\n<b>➲ Telegram ID:</b> <code>{user_id}</code>\n<b>➲ Data Centre:</b> <code>{dc_id}</code>", | |
quote=True | |
) | |
elif chat_type in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]: | |
_id = "" | |
_id += ( | |
"<b>➲ Chat ID</b>: " | |
f"<code>{message.chat.id}</code>\n" | |
) | |
if message.reply_to_message: | |
_id += ( | |
"<b>➲ User ID</b>: " | |
f"<code>{message.from_user.id if message.from_user else 'Anonymous'}</code>\n" | |
"<b>➲ Replied User ID</b>: " | |
f"<code>{message.reply_to_message.from_user.id if message.reply_to_message.from_user else 'Anonymous'}</code>\n" | |
) | |
file_info = get_file_id(message.reply_to_message) | |
else: | |
_id += ( | |
"<b>➲ User ID</b>: " | |
f"<code>{message.from_user.id if message.from_user else 'Anonymous'}</code>\n" | |
) | |
file_info = get_file_id(message) | |
if file_info: | |
_id += ( | |
f"<b>{file_info.message_type}</b>: " | |
f"<code>{file_info.file_id}</code>\n" | |
) | |
await message.reply_text( | |
_id, | |
quote=True | |
) | |
async def who_is(client, message): | |
# https://github.com/SpEcHiDe/PyroGramBot/blob/master/pyrobot/plugins/admemes/whois.py#L19 | |
status_message = await message.reply_text( | |
"`Fetching user info...`" | |
) | |
await status_message.edit( | |
"`Processing user info...`" | |
) | |
from_user = None | |
from_user_id, _ = extract_user(message) | |
try: | |
from_user = await client.get_users(from_user_id) | |
except Exception as error: | |
await status_message.edit(str(error)) | |
return | |
if from_user is None: | |
return await status_message.edit("no valid user_id / message specified") | |
message_out_str = "" | |
message_out_str += f"<b>➲First Name:</b> {from_user.first_name}\n" | |
last_name = from_user.last_name or "<b>None</b>" | |
message_out_str += f"<b>➲Last Name:</b> {last_name}\n" | |
message_out_str += f"<b>➲Telegram ID:</b> <code>{from_user.id}</code>\n" | |
username = from_user.username or "<b>None</b>" | |
dc_id = from_user.dc_id or "[User Doesn't Have A Valid DP]" | |
message_out_str += f"<b>➲Data Centre:</b> <code>{dc_id}</code>\n" | |
message_out_str += f"<b>➲User Name:</b> @{username}\n" | |
message_out_str += f"<b>➲User 𝖫𝗂𝗇𝗄:</b> <a href='tg://user?id={from_user.id}'><b>Click Here</b></a>\n" | |
if message.chat.type in ((enums.ChatType.SUPERGROUP, enums.ChatType.CHANNEL)): | |
try: | |
chat_member_p = await message.chat.get_member(from_user.id) | |
joined_date = ( | |
chat_member_p.joined_date or datetime.now() | |
).strftime("%Y.%m.%d %H:%M:%S") | |
message_out_str += ( | |
"<b>➲Joined this Chat on:</b> <code>" | |
f"{joined_date}" | |
"</code>\n" | |
) | |
except UserNotParticipant: | |
pass | |
chat_photo = from_user.photo | |
if chat_photo: | |
local_user_photo = await client.download_media( | |
message=chat_photo.big_file_id | |
) | |
buttons = [[ | |
InlineKeyboardButton('🔐 Close', callback_data='close_data') | |
]] | |
reply_markup = InlineKeyboardMarkup(buttons) | |
await message.reply_photo( | |
photo=local_user_photo, | |
quote=True, | |
reply_markup=reply_markup, | |
caption=message_out_str, | |
parse_mode=enums.ParseMode.HTML, | |
disable_notification=True | |
) | |
os.remove(local_user_photo) | |
else: | |
buttons = [[ | |
InlineKeyboardButton('🔐 Close', callback_data='close_data') | |
]] | |
reply_markup = InlineKeyboardMarkup(buttons) | |
await message.reply_text( | |
text=message_out_str, | |
reply_markup=reply_markup, | |
quote=True, | |
parse_mode=enums.ParseMode.HTML, | |
disable_notification=True | |
) | |
await status_message.delete() | |
async def imdb_search(client, message): | |
if ' ' in message.text: | |
k = await message.reply('Searching ImDB') | |
r, title = message.text.split(None, 1) | |
movies = await get_poster(title, bulk=True) | |
if not movies: | |
return await message.reply("No results Found") | |
btn = [ | |
[ | |
InlineKeyboardButton( | |
text=f"{movie.get('title')} - {movie.get('year')}", | |
callback_data=f"imdb#{movie.movieID}", | |
) | |
] | |
for movie in movies | |
] | |
await k.edit('Here is what i found on IMDb', reply_markup=InlineKeyboardMarkup(btn)) | |
else: | |
await message.reply('Give me a movie / series Name') | |
async def imdb_callback(bot: Client, quer_y: CallbackQuery): | |
i, movie = quer_y.data.split('#') | |
imdb = await get_poster(query=movie, id=True) | |
btn = [ | |
[ | |
InlineKeyboardButton( | |
text=f"{imdb.get('title')}", | |
url=imdb['url'], | |
) | |
] | |
] | |
message = quer_y.message.reply_to_message or quer_y.message | |
if imdb: | |
caption = IMDB_TEMPLATE.format( | |
query = imdb['title'], | |
title = imdb['title'], | |
votes = imdb['votes'], | |
aka = imdb["aka"], | |
seasons = imdb["seasons"], | |
box_office = imdb['box_office'], | |
localized_title = imdb['localized_title'], | |
kind = imdb['kind'], | |
imdb_id = imdb["imdb_id"], | |
cast = imdb["cast"], | |
runtime = imdb["runtime"], | |
countries = imdb["countries"], | |
certificates = imdb["certificates"], | |
languages = imdb["languages"], | |
director = imdb["director"], | |
writer = imdb["writer"], | |
producer = imdb["producer"], | |
composer = imdb["composer"], | |
cinematographer = imdb["cinematographer"], | |
music_team = imdb["music_team"], | |
distributors = imdb["distributors"], | |
release_date = imdb['release_date'], | |
year = imdb['year'], | |
genres = imdb['genres'], | |
poster = imdb['poster'], | |
plot = imdb['plot'], | |
rating = imdb['rating'], | |
url = imdb['url'], | |
**locals() | |
) | |
else: | |
caption = "No Results" | |
if imdb.get('poster'): | |
try: | |
await quer_y.message.reply_photo(photo=imdb['poster'], caption=caption, reply_markup=InlineKeyboardMarkup(btn)) | |
except (MediaEmpty, PhotoInvalidDimensions, WebpageMediaEmpty): | |
pic = imdb.get('poster') | |
poster = pic.replace('.jpg', "._V1_UX360.jpg") | |
await quer_y.message.reply_photo(photo=poster, caption=caption, reply_markup=InlineKeyboardMarkup(btn)) | |
except Exception as e: | |
logger.exception(e) | |
await quer_y.message.reply(caption, reply_markup=InlineKeyboardMarkup(btn), disable_web_page_preview=False) | |
await quer_y.message.delete() | |
else: | |
await quer_y.message.edit(caption, reply_markup=InlineKeyboardMarkup(btn), disable_web_page_preview=False) | |
await quer_y.answer() | |