Module pytgcalls.mtproto.telethon_bridge

Expand source code
#  tgcalls - a Python binding for C++ library by Telegram
#  pytgcalls - a library connecting the Python binding with MTProto
#  Copyright (C) 2020-2021 Il`ya (Marshal) <https://github.com/MarshalX>
#
#  This file is part of tgcalls and pytgcalls.
#
#  tgcalls and pytgcalls is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published
#  by the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  tgcalls and pytgcalls is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License v3
#  along with tgcalls. If not, see <http://www.gnu.org/licenses/>.

from asyncio import AbstractEventLoop
from typing import Callable

from telethon.errors import (
    BadRequestError as TelethonBadRequestError,
    GroupcallJoinMissingError as TelethonGroupcallJoinMissingError,
    GroupcallSsrcDuplicateMuchError as TelethonGroupcallSsrcDuplicateMuchError,
)
from telethon.events import Raw, StopPropagation
from telethon.tl import functions
from telethon.tl.types import (
    DataJSON,
    GroupCallDiscarded as TelethonGroupCallDiscarded,
    SpeakingInGroupCallAction,
    UpdateGroupCall,
    UpdateGroupCallConnection,
    UpdateGroupCallParticipants,
    InputPeerChat,
    InputPeerChannel,
)

from pytgcalls import PytgcallsError
from pytgcalls.mtproto import MTProtoBridgeBase
from pytgcalls.mtproto.data import GroupCallDiscardedWrapper, GroupCallParticipantWrapper, GroupCallWrapper
from pytgcalls.mtproto.data.update import UpdateGroupCallParticipantsWrapper, UpdateGroupCallWrapper
from pytgcalls.mtproto.exceptions import BadRequest, GroupcallSsrcDuplicateMuch
from pytgcalls.utils import int_ssrc


class TelethonBridge(MTProtoBridgeBase):
    def __init__(self, client):
        super().__init__(client)

        self._loop = client.loop

        self._update_to_handler = {
            UpdateGroupCallParticipants: self._process_group_call_participants_update,
            UpdateGroupCall: self._process_group_call_update,
        }

    async def _process_update(self, update):
        if type(update) not in self._update_to_handler.keys():
            return

        if not self.group_call or not update.call or update.call.id != self.group_call.id:
            return
        self.group_call = update.call

        await self._update_to_handler[type(update)](update)
        raise StopPropagation

    async def _process_group_call_participants_update(self, update):
        participants = [GroupCallParticipantWrapper.create(p) for p in update.participants]
        wrapped_update = UpdateGroupCallParticipantsWrapper(participants)

        await self.group_call_participants_update_callback(wrapped_update)

    async def _process_group_call_update(self, update):
        if not isinstance(update.call, TelethonGroupCallDiscarded):
            return

        call = GroupCallDiscardedWrapper()  # no info needed
        wrapped_update = UpdateGroupCallWrapper(update.chat_id, call)

        await self.group_call_update_callback(wrapped_update)

    async def _process_group_call_connection(self, update):
        # TODO update to new layer when pyrogram will release new stable version on pypi
        call = GroupCallWrapper('placeholder', update.params)
        wrapped_update = UpdateGroupCallWrapper('placeholder', call)

        await self.group_call_update_callback(wrapped_update)

    async def check_group_call(self) -> bool:
        if not self.group_call or not self.my_ssrc:
            return False

        try:
            ssrcs_in_group_call = await (
                self.client(
                    functions.phone.CheckGroupCallRequest(call=self.group_call, sources=[int_ssrc(self.my_ssrc)])
                )
            )
        except TelethonBadRequestError as e:
            if isinstance(e, TelethonGroupcallJoinMissingError):
                return False
            else:
                raise BadRequest(e)

        return int_ssrc(self.my_ssrc) in ssrcs_in_group_call

    async def leave_current_group_call(self):
        if not self.full_chat or not self.full_chat.call or not self.my_ssrc:
            return

        response = await self.client(
            functions.phone.LeaveGroupCallRequest(call=self.full_chat.call, source=int_ssrc(self.my_ssrc))
        )

        self.client._handle_update(response)

    async def edit_group_call_member(self, peer, volume: int = None, muted=False):
        response = await self.client(
            functions.phone.EditGroupCallParticipantRequest(
                call=self.full_chat.call,
                participant=peer,
                muted=muted,
                volume=volume,
            )
        )

        self.client._handle_update(response)

    async def get_and_set_self_peer(self):
        self.my_peer = await self.client.get_me(input_peer=True)

        return self.my_peer

    async def get_and_set_group_call(self, group):
        self.chat_peer = group

        if type(group) not in [InputPeerChannel, InputPeerChat]:
            self.chat_peer = await self.client.get_input_entity(group)

        if isinstance(self.chat_peer, InputPeerChannel):
            self.full_chat = (await self.client(functions.channels.GetFullChannelRequest(group))).full_chat
        elif isinstance(self.chat_peer, InputPeerChat):
            self.full_chat = (await self.client(functions.messages.GetFullChatRequest(group))).full_chat

        if self.full_chat is None:
            raise PytgcallsError(f'Can\'t get full chat by {group}')

        self.group_call = self.full_chat.call

        return self.group_call

    def unregister_update_handlers(self):
        self.client.remove_event_handler(self._process_update, Raw)

    def register_update_handlers(self):
        self.client.add_event_handler(self._process_update, Raw)

    async def resolve_and_set_join_as(self, join_as):
        if join_as is None:
            self.join_as = self.full_chat.groupcall_default_join_as
            if self.join_as is None:
                self.join_as = await self.get_and_set_self_peer()
        else:
            self.join_as = join_as

    async def send_speaking_group_call_action(self):
        await self.client(functions.messages.SetTypingRequest(peer=self.chat_peer, action=SpeakingInGroupCallAction()))

    async def join_group_call(self, invite_hash: str, params: str, muted: bool, pre_update_processing: Callable):
        try:
            response = await self.client(
                functions.phone.JoinGroupCallRequest(
                    call=self.group_call,
                    join_as=self.join_as,
                    invite_hash=invite_hash,
                    params=DataJSON(data=params),
                    muted=muted,
                )
            )

            pre_update_processing()

            # it is here cuz we need to associate params for connection with group call
            for update in response.updates:
                if isinstance(update, UpdateGroupCallConnection):
                    await self._process_group_call_connection(update)

            self.client._handle_update(response)
        except TelethonGroupcallSsrcDuplicateMuchError as e:
            raise GroupcallSsrcDuplicateMuch(e)

    def get_event_loop(self) -> AbstractEventLoop:
        return self._loop

