😝

😝

DarkzzAngel
""" Userge Voice-Call Plugin """

# Copyright (C) 2020-2021 by UsergeTeam@Github, < https://github.com/UsergeTeam >.
#
# This file is part of < https://github.com/UsergeTeam/Userge > project,
# and is released under the "GNU v3.0 License Agreement".
# Please see < https://github.com/UsergeTeam/Userge/blob/master/LICENSE >
#
# All rights reserved
#
# Author (C) - @Krishna_Singhal (https://github.com/Krishna-Singhal)

import asyncio
import json
import os
import re
import shutil
from math import gcd
from traceback import format_exc
from typing import List, Tuple

from pyrogram.raw import functions
from pyrogram.types import (
    InlineKeyboardMarkup,
    InlineKeyboardButton,
    CallbackQuery,
    Message as RawMessage)
from pyrogram.types.messages_and_media.message import Str
from pytgcalls.methods.core import join_voice_call

from youtubesearchpython import VideosSearch

from userge import userge, Message, pool, filters, get_collection, Config
from userge.utils import time_formatter, import_ytdl, progress, runcmd
from userge.utils.exceptions import StopConversation

#  Reading requirements.txt to determine which library is to be used.
with open("requirements.txt", encoding="utf-8") as req:
    TGCALL_LIB = re.findall(
        r"py\-?tgcalls*",
        req.read(),
        re.IGNORECASE)[0].lower().strip()

LPYCALLS = TGCALL_LIB == "py-tgcalls"

if LPYCALLS:
    from pytgcalls import PyTgCalls
    from pytgcalls.exceptions import (
        NodeJSNotInstalled,
        TooOldNodeJSVersion,
        NoActiveGroupCall,
        AlreadyJoinedError,
        NotInGroupCallError
    )
    from pytgcalls.types.input_stream import (
        AudioVideoPiped,
        AudioPiped,
        VideoParameters
    )
    from pytgcalls.types.stream import (
        StreamAudioEnded
    )
    from pytgcalls.types import Update
    from pytgcalls import StreamType

    # https://github.com/pytgcalls/pytgcalls/blob/master/pytgcalls/mtproto/mtproto_client.py#L18
    userge.__class__.__module__ = 'pyrogram.client'
    call = PyTgCalls(userge, overload_quiet_mode=True)

else:
    from pytgcalls import GroupCallFactory
    from pytgcalls.exceptions import GroupCallNotFoundError
    call = GroupCallFactory(userge).get_group_call()

CHANNEL = userge.getCLogger(__name__)

VC_DB = get_collection("VC_CMDS_TOGGLE")
CMDS_FOR_ALL = False

ADMINS = {}

PLAYING = False

CHAT_NAME = ""
CHAT_ID = 0
CONTROL_CHAT_IDS: List[int] = []
QUEUE: List[Message] = []
CLIENT = userge

BACK_BUTTON_TEXT = ""
CQ_MSG: List[RawMessage] = []

ytdl = import_ytdl()

yt_regex = re.compile(
    r'(https?://)?(www\.)?'
    r'(youtube|youtu|youtube-nocookie)\.(com|be)/'
    r'(watch\?v=|embed/|v/|.+\?v=)?([^&=%?]{11})'
)
_SCHEDULED = "[{title}]({link}) Scheduled to QUEUE on #{position} position"


async def _init():
    global CMDS_FOR_ALL  # pylint: disable=global-statement
    data = await VC_DB.find_one({'_id': 'VC_CMD_TOGGLE'})
    if data:
        CMDS_FOR_ALL = bool(data['is_enable'])

    if LPYCALLS:
        await call.start()  # Initialising NodeJS


async def reply_text(
    msg: Message,
    text: str,
    markup=None,
    to_reply: bool = True,
    parse_mode: str = None,
    del_in: int = -1
) -> Message:
    kwargs = {
        'chat_id': msg.chat.id,
        'text': text,
        'del_in': del_in,
        'reply_to_message_id': msg.message_id if to_reply else None,
        'reply_markup': markup,
        'disable_web_page_preview': True
    }
    if parse_mode:
        kwargs['parse_mode'] = parse_mode
    new_msg = await msg.client.send_message(**kwargs)
    if to_reply and not isinstance(new_msg, bool):
        new_msg.reply_to_message = msg
    return new_msg


