Module pytgcalls.mtproto.pyrogram_bridge

Expand source code
#  tgcalls - a Python binding for C++ library by Telegram
#  pytgcalls - a library connecting the Python binding with MTProto
#  Copyright (C) 2020-2021 Il`ya (Marshal) <https://github.com/MarshalX>
#
#  This file is part of tgcalls and pytgcalls.
#
#  tgcalls and pytgcalls is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published
#  by the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  tgcalls and pytgcalls is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License v3
#  along with tgcalls. If not, see <http://www.gnu.org/licenses/>.

from asyncio import AbstractEventLoop
from typing import Callable

from pyrogram.errors import (
    BadRequest as PyrogramBadRequest,
    GroupcallSsrcDuplicateMuch as PyrogramGroupcallSsrcDuplicateMuch,
)
from pyrogram.handlers import RawUpdateHandler
from pyrogram.raw import functions, types
from pyrogram.raw.types import GroupCallDiscarded as PyrogramGroupCallDiscarded, InputPeerChannel, InputPeerChat
from pyrogram.utils import get_peer_id

from pytgcalls import PytgcallsError
from pytgcalls.mtproto.data import GroupCallDiscardedWrapper, GroupCallWrapper, GroupCallParticipantWrapper
from pytgcalls.mtproto.data.update import UpdateGroupCallWrapper, UpdateGroupCallParticipantsWrapper
from pytgcalls.mtproto.exceptions import BadRequest, GroupcallSsrcDuplicateMuch
from pytgcalls.utils import int_ssrc

from pyrogram import Client, ContinuePropagation

from pytgcalls.mtproto import MTProtoBridgeBase


