Source code for bt_manager.codecs

from __future__ import unicode_literals
from collections import namedtuple
from bt_manager import ffi

A2DP_CODECS = {'SBC': 0x00,
               'MPEG12': 0x01,
               'MPEG24': 0x02,
               'ATRAC': 0x03,
               }
"""
Enumeration of codec types supported by A2DP profile
"""

SBCCodecConfig = namedtuple('SBCCodecConfig',
                            'channel_mode frequency allocation_method '
                            'subbands block_length min_bitpool '
                            'max_bitpool')
"""
Named tuple collection of SBC A2DP audio profile properties
"""


[docs]class SBCSamplingFrequency: """Indicates with which sampling frequency the SBC frame has been encoded.""" FREQ_16KHZ = (1 << 3) FREQ_32KHZ = (1 << 2) FREQ_44_1KHZ = (1 << 1) FREQ_48KHZ = 1 ALL = 0xF
[docs]class SBCBlocks: """The block size with which the stream has been encoded""" BLOCKS_4 = (1 << 3) BLOCKS_8 = (1 << 2) BLOCKS_12 = (1 << 1) BLOCKS_16 = 1 ALL = 0xF
[docs]class SBCChannelMode: """Indicate with which channel mode the frame has been encoded. The number of channels depends on this information.""" CHANNEL_MODE_MONO = (1 << 3) CHANNEL_MODE_DUAL = (1 << 2) CHANNEL_MODE_STEREO = (1 << 1) CHANNEL_MODE_JOINT_STEREO = 1 ALL = 0xF
[docs]class SBCAllocationMethod: """Indicates how the bit pool is allocated to different subbands. Either it is based on the loudness of the sub band signal or on the signal to noise ratio.""" SNR = (1 << 1) LOUDNESS = 1 ALL = 0x3
[docs]class SBCSubbands: """indicates the number of subbands with which the frame has been encoded""" SUBBANDS_4 = (1 << 1) SUBBANDS_8 = 1 ALL = 0x3
[docs]class SBCCodec: """ Python cass wrapper around CFFI calls into the SBC codec implemented in C. The main API is defined by sbc.h with additional functions added to encapsulate an SBC payload as part of an RTP packet. The API extensions for RTP are designed to work directly with bluetooth media transport file descriptors for the A2DP profile. So, the basic idea is that this class is instantiated as part of a media endpoint implementation in order to encode or decode data carried on the media transport. .. note:: You need two separate instantiations of this class if you wish to encode and decode at the same time. Although the class implementation is the same, the underlying C implementation requires separate `sbc_t` instances. :param namedtuple config: Media endpoint negotiated configuration parameters. These are not used directly by the codec here but translated to parameters usable by the codec. See :py:class:`.SBCCodecConfig` """ def __init__(self, config): import sys try: self.codec = ffi.verify(b'#include "rtpsbc.h"', libraries=[b'rtpsbc'], ext_package=b'rtpsbc') except: print 'Exception:', sys.exc_info()[0] self.config = ffi.new('sbc_t *') self.ts = ffi.new('unsigned int *', 0) self.seq_num = ffi.new('unsigned int *', 0) self._init_sbc_config(config) self.codec.sbc_init(self.config, 0) def _init_sbc_config(self, config): """ Translator from namedtuple config representation to the sbc_t type. :param namedtuple config: See :py:class:`.SBCCodecConfig` :returns: """ if (config.channel_mode == SBCChannelMode.CHANNEL_MODE_MONO): self.config.mode = self.codec.SBC_MODE_MONO elif (config.channel_mode == SBCChannelMode.CHANNEL_MODE_STEREO): self.config.mode = self.codec.SBC_MODE_STEREO elif (config.channel_mode == SBCChannelMode.CHANNEL_MODE_DUAL): self.config.mode = self.codec.SBC_MODE_DUAL_CHANNEL elif (config.channel_mode == SBCChannelMode.CHANNEL_MODE_JOINT_STEREO): self.config.mode = self.codec.SBC_MODE_JOINT_STEREO if (config.frequency == SBCSamplingFrequency.FREQ_16KHZ): self.config.frequency = self.codec.SBC_FREQ_16000 elif (config.frequency == SBCSamplingFrequency.FREQ_32KHZ): self.config.frequency = self.codec.SBC_FREQ_32000 elif (config.frequency == SBCSamplingFrequency.FREQ_44_1KHZ): self.config.frequency = self.codec.SBC_FREQ_44100 elif (config.frequency == SBCSamplingFrequency.FREQ_48KHZ): self.config.frequency = self.codec.SBC_FREQ_48000 if (config.allocation_method == SBCAllocationMethod.LOUDNESS): self.config.allocation = self.codec.SBC_AM_LOUDNESS elif (config.allocation_method == SBCAllocationMethod.SNR): self.config.allocation = self.codec.SBC_AM_SNR if (config.subbands == SBCSubbands.SUBBANDS_4): self.config.subbands = self.codec.SBC_SB_4 elif (config.subbands == SBCSubbands.SUBBANDS_8): self.config.subbands = self.codec.SBC_SB_8 if (config.block_length == SBCBlocks.BLOCKS_4): self.config.blocks = self.codec.SBC_BLK_4 elif (config.block_length == SBCBlocks.BLOCKS_8): self.config.blocks = self.codec.SBC_BLK_8 elif (config.block_length == SBCBlocks.BLOCKS_12): self.config.blocks = self.codec.SBC_BLK_12 elif (config.block_length == SBCBlocks.BLOCKS_16): self.config.blocks = self.codec.SBC_BLK_16 self.config.bitpool = config.max_bitpool self.config.endian = self.codec.SBC_LE
[docs] def encode(self, fd, mtu, data): """ Encode the supplied data (byte array) and write to the media transport file descriptor encapsulated as RTP packets. The encoder will calculate the required number of SBC frames and encapsulate as RTP to fit the MTU size. :param int fd: Media transport file descriptor :param int mtu: Media transport MTU size as returned when the media transport was acquired. :param array{byte} data: Data to encode and send over the media transport. :return: """ self.codec.rtp_sbc_encode_to_fd(self.config, ffi.new('char[]', data), len(data), mtu, self.ts, self.seq_num, fd)
[docs] def decode(self, fd, mtu, max_len=2560): """ Read the media transport descriptor, depay the RTP payload and decode the SBC frames into a byte array. The maximum number of bytes to be returned may be passed as an argument and all available bytes are returned to the caller. :param int fd: Media transport file descriptor :param int mtu: Media transport MTU size as returned when the media transport was acquired. :param int max_len: Optional. Set maximum number of bytes to read. :return data: Decoded data bytes as an array. :rtype: array{byte} """ output_buffer = ffi.new('char[]', max_len) sz = self.codec.rtp_sbc_decode_from_fd(self.config, output_buffer, max_len, mtu, fd) return ffi.buffer(output_buffer[0:sz])