def _get_scheduled_text(title: str, link: str) -> str:
    return _SCHEDULED.format(title=title, link=link, position=len(QUEUE) + 1)


def vc_chat(func):
    """ decorator for Voice-Call chat """

    async def checker(msg: Message):
        if CHAT_ID and msg.chat.id in ([CHAT_ID] + CONTROL_CHAT_IDS):
            await func(msg)
        elif msg.outgoing:
            await msg.edit("`Haven't join any Voice-Call...`")

    checker.__doc__ = func.__doc__

    return checker


def check_enable_for_all(func):
    """ decorator to check cmd is_enable for others """

    async def checker(msg: Message):
        if (
            (
                msg.from_user
                and msg.from_user.id == userge.id
            ) or CMDS_FOR_ALL
        ):
            await func(msg)

    checker.__doc__ = func.__doc__

    return checker


def check_cq_for_all(func):
    """ decorator to check CallbackQuery users """

    async def checker(_, c_q: CallbackQuery):
        if c_q.from_user.id == userge.id or CMDS_FOR_ALL:
            await func(c_q)
        else:
            await c_q.answer(
                "⚠️ You don't have permission to use me", show_alert=True)

    checker.__doc__ = func.__doc__

    return checker


def default_markup():
    """ default markup for playing text """

    return InlineKeyboardMarkup(
        [[
            InlineKeyboardButton(text="⏩ Skip", callback_data="skip"),
            InlineKeyboardButton(text="πŸ—’ Queue", callback_data="queue")
        ]]
    )


def volume_button_markup():
    """ volume buttons markup """

    buttons = [
        [
            InlineKeyboardButton(text="πŸ”ˆ 50", callback_data="vol(50)"),
            InlineKeyboardButton(text="πŸ”‰ 100", callback_data="vol(100)")
        ],
        [
            InlineKeyboardButton(text="πŸ”‰ 150", callback_data="vol(150)"),
            InlineKeyboardButton(text="πŸ”Š 200", callback_data="vol(200)")
        ],
        [
            InlineKeyboardButton(text="πŸ–Œ Enter Manually", callback_data="vol(custom)"),
        ]
    ]

    return InlineKeyboardMarkup(buttons)


@userge.on_cmd("joinvc", about={
    'header': "Join Voice-Call",
    'flags': {
        '-as': "Join as any of your public channel.",
        '-at': "Joins vc in a remote chat and control it from saved messages/linked chat"},
    'examples': [
        "{tr}joinvc -as @TheUserge -at @UsergeOT - Join VC of @UsergeOT as @TheUserge.",
        "{tr}joinvc -at -100123456789 - Join VC of any private channel / group."]},
    allow_bots=False)