class PyrogramBridge(MTProtoBridgeBase):
    def __init__(self, client: Client):
        super().__init__(client)

        self._update_to_handler = {
            types.UpdateGroupCallParticipants: self._process_group_call_participants_update,
            types.UpdateGroupCall: self._process_group_call_update,
        }

        self._handler_group = None
        self._update_handler = RawUpdateHandler(self._process_update)

    async def _process_update(self, _, update, users, chats):
        if type(update) not in self._update_to_handler.keys():
            raise ContinuePropagation

        if not self.group_call or not update.call or update.call.id != self.group_call.id:
            raise ContinuePropagation
        self.group_call = update.call

        await self._update_to_handler[type(update)](update)

    async def _process_group_call_participants_update(self, update):
        participants = [GroupCallParticipantWrapper.create(p) for p in update.participants]
        wrapped_update = UpdateGroupCallParticipantsWrapper(participants)

        await self.group_call_participants_update_callback(wrapped_update)

    async def _process_group_call_update(self, update):
        if isinstance(update.call, PyrogramGroupCallDiscarded):
            call = GroupCallDiscardedWrapper()  # no info needed
        else:
            call = GroupCallWrapper(update.call.id, update.call.params)

        wrapped_update = UpdateGroupCallWrapper(update.chat_id, call)

        await self.group_call_update_callback(wrapped_update)

    async def check_group_call(self) -> bool:
        if not self.group_call or not self.my_ssrc:
            return False

        try:
            in_group_call = await (
                self.client.send(functions.phone.CheckGroupCall(call=self.group_call, source=int_ssrc(self.my_ssrc)))
            )
        except PyrogramBadRequest as e:
            if e.x != '[400 GROUPCALL_JOIN_MISSING]':
                raise BadRequest(e.x)

            in_group_call = False

        return in_group_call

    async def leave_current_group_call(self):
        if not self.full_chat or not self.full_chat.call or not self.my_ssrc:
            return

        response = await self.client.send(
            functions.phone.LeaveGroupCall(call=self.full_chat.call, source=int_ssrc(self.my_ssrc))
        )
        await self.client.handle_updates(response)

    async def edit_group_call_member(self, peer, volume: int = None, muted=False):
        response = await self.client.send(
            functions.phone.EditGroupCallParticipant(
                call=self.full_chat.call,
                participant=peer,
                muted=muted,
                volume=volume,
            )
        )
        await self.client.handle_updates(response)

    async def get_and_set_self_peer(self):
        self.my_peer = await self.client.resolve_peer(await self.client.storage.user_id())

        return self.my_peer

    async def get_and_set_group_call(self, group):
        """Get group call input of chat.

        Args:
            group (`InputPeerChannel` | `InputPeerChat` | `str` | `int`): Chat ID in any form.

        Returns:
            `InputGroupCall`.
        """

        self.chat_peer = group
        if type(group) not in [InputPeerChannel, InputPeerChat]:
            self.chat_peer = await self.client.resolve_peer(group)

        if isinstance(self.chat_peer, InputPeerChannel):
            self.full_chat = (
                await (self.client.send(functions.channels.GetFullChannel(channel=self.chat_peer)))
            ).full_chat
        elif isinstance(self.chat_peer, InputPeerChat):
            self.full_chat = (
                await (self.client.send(functions.messages.GetFullChat(chat_id=self.chat_peer.chat_id)))
            ).full_chat

        if self.full_chat is None:
            raise PytgcallsError(f'Can\'t get full chat by {group}')

        self.group_call = self.full_chat.call

        return self.group_call

    def unregister_update_handlers(self):
        if self._handler_group:
            self.client.remove_handler(self._update_handler, self._handler_group)
            self._handler_group = None

    def register_update_handlers(self):
        if self.group_call.id > 0:
            self._handler_group = -self.group_call.id
        self._handler_group = self.group_call.id

        self.client.add_handler(self._update_handler, self._handler_group)

    async def resolve_and_set_join_as(self, join_as):
        my_peer = await self.get_and_set_self_peer()

        if join_as is None:
            self.join_as = self.full_chat.groupcall_default_join_as
            if self.join_as:
                # convert Peer to InputPeer
                self.join_as = await self.client.resolve_peer(get_peer_id(self.join_as))
            else:
                self.join_as = my_peer
        elif isinstance(join_as, str) or isinstance(join_as, int):
            self.join_as = await self.client.resolve_peer(join_as)
        else:
            self.join_as = join_as

    async def send_speaking_group_call_action(self):
        await self.client.send(
            functions.messages.SetTyping(peer=self.chat_peer, action=types.SpeakingInGroupCallAction())
        )

    async def join_group_call(self, invite_hash: str, params: str, muted: bool, pre_update_processing: Callable):
        try:
            response = await self.client.send(
                functions.phone.JoinGroupCall(
                    call=self.group_call,
                    join_as=self.join_as,
                    invite_hash=invite_hash,
                    params=types.DataJSON(data=params),
                    muted=muted,
                )
            )

            pre_update_processing()

            await self.client.handle_updates(response)
        except PyrogramGroupcallSsrcDuplicateMuch as e:
            raise GroupcallSsrcDuplicateMuch(e.x)

    def get_event_loop(self) -> AbstractEventLoop:
        return self.client.loop

Classes

