Module pytgcalls.implementation.group_call_raw
Expand source code
# tgcalls - a Python binding for C++ library by Telegram
# pytgcalls - a library connecting the Python binding with MTProto
# Copyright (C) 2020-2021 Il`ya (Marshal) <https://github.com/MarshalX>
#
# This file is part of tgcalls and pytgcalls.
#
# tgcalls and pytgcalls is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# tgcalls and pytgcalls is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License v3
# along with tgcalls. If not, see <http://www.gnu.org/licenses/>.
from typing import Callable
import tgcalls
from pytgcalls.implementation import GroupCall
class GroupCallRaw(GroupCall):
def __init__(
self,
mtproto_bridge,
on_played_data: Callable[['GroupCallRaw', int], bytes] = None,
on_recorded_data: Callable[['GroupCallRaw', bytes, int], None] = None,
enable_logs_to_console=False,
path_to_log_file=None,
outgoing_audio_bitrate_kbit=128,
):
super().__init__(mtproto_bridge, enable_logs_to_console, path_to_log_file, outgoing_audio_bitrate_kbit)
self.__is_playout_paused = False
self.__is_recording_paused = False
self.__raw_audio_device_descriptor = None
self.on_played_data = on_played_data
self.on_recorded_data = on_recorded_data
def __create_and_return_raw_audio_device_descriptor(self):
self.__raw_audio_device_descriptor = tgcalls.RawAudioDeviceDescriptor()
self.__raw_audio_device_descriptor.getPlayedBufferCallback = self.__get_played_buffer_callback
self.__raw_audio_device_descriptor.setRecordedBufferCallback = self.__set_recorded_buffer_callback
self.__raw_audio_device_descriptor.isPlayoutPaused = self.__is_playout_paused_callback
self.__raw_audio_device_descriptor.isRecordingPaused = self.__is_recording_paused_callback
return self.__raw_audio_device_descriptor
def _setup_and_start_group_call(self):
self._start_native_group_call(self.__create_and_return_raw_audio_device_descriptor())
def __get_played_buffer_callback(self, length: int):
frame = b''
if self.on_played_data:
data = self.on_played_data(self, length)
if data:
frame = data
return frame.ljust(length, b'\0')
def __set_recorded_buffer_callback(self, frame: bytes, length: int):
if self.on_recorded_data:
self.on_recorded_data(self, frame, length)
def pause_playout(self):
"""Pause playout (playing from callback)."""
self.__is_playout_paused = True
def resume_playout(self):
"""Resume playout (playing from callback)."""
self.__is_playout_paused = False
def pause_recording(self):
"""Pause recording (output to callback)."""
self.__is_recording_paused = True
def resume_recording(self):
"""Resume recording (output to callback)."""
self.__is_recording_paused = False
def __is_playout_paused_callback(self):
return self.__is_playout_paused
def __is_recording_paused_callback(self):
return self.__is_recording_paused
"""Stop requesting new data to send."""
stop_playout = pause_playout
"""Stop getting raw data."""
stop_output = pause_recording
Classes
class GroupCallRaw (mtproto_bridge, on_played_data: Callable[[_ForwardRef('GroupCallRaw'), int], bytes] = None, on_recorded_data: Callable[[_ForwardRef('GroupCallRaw'), bytes, int], NoneType] = None, enable_logs_to_console=False, path_to_log_file=None, outgoing_audio_bitrate_kbit=128)-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class GroupCallRaw(GroupCall): def __init__( self, mtproto_bridge, on_played_data: Callable[['GroupCallRaw', int], bytes] = None, on_recorded_data: Callable[['GroupCallRaw', bytes, int], None] = None, enable_logs_to_console=False, path_to_log_file=None, outgoing_audio_bitrate_kbit=128, ): super().__init__(mtproto_bridge, enable_logs_to_console, path_to_log_file, outgoing_audio_bitrate_kbit) self.__is_playout_paused = False self.__is_recording_paused = False self.__raw_audio_device_descriptor = None self.on_played_data = on_played_data self.on_recorded_data = on_recorded_data def __create_and_return_raw_audio_device_descriptor(self): self.__raw_audio_device_descriptor = tgcalls.RawAudioDeviceDescriptor() self.__raw_audio_device_descriptor.getPlayedBufferCallback = self.__get_played_buffer_callback self.__raw_audio_device_descriptor.setRecordedBufferCallback = self.__set_recorded_buffer_callback self.__raw_audio_device_descriptor.isPlayoutPaused = self.__is_playout_paused_callback self.__raw_audio_device_descriptor.isRecordingPaused = self.__is_recording_paused_callback return self.__raw_audio_device_descriptor def _setup_and_start_group_call(self): self._start_native_group_call(self.__create_and_return_raw_audio_device_descriptor()) def __get_played_buffer_callback(self, length: int): frame = b'' if self.on_played_data: data = self.on_played_data(self, length) if data: frame = data return frame.ljust(length, b'\0') def __set_recorded_buffer_callback(self, frame: bytes, length: int): if self.on_recorded_data: self.on_recorded_data(self, frame, length) def pause_playout(self): """Pause playout (playing from callback).""" self.__is_playout_paused = True def resume_playout(self): """Resume playout (playing from callback).""" self.__is_playout_paused = False def pause_recording(self): """Pause recording (output to callback).""" self.__is_recording_paused = True def resume_recording(self): """Resume recording (output to callback).""" self.__is_recording_paused = False def __is_playout_paused_callback(self): return self.__is_playout_paused def __is_recording_paused_callback(self): return self.__is_recording_paused """Stop requesting new data to send.""" stop_playout = pause_playout """Stop getting raw data.""" stop_output = pause_recordingAncestors
- GroupCall
- abc.ABC
- GroupCallDispatcherMixin
- pytgcalls.dispatcher.dispatcher_mixin.DispatcherMixin
- GroupCallNative
Class variables
var SEND_ACTION_UPDATE_EACH-
Inherited from:
GroupCall.SEND_ACTION_UPDATE_EACHHow often to send speaking action to chat
Instance variables
var enable_action-
Inherited from:
GroupCall.enable_actionIs enable sending of speaking action
var invite_hash-
Inherited from:
GroupCall.invite_hashHash from invite link to join as speaker
var is_connected-
Inherited from:
GroupCall.is_connectedIs connected to voice chat via tgcalls
Methods
async def edit_group_call(self, volume: int = None, muted=False)-
Inherited from:
GroupCall.edit_group_callEdit own settings of group call …
async def edit_group_call_member(self, peer, volume: int = None, muted=False)-
Inherited from:
GroupCall.edit_group_call_memberEdit setting of user in voice chat (required voice chat management permission) …
async def leave_current_group_call(self)-
Inherited from:
GroupCall.leave_current_group_callLeave group call from server side (MTProto part).
def on_network_status_changed(self, func: Callable) ‑> Callable-
Inherited from:
GroupCall.on_network_status_changedWhen a status of network will be changed …
def on_participant_list_updated(self, func: Callable) ‑> Callable-
Inherited from:
GroupCall.on_participant_list_updatedWhen a list of participant will be updated …
def pause_playout(self)-
Pause playout (playing from callback).
Expand source code
def pause_playout(self): """Pause playout (playing from callback).""" self.__is_playout_paused = True def pause_recording(self)-
Pause recording (output to callback).
Expand source code
def pause_recording(self): """Pause recording (output to callback).""" self.__is_recording_paused = True def print_available_playout_devices(self)-
Inherited from:
GroupCall.print_available_playout_devicesPrint name and guid of available playout audio devices in system. Just helper method …
def print_available_recording_devices(self)-
Inherited from:
GroupCall.print_available_recording_devicesPrint name and guid of available recording audio devices in system. Just helper method …
async def reconnect(self)-
Inherited from:
GroupCall.reconnectConnect to voice chat using the same native instance.
def resume_playout(self)-
Resume playout (playing from callback).
Expand source code
def resume_playout(self): """Resume playout (playing from callback).""" self.__is_playout_paused = False def resume_recording(self)-
Resume recording (output to callback).
Expand source code
def resume_recording(self): """Resume recording (output to callback).""" self.__is_recording_paused = False async def set_is_mute(self, is_muted: bool)-
Inherited from:
GroupCall.set_is_muteSet is mute …
async def set_my_volume(self, volume)-
Inherited from:
GroupCall.set_my_volumeSet volume for current client …
async def start(self, group, join_as=None, invite_hash: Optional[str] = None, enable_action=True)-
Inherited from:
GroupCall.startStart voice chat (join and play/record from initial values) …
async def stop(self)-
Inherited from:
GroupCall.stopProperly stop tgcalls, remove MTProto handler, leave from server side.
def stop_output(self)-
Pause recording (output to callback).
Expand source code
def pause_recording(self): """Pause recording (output to callback).""" self.__is_recording_paused = True def stop_playout(self)-
Pause playout (playing from callback).
Expand source code
def pause_playout(self): """Pause playout (playing from callback).""" self.__is_playout_paused = True