Module pytgcalls.implementation.group_call_raw
Expand source code
# tgcalls - a Python binding for C++ library by Telegram
# pytgcalls - a library connecting the Python binding with MTProto
# Copyright (C) 2020-2021 Il`ya (Marshal) <https://github.com/MarshalX>
#
# This file is part of tgcalls and pytgcalls.
#
# tgcalls and pytgcalls is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# tgcalls and pytgcalls is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License v3
# along with tgcalls. If not, see <http://www.gnu.org/licenses/>.
from typing import Callable
import tgcalls
from pytgcalls.implementation import GroupCall
class GroupCallRaw(GroupCall):
def __init__(
self,
mtproto_bridge,
on_played_data: Callable[['GroupCallRaw', int], bytes] = None,
on_recorded_data: Callable[['GroupCallRaw', bytes, int], None] = None,
enable_logs_to_console=False,
path_to_log_file=None,
outgoing_audio_bitrate_kbit=128,
):
super().__init__(mtproto_bridge, enable_logs_to_console, path_to_log_file, outgoing_audio_bitrate_kbit)
self.__is_playout_paused = False
self.__is_recording_paused = False
self.__raw_audio_device_descriptor = None
self.on_played_data = on_played_data
self.on_recorded_data = on_recorded_data
def __create_and_return_raw_audio_device_descriptor(self):
self.__raw_audio_device_descriptor = tgcalls.RawAudioDeviceDescriptor()
self.__raw_audio_device_descriptor.getPlayedBufferCallback = self.__get_played_buffer_callback
self.__raw_audio_device_descriptor.setRecordedBufferCallback = self.__set_recorded_buffer_callback
self.__raw_audio_device_descriptor.isPlayoutPaused = self.__is_playout_paused_callback
self.__raw_audio_device_descriptor.isRecordingPaused = self.__is_recording_paused_callback
return self.__raw_audio_device_descriptor
def _setup_and_start_group_call(self):
self._start_native_group_call(self.__create_and_return_raw_audio_device_descriptor())
def __get_played_buffer_callback(self, length: int):
frame = b''
if self.on_played_data:
data = self.on_played_data(self, length)
if data:
frame = data
return frame.ljust(length, b'\0')
def __set_recorded_buffer_callback(self, frame: bytes, length: int):
if self.on_recorded_data:
self.on_recorded_data(self, frame, length)
def pause_playout(self):
"""Pause playout (playing from callback)."""
self.__is_playout_paused = True
def resume_playout(self):
"""Resume playout (playing from callback)."""
self.__is_playout_paused = False
def pause_recording(self):
"""Pause recording (output to callback)."""
self.__is_recording_paused = True
def resume_recording(self):
"""Resume recording (output to callback)."""
self.__is_recording_paused = False
def __is_playout_paused_callback(self):
return self.__is_playout_paused
def __is_recording_paused_callback(self):
return self.__is_recording_paused
"""Stop requesting new data to send."""
stop_playout = pause_playout
"""Stop getting raw data."""
stop_output = pause_recording
Classes
class GroupCallRaw (mtproto_bridge, on_played_data: Callable[[_ForwardRef('GroupCallRaw'), int], bytes] = None, on_recorded_data: Callable[[_ForwardRef('GroupCallRaw'), bytes, int], NoneType] = None, enable_logs_to_console=False, path_to_log_file=None, outgoing_audio_bitrate_kbit=128)
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class GroupCallRaw(GroupCall): def __init__( self, mtproto_bridge, on_played_data: Callable[['GroupCallRaw', int], bytes] = None, on_recorded_data: Callable[['GroupCallRaw', bytes, int], None] = None, enable_logs_to_console=False, path_to_log_file=None, outgoing_audio_bitrate_kbit=128, ): super().__init__(mtproto_bridge, enable_logs_to_console, path_to_log_file, outgoing_audio_bitrate_kbit) self.__is_playout_paused = False self.__is_recording_paused = False self.__raw_audio_device_descriptor = None self.on_played_data = on_played_data self.on_recorded_data = on_recorded_data def __create_and_return_raw_audio_device_descriptor(self): self.__raw_audio_device_descriptor = tgcalls.RawAudioDeviceDescriptor() self.__raw_audio_device_descriptor.getPlayedBufferCallback = self.__get_played_buffer_callback self.__raw_audio_device_descriptor.setRecordedBufferCallback = self.__set_recorded_buffer_callback self.__raw_audio_device_descriptor.isPlayoutPaused = self.__is_playout_paused_callback self.__raw_audio_device_descriptor.isRecordingPaused = self.__is_recording_paused_callback return self.__raw_audio_device_descriptor def _setup_and_start_group_call(self): self._start_native_group_call(self.__create_and_return_raw_audio_device_descriptor()) def __get_played_buffer_callback(self, length: int): frame = b'' if self.on_played_data: data = self.on_played_data(self, length) if data: frame = data return frame.ljust(length, b'\0') def __set_recorded_buffer_callback(self, frame: bytes, length: int): if self.on_recorded_data: self.on_recorded_data(self, frame, length) def pause_playout(self): """Pause playout (playing from callback).""" self.__is_playout_paused = True def resume_playout(self): """Resume playout (playing from callback).""" self.__is_playout_paused = False def pause_recording(self): """Pause recording (output to callback).""" self.__is_recording_paused = True def resume_recording(self): """Resume recording (output to callback).""" self.__is_recording_paused = False def __is_playout_paused_callback(self): return self.__is_playout_paused def __is_recording_paused_callback(self): return self.__is_recording_paused """Stop requesting new data to send.""" stop_playout = pause_playout """Stop getting raw data.""" stop_output = pause_recording
Ancestors
- GroupCall
- abc.ABC
- GroupCallDispatcherMixin
- pytgcalls.dispatcher.dispatcher_mixin.DispatcherMixin
- GroupCallNative
Class variables
var SEND_ACTION_UPDATE_EACH
-
Inherited from:
GroupCall
.SEND_ACTION_UPDATE_EACH
How often to send speaking action to chat
Instance variables
var enable_action
-
Inherited from:
GroupCall
.enable_action
Is enable sending of speaking action
var invite_hash
-
Inherited from:
GroupCall
.invite_hash
Hash from invite link to join as speaker
var is_connected
-
Inherited from:
GroupCall
.is_connected
Is connected to voice chat via tgcalls
Methods
async def edit_group_call(self, volume: int = None, muted=False)
-
Inherited from:
GroupCall
.edit_group_call
Edit own settings of group call …
async def edit_group_call_member(self, peer, volume: int = None, muted=False)
-
Inherited from:
GroupCall
.edit_group_call_member
Edit setting of user in voice chat (required voice chat management permission) …
async def leave_current_group_call(self)
-
Inherited from:
GroupCall
.leave_current_group_call
Leave group call from server side (MTProto part).
def on_network_status_changed(self, func: Callable) ‑> Callable
-
Inherited from:
GroupCall
.on_network_status_changed
When a status of network will be changed …
def on_participant_list_updated(self, func: Callable) ‑> Callable
-
Inherited from:
GroupCall
.on_participant_list_updated
When a list of participant will be updated …
def pause_playout(self)
-
Pause playout (playing from callback).
Expand source code
def pause_playout(self): """Pause playout (playing from callback).""" self.__is_playout_paused = True
def pause_recording(self)
-
Pause recording (output to callback).
Expand source code
def pause_recording(self): """Pause recording (output to callback).""" self.__is_recording_paused = True
def print_available_playout_devices(self)
-
Inherited from:
GroupCall
.print_available_playout_devices
Print name and guid of available playout audio devices in system. Just helper method …
def print_available_recording_devices(self)
-
Inherited from:
GroupCall
.print_available_recording_devices
Print name and guid of available recording audio devices in system. Just helper method …
async def reconnect(self)
-
Inherited from:
GroupCall
.reconnect
Connect to voice chat using the same native instance.
def resume_playout(self)
-
Resume playout (playing from callback).
Expand source code
def resume_playout(self): """Resume playout (playing from callback).""" self.__is_playout_paused = False
def resume_recording(self)
-
Resume recording (output to callback).
Expand source code
def resume_recording(self): """Resume recording (output to callback).""" self.__is_recording_paused = False
async def set_is_mute(self, is_muted: bool)
-
Inherited from:
GroupCall
.set_is_mute
Set is mute …
async def set_my_volume(self, volume)
-
Inherited from:
GroupCall
.set_my_volume
Set volume for current client …
async def start(self, group, join_as=None, invite_hash: Optional[str] = None, enable_action=True)
-
Inherited from:
GroupCall
.start
Start voice chat (join and play/record from initial values) …
async def stop(self)
-
Inherited from:
GroupCall
.stop
Properly stop tgcalls, remove MTProto handler, leave from server side.
def stop_output(self)
-
Pause recording (output to callback).
Expand source code
def pause_recording(self): """Pause recording (output to callback).""" self.__is_recording_paused = True
def stop_playout(self)
-
Pause playout (playing from callback).
Expand source code
def pause_playout(self): """Pause playout (playing from callback).""" self.__is_playout_paused = True