class PyrogramBridge (client: pyrogram.client.Client)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class PyrogramBridge(MTProtoBridgeBase):
    def __init__(self, client: Client):
        super().__init__(client)

        self._update_to_handler = {
            types.UpdateGroupCallParticipants: self._process_group_call_participants_update,
            types.UpdateGroupCall: self._process_group_call_update,
        }

        self._handler_group = None
        self._update_handler = RawUpdateHandler(self._process_update)

    async def _process_update(self, _, update, users, chats):
        if type(update) not in self._update_to_handler.keys():
            raise ContinuePropagation

        if not self.group_call or not update.call or update.call.id != self.group_call.id:
            raise ContinuePropagation
        self.group_call = update.call

        await self._update_to_handler[type(update)](update)

    async def _process_group_call_participants_update(self, update):
        participants = [GroupCallParticipantWrapper.create(p) for p in update.participants]
        wrapped_update = UpdateGroupCallParticipantsWrapper(participants)

        await self.group_call_participants_update_callback(wrapped_update)

    async def _process_group_call_update(self, update):
        if isinstance(update.call, PyrogramGroupCallDiscarded):
            call = GroupCallDiscardedWrapper()  # no info needed
        else:
            call = GroupCallWrapper(update.call.id, update.call.params)

        wrapped_update = UpdateGroupCallWrapper(update.chat_id, call)

        await self.group_call_update_callback(wrapped_update)

    async def check_group_call(self) -> bool:
        if not self.group_call or not self.my_ssrc:
            return False

        try:
            in_group_call = await (
                self.client.send(functions.phone.CheckGroupCall(call=self.group_call, source=int_ssrc(self.my_ssrc)))
            )
        except PyrogramBadRequest as e:
            if e.x != '[400 GROUPCALL_JOIN_MISSING]':
                raise BadRequest(e.x)

            in_group_call = False

        return in_group_call

    async def leave_current_group_call(self):
        if not self.full_chat or not self.full_chat.call or not self.my_ssrc:
            return

        response = await self.client.send(
            functions.phone.LeaveGroupCall(call=self.full_chat.call, source=int_ssrc(self.my_ssrc))
        )
        await self.client.handle_updates(response)

    async def edit_group_call_member(self, peer, volume: int = None, muted=False):
        response = await self.client.send(
            functions.phone.EditGroupCallParticipant(
                call=self.full_chat.call,
                participant=peer,
                muted=muted,
                volume=volume,
            )
        )
        await self.client.handle_updates(response)

    async def get_and_set_self_peer(self):
        self.my_peer = await self.client.resolve_peer(await self.client.storage.user_id())

        return self.my_peer

    async def get_and_set_group_call(self, group):
        """Get group call input of chat.

        Args:
            group (`InputPeerChannel` | `InputPeerChat` | `str` | `int`): Chat ID in any form.

        Returns:
            `InputGroupCall`.
        """

        self.chat_peer = group
        if type(group) not in [InputPeerChannel, InputPeerChat]:
            self.chat_peer = await self.client.resolve_peer(group)

        if isinstance(self.chat_peer, InputPeerChannel):
            self.full_chat = (
                await (self.client.send(functions.channels.GetFullChannel(channel=self.chat_peer)))
            ).full_chat
        elif isinstance(self.chat_peer, InputPeerChat):
            self.full_chat = (
                await (self.client.send(functions.messages.GetFullChat(chat_id=self.chat_peer.chat_id)))
            ).full_chat

        if self.full_chat is None:
            raise PytgcallsError(f'Can\'t get full chat by {group}')

        self.group_call = self.full_chat.call

        return self.group_call

    def unregister_update_handlers(self):
        if self._handler_group:
            self.client.remove_handler(self._update_handler, self._handler_group)
            self._handler_group = None

    def register_update_handlers(self):
        if self.group_call.id > 0:
            self._handler_group = -self.group_call.id
        self._handler_group = self.group_call.id

        self.client.add_handler(self._update_handler, self._handler_group)

    async def resolve_and_set_join_as(self, join_as):
        my_peer = await self.get_and_set_self_peer()

        if join_as is None:
            self.join_as = self.full_chat.groupcall_default_join_as
            if self.join_as:
                # convert Peer to InputPeer
                self.join_as = await self.client.resolve_peer(get_peer_id(self.join_as))
            else:
                self.join_as = my_peer
        elif isinstance(join_as, str) or isinstance(join_as, int):
            self.join_as = await self.client.resolve_peer(join_as)
        else:
            self.join_as = join_as

    async def send_speaking_group_call_action(self):
        await self.client.send(
            functions.messages.SetTyping(peer=self.chat_peer, action=types.SpeakingInGroupCallAction())
        )

    async def join_group_call(self, invite_hash: str, params: str, muted: bool, pre_update_processing: Callable):
        try:
            response = await self.client.send(
                functions.phone.JoinGroupCall(
                    call=self.group_call,
                    join_as=self.join_as,
                    invite_hash=invite_hash,
                    params=types.DataJSON(data=params),
                    muted=muted,
                )
            )

            pre_update_processing()

            await self.client.handle_updates(response)
        except PyrogramGroupcallSsrcDuplicateMuch as e:
            raise GroupcallSsrcDuplicateMuch(e.x)

    def get_event_loop(self) -> AbstractEventLoop:
        return self.client.loop