async def joinvc(msg: Message):
    """ join voice chat """
    global CHAT_NAME, CHAT_ID, CONTROL_CHAT_IDS  # pylint: disable=global-statement

    await msg.delete()

    if CHAT_NAME:
        return await reply_text(msg, f"`Already joined in {CHAT_NAME}`")

    regex = re.match(
        r'(\-as\s+(\-100\d+|[@a-zA-Z_0-9]+))?(\s+?\-at\s+(\-100\d+|[@a-zA-Z_0-9]+))?',
        msg.input_str)
    join_as = regex.group(2)
    chat = regex.group(4)
    if not chat and msg.chat.type == "private":
        return await msg.err("Invalid chat, either use in group / channel or use -at flag.")
    if chat:
        if chat.strip("-").isnumeric():
            chat = int(chat)
        try:
            _chat = await userge.get_chat(chat)
        except Exception as e:
            return await reply_text(msg, f'Invalid Join In Chat Specified\n{e}')
        CHAT_ID = _chat.id
        CHAT_NAME = _chat.title
        # Joins Voice_call in a remote chat and control it from Saved Messages
        # / Linked Chat
        CONTROL_CHAT_IDS.append(userge.id)
        if _chat.linked_chat:
            CONTROL_CHAT_IDS.append(_chat.linked_chat.id)
    else:
        CHAT_ID = msg.chat.id
        CHAT_NAME = msg.chat.title
    if join_as:
        if join_as.strip("-").isnumeric():
            join_as = int(join_as)
        try:
            join_as = (await userge.get_chat(join_as)).id
        except Exception as e:
            CHAT_ID, CHAT_NAME, CONTROL_CHAT_IDS = 0, '', []
            return await reply_text(msg, f'Invalid Join As Chat Specified\n{e}')
        join_as_peers = await userge.send(functions.phone.GetGroupCallJoinAs(
            peer=(
                await userge.resolve_peer(CHAT_ID)
            )
        ))
        raw_id = int(str(join_as).replace("-100", ""))
        if raw_id not in [
            getattr(peers, "user_id", None)
            or getattr(peers, "channel_id", None)
            for peers in join_as_peers.peers
        ]:
            CHAT_ID, CHAT_NAME, CONTROL_CHAT_IDS = 0, '', []
            return await reply_text(msg, "You cant join the voice chat as this channel.")

    if LPYCALLS:
        if join_as:
            peer = await userge.resolve_peer(join_as)
        else:
            peer = await userge.resolve_peer(userge.id)
        try:
            # Joining with a dummy audio, since py-tgcalls wont allow joining
            # without file.
            await call.join_group_call(
                CHAT_ID,
                AudioPiped(
                    'http://duramecho.com/Misc/SilentCd/Silence01s.mp3'
                ),
                join_as=peer,
                stream_type=StreamType().pulse_stream
            )
        except NoActiveGroupCall:
            try:
                peer = await userge.resolve_peer(CHAT_ID)
                await userge.send(
                    functions.phone.CreateGroupCall(
                        peer=peer, random_id=2
                    )
                )
                await asyncio.sleep(3)
                CHAT_ID, CHAT_NAME, CONTROL_CHAT_IDS = 0, '', []
                await joinvc(msg)
            except Exception as err:
                CHAT_ID, CHAT_NAME, CONTROL_CHAT_IDS = 0, '', []
                return await reply_text(msg, err)
        except (NodeJSNotInstalled, TooOldNodeJSVersion):
            return await reply_text(msg, "NodeJs is not installed or installed version is too old.")
        except AlreadyJoinedError:
            await call.leave_group_call(CHAT_ID)
            await asyncio.sleep(3)
            CHAT_ID, CHAT_NAME, CONTROL_CHAT_IDS = 0, '', []
            await joinvc(msg)
        except Exception as e:
            CHAT_ID, CHAT_NAME, CONTROL_CHAT_IDS = 0, '', []
            return await reply_text(msg, f'Error during Joining the Call\n`{e}`')
    else:
        try:
            await call.start(CHAT_ID, join_as=join_as, enable_action=False)
        except GroupCallNotFoundError:
            try:
                peer = await userge.resolve_peer(CHAT_ID)
                await userge.send(
                    functions.phone.CreateGroupCall(
                        peer=peer, random_id=2
                    )
                )
                await asyncio.sleep(3)
                await call.start(CHAT_ID, join_as=join_as, enable_action=False)
            except Exception as err:
                CHAT_ID, CHAT_NAME, CONTROL_CHAT_IDS = 0, '', []
                return await reply_text(msg, str(err))
        except Exception as e:
            CHAT_ID, CHAT_NAME, CONTROL_CHAT_IDS = 0, '', []
            return await msg.err(f'Error during Joining the Call {e}')
    await reply_text(msg, "`Joined VoiceChat Succesfully`", del_in=5)


@userge.on_cmd("leavevc", about={
    'header': "Leave Voice-Call",
    'usage': "{tr}leavevc"})
