Module pytgcalls.mtproto.telethon_bridge
Expand source code
# tgcalls - a Python binding for C++ library by Telegram
# pytgcalls - a library connecting the Python binding with MTProto
# Copyright (C) 2020-2021 Il`ya (Marshal) <https://github.com/MarshalX>
#
# This file is part of tgcalls and pytgcalls.
#
# tgcalls and pytgcalls is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# tgcalls and pytgcalls is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License v3
# along with tgcalls. If not, see <http://www.gnu.org/licenses/>.
from asyncio import AbstractEventLoop
from typing import Callable
from telethon.errors import (
BadRequestError as TelethonBadRequestError,
GroupcallJoinMissingError as TelethonGroupcallJoinMissingError,
GroupcallSsrcDuplicateMuchError as TelethonGroupcallSsrcDuplicateMuchError,
)
from telethon.events import Raw, StopPropagation
from telethon.tl import functions
from telethon.tl.types import (
DataJSON,
GroupCallDiscarded as TelethonGroupCallDiscarded,
SpeakingInGroupCallAction,
UpdateGroupCall,
UpdateGroupCallConnection,
UpdateGroupCallParticipants,
InputPeerChat,
InputPeerChannel,
)
from pytgcalls import PytgcallsError
from pytgcalls.mtproto import MTProtoBridgeBase
from pytgcalls.mtproto.data import GroupCallDiscardedWrapper, GroupCallParticipantWrapper, GroupCallWrapper
from pytgcalls.mtproto.data.update import UpdateGroupCallParticipantsWrapper, UpdateGroupCallWrapper
from pytgcalls.mtproto.exceptions import BadRequest, GroupcallSsrcDuplicateMuch
from pytgcalls.utils import int_ssrc
class TelethonBridge(MTProtoBridgeBase):
def __init__(self, client):
super().__init__(client)
self._loop = client.loop
self._update_to_handler = {
UpdateGroupCallParticipants: self._process_group_call_participants_update,
UpdateGroupCall: self._process_group_call_update,
}
async def _process_update(self, update):
if type(update) not in self._update_to_handler.keys():
return
if not self.group_call or not update.call or update.call.id != self.group_call.id:
return
self.group_call = update.call
await self._update_to_handler[type(update)](update)
raise StopPropagation
async def _process_group_call_participants_update(self, update):
participants = [GroupCallParticipantWrapper.create(p) for p in update.participants]
wrapped_update = UpdateGroupCallParticipantsWrapper(participants)
await self.group_call_participants_update_callback(wrapped_update)
async def _process_group_call_update(self, update):
if not isinstance(update.call, TelethonGroupCallDiscarded):
return
call = GroupCallDiscardedWrapper() # no info needed
wrapped_update = UpdateGroupCallWrapper(update.chat_id, call)
await self.group_call_update_callback(wrapped_update)
async def _process_group_call_connection(self, update):
# TODO update to new layer when pyrogram will release new stable version on pypi
call = GroupCallWrapper('placeholder', update.params)
wrapped_update = UpdateGroupCallWrapper('placeholder', call)
await self.group_call_update_callback(wrapped_update)
async def check_group_call(self) -> bool:
if not self.group_call or not self.my_ssrc:
return False
try:
ssrcs_in_group_call = await (
self.client(
functions.phone.CheckGroupCallRequest(call=self.group_call, sources=[int_ssrc(self.my_ssrc)])
)
)
except TelethonBadRequestError as e:
if isinstance(e, TelethonGroupcallJoinMissingError):
return False
else:
raise BadRequest(e)
return int_ssrc(self.my_ssrc) in ssrcs_in_group_call
async def leave_current_group_call(self):
if not self.full_chat or not self.full_chat.call or not self.my_ssrc:
return
response = await self.client(
functions.phone.LeaveGroupCallRequest(call=self.full_chat.call, source=int_ssrc(self.my_ssrc))
)
self.client._handle_update(response)
async def edit_group_call_member(self, peer, volume: int = None, muted=False):
response = await self.client(
functions.phone.EditGroupCallParticipantRequest(
call=self.full_chat.call,
participant=peer,
muted=muted,
volume=volume,
)
)
self.client._handle_update(response)
async def get_and_set_self_peer(self):
self.my_peer = await self.client.get_me(input_peer=True)
return self.my_peer
async def get_and_set_group_call(self, group):
self.chat_peer = group
if type(group) not in [InputPeerChannel, InputPeerChat]:
self.chat_peer = await self.client.get_input_entity(group)
if isinstance(self.chat_peer, InputPeerChannel):
self.full_chat = (await self.client(functions.channels.GetFullChannelRequest(group))).full_chat
elif isinstance(self.chat_peer, InputPeerChat):
self.full_chat = (await self.client(functions.messages.GetFullChatRequest(group))).full_chat
if self.full_chat is None:
raise PytgcallsError(f'Can\'t get full chat by {group}')
self.group_call = self.full_chat.call
return self.group_call
def unregister_update_handlers(self):
self.client.remove_event_handler(self._process_update, Raw)
def register_update_handlers(self):
self.client.add_event_handler(self._process_update, Raw)
async def resolve_and_set_join_as(self, join_as):
if join_as is None:
self.join_as = self.full_chat.groupcall_default_join_as
if self.join_as is None:
self.join_as = await self.get_and_set_self_peer()
else:
self.join_as = join_as
async def send_speaking_group_call_action(self):
await self.client(functions.messages.SetTypingRequest(peer=self.chat_peer, action=SpeakingInGroupCallAction()))
async def join_group_call(self, invite_hash: str, params: str, muted: bool, pre_update_processing: Callable):
try:
response = await self.client(
functions.phone.JoinGroupCallRequest(
call=self.group_call,
join_as=self.join_as,
invite_hash=invite_hash,
params=DataJSON(data=params),
muted=muted,
)
)
pre_update_processing()
# it is here cuz we need to associate params for connection with group call
for update in response.updates:
if isinstance(update, UpdateGroupCallConnection):
await self._process_group_call_connection(update)
self.client._handle_update(response)
except TelethonGroupcallSsrcDuplicateMuchError as e:
raise GroupcallSsrcDuplicateMuch(e)
def get_event_loop(self) -> AbstractEventLoop:
return self._loop
Classes
class TelethonBridge (client)
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class TelethonBridge(MTProtoBridgeBase): def __init__(self, client): super().__init__(client) self._loop = client.loop self._update_to_handler = { UpdateGroupCallParticipants: self._process_group_call_participants_update, UpdateGroupCall: self._process_group_call_update, } async def _process_update(self, update): if type(update) not in self._update_to_handler.keys(): return if not self.group_call or not update.call or update.call.id != self.group_call.id: return self.group_call = update.call await self._update_to_handler[type(update)](update) raise StopPropagation async def _process_group_call_participants_update(self, update): participants = [GroupCallParticipantWrapper.create(p) for p in update.participants] wrapped_update = UpdateGroupCallParticipantsWrapper(participants) await self.group_call_participants_update_callback(wrapped_update) async def _process_group_call_update(self, update): if not isinstance(update.call, TelethonGroupCallDiscarded): return call = GroupCallDiscardedWrapper() # no info needed wrapped_update = UpdateGroupCallWrapper(update.chat_id, call) await self.group_call_update_callback(wrapped_update) async def _process_group_call_connection(self, update): # TODO update to new layer when pyrogram will release new stable version on pypi call = GroupCallWrapper('placeholder', update.params) wrapped_update = UpdateGroupCallWrapper('placeholder', call) await self.group_call_update_callback(wrapped_update) async def check_group_call(self) -> bool: if not self.group_call or not self.my_ssrc: return False try: ssrcs_in_group_call = await ( self.client( functions.phone.CheckGroupCallRequest(call=self.group_call, sources=[int_ssrc(self.my_ssrc)]) ) ) except TelethonBadRequestError as e: if isinstance(e, TelethonGroupcallJoinMissingError): return False else: raise BadRequest(e) return int_ssrc(self.my_ssrc) in ssrcs_in_group_call async def leave_current_group_call(self): if not self.full_chat or not self.full_chat.call or not self.my_ssrc: return response = await self.client( functions.phone.LeaveGroupCallRequest(call=self.full_chat.call, source=int_ssrc(self.my_ssrc)) ) self.client._handle_update(response) async def edit_group_call_member(self, peer, volume: int = None, muted=False): response = await self.client( functions.phone.EditGroupCallParticipantRequest( call=self.full_chat.call, participant=peer, muted=muted, volume=volume, ) ) self.client._handle_update(response) async def get_and_set_self_peer(self): self.my_peer = await self.client.get_me(input_peer=True) return self.my_peer async def get_and_set_group_call(self, group): self.chat_peer = group if type(group) not in [InputPeerChannel, InputPeerChat]: self.chat_peer = await self.client.get_input_entity(group) if isinstance(self.chat_peer, InputPeerChannel): self.full_chat = (await self.client(functions.channels.GetFullChannelRequest(group))).full_chat elif isinstance(self.chat_peer, InputPeerChat): self.full_chat = (await self.client(functions.messages.GetFullChatRequest(group))).full_chat if self.full_chat is None: raise PytgcallsError(f'Can\'t get full chat by {group}') self.group_call = self.full_chat.call return self.group_call def unregister_update_handlers(self): self.client.remove_event_handler(self._process_update, Raw) def register_update_handlers(self): self.client.add_event_handler(self._process_update, Raw) async def resolve_and_set_join_as(self, join_as): if join_as is None: self.join_as = self.full_chat.groupcall_default_join_as if self.join_as is None: self.join_as = await self.get_and_set_self_peer() else: self.join_as = join_as async def send_speaking_group_call_action(self): await self.client(functions.messages.SetTypingRequest(peer=self.chat_peer, action=SpeakingInGroupCallAction())) async def join_group_call(self, invite_hash: str, params: str, muted: bool, pre_update_processing: Callable): try: response = await self.client( functions.phone.JoinGroupCallRequest( call=self.group_call, join_as=self.join_as, invite_hash=invite_hash, params=DataJSON(data=params), muted=muted, ) ) pre_update_processing() # it is here cuz we need to associate params for connection with group call for update in response.updates: if isinstance(update, UpdateGroupCallConnection): await self._process_group_call_connection(update) self.client._handle_update(response) except TelethonGroupcallSsrcDuplicateMuchError as e: raise GroupcallSsrcDuplicateMuch(e) def get_event_loop(self) -> AbstractEventLoop: return self._loop
Ancestors
- MTProtoBridgeBase
- abc.ABC
Instance variables
var chat_peer
-
Inherited from:
MTProtoBridgeBase
.chat_peer
Chat peer where bot is now
var client
-
Inherited from:
MTProtoBridgeBase
.client
Any MTProto client. Pyrogram/Telethon and so on
var full_chat
-
Inherited from:
MTProtoBridgeBase
.full_chat
Full chat information
var group_call
-
Inherited from:
MTProtoBridgeBase
.group_call
Instance of MTProto's group call
var group_call_participants_update_callback
-
Inherited from:
MTProtoBridgeBase
.group_call_participants_update_callback
Native handler of wrapped group call participants update
var group_call_update_callback
-
Inherited from:
MTProtoBridgeBase
.group_call_update_callback
Native handler of wrapped group call update
var join_as
-
Inherited from:
MTProtoBridgeBase
.join_as
How to present yourself in participants list
var my_peer
-
Inherited from:
MTProtoBridgeBase
.my_peer
Client user peer
var my_ssrc
-
Inherited from:
MTProtoBridgeBase
.my_ssrc
Client SSRC (Synchronization Source)
Methods
async def check_group_call(self) ‑> bool
-
Inherited from:
MTProtoBridgeBase
.check_group_call
Check if client is in a voice chat …
Expand source code
async def check_group_call(self) -> bool: if not self.group_call or not self.my_ssrc: return False try: ssrcs_in_group_call = await ( self.client( functions.phone.CheckGroupCallRequest(call=self.group_call, sources=[int_ssrc(self.my_ssrc)]) ) ) except TelethonBadRequestError as e: if isinstance(e, TelethonGroupcallJoinMissingError): return False else: raise BadRequest(e) return int_ssrc(self.my_ssrc) in ssrcs_in_group_call
async def edit_group_call_member(self, peer, volume: int = None, muted=False)
-
Inherited from:
MTProtoBridgeBase
.edit_group_call_member
call phone.EditGroupCallParticipant
Expand source code
async def edit_group_call_member(self, peer, volume: int = None, muted=False): response = await self.client( functions.phone.EditGroupCallParticipantRequest( call=self.full_chat.call, participant=peer, muted=muted, volume=volume, ) ) self.client._handle_update(response)
async def get_and_set_group_call(self, group)
-
Inherited from:
MTProtoBridgeBase
.get_and_set_group_call
there is group arg can be peer, int, string with username and so on need to support all of them …
Expand source code
async def get_and_set_group_call(self, group): self.chat_peer = group if type(group) not in [InputPeerChannel, InputPeerChat]: self.chat_peer = await self.client.get_input_entity(group) if isinstance(self.chat_peer, InputPeerChannel): self.full_chat = (await self.client(functions.channels.GetFullChannelRequest(group))).full_chat elif isinstance(self.chat_peer, InputPeerChat): self.full_chat = (await self.client(functions.messages.GetFullChatRequest(group))).full_chat if self.full_chat is None: raise PytgcallsError(f'Can\'t get full chat by {group}') self.group_call = self.full_chat.call return self.group_call
async def get_and_set_self_peer(self)
-
Inherited from:
MTProtoBridgeBase
.get_and_set_self_peer
resolve self peer and set to obj field
Expand source code
async def get_and_set_self_peer(self): self.my_peer = await self.client.get_me(input_peer=True) return self.my_peer
def get_event_loop(self) ‑> asyncio.events.AbstractEventLoop
-
Inherited from:
MTProtoBridgeBase
.get_event_loop
return MTProto client loop
Expand source code
def get_event_loop(self) -> AbstractEventLoop: return self._loop
async def join_group_call(self, invite_hash: str, params: str, muted: bool, pre_update_processing: Callable)
-
Inherited from:
MTProtoBridgeBase
.join_group_call
call phone.JoinGroupCall with group_call, join_as, invite hash, muted and params …
Expand source code
async def join_group_call(self, invite_hash: str, params: str, muted: bool, pre_update_processing: Callable): try: response = await self.client( functions.phone.JoinGroupCallRequest( call=self.group_call, join_as=self.join_as, invite_hash=invite_hash, params=DataJSON(data=params), muted=muted, ) ) pre_update_processing() # it is here cuz we need to associate params for connection with group call for update in response.updates: if isinstance(update, UpdateGroupCallConnection): await self._process_group_call_connection(update) self.client._handle_update(response) except TelethonGroupcallSsrcDuplicateMuchError as e: raise GroupcallSsrcDuplicateMuch(e)
async def leave_current_group_call(self)
-
Inherited from:
MTProtoBridgeBase
.leave_current_group_call
call phone.LeaveGroupCall and handle returned updates
Expand source code
async def leave_current_group_call(self): if not self.full_chat or not self.full_chat.call or not self.my_ssrc: return response = await self.client( functions.phone.LeaveGroupCallRequest(call=self.full_chat.call, source=int_ssrc(self.my_ssrc)) ) self.client._handle_update(response)
def re_register_update_handlers(self)
-
Inherited from:
MTProtoBridgeBase
.re_register_update_handlers
Delete and add pytgcalls handler in MTProto client.
def register_update_handlers(self)
-
Inherited from:
MTProtoBridgeBase
.register_update_handlers
register handlers
Expand source code
def register_update_handlers(self): self.client.add_event_handler(self._process_update, Raw)
async def resolve_and_set_join_as(self, join_as)
-
Inherited from:
MTProtoBridgeBase
.resolve_and_set_join_as
join_as arg can be str on peer. if it str we need to resolve peer save join_as to class field
Expand source code
async def resolve_and_set_join_as(self, join_as): if join_as is None: self.join_as = self.full_chat.groupcall_default_join_as if self.join_as is None: self.join_as = await self.get_and_set_self_peer() else: self.join_as = join_as
async def send_speaking_group_call_action(self)
-
Inherited from:
MTProtoBridgeBase
.send_speaking_group_call_action
call messages.SetTyping with SpeakingInGroupCallAction by chat_peer
Expand source code
async def send_speaking_group_call_action(self): await self.client(functions.messages.SetTypingRequest(peer=self.chat_peer, action=SpeakingInGroupCallAction()))
def unregister_update_handlers(self)
-
Inherited from:
MTProtoBridgeBase
.unregister_update_handlers
delete all registered handlers from MTProto client
Expand source code
def unregister_update_handlers(self): self.client.remove_event_handler(self._process_update, Raw)