Ancestors

Instance variables

var chat_peer

Inherited from: MTProtoBridgeBase.chat_peer

Chat peer where bot is now

var client

Inherited from: MTProtoBridgeBase.client

Any MTProto client. Pyrogram/Telethon and so on

var full_chat

Inherited from: MTProtoBridgeBase.full_chat

Full chat information

var group_call

Inherited from: MTProtoBridgeBase.group_call

Instance of MTProto's group call

var group_call_participants_update_callback

Inherited from: MTProtoBridgeBase.group_call_participants_update_callback

Native handler of wrapped group call participants update

var group_call_update_callback

Inherited from: MTProtoBridgeBase.group_call_update_callback

Native handler of wrapped group call update

var join_as

Inherited from: MTProtoBridgeBase.join_as

How to present yourself in participants list

var my_peer

Inherited from: MTProtoBridgeBase.my_peer

Client user peer

var my_ssrc

Inherited from: MTProtoBridgeBase.my_ssrc

Client SSRC (Synchronization Source)

Methods

async def check_group_call(self) ‑> bool

Inherited from: MTProtoBridgeBase.check_group_call

Check if client is in a voice chat …

Expand source code
async def check_group_call(self) -> bool:
    if not self.group_call or not self.my_ssrc:
        return False

    try:
        in_group_call = await (
            self.client.send(functions.phone.CheckGroupCall(call=self.group_call, source=int_ssrc(self.my_ssrc)))
        )
    except PyrogramBadRequest as e:
        if e.x != '[400 GROUPCALL_JOIN_MISSING]':
            raise BadRequest(e.x)

        in_group_call = False

    return in_group_call
async def edit_group_call_member(self, peer, volume: int = None, muted=False)

Inherited from: MTProtoBridgeBase.edit_group_call_member

call phone.EditGroupCallParticipant

Expand source code
async def edit_group_call_member(self, peer, volume: int = None, muted=False):
    response = await self.client.send(
        functions.phone.EditGroupCallParticipant(
            call=self.full_chat.call,
            participant=peer,
            muted=muted,
            volume=volume,
        )
    )
    await self.client.handle_updates(response)
async def get_and_set_group_call(self, group)

Get group call input of chat.

Args

group (InputPeerChannel | InputPeerChat | str | int): Chat ID in any form.

Returns

InputGroupCall.

Expand source code
async def get_and_set_group_call(self, group):
    """Get group call input of chat.

    Args:
        group (`InputPeerChannel` | `InputPeerChat` | `str` | `int`): Chat ID in any form.

    Returns:
        `InputGroupCall`.
    """

    self.chat_peer = group
    if type(group) not in [InputPeerChannel, InputPeerChat]:
        self.chat_peer = await self.client.resolve_peer(group)

    if isinstance(self.chat_peer, InputPeerChannel):
        self.full_chat = (
            await (self.client.send(functions.channels.GetFullChannel(channel=self.chat_peer)))
        ).full_chat
    elif isinstance(self.chat_peer, InputPeerChat):
        self.full_chat = (
            await (self.client.send(functions.messages.GetFullChat(chat_id=self.chat_peer.chat_id)))
        ).full_chat

    if self.full_chat is None:
        raise PytgcallsError(f'Can\'t get full chat by {group}')

    self.group_call = self.full_chat.call

    return self.group_call