async def leavevc(msg: Message):
    """ leave voice chat """
    global CHAT_NAME, CHAT_ID, PLAYING, BACK_BUTTON_TEXT  # pylint: disable=global-statement

    await msg.delete()

    if CHAT_NAME:
        if LPYCALLS:
            try:
                await call.leave_group_call(CHAT_ID)
            except (NotInGroupCallError, NoActiveGroupCall):
                pass
        else:
            asyncio.get_event_loop().create_task(call.stop())
        CHAT_NAME = ""
        CHAT_ID = 0
        CONTROL_CHAT_IDS.clear()
        QUEUE.clear()
        PLAYING = False
        BACK_BUTTON_TEXT = ""

        await reply_text(msg, "`Left Voicechat`", del_in=5)
    else:
        await reply_text(msg, "`I didn't find any Voice-Chat to leave")


@userge.on_cmd("vcmode", about={
    'header': "Toggle to enable or disable play and queue commands for all users"})
async def toggle_vc(msg: Message):
    """ toggle enable/disable vc cmds """

    global CMDS_FOR_ALL  # pylint: disable=global-statement

    await msg.delete()
    CMDS_FOR_ALL = not CMDS_FOR_ALL

    await VC_DB.update_one(
        {'_id': 'VC_CMD_TOGGLE'},
        {"$set": {'is_enable': CMDS_FOR_ALL}},
        upsert=True
    )

    text = (
        "**Enabled**" if CMDS_FOR_ALL else "**Disabled**"
    ) + " commands Successfully"

    await reply_text(msg, text, del_in=5)


@userge.on_cmd("play", about={
    'header': "play or add songs to queue",
    'flags': {
        '-v': "Stream as video."}},
    trigger=Config.SUDO_TRIGGER, check_client=True,
    filter_me=False, allow_bots=False)
@vc_chat
@check_enable_for_all
async def play_music(msg: Message):
    """ play music in voice call """
    global CLIENT  # pylint: disable=global-statement

    input_str = msg.filtered_input_str
    flags = msg.flags
    is_video = "-v" in flags
    if input_str:
        if yt_regex.match(input_str):
            if PLAYING:
                msg = await reply_text(msg, _get_scheduled_text("Song", input_str))
            setattr(msg, '_flags', flags)
            QUEUE.append(msg)
        else:
            mesg = await reply_text(msg, f"Searching `{input_str}` on YouTube")
            title, link = await _get_song(input_str)
            if link:
                if PLAYING:
                    msg = await reply_text(msg, _get_scheduled_text(title, link))
                else:
                    msg = await msg.edit(f"[{title}]({link})", disable_web_page_preview=True)
                await mesg.delete()
                setattr(msg, '_flags', flags)
                QUEUE.append(msg)
            else:
                await mesg.edit("No results found.")
    elif msg.reply_to_message:
        replied = msg.reply_to_message
        replied_file = replied.audio or replied.video or replied.document
        if not replied_file:
            return await reply_text(msg, "Input not found")
        if replied.audio:
            setattr(
                replied.audio,
                'file_name',
                replied_file.title or replied_file.file_name or "Song")
            setattr(replied.audio, 'is_video', False)
        elif replied.video:
            setattr(replied.video, 'is_video', is_video)
        elif replied.document and "video" in replied.document.mime_type:
            setattr(replied.document, 'is_video', is_video)
        else:
            return await reply_text(msg, "Replied media is invalid.")

        setattr(replied, '_client', msg.client)
        CLIENT = msg.client
        QUEUE.append(replied)
        if PLAYING:
            await reply_text(msg, _get_scheduled_text(replied_file.file_name, replied.link))
    else:
        return await reply_text(msg, "Input not found")

    if not PLAYING:
        await handle_queue()


@userge.on_cmd("helpvc",
               about={'header': "help for voice_call plugin"},
               trigger=Config.SUDO_TRIGGER,
               allow_private=False,
               check_client=True,
               filter_me=False,
               allow_bots=False)
