From 67e729c7987ff2a3bada7b893e8d13eeac8a2e1b Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Fri, 5 Apr 2024 23:46:06 +0800 Subject: [PATCH] HFP AG implementation --- bumble/hfp.py | 756 +++++++++++++++++++++++++++++++--- examples/run_hfp_gateway.py | 185 ++------- examples/run_hfp_handsfree.py | 4 +- setup.cfg | 2 +- tests/hfp_test.py | 278 +++++++++++-- 5 files changed, 1002 insertions(+), 223 deletions(-) diff --git a/bumble/hfp.py b/bumble/hfp.py index 145523d5..9a944813 100644 --- a/bumble/hfp.py +++ b/bumble/hfp.py @@ -15,6 +15,9 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- +from __future__ import annotations + +import collections import collections.abc import logging import asyncio @@ -22,16 +25,32 @@ import enum import traceback import pyee -from typing import Dict, List, Union, Set, Any, Optional, Type, TYPE_CHECKING +import re +from typing import ( + Dict, + List, + Union, + Set, + Any, + Optional, + Type, + Tuple, + ClassVar, + Iterable, + TYPE_CHECKING, +) from typing_extensions import Self from bumble import at +from bumble import device from bumble import rfcomm +from bumble import sdp from bumble.colors import color from bumble.core import ( ProtocolError, BT_GENERIC_AUDIO_SERVICE, BT_HANDSFREE_SERVICE, + BT_HEADSET_AUDIO_GATEWAY_SERVICE, BT_L2CAP_PROTOCOL_ID, BT_RFCOMM_PROTOCOL_ID, ) @@ -40,15 +59,6 @@ CodingFormat, CodecID, ) -from bumble.sdp import ( - DataElement, - ServiceAttribute, - SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, - SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, - SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, - SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, - SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, -) # ----------------------------------------------------------------------------- @@ -66,6 +76,9 @@ def __init__(self, error_name: str = '', details: str = ''): super().__init__(None, 'hfp', error_name, details) +class HfLoopTermination(HfpProtocolError): ... + + # ----------------------------------------------------------------------------- # Protocol Support # ----------------------------------------------------------------------------- @@ -329,6 +342,21 @@ class CallInfo: type: Optional[int] = None +class CmeError(enum.IntEnum): + """ + CME ERROR codes (partial listed). + + TS 127 007 - V6.8.0, 9.2.1 General errors + """ + + PHONE_FAILURE = 0 + OPERATION_NOT_ALLOWED = 3 + OPERATION_NOT_SUPPORTED = 4 + MEMORY_FULL_ = 20 + INVALID_INDEX = 21 + NOT_FOUND = 22 + + # ----------------------------------------------------------------------------- # Hands-Free Control Interoperability Requirements # ----------------------------------------------------------------------------- @@ -402,12 +430,21 @@ class CallInfo: @dataclasses.dataclass -class Configuration: +class HfConfiguration: supported_hf_features: List[HfFeature] supported_hf_indicators: List[HfIndicator] supported_audio_codecs: List[AudioCodec] +@dataclasses.dataclass +class AgConfiguration: + supported_ag_features: Iterable[AgFeature] + supported_ag_indicators: collections.abc.Sequence[AgIndicatorState] + supported_hf_indicators: Iterable[HfIndicator] + supported_ag_call_hold_operations: Iterable[CallHoldOperation] + supported_audio_codecs: Iterable[AudioCodec] + + class AtResponseType(enum.Enum): """ Indicates if a response is expected from an AT command, and if multiple responses are accepted. @@ -435,18 +472,122 @@ def parse_from(cls: Type[Self], buffer: bytearray) -> Self: ) +@dataclasses.dataclass +class AtCommand: + class SubCode(str, enum.Enum): + NONE = '' + SET = '=' + TEST = '=?' + READ = '?' + + code: str + sub_code: SubCode + parameters: list + + _PARSE_PATTERN: ClassVar[re.Pattern] = re.compile( + r'AT\+(?P[A-Z]+)(?P=\?|=|\?)?(?P.*)' + ) + + @classmethod + def parse_from(cls: Type[Self], buffer: bytearray) -> Self: + if not (match := cls._PARSE_PATTERN.fullmatch(buffer.decode())): + if buffer.startswith(b'ATA'): + return cls(code='A', sub_code=AtCommand.SubCode.NONE, parameters=[]) + if buffer.startswith(b'ATD'): + return cls( + code='D', sub_code=AtCommand.SubCode.NONE, parameters=[buffer[3:]] + ) + raise HfpProtocolError('Invalid command') + + parameters = [] + if parameters_text := match.group('parameters'): + parameters = at.parse_parameters(parameters_text.encode()) + + return cls( + code=match.group('code'), + sub_code=AtCommand.SubCode(match.group('sub_code') or ''), + parameters=parameters, + ) + + @dataclasses.dataclass class AgIndicatorState: - description: str - index: int + description: AgIndicator supported_values: Set[int] current_status: int + index: Optional[int] = None + enabled: bool = True + + @property + def supported_values_text(self) -> str: + min_value = min(self.supported_values) + max_value = max(self.supported_values) + if len(self.supported_values) == (max_value - min_value + 1): + return f'({min_value}-{max_value})' + else: + return f'({",".join(str(v) for v in self.supported_values)})' + + @property + def on_test_text(self) -> str: + return f'(\"{self.description.value}\",{self.supported_values_text})' + + @classmethod + def call(cls: Type[Self]) -> Self: + return cls( + description=AgIndicator.CALL, supported_values={0, 1}, current_status=0 + ) + + @classmethod + def callsetup(cls: Type[Self]) -> Self: + return cls( + description=AgIndicator.CALL_SETUP, + supported_values={0, 1, 2, 3}, + current_status=0, + ) + + @classmethod + def callheld(cls: Type[Self]) -> Self: + return cls( + description=AgIndicator.CALL_HELD, + supported_values={0, 1, 2}, + current_status=0, + ) + + @classmethod + def service(cls: Type[Self]) -> Self: + return cls( + description=AgIndicator.SERVICE, supported_values={0, 1}, current_status=0 + ) + + @classmethod + def signal(cls: Type[Self]) -> Self: + return cls( + description=AgIndicator.SIGNAL, + supported_values={0, 1, 2, 3, 4, 5}, + current_status=0, + ) + + @classmethod + def roam(cls: Type[Self]) -> Self: + return cls( + description=AgIndicator.CALL, supported_values={0, 1}, current_status=0 + ) + + @classmethod + def battchg(cls: Type[Self]) -> Self: + return cls( + description=AgIndicator.BATTERY_CHARGE, + supported_values={0, 1, 2, 3, 4, 5}, + current_status=0, + ) @dataclasses.dataclass class HfIndicatorState: + indicator: HfIndicator supported: bool = False enabled: bool = False + current_status: int = 0 class HfProtocol(pyee.EventEmitter): @@ -477,14 +618,14 @@ class HfProtocol(pyee.EventEmitter): command_lock: asyncio.Lock if TYPE_CHECKING: response_queue: asyncio.Queue[AtResponse] - unsolicited_queue: asyncio.Queue[AtResponse] + unsolicited_queue: asyncio.Queue[Optional[AtResponse]] else: response_queue: asyncio.Queue unsolicited_queue: asyncio.Queue read_buffer: bytearray active_codec: AudioCodec - def __init__(self, dlc: rfcomm.DLC, configuration: Configuration) -> None: + def __init__(self, dlc: rfcomm.DLC, configuration: HfConfiguration) -> None: super().__init__() # Configure internal state. @@ -494,13 +635,14 @@ def __init__(self, dlc: rfcomm.DLC, configuration: Configuration) -> None: self.unsolicited_queue = asyncio.Queue() self.read_buffer = bytearray() self.active_codec = AudioCodec.CVSD + self._slc_initialized = False # Build local features. self.supported_hf_features = sum(configuration.supported_hf_features) self.supported_audio_codecs = configuration.supported_audio_codecs self.hf_indicators = { - indicator: HfIndicatorState() + indicator: HfIndicatorState(indicator=indicator) for indicator in configuration.supported_hf_indicators } @@ -511,6 +653,9 @@ def __init__(self, dlc: rfcomm.DLC, configuration: Configuration) -> None: # Bind the AT reader to the RFCOMM channel. self.dlc.sink = self._read_at + self.dlc.multiplexer.l2cap_channel.on( + 'close', lambda: self.unsolicited_queue.put_nowait(None) + ) def supports_hf_feature(self, feature: HfFeature) -> bool: return (self.supported_hf_features & feature) != 0 @@ -621,7 +766,7 @@ async def initiate_slc(self): # If both the HF and AG do support the Codec Negotiation feature # then the HF shall send the AT+BAC= command to # the AG to notify the AG of the available codecs in the HF. - codecs = [str(c) for c in self.supported_audio_codecs] + codecs = [str(c.value) for c in self.supported_audio_codecs] await self.execute_command(f"AT+BAC={','.join(codecs)}") # 4.2.1.3 AG Indicators @@ -639,7 +784,7 @@ async def initiate_slc(self): self.ag_indicators = [] for index, indicator in enumerate(response.parameters): - description = indicator[0].decode() + description = AgIndicator(indicator[0].decode()) supported_values = [] for value in indicator[1]: value = value.split(b'-') @@ -697,7 +842,7 @@ async def initiate_slc(self): # shall send the AT+BIND= command to the AG # to notify the AG of the supported indicators’ assigned numbers in the # HF. The AG shall respond with OK - indicators = [str(i) for i in self.hf_indicators.keys()] + indicators = [str(i.value) for i in self.hf_indicators] await self.execute_command(f"AT+BIND={','.join(indicators)}") # After having provided the AG with the HF indicators it supports, @@ -733,6 +878,7 @@ async def initiate_slc(self): self.hf_indicators[indicator].enabled = True logger.info("SLC setup completed") + self._slc_initialized = True async def setup_audio_connection(self): """4.11.2 Audio Connection Setup by HF.""" @@ -829,6 +975,8 @@ async def update_ag_indicator(self, index: int, value: int): async def handle_unsolicited(self): """Handle unsolicited result codes sent by the audio gateway.""" result = await self.unsolicited_queue.get() + if not result: + raise HfLoopTermination() if result.code == "+BCS": await self.setup_codec_connection(int(result.parameters[0])) elif result.code == "+CIEV": @@ -846,14 +994,352 @@ async def run(self): """ try: - await self.initiate_slc() + if not self._slc_initialized: + await self.initiate_slc() while True: await self.handle_unsolicited() + except HfLoopTermination: + logger.info('Loop terminated') except Exception: logger.error("HFP-HF protocol failed with the following error:") logger.error(traceback.format_exc()) +class AgProtocol(pyee.EventEmitter): + """ + Implementation for the Audio-Gateway side of the Hands-Free profile. + + Reference specification Hands-Free Profile v1.8. + + Emitted events: + slc_complete: Emit when SLC procedure is completed. + codec_negotiation: When codec is renegotiated, notify the new codec. + Args: + active_codec: AudioCodec + hf_indicator: When HF update their indicators, notify the new state. + Args: + hf_indicator: HfIndicator + codec_connection_request: Emit when HF sends AT+BCC to request codec connection. + answer: Emit when HF sends ATA to answer phone call. + hang_up: Emit when HF sends AT+CHUP to hang up phone call. + dial: Emit when HF sends ATD to dial phone call. + """ + + supported_hf_features: int + supported_hf_indicators: Set[HfIndicator] + supported_audio_codecs: List[AudioCodec] + + supported_ag_features: int + supported_ag_call_hold_operations: List[CallHoldOperation] + + ag_indicators: List[AgIndicatorState] + hf_indicators: collections.OrderedDict[HfIndicator, HfIndicatorState] + + dlc: rfcomm.DLC + command_lock: asyncio.Lock + + read_buffer: bytearray + active_codec: AudioCodec + + indicator_report_enabled: bool + inband_ringtone_enabled: bool + cme_error_enabled: bool + _remained_slc_setup_features: Set[HfFeature] + + def __init__(self, dlc: rfcomm.DLC, configuration: AgConfiguration) -> None: + super().__init__() + + # Configure internal state. + self.dlc = dlc + self.command_lock = asyncio.Lock() + self.read_buffer = bytearray() + self.active_codec = AudioCodec.CVSD + + # Build local features. + self.supported_ag_features = sum(configuration.supported_ag_features) + self.supported_ag_call_hold_operations = list( + configuration.supported_ag_call_hold_operations + ) + self.ag_indicators = list(configuration.supported_ag_indicators) + self.supported_hf_indicators = set(configuration.supported_hf_indicators) + self.inband_ringtone_enabled = True + self._remained_slc_setup_features = set() + + # Clear remote features. + self.supported_hf_features = 0 + self.supported_audio_codecs = [] + self.indicator_report_enabled = False + self.cme_error_enabled = False + + self.hf_indicators = collections.OrderedDict() + + # Bind the AT reader to the RFCOMM channel. + self.dlc.sink = self._read_at + + def supports_hf_feature(self, feature: HfFeature) -> bool: + return (self.supported_hf_features & feature) != 0 + + def supports_ag_feature(self, feature: AgFeature) -> bool: + return (self.supported_ag_features & feature) != 0 + + def _read_at(self, data: bytes): + """ + Reads AT messages from the RFCOMM channel. + """ + # Append to the read buffer. + self.read_buffer.extend(data) + + # Locate header and trailer. + trailer = self.read_buffer.find(b'\r') + if trailer == -1: + return + + # Isolate the AT response code and parameters. + raw_command = self.read_buffer[:trailer] + command = AtCommand.parse_from(raw_command) + logger.debug(f"<<< {raw_command.decode()}") + + # Consume the response bytes. + self.read_buffer = self.read_buffer[trailer + 1 :] + + if command.sub_code == AtCommand.SubCode.TEST: + handler_name = f'_on_{command.code.lower()}_test' + elif command.sub_code == AtCommand.SubCode.READ: + handler_name = f'_on_{command.code.lower()}_read' + else: + handler_name = f'_on_{command.code.lower()}' + + if handler := getattr(self, handler_name, None): + handler(*command.parameters) + else: + logger.warning('Handler %s not found', handler_name) + self.send_response('ERROR') + + def send_response(self, response: str) -> None: + """Sends an AT response.""" + self.dlc.write(f'\r\n{response}\r\n') + + def send_cme_error(self, error_code: CmeError) -> None: + """Sends an CME ERROR response. + + If CME Error is not enabled by HF, sends ERROR instead. + """ + if self.cme_error_enabled: + self.send_response(f'+CME ERROR: {error_code.value}') + else: + self.send_error() + + def send_ok(self) -> None: + """Sends an OK response.""" + self.send_response('OK') + + def send_error(self) -> None: + """Sends an ERROR response.""" + self.send_response('ERROR') + + def set_inband_ringtone_enabled(self, enabled: bool) -> None: + """Enables or disables in-band ringtone.""" + + self.inband_ringtone_enabled = enabled + self.send_response(f'+BSIR: {1 if enabled else 0}') + + def update_ag_indicator(self, indicator: AgIndicator, value: int) -> None: + """Updates AG indicator. + + Args: + indicator: Name of the indicator. + value: new value of the indicator. + """ + + search_result = next( + ( + (index, state) + for index, state in enumerate(self.ag_indicators) + if state.description == indicator + ), + None, + ) + if not search_result: + raise KeyError(f'{indicator} is not supported.') + + index, indicator_state = search_result + if not self.indicator_report_enabled: + logger.warning('AG indicator report is disabled') + if not indicator_state.enabled: + logger.warning(f'AG indicator {indicator} is disabled') + + indicator_state.current_status = value + self.send_response(f'+CIEV: {index+1},{value}') + + async def negotiate_codec(self, codec: AudioCodec) -> None: + """Starts codec negotiation.""" + + if not self.supports_ag_feature(AgFeature.CODEC_NEGOTIATION): + logger.warning('Local does not support Codec Negotiation') + if not self.supports_hf_feature(HfFeature.CODEC_NEGOTIATION): + logger.warning('Peer does not support Codec Negotiation') + if codec not in self.supported_audio_codecs: + logger.warning(f'{codec} is not supported by peer') + + at_bcs_future = asyncio.get_running_loop().create_future() + self.once('codec_negotiation', at_bcs_future.set_result) + self.send_response(f'+BCS: {codec.value}') + if (new_codec := await at_bcs_future) != codec: + raise HfpProtocolError(f'Expect codec: {codec}, but get {new_codec}') + + def _check_remained_slc_commands(self) -> None: + if not self._remained_slc_setup_features: + self.emit('slc_complete') + + def _on_brsf(self, hf_features: bytes) -> None: + self.supported_hf_features = int(hf_features) + self.send_response(f'+BRSF: {self.supported_ag_features}') + self.send_ok() + + if self.supports_hf_feature( + HfFeature.HF_INDICATORS + ) and self.supports_ag_feature(AgFeature.HF_INDICATORS): + self._remained_slc_setup_features.add(HfFeature.HF_INDICATORS) + + if self.supports_hf_feature( + HfFeature.THREE_WAY_CALLING + ) and self.supports_ag_feature(AgFeature.THREE_WAY_CALLING): + self._remained_slc_setup_features.add(HfFeature.THREE_WAY_CALLING) + + def _on_bac(self, *args) -> None: + self.supported_audio_codecs = [AudioCodec(int(value)) for value in args] + self.send_ok() + + def _on_bcs(self, codec: bytes) -> None: + self.active_codec = AudioCodec(int(codec)) + self.send_ok() + self.emit('codec_negotiation', self.active_codec) + + def _on_cind_test(self) -> None: + if not self.ag_indicators: + self.send_cme_error(CmeError.NOT_FOUND) + return + + indicator_list_str = ",".join( + indicator.on_test_text for indicator in self.ag_indicators + ) + self.send_response(f'+CIND: {indicator_list_str}') + self.send_ok() + + def _on_cind_read(self) -> None: + if not self.ag_indicators: + self.send_cme_error(CmeError.NOT_FOUND) + return + + indicator_list_str = ",".join( + str(indicator.current_status) for indicator in self.ag_indicators + ) + self.send_response(f'+CIND: {indicator_list_str}') + self.send_ok() + + self._check_remained_slc_commands() + + def _on_cmer( + self, + mode: bytes, + keypad: Optional[bytes] = None, + display: Optional[bytes] = None, + indicator: bytes = b'', + ) -> None: + if int(mode) != 3 or keypad or display or int(indicator) not in (0, 1): + logger.error( + f'Unexpected values: mode={mode!r}, keypad={keypad!r}, ' + f'display={display!r}, indicator={indicator!r}' + ) + self.send_cme_error(CmeError.INVALID_INDEX) + + self.indicator_report_enabled = bool(int(indicator)) + self.send_ok() + + def _on_cmee(self, enabled: bytes) -> None: + self.cme_error_enabled = bool(int(enabled)) + self.send_ok() + + def _on_bind(self, *args) -> None: + if not self.supports_ag_feature(AgFeature.HF_INDICATORS): + self.send_error() + return + + peer_supported_indicators = set( + HfIndicator(int(indicator)) for indicator in args + ) + self.hf_indicators = collections.OrderedDict( + { + indicator: HfIndicatorState(indicator=indicator) + for indicator in self.supported_hf_indicators.intersection( + peer_supported_indicators + ) + } + ) + self.send_ok() + + def _on_bind_test(self) -> None: + if not self.supports_ag_feature(AgFeature.HF_INDICATORS): + self.send_error() + return + + hf_indicator_list_str = ",".join( + str(indicator.value) for indicator in self.supported_hf_indicators + ) + self.send_response(f'+BIND: ({hf_indicator_list_str})') + self.send_ok() + + def _on_bind_read(self) -> None: + if not self.supports_ag_feature(AgFeature.HF_INDICATORS): + self.send_error() + return + + for indicator in self.hf_indicators: + self.send_response(f'+BIND: {indicator.value},1') + + self.send_ok() + + self._remained_slc_setup_features.remove(HfFeature.HF_INDICATORS) + self._check_remained_slc_commands() + + def _on_biev(self, index_bytes: bytes, value_bytes: bytes) -> None: + if not self.supports_ag_feature(AgFeature.HF_INDICATORS): + self.send_error() + return + + index = HfIndicator(int(index_bytes)) + if index not in self.hf_indicators: + self.send_error() + return + + self.hf_indicators[index].current_status = int(value_bytes) + self.emit('hf_indicator', self.hf_indicators[index]) + self.send_ok() + + def _on_bia(self, *args) -> None: + for enabled, state in zip(args, self.ag_indicators): + state.enabled = bool(int(enabled)) + self.send_ok() + + def _on_bcc(self) -> None: + self.emit('codec_connection_request') + self.send_ok() + + def _on_a(self) -> None: + """ATA handler.""" + self.emit('answer') + self.send_ok() + + def _on_d(self, number: bytes) -> None: + """ATD handler.""" + self.emit('dial', number.decode()) + self.send_ok() + + def _on_chup(self) -> None: + self.emit('hang_up') + self.send_ok() + + # ----------------------------------------------------------------------------- # Normative SDP definitions # ----------------------------------------------------------------------------- @@ -907,9 +1393,12 @@ class AgSdpFeature(enum.IntFlag): VOICE_RECOGNITION_TEST = 0x80 -def sdp_records( - service_record_handle: int, rfcomm_channel: int, configuration: Configuration -) -> List[ServiceAttribute]: +def make_hf_sdp_records( + service_record_handle: int, + rfcomm_channel: int, + configuration: HfConfiguration, + version: ProfileVersion = ProfileVersion.V1_8, +) -> List[sdp.ServiceAttribute]: """ Generates the SDP record for HFP Hands-Free support. @@ -941,53 +1430,226 @@ def sdp_records( hf_supported_features |= HfSdpFeature.WIDE_BAND return [ - ServiceAttribute( - SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, - DataElement.unsigned_integer_32(service_record_handle), + sdp.ServiceAttribute( + sdp.SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + sdp.DataElement.unsigned_integer_32(service_record_handle), + ), + sdp.ServiceAttribute( + sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, + sdp.DataElement.sequence( + [ + sdp.DataElement.uuid(BT_HANDSFREE_SERVICE), + sdp.DataElement.uuid(BT_GENERIC_AUDIO_SERVICE), + ] + ), + ), + sdp.ServiceAttribute( + sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.DataElement.sequence( + [ + sdp.DataElement.sequence( + [sdp.DataElement.uuid(BT_L2CAP_PROTOCOL_ID)] + ), + sdp.DataElement.sequence( + [ + sdp.DataElement.uuid(BT_RFCOMM_PROTOCOL_ID), + sdp.DataElement.unsigned_integer_8(rfcomm_channel), + ] + ), + ] + ), + ), + sdp.ServiceAttribute( + sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.DataElement.sequence( + [ + sdp.DataElement.sequence( + [ + sdp.DataElement.uuid(BT_HANDSFREE_SERVICE), + sdp.DataElement.unsigned_integer_16(version), + ] + ) + ] + ), + ), + sdp.ServiceAttribute( + sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, + sdp.DataElement.unsigned_integer_16(hf_supported_features), + ), + ] + + +def make_ag_sdp_records( + service_record_handle: int, + rfcomm_channel: int, + configuration: AgConfiguration, + version: ProfileVersion = ProfileVersion.V1_8, +) -> List[sdp.ServiceAttribute]: + """ + Generates the SDP record for HFP Audio-Gateway support. + + The record exposes the features supported in the input configuration, + and the allocated RFCOMM channel. + """ + + ag_supported_features = 0 + + if AgFeature.EC_NR in configuration.supported_ag_features: + ag_supported_features |= AgSdpFeature.EC_NR + if AgFeature.THREE_WAY_CALLING in configuration.supported_ag_features: + ag_supported_features |= AgSdpFeature.THREE_WAY_CALLING + if ( + AgFeature.ENHANCED_VOICE_RECOGNITION_STATUS + in configuration.supported_ag_features + ): + ag_supported_features |= AgSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS + if AgFeature.VOICE_RECOGNITION_TEST in configuration.supported_ag_features: + ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_TEST + if AgFeature.IN_BAND_RING_TONE_CAPABILITY in configuration.supported_ag_features: + ag_supported_features |= AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY + if AgFeature.VOICE_RECOGNITION_FUNCTION in configuration.supported_ag_features: + ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_FUNCTION + if AudioCodec.MSBC in configuration.supported_audio_codecs: + ag_supported_features |= AgSdpFeature.WIDE_BAND + + return [ + sdp.ServiceAttribute( + sdp.SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + sdp.DataElement.unsigned_integer_32(service_record_handle), ), - ServiceAttribute( - SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, - DataElement.sequence( + sdp.ServiceAttribute( + sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, + sdp.DataElement.sequence( [ - DataElement.uuid(BT_HANDSFREE_SERVICE), - DataElement.uuid(BT_GENERIC_AUDIO_SERVICE), + sdp.DataElement.uuid(BT_HEADSET_AUDIO_GATEWAY_SERVICE), + sdp.DataElement.uuid(BT_GENERIC_AUDIO_SERVICE), ] ), ), - ServiceAttribute( - SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, - DataElement.sequence( + sdp.ServiceAttribute( + sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.DataElement.sequence( [ - DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]), - DataElement.sequence( + sdp.DataElement.sequence( + [sdp.DataElement.uuid(BT_L2CAP_PROTOCOL_ID)] + ), + sdp.DataElement.sequence( [ - DataElement.uuid(BT_RFCOMM_PROTOCOL_ID), - DataElement.unsigned_integer_8(rfcomm_channel), + sdp.DataElement.uuid(BT_RFCOMM_PROTOCOL_ID), + sdp.DataElement.unsigned_integer_8(rfcomm_channel), ] ), ] ), ), - ServiceAttribute( - SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, - DataElement.sequence( + sdp.ServiceAttribute( + sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.DataElement.sequence( [ - DataElement.sequence( + sdp.DataElement.sequence( [ - DataElement.uuid(BT_HANDSFREE_SERVICE), - DataElement.unsigned_integer_16(ProfileVersion.V1_8), + sdp.DataElement.uuid(BT_HEADSET_AUDIO_GATEWAY_SERVICE), + sdp.DataElement.unsigned_integer_16(version), ] ) ] ), ), - ServiceAttribute( - SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, - DataElement.unsigned_integer_16(hf_supported_features), + sdp.ServiceAttribute( + sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, + sdp.DataElement.unsigned_integer_16(ag_supported_features), ), ] +async def find_hf_sdp_record( + connection: device.Connection, +) -> Optional[Tuple[int, ProfileVersion, HfSdpFeature]]: + """Searches a Hands-Free SDP record from remote device. + + Args: + connection: ACL connection to make SDP search. + + Returns: + Dictionary mapping from channel number to service class UUID list. + """ + async with sdp.Client(connection) as sdp_client: + search_result = await sdp_client.search_attributes( + uuids=[BT_HANDSFREE_SERVICE], + attribute_ids=[ + sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, + ], + ) + for attribute_lists in search_result: + channel: Optional[int] = None + version: Optional[ProfileVersion] = None + features: Optional[HfSdpFeature] = None + for attribute in attribute_lists: + # The layout is [[L2CAP_PROTOCOL], [RFCOMM_PROTOCOL, RFCOMM_CHANNEL]]. + if attribute.id == sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID: + protocol_descriptor_list = attribute.value.value + channel = protocol_descriptor_list[1].value[1].value + elif ( + attribute.id + == sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID + ): + profile_descriptor_list = attribute.value.value + version = ProfileVersion(profile_descriptor_list[0].value[1].value) + elif attribute.id == sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID: + features = HfSdpFeature(attribute.value.value) + if not channel or not version or features is None: + logger.warning(f"Bad result {attribute_lists}.") + return None + return (channel, version, features) + return None + + +async def find_ag_sdp_record( + connection: device.Connection, +) -> Optional[Tuple[int, ProfileVersion, AgSdpFeature]]: + """Searches an Audio-Gateway SDP record from remote device. + + Args: + connection: ACL connection to make SDP search. + + Returns: + Dictionary mapping from channel number to service class UUID list. + """ + async with sdp.Client(connection) as sdp_client: + search_result = await sdp_client.search_attributes( + uuids=[BT_HEADSET_AUDIO_GATEWAY_SERVICE], + attribute_ids=[ + sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, + ], + ) + for attribute_lists in search_result: + channel: Optional[int] = None + version: Optional[ProfileVersion] = None + features: Optional[AgSdpFeature] = None + for attribute in attribute_lists: + # The layout is [[L2CAP_PROTOCOL], [RFCOMM_PROTOCOL, RFCOMM_CHANNEL]]. + if attribute.id == sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID: + protocol_descriptor_list = attribute.value.value + channel = protocol_descriptor_list[1].value[1].value + elif ( + attribute.id + == sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID + ): + profile_descriptor_list = attribute.value.value + version = ProfileVersion(profile_descriptor_list[0].value[1].value) + elif attribute.id == sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID: + features = AgSdpFeature(attribute.value.value) + if not channel or not version or features is None: + logger.warning(f"Bad result {attribute_lists}.") + return None + return (channel, version, features) + return None + + # ----------------------------------------------------------------------------- # ESCO Codec Default Parameters # ----------------------------------------------------------------------------- diff --git a/examples/run_hfp_gateway.py b/examples/run_hfp_gateway.py index c3b392da..a05fdff5 100644 --- a/examples/run_hfp_gateway.py +++ b/examples/run_hfp_gateway.py @@ -16,127 +16,57 @@ # Imports # ----------------------------------------------------------------------------- import asyncio +import contextlib +import json import sys import os import logging +import websockets -from bumble.colors import color +from typing import Optional import bumble.core from bumble.device import Device from bumble.transport import open_transport_or_link from bumble.core import ( - BT_HANDSFREE_SERVICE, - BT_RFCOMM_PROTOCOL_ID, BT_BR_EDR_TRANSPORT, ) from bumble import rfcomm, hfp from bumble.hci import HCI_SynchronousDataPacket -from bumble.sdp import ( - Client as SDP_Client, - DataElement, - ServiceAttribute, - SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, - SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, - SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, -) logger = logging.getLogger(__name__) -# ----------------------------------------------------------------------------- -# pylint: disable-next=too-many-nested-blocks -async def list_rfcomm_channels(device, connection): - # Connect to the SDP Server - sdp_client = SDP_Client(connection) - await sdp_client.connect() - - # Search for services that support the Handsfree Profile - search_result = await sdp_client.search_attributes( - [BT_HANDSFREE_SERVICE], - [ - SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, - SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, - SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, +def _default_configuration() -> hfp.AgConfiguration: + return hfp.AgConfiguration( + supported_ag_features=[ + hfp.AgFeature.HF_INDICATORS, + hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY, + hfp.AgFeature.REJECT_CALL, + hfp.AgFeature.CODEC_NEGOTIATION, + hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED, ], + supported_ag_indicators=[ + hfp.AgIndicatorState.call(), + hfp.AgIndicatorState.service(), + hfp.AgIndicatorState.callsetup(), + hfp.AgIndicatorState.callsetup(), + hfp.AgIndicatorState.signal(), + hfp.AgIndicatorState.roam(), + hfp.AgIndicatorState.battchg(), + ], + supported_hf_indicators=[ + hfp.HfIndicator.ENHANCED_SAFETY, + hfp.HfIndicator.BATTERY_LEVEL, + ], + supported_ag_call_hold_operations=[], + supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC], ) - print(color('==================================', 'blue')) - print(color('Handsfree Services:', 'yellow')) - rfcomm_channels = [] - # pylint: disable-next=too-many-nested-blocks - for attribute_list in search_result: - # Look for the RFCOMM Channel number - protocol_descriptor_list = ServiceAttribute.find_attribute_in_list( - attribute_list, SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID - ) - if protocol_descriptor_list: - for protocol_descriptor in protocol_descriptor_list.value: - if len(protocol_descriptor.value) >= 2: - if protocol_descriptor.value[0].value == BT_RFCOMM_PROTOCOL_ID: - print(color('SERVICE:', 'green')) - print( - color(' RFCOMM Channel:', 'cyan'), - protocol_descriptor.value[1].value, - ) - rfcomm_channels.append(protocol_descriptor.value[1].value) - - # List profiles - bluetooth_profile_descriptor_list = ( - ServiceAttribute.find_attribute_in_list( - attribute_list, - SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, - ) - ) - if bluetooth_profile_descriptor_list: - if bluetooth_profile_descriptor_list.value: - if ( - bluetooth_profile_descriptor_list.value[0].type - == DataElement.SEQUENCE - ): - bluetooth_profile_descriptors = ( - bluetooth_profile_descriptor_list.value - ) - else: - # Sometimes, instead of a list of lists, we just - # find a list. Fix that - bluetooth_profile_descriptors = [ - bluetooth_profile_descriptor_list - ] - - print(color(' Profiles:', 'green')) - for ( - bluetooth_profile_descriptor - ) in bluetooth_profile_descriptors: - version_major = ( - bluetooth_profile_descriptor.value[1].value >> 8 - ) - version_minor = ( - bluetooth_profile_descriptor.value[1].value - & 0xFF - ) - print( - ' ' - f'{bluetooth_profile_descriptor.value[0].value}' - f' - version {version_major}.{version_minor}' - ) - - # List service classes - service_class_id_list = ServiceAttribute.find_attribute_in_list( - attribute_list, SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID - ) - if service_class_id_list: - if service_class_id_list.value: - print(color(' Service Classes:', 'green')) - for service_class_id in service_class_id_list.value: - print(' ', service_class_id.value) - - await sdp_client.disconnect() - return rfcomm_channels # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 4: print( 'Usage: run_hfp_gateway.py ' @@ -149,11 +79,13 @@ async def main(): return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True await device.power_on() @@ -164,13 +96,14 @@ async def main(): print(f'=== Connected to {connection.peer_address}!') # Get a list of all the Handsfree services (should only be 1) - channels = await list_rfcomm_channels(device, connection) - if len(channels) == 0: + if not (hfp_record := await hfp.find_hf_sdp_record(connection)): print('!!! no service found') return # Pick the first one - channel = channels[0] + channel, version, hf_sdp_features = hfp_record + print(f'HF version: {version}') + print(f'HF features: {hf_sdp_features}') # Request authentication print('*** Authenticating...') @@ -205,51 +138,9 @@ def on_sco(connection_handle: int, packet: HCI_SynchronousDataPacket): device.host.on('sco_packet', on_sco) - # Protocol loop (just for testing at this point) - protocol = hfp.HfpProtocol(session) - while True: - line = await protocol.next_line() - - if line.startswith('AT+BRSF='): - protocol.send_response_line('+BRSF: 30') - protocol.send_response_line('OK') - elif line.startswith('AT+CIND=?'): - protocol.send_response_line( - '+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),' - '("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),' - '("callheld",(0-2))' - ) - protocol.send_response_line('OK') - elif line.startswith('AT+CIND?'): - protocol.send_response_line('+CIND: 0,0,1,4,1,5,0') - protocol.send_response_line('OK') - elif line.startswith('AT+CMER='): - protocol.send_response_line('OK') - elif line.startswith('AT+CHLD=?'): - protocol.send_response_line('+CHLD: 0') - protocol.send_response_line('OK') - elif line.startswith('AT+BTRH?'): - protocol.send_response_line('+BTRH: 0') - protocol.send_response_line('OK') - elif line.startswith('AT+CLIP='): - protocol.send_response_line('OK') - elif line.startswith('AT+VGS='): - protocol.send_response_line('OK') - elif line.startswith('AT+BIA='): - protocol.send_response_line('OK') - elif line.startswith('AT+BVRA='): - protocol.send_response_line( - '+BVRA: 1,1,12AA,1,1,"Message 1 from Janina"' - ) - elif line.startswith('AT+XEVENT='): - protocol.send_response_line('OK') - elif line.startswith('AT+XAPL='): - protocol.send_response_line('OK') - else: - print(color('UNSUPPORTED AT COMMAND', 'red')) - protocol.send_response_line('ERROR') + ag_protocol = hfp.AgProtocol(session, _default_configuration()) - await hci_source.wait_for_termination() + await hci_transport.source.terminated # ----------------------------------------------------------------------------- diff --git a/examples/run_hfp_handsfree.py b/examples/run_hfp_handsfree.py index 40d36bde..6c5f3a19 100644 --- a/examples/run_hfp_handsfree.py +++ b/examples/run_hfp_handsfree.py @@ -37,7 +37,7 @@ # ----------------------------------------------------------------------------- -def on_dlc(dlc: rfcomm.DLC, configuration: hfp.Configuration): +def on_dlc(dlc: rfcomm.DLC, configuration: hfp.HfConfiguration): print('*** DLC connected', dlc) global hf_protocol hf_protocol = HfProtocol(dlc, configuration) @@ -96,7 +96,7 @@ async def main(): # Hands-Free profile configuration. # TODO: load configuration from file. - configuration = hfp.Configuration( + configuration = hfp.HfConfiguration( supported_hf_features=[ hfp.HfFeature.THREE_WAY_CALLING, hfp.HfFeature.REMOTE_VOLUME_CONTROL, diff --git a/setup.cfg b/setup.cfg index dbede6fa..080eb68f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -83,7 +83,7 @@ build = build >= 0.7 test = pytest >= 8.0 - pytest-asyncio >= 0.21.1 + pytest-asyncio >= 0.23.5 pytest-html >= 3.2.0 coverage >= 6.4 development = diff --git a/tests/hfp_test.py b/tests/hfp_test.py index dc281805..0f721c0a 100644 --- a/tests/hfp_test.py +++ b/tests/hfp_test.py @@ -19,8 +19,9 @@ import logging import os import pytest +import pytest_asyncio -from typing import Tuple +from typing import Tuple, Optional from .test_utils import TwoDevices from bumble import core @@ -35,10 +36,73 @@ logger = logging.getLogger(__name__) +# ----------------------------------------------------------------------------- +def _default_hf_configuration() -> hfp.HfConfiguration: + return hfp.HfConfiguration( + supported_hf_features=[ + hfp.HfFeature.CODEC_NEGOTIATION, + hfp.HfFeature.ESCO_S4_SETTINGS_SUPPORTED, + hfp.HfFeature.HF_INDICATORS, + ], + supported_hf_indicators=[ + hfp.HfIndicator.ENHANCED_SAFETY, + hfp.HfIndicator.BATTERY_LEVEL, + ], + supported_audio_codecs=[ + hfp.AudioCodec.CVSD, + hfp.AudioCodec.MSBC, + ], + ) + + +# ----------------------------------------------------------------------------- +def _default_hf_sdp_features() -> hfp.HfSdpFeature: + return hfp.HfSdpFeature.WIDE_BAND + + +# ----------------------------------------------------------------------------- +def _default_ag_configuration() -> hfp.AgConfiguration: + return hfp.AgConfiguration( + supported_ag_features=[ + hfp.AgFeature.HF_INDICATORS, + hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY, + hfp.AgFeature.REJECT_CALL, + hfp.AgFeature.CODEC_NEGOTIATION, + hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED, + ], + supported_ag_indicators=[ + hfp.AgIndicatorState.call(), + hfp.AgIndicatorState.service(), + hfp.AgIndicatorState.callsetup(), + hfp.AgIndicatorState.callsetup(), + hfp.AgIndicatorState.signal(), + hfp.AgIndicatorState.roam(), + hfp.AgIndicatorState.battchg(), + ], + supported_hf_indicators=[ + hfp.HfIndicator.ENHANCED_SAFETY, + hfp.HfIndicator.BATTERY_LEVEL, + ], + supported_ag_call_hold_operations=[], + supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC], + ) + + +# ----------------------------------------------------------------------------- +def _default_ag_sdp_features() -> hfp.AgSdpFeature: + return hfp.AgSdpFeature.WIDE_BAND | hfp.AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY + + # ----------------------------------------------------------------------------- async def make_hfp_connections( - hf_config: hfp.Configuration, -) -> Tuple[hfp.HfProtocol, hfp.HfpProtocol]: + hf_config: Optional[hfp.HfConfiguration] = None, + ag_config: Optional[hfp.AgConfiguration] = None, +): + if not hf_config: + hf_config = _default_hf_configuration() + if not ag_config: + ag_config = _default_ag_configuration() + # Setup devices devices = TwoDevices() await devices.setup_connection() @@ -55,38 +119,200 @@ async def make_hfp_connections( # Setup HFP connection hf = hfp.HfProtocol(client_dlc, hf_config) - ag = hfp.HfpProtocol(server_dlc) - return hf, ag + ag = hfp.AgProtocol(server_dlc, ag_config) + + await hf.initiate_slc() + return (hf, ag) # ----------------------------------------------------------------------------- +@pytest_asyncio.fixture +async def hfp_connections(): + hf, ag = await make_hfp_connections() + hf_loop_task = asyncio.create_task(hf.run()) + + try: + yield (hf, ag) + finally: + # Close the coroutine. + hf.unsolicited_queue.put_nowait(None) + await hf_loop_task +# ----------------------------------------------------------------------------- @pytest.mark.asyncio -async def test_slc(): - hf_config = hfp.Configuration( - supported_hf_features=[], supported_hf_indicators=[], supported_audio_codecs=[] - ) - hf, ag = await make_hfp_connections(hf_config) - - async def ag_loop(): - while line := await ag.next_line(): - if line.startswith('AT+BRSF'): - ag.send_response_line('+BRSF: 0') - elif line.startswith('AT+CIND=?'): - ag.send_response_line( - '+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),' - '("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),' - '("callheld",(0-2))' +async def test_slc_with_minimal_features(): + hf, ag = await make_hfp_connections( + hfp.HfConfiguration( + supported_audio_codecs=[], + supported_hf_features=[], + supported_hf_indicators=[], + ), + hfp.AgConfiguration( + supported_ag_call_hold_operations=[], + supported_ag_features=[], + supported_ag_indicators=[ + hfp.AgIndicatorState( + description=hfp.AgIndicator.CALL, + supported_values={0, 1}, + current_status=0, ) - elif line.startswith('AT+CIND?'): - ag.send_response_line('+CIND: 0,0,1,4,1,5,0') - ag.send_response_line('OK') + ], + supported_hf_indicators=[], + supported_audio_codecs=[], + ), + ) - ag_task = asyncio.create_task(ag_loop()) + assert hf.supported_ag_features == ag.supported_ag_features + assert hf.supported_hf_features == ag.supported_hf_features + for a, b in zip(hf.ag_indicators, ag.ag_indicators): + assert a.description == b.description + assert a.current_status == b.current_status - await hf.initiate_slc() - ag_task.cancel() + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_slc(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + + assert hf.supported_ag_features == ag.supported_ag_features + assert hf.supported_hf_features == ag.supported_hf_features + for a, b in zip(hf.ag_indicators, ag.ag_indicators): + assert a.description == b.description + assert a.current_status == b.current_status + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_ag_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + + future = asyncio.get_running_loop().create_future() + hf.on('ag_indicator', future.set_result) + + ag.update_ag_indicator(hfp.AgIndicator.CALL, 1) + + indicator: hfp.AgIndicatorState = await future + assert indicator.current_status == 1 + assert indicator.description == hfp.AgIndicator.CALL + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_hf_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + + future = asyncio.get_running_loop().create_future() + ag.on('hf_indicator', future.set_result) + + await hf.execute_command('AT+BIEV=2,100') + + indicator: hfp.HfIndicatorState = await future + assert indicator.current_status == 100 + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_codec_negotiation( + hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] +): + hf, ag = hfp_connections + + futures = [ + asyncio.get_running_loop().create_future(), + asyncio.get_running_loop().create_future(), + ] + hf.on('codec_negotiation', futures[0].set_result) + ag.on('codec_negotiation', futures[1].set_result) + await ag.negotiate_codec(hfp.AudioCodec.MSBC) + + assert await futures[0] == await futures[1] + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_dial(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + NUMBER = 'ATD123456789' + + future = asyncio.get_running_loop().create_future() + ag.on('dial', future.set_result) + await hf.execute_command(f'ATD{NUMBER}') + + number: str = await future + assert number == NUMBER + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_answer(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + + future = asyncio.get_running_loop().create_future() + ag.on('answer', lambda: future.set_result(None)) + await hf.answer_incoming_call() + + await future + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_reject_incoming_call( + hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] +): + hf, ag = hfp_connections + + future = asyncio.get_running_loop().create_future() + ag.on('hang_up', lambda: future.set_result(None)) + await hf.reject_incoming_call() + + await future + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_terminate_call(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + + future = asyncio.get_running_loop().create_future() + ag.on('hang_up', lambda: future.set_result(None)) + await hf.terminate_call() + + await future + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_hf_sdp_record(): + devices = TwoDevices() + await devices.setup_connection() + + devices[0].sdp_service_records[1] = hfp.make_hf_sdp_records( + 1, 2, _default_hf_configuration(), hfp.ProfileVersion.V1_8 + ) + + assert await hfp.find_hf_sdp_record(devices.connections[1]) == ( + 2, + hfp.ProfileVersion.V1_8, + _default_hf_sdp_features(), + ) + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_ag_sdp_record(): + devices = TwoDevices() + await devices.setup_connection() + + devices[0].sdp_service_records[1] = hfp.make_ag_sdp_records( + 1, 2, _default_ag_configuration(), hfp.ProfileVersion.V1_8 + ) + + assert await hfp.find_ag_sdp_record(devices.connections[1]) == ( + 2, + hfp.ProfileVersion.V1_8, + _default_ag_sdp_features(), + ) # -----------------------------------------------------------------------------