async def get_and_set_self_peer(self)

Inherited from: MTProtoBridgeBase.get_and_set_self_peer

resolve self peer and set to obj field

Expand source code
async def get_and_set_self_peer(self):
    self.my_peer = await self.client.resolve_peer(await self.client.storage.user_id())

    return self.my_peer
def get_event_loop(self) ‑> asyncio.events.AbstractEventLoop

Inherited from: MTProtoBridgeBase.get_event_loop

return MTProto client loop

Expand source code
def get_event_loop(self) -> AbstractEventLoop:
    return self.client.loop
async def join_group_call(self, invite_hash: str, params: str, muted: bool, pre_update_processing: Callable)

Inherited from: MTProtoBridgeBase.join_group_call

call phone.JoinGroupCall with group_call, join_as, invite hash, muted and params …

Expand source code
async def join_group_call(self, invite_hash: str, params: str, muted: bool, pre_update_processing: Callable):
    try:
        response = await self.client.send(
            functions.phone.JoinGroupCall(
                call=self.group_call,
                join_as=self.join_as,
                invite_hash=invite_hash,
                params=types.DataJSON(data=params),
                muted=muted,
            )
        )

        pre_update_processing()

        await self.client.handle_updates(response)
    except PyrogramGroupcallSsrcDuplicateMuch as e:
        raise GroupcallSsrcDuplicateMuch(e.x)
async def leave_current_group_call(self)

Inherited from: MTProtoBridgeBase.leave_current_group_call

call phone.LeaveGroupCall and handle returned updates

Expand source code
async def leave_current_group_call(self):
    if not self.full_chat or not self.full_chat.call or not self.my_ssrc:
        return

    response = await self.client.send(
        functions.phone.LeaveGroupCall(call=self.full_chat.call, source=int_ssrc(self.my_ssrc))
    )
    await self.client.handle_updates(response)
def re_register_update_handlers(self)

Inherited from: MTProtoBridgeBase.re_register_update_handlers

Delete and add pytgcalls handler in MTProto client.

def register_update_handlers(self)

Inherited from: MTProtoBridgeBase.register_update_handlers

register handlers

Expand source code
def register_update_handlers(self):
    if self.group_call.id > 0:
        self._handler_group = -self.group_call.id
    self._handler_group = self.group_call.id

    self.client.add_handler(self._update_handler, self._handler_group)
async def resolve_and_set_join_as(self, join_as)

Inherited from: MTProtoBridgeBase.resolve_and_set_join_as

join_as arg can be str on peer. if it str we need to resolve peer save join_as to class field

Expand source code
async def resolve_and_set_join_as(self, join_as):
    my_peer = await self.get_and_set_self_peer()

    if join_as is None:
        self.join_as = self.full_chat.groupcall_default_join_as
        if self.join_as:
            # convert Peer to InputPeer
            self.join_as = await self.client.resolve_peer(get_peer_id(self.join_as))
        else:
            self.join_as = my_peer
    elif isinstance(join_as, str) or isinstance(join_as, int):
        self.join_as = await self.client.resolve_peer(join_as)
    else:
        self.join_as = join_as
async def send_speaking_group_call_action(self)

Inherited from: MTProtoBridgeBase.send_speaking_group_call_action

call messages.SetTyping with SpeakingInGroupCallAction by chat_peer

Expand source code
async def send_speaking_group_call_action(self):
    await self.client.send(
        functions.messages.SetTyping(peer=self.chat_peer, action=types.SpeakingInGroupCallAction())
    )
def unregister_update_handlers(self)

Inherited from: MTProtoBridgeBase.unregister_update_handlers

delete all registered handlers from MTProto client

Expand source code
def unregister_update_handlers(self):
    if self._handler_group:
        self.client.remove_handler(self._update_handler, self._handler_group)
        self._handler_group = None