Classes

class TelethonBridge (client)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class TelethonBridge(MTProtoBridgeBase):
    def __init__(self, client):
        super().__init__(client)

        self._loop = client.loop

        self._update_to_handler = {
            UpdateGroupCallParticipants: self._process_group_call_participants_update,
            UpdateGroupCall: self._process_group_call_update,
        }

    async def _process_update(self, update):
        if type(update) not in self._update_to_handler.keys():
            return

        if not self.group_call or not update.call or update.call.id != self.group_call.id:
            return
        self.group_call = update.call

        await self._update_to_handler[type(update)](update)
        raise StopPropagation

    async def _process_group_call_participants_update(self, update):
        participants = [GroupCallParticipantWrapper.create(p) for p in update.participants]
        wrapped_update = UpdateGroupCallParticipantsWrapper(participants)

        await self.group_call_participants_update_callback(wrapped_update)

    async def _process_group_call_update(self, update):
        if not isinstance(update.call, TelethonGroupCallDiscarded):
            return

        call = GroupCallDiscardedWrapper()  # no info needed
        wrapped_update = UpdateGroupCallWrapper(update.chat_id, call)

        await self.group_call_update_callback(wrapped_update)

    async def _process_group_call_connection(self, update):
        # TODO update to new layer when pyrogram will release new stable version on pypi
        call = GroupCallWrapper('placeholder', update.params)
        wrapped_update = UpdateGroupCallWrapper('placeholder', call)

        await self.group_call_update_callback(wrapped_update)

    async def check_group_call(self) -> bool:
        if not self.group_call or not self.my_ssrc:
            return False

        try:
            ssrcs_in_group_call = await (
                self.client(
                    functions.phone.CheckGroupCallRequest(call=self.group_call, sources=[int_ssrc(self.my_ssrc)])
                )
            )
        except TelethonBadRequestError as e:
            if isinstance(e, TelethonGroupcallJoinMissingError):
                return False
            else:
                raise BadRequest(e)

        return int_ssrc(self.my_ssrc) in ssrcs_in_group_call

    async def leave_current_group_call(self):
        if not self.full_chat or not self.full_chat.call or not self.my_ssrc:
            return

        response = await self.client(
            functions.phone.LeaveGroupCallRequest(call=self.full_chat.call, source=int_ssrc(self.my_ssrc))
        )

        self.client._handle_update(response)

    async def edit_group_call_member(self, peer, volume: int = None, muted=False):
        response = await self.client(
            functions.phone.EditGroupCallParticipantRequest(
                call=self.full_chat.call,
                participant=peer,
                muted=muted,
                volume=volume,
            )
        )

        self.client._handle_update(response)

    async def get_and_set_self_peer(self):
        self.my_peer = await self.client.get_me(input_peer=True)

        return self.my_peer

    async def get_and_set_group_call(self, group):
        self.chat_peer = group

        if type(group) not in [InputPeerChannel, InputPeerChat]:
            self.chat_peer = await self.client.get_input_entity(group)

        if isinstance(self.chat_peer, InputPeerChannel):
            self.full_chat = (await self.client(functions.channels.GetFullChannelRequest(group))).full_chat
        elif isinstance(self.chat_peer, InputPeerChat):
            self.full_chat = (await self.client(functions.messages.GetFullChatRequest(group))).full_chat

        if self.full_chat is None:
            raise PytgcallsError(f'Can\'t get full chat by {group}')

        self.group_call = self.full_chat.call

        return self.group_call

    def unregister_update_handlers(self):
        self.client.remove_event_handler(self._process_update, Raw)

    def register_update_handlers(self):
        self.client.add_event_handler(self._process_update, Raw)

    async def resolve_and_set_join_as(self, join_as):
        if join_as is None:
            self.join_as = self.full_chat.groupcall_default_join_as
            if self.join_as is None:
                self.join_as = await self.get_and_set_self_peer()
        else:
            self.join_as = join_as

    async def send_speaking_group_call_action(self):
        await self.client(functions.messages.SetTypingRequest(peer=self.chat_peer, action=SpeakingInGroupCallAction()))

    async def join_group_call(self, invite_hash: str, params: str, muted: bool, pre_update_processing: Callable):
        try:
            response = await self.client(
                functions.phone.JoinGroupCallRequest(
                    call=self.group_call,
                    join_as=self.join_as,
                    invite_hash=invite_hash,
                    params=DataJSON(data=params),
                    muted=muted,
                )
            )

            pre_update_processing()

            # it is here cuz we need to associate params for connection with group call
            for update in response.updates:
                if isinstance(update, UpdateGroupCallConnection):
                    await self._process_group_call_connection(update)

            self.client._handle_update(response)
        except TelethonGroupcallSsrcDuplicateMuchError as e:
            raise GroupcallSsrcDuplicateMuch(e)

    def get_event_loop(self) -> AbstractEventLoop:
        return self._loop

