Module pytgcalls.mtproto.pyrogram_bridge
Expand source code
# tgcalls - a Python binding for C++ library by Telegram
# pytgcalls - a library connecting the Python binding with MTProto
# Copyright (C) 2020-2021 Il`ya (Marshal) <https://github.com/MarshalX>
#
# This file is part of tgcalls and pytgcalls.
#
# tgcalls and pytgcalls is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# tgcalls and pytgcalls is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License v3
# along with tgcalls. If not, see <http://www.gnu.org/licenses/>.
from asyncio import AbstractEventLoop
from typing import Callable
from pyrogram.errors import (
BadRequest as PyrogramBadRequest,
GroupcallSsrcDuplicateMuch as PyrogramGroupcallSsrcDuplicateMuch,
)
from pyrogram.handlers import RawUpdateHandler
from pyrogram.raw import functions, types
from pyrogram.raw.types import GroupCallDiscarded as PyrogramGroupCallDiscarded, InputPeerChannel, InputPeerChat
from pyrogram.utils import get_peer_id
from pytgcalls import PytgcallsError
from pytgcalls.mtproto.data import GroupCallDiscardedWrapper, GroupCallWrapper, GroupCallParticipantWrapper
from pytgcalls.mtproto.data.update import UpdateGroupCallWrapper, UpdateGroupCallParticipantsWrapper
from pytgcalls.mtproto.exceptions import BadRequest, GroupcallSsrcDuplicateMuch
from pytgcalls.utils import int_ssrc
from pyrogram import Client, ContinuePropagation
from pytgcalls.mtproto import MTProtoBridgeBase
class PyrogramBridge(MTProtoBridgeBase):
def __init__(self, client: Client):
super().__init__(client)
self._update_to_handler = {
types.UpdateGroupCallParticipants: self._process_group_call_participants_update,
types.UpdateGroupCall: self._process_group_call_update,
}
self._handler_group = None
self._update_handler = RawUpdateHandler(self._process_update)
async def _process_update(self, _, update, users, chats):
if type(update) not in self._update_to_handler.keys():
raise ContinuePropagation
if not self.group_call or not update.call or update.call.id != self.group_call.id:
raise ContinuePropagation
self.group_call = update.call
await self._update_to_handler[type(update)](update)
async def _process_group_call_participants_update(self, update):
participants = [GroupCallParticipantWrapper.create(p) for p in update.participants]
wrapped_update = UpdateGroupCallParticipantsWrapper(participants)
await self.group_call_participants_update_callback(wrapped_update)
async def _process_group_call_update(self, update):
if isinstance(update.call, PyrogramGroupCallDiscarded):
call = GroupCallDiscardedWrapper() # no info needed
else:
call = GroupCallWrapper(update.call.id, update.call.params)
wrapped_update = UpdateGroupCallWrapper(update.chat_id, call)
await self.group_call_update_callback(wrapped_update)
async def check_group_call(self) -> bool:
if not self.group_call or not self.my_ssrc:
return False
try:
in_group_call = await (
self.client.send(functions.phone.CheckGroupCall(call=self.group_call, source=int_ssrc(self.my_ssrc)))
)
except PyrogramBadRequest as e:
if e.x != '[400 GROUPCALL_JOIN_MISSING]':
raise BadRequest(e.x)
in_group_call = False
return in_group_call
async def leave_current_group_call(self):
if not self.full_chat or not self.full_chat.call or not self.my_ssrc:
return
response = await self.client.send(
functions.phone.LeaveGroupCall(call=self.full_chat.call, source=int_ssrc(self.my_ssrc))
)
await self.client.handle_updates(response)
async def edit_group_call_member(self, peer, volume: int = None, muted=False):
response = await self.client.send(
functions.phone.EditGroupCallParticipant(
call=self.full_chat.call,
participant=peer,
muted=muted,
volume=volume,
)
)
await self.client.handle_updates(response)
async def get_and_set_self_peer(self):
self.my_peer = await self.client.resolve_peer(await self.client.storage.user_id())
return self.my_peer
async def get_and_set_group_call(self, group):
"""Get group call input of chat.
Args:
group (`InputPeerChannel` | `InputPeerChat` | `str` | `int`): Chat ID in any form.
Returns:
`InputGroupCall`.
"""
self.chat_peer = group
if type(group) not in [InputPeerChannel, InputPeerChat]:
self.chat_peer = await self.client.resolve_peer(group)
if isinstance(self.chat_peer, InputPeerChannel):
self.full_chat = (
await (self.client.send(functions.channels.GetFullChannel(channel=self.chat_peer)))
).full_chat
elif isinstance(self.chat_peer, InputPeerChat):
self.full_chat = (
await (self.client.send(functions.messages.GetFullChat(chat_id=self.chat_peer.chat_id)))
).full_chat
if self.full_chat is None:
raise PytgcallsError(f'Can\'t get full chat by {group}')
self.group_call = self.full_chat.call
return self.group_call
def unregister_update_handlers(self):
if self._handler_group:
self.client.remove_handler(self._update_handler, self._handler_group)
self._handler_group = None
def register_update_handlers(self):
if self.group_call.id > 0:
self._handler_group = -self.group_call.id
self._handler_group = self.group_call.id
self.client.add_handler(self._update_handler, self._handler_group)
async def resolve_and_set_join_as(self, join_as):
my_peer = await self.get_and_set_self_peer()
if join_as is None:
self.join_as = self.full_chat.groupcall_default_join_as
if self.join_as:
# convert Peer to InputPeer
self.join_as = await self.client.resolve_peer(get_peer_id(self.join_as))
else:
self.join_as = my_peer
elif isinstance(join_as, str) or isinstance(join_as, int):
self.join_as = await self.client.resolve_peer(join_as)
else:
self.join_as = join_as
async def send_speaking_group_call_action(self):
await self.client.send(
functions.messages.SetTyping(peer=self.chat_peer, action=types.SpeakingInGroupCallAction())
)
async def join_group_call(self, invite_hash: str, params: str, muted: bool, pre_update_processing: Callable):
try:
response = await self.client.send(
functions.phone.JoinGroupCall(
call=self.group_call,
join_as=self.join_as,
invite_hash=invite_hash,
params=types.DataJSON(data=params),
muted=muted,
)
)
pre_update_processing()
await self.client.handle_updates(response)
except PyrogramGroupcallSsrcDuplicateMuch as e:
raise GroupcallSsrcDuplicateMuch(e.x)
def get_event_loop(self) -> AbstractEventLoop:
return self.client.loop
Classes
class PyrogramBridge (client: pyrogram.client.Client)
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class PyrogramBridge(MTProtoBridgeBase): def __init__(self, client: Client): super().__init__(client) self._update_to_handler = { types.UpdateGroupCallParticipants: self._process_group_call_participants_update, types.UpdateGroupCall: self._process_group_call_update, } self._handler_group = None self._update_handler = RawUpdateHandler(self._process_update) async def _process_update(self, _, update, users, chats): if type(update) not in self._update_to_handler.keys(): raise ContinuePropagation if not self.group_call or not update.call or update.call.id != self.group_call.id: raise ContinuePropagation self.group_call = update.call await self._update_to_handler[type(update)](update) async def _process_group_call_participants_update(self, update): participants = [GroupCallParticipantWrapper.create(p) for p in update.participants] wrapped_update = UpdateGroupCallParticipantsWrapper(participants) await self.group_call_participants_update_callback(wrapped_update) async def _process_group_call_update(self, update): if isinstance(update.call, PyrogramGroupCallDiscarded): call = GroupCallDiscardedWrapper() # no info needed else: call = GroupCallWrapper(update.call.id, update.call.params) wrapped_update = UpdateGroupCallWrapper(update.chat_id, call) await self.group_call_update_callback(wrapped_update) async def check_group_call(self) -> bool: if not self.group_call or not self.my_ssrc: return False try: in_group_call = await ( self.client.send(functions.phone.CheckGroupCall(call=self.group_call, source=int_ssrc(self.my_ssrc))) ) except PyrogramBadRequest as e: if e.x != '[400 GROUPCALL_JOIN_MISSING]': raise BadRequest(e.x) in_group_call = False return in_group_call async def leave_current_group_call(self): if not self.full_chat or not self.full_chat.call or not self.my_ssrc: return response = await self.client.send( functions.phone.LeaveGroupCall(call=self.full_chat.call, source=int_ssrc(self.my_ssrc)) ) await self.client.handle_updates(response) async def edit_group_call_member(self, peer, volume: int = None, muted=False): response = await self.client.send( functions.phone.EditGroupCallParticipant( call=self.full_chat.call, participant=peer, muted=muted, volume=volume, ) ) await self.client.handle_updates(response) async def get_and_set_self_peer(self): self.my_peer = await self.client.resolve_peer(await self.client.storage.user_id()) return self.my_peer async def get_and_set_group_call(self, group): """Get group call input of chat. Args: group (`InputPeerChannel` | `InputPeerChat` | `str` | `int`): Chat ID in any form. Returns: `InputGroupCall`. """ self.chat_peer = group if type(group) not in [InputPeerChannel, InputPeerChat]: self.chat_peer = await self.client.resolve_peer(group) if isinstance(self.chat_peer, InputPeerChannel): self.full_chat = ( await (self.client.send(functions.channels.GetFullChannel(channel=self.chat_peer))) ).full_chat elif isinstance(self.chat_peer, InputPeerChat): self.full_chat = ( await (self.client.send(functions.messages.GetFullChat(chat_id=self.chat_peer.chat_id))) ).full_chat if self.full_chat is None: raise PytgcallsError(f'Can\'t get full chat by {group}') self.group_call = self.full_chat.call return self.group_call def unregister_update_handlers(self): if self._handler_group: self.client.remove_handler(self._update_handler, self._handler_group) self._handler_group = None def register_update_handlers(self): if self.group_call.id > 0: self._handler_group = -self.group_call.id self._handler_group = self.group_call.id self.client.add_handler(self._update_handler, self._handler_group) async def resolve_and_set_join_as(self, join_as): my_peer = await self.get_and_set_self_peer() if join_as is None: self.join_as = self.full_chat.groupcall_default_join_as if self.join_as: # convert Peer to InputPeer self.join_as = await self.client.resolve_peer(get_peer_id(self.join_as)) else: self.join_as = my_peer elif isinstance(join_as, str) or isinstance(join_as, int): self.join_as = await self.client.resolve_peer(join_as) else: self.join_as = join_as async def send_speaking_group_call_action(self): await self.client.send( functions.messages.SetTyping(peer=self.chat_peer, action=types.SpeakingInGroupCallAction()) ) async def join_group_call(self, invite_hash: str, params: str, muted: bool, pre_update_processing: Callable): try: response = await self.client.send( functions.phone.JoinGroupCall( call=self.group_call, join_as=self.join_as, invite_hash=invite_hash, params=types.DataJSON(data=params), muted=muted, ) ) pre_update_processing() await self.client.handle_updates(response) except PyrogramGroupcallSsrcDuplicateMuch as e: raise GroupcallSsrcDuplicateMuch(e.x) def get_event_loop(self) -> AbstractEventLoop: return self.client.loop
Ancestors
- MTProtoBridgeBase
- abc.ABC
Instance variables
var chat_peer
-
Inherited from:
MTProtoBridgeBase
.chat_peer
Chat peer where bot is now
var client
-
Inherited from:
MTProtoBridgeBase
.client
Any MTProto client. Pyrogram/Telethon and so on
var full_chat
-
Inherited from:
MTProtoBridgeBase
.full_chat
Full chat information
var group_call
-
Inherited from:
MTProtoBridgeBase
.group_call
Instance of MTProto's group call
var group_call_participants_update_callback
-
Inherited from:
MTProtoBridgeBase
.group_call_participants_update_callback
Native handler of wrapped group call participants update
var group_call_update_callback
-
Inherited from:
MTProtoBridgeBase
.group_call_update_callback
Native handler of wrapped group call update
var join_as
-
Inherited from:
MTProtoBridgeBase
.join_as
How to present yourself in participants list
var my_peer
-
Inherited from:
MTProtoBridgeBase
.my_peer
Client user peer
var my_ssrc
-
Inherited from:
MTProtoBridgeBase
.my_ssrc
Client SSRC (Synchronization Source)
Methods
async def check_group_call(self) ‑> bool
-
Inherited from:
MTProtoBridgeBase
.check_group_call
Check if client is in a voice chat …
Expand source code
async def check_group_call(self) -> bool: if not self.group_call or not self.my_ssrc: return False try: in_group_call = await ( self.client.send(functions.phone.CheckGroupCall(call=self.group_call, source=int_ssrc(self.my_ssrc))) ) except PyrogramBadRequest as e: if e.x != '[400 GROUPCALL_JOIN_MISSING]': raise BadRequest(e.x) in_group_call = False return in_group_call
async def edit_group_call_member(self, peer, volume: int = None, muted=False)
-
Inherited from:
MTProtoBridgeBase
.edit_group_call_member
call phone.EditGroupCallParticipant
Expand source code
async def edit_group_call_member(self, peer, volume: int = None, muted=False): response = await self.client.send( functions.phone.EditGroupCallParticipant( call=self.full_chat.call, participant=peer, muted=muted, volume=volume, ) ) await self.client.handle_updates(response)
async def get_and_set_group_call(self, group)
-
Get group call input of chat.
Args
group (
InputPeerChannel
|InputPeerChat
|str
|int
): Chat ID in any form.Returns
InputGroupCall
.Expand source code
async def get_and_set_group_call(self, group): """Get group call input of chat. Args: group (`InputPeerChannel` | `InputPeerChat` | `str` | `int`): Chat ID in any form. Returns: `InputGroupCall`. """ self.chat_peer = group if type(group) not in [InputPeerChannel, InputPeerChat]: self.chat_peer = await self.client.resolve_peer(group) if isinstance(self.chat_peer, InputPeerChannel): self.full_chat = ( await (self.client.send(functions.channels.GetFullChannel(channel=self.chat_peer))) ).full_chat elif isinstance(self.chat_peer, InputPeerChat): self.full_chat = ( await (self.client.send(functions.messages.GetFullChat(chat_id=self.chat_peer.chat_id))) ).full_chat if self.full_chat is None: raise PytgcallsError(f'Can\'t get full chat by {group}') self.group_call = self.full_chat.call return self.group_call
async def get_and_set_self_peer(self)
-
Inherited from:
MTProtoBridgeBase
.get_and_set_self_peer
resolve self peer and set to obj field
Expand source code
async def get_and_set_self_peer(self): self.my_peer = await self.client.resolve_peer(await self.client.storage.user_id()) return self.my_peer
def get_event_loop(self) ‑> asyncio.events.AbstractEventLoop
-
Inherited from:
MTProtoBridgeBase
.get_event_loop
return MTProto client loop
Expand source code
def get_event_loop(self) -> AbstractEventLoop: return self.client.loop
async def join_group_call(self, invite_hash: str, params: str, muted: bool, pre_update_processing: Callable)
-
Inherited from:
MTProtoBridgeBase
.join_group_call
call phone.JoinGroupCall with group_call, join_as, invite hash, muted and params …
Expand source code
async def join_group_call(self, invite_hash: str, params: str, muted: bool, pre_update_processing: Callable): try: response = await self.client.send( functions.phone.JoinGroupCall( call=self.group_call, join_as=self.join_as, invite_hash=invite_hash, params=types.DataJSON(data=params), muted=muted, ) ) pre_update_processing() await self.client.handle_updates(response) except PyrogramGroupcallSsrcDuplicateMuch as e: raise GroupcallSsrcDuplicateMuch(e.x)
async def leave_current_group_call(self)
-
Inherited from:
MTProtoBridgeBase
.leave_current_group_call
call phone.LeaveGroupCall and handle returned updates
Expand source code
async def leave_current_group_call(self): if not self.full_chat or not self.full_chat.call or not self.my_ssrc: return response = await self.client.send( functions.phone.LeaveGroupCall(call=self.full_chat.call, source=int_ssrc(self.my_ssrc)) ) await self.client.handle_updates(response)
def re_register_update_handlers(self)
-
Inherited from:
MTProtoBridgeBase
.re_register_update_handlers
Delete and add pytgcalls handler in MTProto client.
def register_update_handlers(self)
-
Inherited from:
MTProtoBridgeBase
.register_update_handlers
register handlers
Expand source code
def register_update_handlers(self): if self.group_call.id > 0: self._handler_group = -self.group_call.id self._handler_group = self.group_call.id self.client.add_handler(self._update_handler, self._handler_group)
async def resolve_and_set_join_as(self, join_as)
-
Inherited from:
MTProtoBridgeBase
.resolve_and_set_join_as
join_as arg can be str on peer. if it str we need to resolve peer save join_as to class field
Expand source code
async def resolve_and_set_join_as(self, join_as): my_peer = await self.get_and_set_self_peer() if join_as is None: self.join_as = self.full_chat.groupcall_default_join_as if self.join_as: # convert Peer to InputPeer self.join_as = await self.client.resolve_peer(get_peer_id(self.join_as)) else: self.join_as = my_peer elif isinstance(join_as, str) or isinstance(join_as, int): self.join_as = await self.client.resolve_peer(join_as) else: self.join_as = join_as
async def send_speaking_group_call_action(self)
-
Inherited from:
MTProtoBridgeBase
.send_speaking_group_call_action
call messages.SetTyping with SpeakingInGroupCallAction by chat_peer
Expand source code
async def send_speaking_group_call_action(self): await self.client.send( functions.messages.SetTyping(peer=self.chat_peer, action=types.SpeakingInGroupCallAction()) )
def unregister_update_handlers(self)
-
Inherited from:
MTProtoBridgeBase
.unregister_update_handlers
delete all registered handlers from MTProto client
Expand source code
def unregister_update_handlers(self): if self._handler_group: self.client.remove_handler(self._update_handler, self._handler_group) self._handler_group = None