@vc_chat
@check_enable_for_all
async def _help(msg: Message):
    """ help commands of this plugin for others """

    commands = userge.manager.enabled_plugins["voice_call"].enabled_commands
    cmds = []
    raw_cmds = []

    for i in commands:
        if i.name.startswith(Config.SUDO_TRIGGER):
            cmds.append(i)
            raw_cmds.append(i.name.lstrip(Config.SUDO_TRIGGER))

    if not msg.input_str:
        out_str = f"""βš” ({len(cmds)}) Command(s) Available πŸ”§ Plugin: voice_call πŸ“˜ Doc: Userge Voice-Call Plugin\n\n""" for i, cmd in enumerate(cmds, start=1): out_str += ( f" πŸ€– cmd({i}): {cmd.name}\n" f" πŸ“š info: {cmd.doc}\n\n") await reply_text(msg, out_str, parse_mode="html") elif msg.input_str.lstrip(Config.SUDO_TRIGGER) in raw_cmds: key = msg.input_str.lstrip(Config.CMD_TRIGGER) key_ = Config.CMD_TRIGGER + key if key in commands: out_str = f"{key}\n\n{commands[key].about}" await reply_text(msg, out_str, parse_mode="html") elif key_ in commands: out_str = f"{key_}\n\n{commands[key_].about}" await reply_text(msg, out_str, parse_mode="html") @userge.on_cmd("forceplay", about={ 'header': "Force play with skip the current song and " "Play your song on #1 Position", 'flags': { '-v': "Stream as video."}}) @vc_chat async def force_play_music(msg: Message): """ Force play music in voice call """ if not PLAYING: return await play_music(msg) global CLIENT # pylint: disable=global-statement input_str = msg.filtered_input_str flags = msg.flags is_video = "-v" in flags if input_str: if not yt_regex.match(input_str): mesg = await reply_text(msg, f"Searching `{input_str}` on YouTube") title, link = await _get_song(input_str) if not link: return await mesg.edit("No results found.") await mesg.delete() msg.text = f"[{title}]({link})" setattr(msg, '_flags', flags) QUEUE.insert(0, msg) elif msg.reply_to_message: replied = msg.reply_to_message replied_file = replied.audio or replied.video or replied.document if not replied_file: return await reply_text(msg, "Input not found") if replied.audio: setattr( replied.audio, 'file_name', replied_file.title or replied_file.file_name or "Song") setattr(replied.audio, 'is_video', False) elif replied.video: setattr(replied.video, 'is_video', is_video) elif replied.document and "video" in replied.document.mime_type: setattr(replied.document, 'is_video', is_video) else: return await reply_text(msg, "Replied media is invalid.") setattr(replied, '_client', msg.client) CLIENT = msg.client QUEUE.insert(0, replied) else: return await reply_text(msg, "Input not found") await _skip() @userge.on_cmd("current", about={ 'header': "View Current playing Song.", 'usage': "{tr}current"}, trigger=Config.SUDO_TRIGGER, check_client=True, filter_me=False, allow_bots=False) @vc_chat @check_enable_for_all async def current(msg: Message): """ View current playing song """ await msg.delete() if not BACK_BUTTON_TEXT: return await reply_text(msg, "No song is playing!") await reply_text( msg, BACK_BUTTON_TEXT, markup=default_markup() if userge.has_bot else None, to_reply=True ) @userge.on_cmd("queue", about={ 'header': "View Queue of Songs", 'usage': "{tr}queue"}, trigger=Config.SUDO_TRIGGER, check_client=True, filter_me=False, allow_bots=False) @vc_chat @check_enable_for_all async def view_queue(msg: Message): """ View Queue """ await msg.delete() if not QUEUE: out = "`Queue is empty`" else: out = f"**{len(QUEUE)} Songs in Queue:**\n" for m in QUEUE: file = m.audio or m.video or m.document or None if file: out += f"\n - [{file.file_name}]({m.link})" else: title, link = _get_yt_info(m) out += f"\n - [{title}]({link})" await reply_text(msg, out) @userge.on_cmd("volume", about={ 'header': "Set volume", 'usage': "{tr}volume\n{tr}volume 69"}) @vc_chat async def set_volume(msg: Message): """ change volume """ await msg.delete() if msg.input_str: if msg.input_str.isnumeric(): if 200 >= int(msg.input_str) > 0: if LPYCALLS: await call.change_volume_call(CHAT_ID, int(msg.input_str)) else: await call.set_my_volume(int(msg.input_str)) await reply_text(msg, f"Successfully set volume to __{msg.input_str}__") else: await reply_text(msg, "Invalid Range!") else: await reply_text(msg, "Invalid Arguments!") else: try: await userge.bot.send_message( msg.chat.id, "**🎚 Volume Control**\n\n`Click on the button to change volume" " or Click last option to Enter volume manually.`", reply_markup=volume_button_markup() ) except Exception: await reply_text(msg, "Input not found!") @userge.on_cmd("skip", about={ 'header': "Skip Song", 'usage': "{tr}skip"}, trigger=Config.SUDO_TRIGGER, check_client=True, filter_me=False, allow_bots=False) @vc_chat async def skip_music(msg: Message): """ skip music in vc """ await msg.delete() await _skip() await reply_text(msg, "`Skipped`") @userge.on_cmd("pause", about={ 'header': "Pause Song.", 'usage': "{tr}pause"}, trigger=Config.SUDO_TRIGGER, check_client=True, filter_me=False, allow_bots=False) @vc_chat async def pause_music(msg: Message): """ pause music in vc """ await msg.delete() if LPYCALLS: await call.pause_stream(CHAT_ID) else: await call.set_pause(True) await reply_text(msg, "⏸️ **Paused** Music Successfully") @userge.on_cmd("resume", about={ 'header': "Resume Song.", 'usage': "{tr}resume"}, trigger=Config.SUDO_TRIGGER, check_client=True, filter_me=False, allow_bots=False) @vc_chat async def resume_music(msg: Message): """ resume music in vc """ await msg.delete() if LPYCALLS: await call.resume_stream(CHAT_ID) else: await call.set_pause(True) await reply_text(msg, "◀️ **Resumed** Music Successfully") @userge.on_cmd("stopvc", about={ 'header': "Stop vc and clear Queue.", 'usage': "{tr}stopvc"}) @vc_chat async def stop_music(msg: Message): """ stop music in vc """ await msg.delete() await _skip(True) await reply_text(msg, "`Stopped Userge-Music.`", del_in=5) if LPYCALLS: @call.on_stream_end() async def handler(_: PyTgCalls, update: Update): if isinstance(update, StreamAudioEnded): await handle_queue() else: @call.on_playout_ended async def skip_handler(_, __, ___): await handle_queue() async def handle_queue(): global PLAYING # pylint: disable=global-statement PLAYING = True await _skip() async def _skip(clear_queue: bool = False): global PLAYING # pylint: disable=global-statement if CQ_MSG: # deleting many messages without bot object πŸ˜‚πŸ˜‚ for msg in CQ_MSG: await msg.delete() if clear_queue: QUEUE.clear() if not QUEUE: PLAYING = False if LPYCALLS: await call.change_stream( CHAT_ID, AudioPiped( 'http://duramecho.com/Misc/SilentCd/Silence01s.mp3' ) ) else: await call.stop_media() return shutil.rmtree("temp_music_dir", ignore_errors=True) msg = QUEUE.pop(0) try: if msg.audio or msg.video or msg.document: await tg_down(msg) else: await yt_down(msg) except Exception as err: PLAYING = False out = f'**ERROR:** `{err}`' await CHANNEL.log(f"`{format_exc().strip()}`") if QUEUE: out += "\n\n`Playing next Song.`" await userge.send_message( CHAT_ID, out, disable_web_page_preview=True ) await handle_queue() async def yt_down(msg: Message): """ youtube downloader """ global BACK_BUTTON_TEXT # pylint: disable=global-statement title, url = _get_yt_info(msg) message = await reply_text(msg, f"`Downloading {title}`") song_details = await _get_song_info(url.strip()) if not song_details: await message.delete() return await _skip() title, duration = song_details stream_link = await get_stream_link(url) if not stream_link: raise Exception("Song not Downloaded, add again in Queue [your wish]") flags = msg.flags is_video = "-v" in flags if is_video: await play_video(stream_link) else: await play_audio(stream_link) await message.delete() def requester(): if not msg.from_user: return None replied = msg.reply_to_message if replied and msg.client.id == msg.from_user.id: if not replied.from_user: return None return replied.from_user.mention return msg.from_user.mention BACK_BUTTON_TEXT = ( f"🎢 **Now playing:** [{title}]({url})\n" f"⏳ **Duration:** `{duration}`\n" f"🎧 **Requested By:** {requester()}" ) raw_msg = await reply_text( msg, BACK_BUTTON_TEXT, markup=default_markup() if userge.has_bot else None, to_reply=False ) CQ_MSG.append(raw_msg) if msg.client.id == msg.from_user.id: await msg.delete() async def tg_down(msg: Message): """ TG downloader """ global BACK_BUTTON_TEXT # pylint: disable=global-statement file = msg.audio or msg.video or msg.document title = file.file_name setattr(msg, '_client', CLIENT) message = await reply_text(msg, f"`Downloading {title}`") path = await msg.client.download_media( message=msg, file_name="temp_music_dir/", progress=progress, progress_args=(message, "Downloading...")) filename = os.path.join("temp_music_dir", os.path.basename(path)) if msg.audio: duration = msg.audio.duration elif msg.video or msg.document: duration = await get_duration(filename) if duration > Config.MAX_DURATION: return await _skip() if not (msg.audio or await is_having_audio(filename)): out = "Invalid media found in queue, and skipped" if QUEUE: out += "\n\n`Playing next Song.`" await userge.send_message( CHAT_ID, out, disable_web_page_preview=True ) return await _skip() is_video = file.is_video if is_video: await play_video(filename) else: await play_audio(filename) await message.delete() BACK_BUTTON_TEXT = ( f"🎢 **Now playing:** [{title}]({msg.link})\n" f"⏳ **Duration:** `{time_formatter(duration)}`\n" f"🎧 **Requested By:** {msg.from_user.mention}" ) raw_msg = await reply_text( msg, BACK_BUTTON_TEXT, markup=default_markup() if userge.has_bot else None, to_reply=False ) CQ_MSG.append(raw_msg) async def play_video(file: str): if LPYCALLS: width, height = await get_height_and_width(file) if not height or not width: return await play_audio(file) try: await call.change_stream( CHAT_ID, AudioVideoPiped( file, video_parameters=VideoParameters( width, height, 25 ) ) ) except NotInGroupCallError: await call.join_group_call( CHAT_ID, AudioVideoPiped( file, video_parameters=VideoParameters( width, height, 25 ) ) ) else: await call.stop_media() await call.start_video(file, repeat=False) async def play_audio(file: str): if LPYCALLS: try: await call.change_stream( CHAT_ID, AudioPiped( file ) ) except NotInGroupCallError: await call.join_group_call( CHAT_ID, AudioPiped( file ), stream_type=StreamType().pulse_stream, ) else: await call.stop_media() await call.start_audio(file, repeat=False) async def get_stream_link(link: str) -> str: ytdl = (os.environ.get("YOUTUBE_DL_PATH", "youtube_dl")).replace("_", "-") cmd = ytdl + \ " --geo-bypass -g -f best[height<=?720][width<=?1280]/best " + link out, err, _, _ = await runcmd(cmd) if err: return False return out async def is_having_audio(file: str) -> bool: have_audio = False ffprobe_cmd = f"ffprobe -i {file} -v quiet -of json -show_streams" out, err, _, _ = await runcmd(ffprobe_cmd) if err: return have_audio out = json.loads(out) streams = out.get("streams") if not streams: return have_audio for stream in streams: codec = stream.get("codec_type") if codec and codec == "audio": have_audio = True break return have_audio async def get_duration(file: str) -> int: dur = 0 cmd = f"ffprobe -i {file} -v error -show_entries format=duration -of json -select_streams v:0" out, _, _, _ = await runcmd(cmd) try: out = json.loads(out) if out.get("format") and (out.get("format")).get("duration"): dur = int(float((out.get("format")).get("duration"))) else: dur = 0 except Exception: dur = 0 return dur async def get_height_and_width(file: str): cmd = f"ffprobe -v error -select_streams v -show_entries stream=width,height -of json {file}" out, _, _, _ = await runcmd(cmd) try: out = json.loads(out) streams = out.get("streams") if streams: return resize_ratio(int(streams[0].get("width", 640)), int( streams[0].get("height", 360))) return 0, 0 except BaseException: return 0, 0 def _get_yt_link(msg: Message) -> str: text = msg.text if isinstance(text, Str): text = text.markdown for _ in yt_regex.finditer(text): return _.group(0) return "" def _get_yt_info(msg: Message) -> Tuple[str, str]: if msg.entities: for e in msg.entities: if e.url: return msg.text[e.offset:e.length], e.url return "Song", _get_yt_link(msg) def resize_ratio(w: int, h: int) -> Tuple[int, int]: rescaling = min(w, 1280) * 100 / w if w > h else min(h, 720) * 100 / h h = round((h * rescaling) / 100) w = round((w * rescaling) / 100) divisor = gcd(w, h) ratio_w = w / divisor ratio_h = h / divisor factor = (divisor * 100) / 100 width = round(ratio_w * factor) height = round(ratio_h * factor) return width - 1 if width % 2 else width, height - 1 if height % 2 else height @pool.run_in_thread def _get_song(name: str) -> Tuple[str, str]: results: List[dict] = VideosSearch(name, limit=1).result()['result'] if results: return results[0].get('title', name), results[0].get('link') return name, "" @pool.run_in_thread def _get_song_info(url: str): ydl_opts = {} with ytdl.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) duration = info.get("duration") or 0 if duration > Config.MAX_DURATION: return False duration = info.get("duration") return info.get("title"), time_formatter(duration) if duration else "Live" if userge.has_bot: @userge.bot.on_callback_query(filters.regex("(skip|queue|back)")) @check_cq_for_all async def vc_callback(cq: CallbackQuery): if not CHAT_NAME: await cq.edit_message_text("`Already Left Voice-Call`") return if "skip" in cq.data: text = f"{cq.from_user.mention} Skipped the Song." pattern = re.compile(r'\[(.*)\]') name = None for match in pattern.finditer(BACK_BUTTON_TEXT): name = match.group(1) break if name: text = f"{cq.from_user.mention} Skipped `{name}`." if CQ_MSG: for i, msg in enumerate(CQ_MSG): if msg.message_id == cq.message.message_id: CQ_MSG.pop(i) break await cq.edit_message_text(text, disable_web_page_preview=True) await handle_queue() elif "queue" in cq.data: if not QUEUE: out = "`Queue is empty.`" else: out = f"**{len(QUEUE)} Song" out += f"{'s' if len(QUEUE) > 1 else ''} in Queue:**\n" for m in QUEUE: file = m.audio or m.video or m.document or None if file: out += f"\n - [{file.file_name}]({m.link})" else: title, link = _get_yt_info(m) out += f"\n - [{title}]({link})" out += f"\n\n**Clicked by:** {cq.from_user.mention}" button = InlineKeyboardMarkup( [[InlineKeyboardButton(text="Back", callback_data="back")]] ) await cq.edit_message_text( out, disable_web_page_preview=True, reply_markup=button ) elif "back" in cq.data: if BACK_BUTTON_TEXT: await cq.edit_message_text( BACK_BUTTON_TEXT, disable_web_page_preview=True, reply_markup=default_markup() ) else: await cq.message.delete() @userge.bot.on_callback_query(filters.regex(r"vol\((.+)\)")) @check_cq_for_all async def vol_callback(cq: CallbackQuery): arg = cq.matches[0].group(1) volume = 0 if arg.isnumeric(): volume = int(arg) elif arg == "custom": try: async with userge.conversation(cq.message.chat.id, user_id=cq.from_user.id) as conv: await cq.edit_message_text("`Now Input Volume`") def _filter(_, __, m: RawMessage) -> bool: r = m.reply_to_message return r and r.message_id == cq.message.message_id response = await conv.get_response(mark_read=True, filters=filters.create(_filter)) except StopConversation: await cq.edit_message_text("No arguments passed!") return if response.text.isnumeric(): volume = int(response.text) else: await cq.edit_message_text("`Invalid Arguments!`") return if 200 >= volume > 0: if LPYCALLS: await call.change_volume_call(CHAT_ID, volume) else: await call.set_my_volume(volume) await cq.edit_message_text(f"Successfully set volume to {volume}") else: await cq.edit_message_text("`Invalid Range!`") 

Report Page