Ancestors

Instance variables

var chat_peer

Inherited from: MTProtoBridgeBase.chat_peer

Chat peer where bot is now

var client

Inherited from: MTProtoBridgeBase.client

Any MTProto client. Pyrogram/Telethon and so on

var full_chat

Inherited from: MTProtoBridgeBase.full_chat

Full chat information

var group_call

Inherited from: MTProtoBridgeBase.group_call

Instance of MTProto's group call

var group_call_participants_update_callback

Inherited from: MTProtoBridgeBase.group_call_participants_update_callback

Native handler of wrapped group call participants update

var group_call_update_callback

Inherited from: MTProtoBridgeBase.group_call_update_callback

Native handler of wrapped group call update

var join_as

Inherited from: MTProtoBridgeBase.join_as

How to present yourself in participants list

var my_peer

Inherited from: MTProtoBridgeBase.my_peer

Client user peer

var my_ssrc

Inherited from: MTProtoBridgeBase.my_ssrc

Client SSRC (Synchronization Source)

Methods

async def check_group_call(self) ‑> bool

Inherited from: MTProtoBridgeBase.check_group_call

Check if client is in a voice chat …

Expand source code
async def check_group_call(self) -> bool:
    if not self.group_call or not self.my_ssrc:
        return False

    try:
        ssrcs_in_group_call = await (
            self.client(
                functions.phone.CheckGroupCallRequest(call=self.group_call, sources=[int_ssrc(self.my_ssrc)])
            )
        )
    except TelethonBadRequestError as e:
        if isinstance(e, TelethonGroupcallJoinMissingError):
            return False
        else:
            raise BadRequest(e)

    return int_ssrc(self.my_ssrc) in ssrcs_in_group_call
async def edit_group_call_member(self, peer, volume: int = None, muted=False)

Inherited from: MTProtoBridgeBase.edit_group_call_member

call phone.EditGroupCallParticipant

Expand source code
async def edit_group_call_member(self, peer, volume: int = None, muted=False):
    response = await self.client(
        functions.phone.EditGroupCallParticipantRequest(
            call=self.full_chat.call,
            participant=peer,
            muted=muted,
            volume=volume,
        )
    )

    self.client._handle_update(response)
async def get_and_set_group_call(self, group)

Inherited from: MTProtoBridgeBase.get_and_set_group_call

there is group arg can be peer, int, string with username and so on need to support all of them …

Expand source code
async def get_and_set_group_call(self, group):
    self.chat_peer = group

    if type(group) not in [InputPeerChannel, InputPeerChat]:
        self.chat_peer = await self.client.get_input_entity(group)

    if isinstance(self.chat_peer, InputPeerChannel):
        self.full_chat = (await self.client(functions.channels.GetFullChannelRequest(group))).full_chat
    elif isinstance(self.chat_peer, InputPeerChat):
        self.full_chat = (await self.client(functions.messages.GetFullChatRequest(group))).full_chat

    if self.full_chat is None:
        raise PytgcallsError(f'Can\'t get full chat by {group}')

    self.group_call = self.full_chat.call

    return self.group_call
async def get_and_set_self_peer(self)

Inherited from: MTProtoBridgeBase.get_and_set_self_peer

resolve self peer and set to obj field

Expand source code
async def get_and_set_self_peer(self):
    self.my_peer = await self.client.get_me(input_peer=True)

    return self.my_peer
def get_event_loop(self) ‑> asyncio.events.AbstractEventLoop

Inherited from: MTProtoBridgeBase.get_event_loop

return MTProto client loop

Expand source code
def get_event_loop(self) -> AbstractEventLoop:
    return self._loop
async def join_group_call(self, invite_hash: str, params: str, muted: bool, pre_update_processing: Callable)

Inherited from: MTProtoBridgeBase.join_group_call

call phone.JoinGroupCall with group_call, join_as, invite hash, muted and params …

Expand source code
async def join_group_call(self, invite_hash: str, params: str, muted: bool, pre_update_processing: Callable):
    try:
        response = await self.client(
            functions.phone.JoinGroupCallRequest(
                call=self.group_call,
                join_as=self.join_as,
                invite_hash=invite_hash,
                params=DataJSON(data=params),
                muted=muted,
            )
        )

        pre_update_processing()

        # it is here cuz we need to associate params for connection with group call
        for update in response.updates:
            if isinstance(update, UpdateGroupCallConnection):
                await self._process_group_call_connection(update)

        self.client._handle_update(response)
    except TelethonGroupcallSsrcDuplicateMuchError as e:
        raise GroupcallSsrcDuplicateMuch(e)
async def leave_current_group_call(self)

Inherited from: MTProtoBridgeBase.leave_current_group_call

call phone.LeaveGroupCall and handle returned updates

Expand source code
async def leave_current_group_call(self):
    if not self.full_chat or not self.full_chat.call or not self.my_ssrc:
        return

    response = await self.client(
        functions.phone.LeaveGroupCallRequest(call=self.full_chat.call, source=int_ssrc(self.my_ssrc))
    )

    self.client._handle_update(response)
def re_register_update_handlers(self)

Inherited from: MTProtoBridgeBase.re_register_update_handlers

Delete and add pytgcalls handler in MTProto client.

def register_update_handlers(self)

Inherited from: MTProtoBridgeBase.register_update_handlers

register handlers

Expand source code
def register_update_handlers(self):
    self.client.add_event_handler(self._process_update, Raw)
async def resolve_and_set_join_as(self, join_as)

Inherited from: MTProtoBridgeBase.resolve_and_set_join_as

join_as arg can be str on peer. if it str we need to resolve peer save join_as to class field

Expand source code
async def resolve_and_set_join_as(self, join_as):
    if join_as is None:
        self.join_as = self.full_chat.groupcall_default_join_as
        if self.join_as is None:
            self.join_as = await self.get_and_set_self_peer()
    else:
        self.join_as = join_as
async def send_speaking_group_call_action(self)

Inherited from: MTProtoBridgeBase.send_speaking_group_call_action

call messages.SetTyping with SpeakingInGroupCallAction by chat_peer

Expand source code
async def send_speaking_group_call_action(self):
    await self.client(functions.messages.SetTypingRequest(peer=self.chat_peer, action=SpeakingInGroupCallAction()))
def unregister_update_handlers(self)

Inherited from: MTProtoBridgeBase.unregister_update_handlers

delete all registered handlers from MTProto client

Expand source code
def unregister_update_handlers(self):
    self.client.remove_event_handler(self._process_update, Raw)