diff --git a/bumble/hfp.py b/bumble/hfp.py index a0596fc7..ea645897 100644 --- a/bumble/hfp.py +++ b/bumble/hfp.py @@ -204,17 +204,22 @@ class HfIndicator(enum.IntEnum): BATTERY_LEVEL = 0x02 # Battery level feature -class CallHoldOperation(enum.IntEnum): +class CallHoldOperation(enum.Enum): """ Call Hold supported operations (normative). AT Commands Reference Guide, 3.5.2.3.12 +CHLD - Call Holding Services. """ - RELEASE_ALL_HELD_CALLS = 0 # Release all held calls - RELEASE_ALL_ACTIVE_CALLS = 1 # Release all active calls, accept other - HOLD_ALL_ACTIVE_CALLS = 2 # Place all active calls on hold, accept other - ADD_HELD_CALL = 3 # Adds a held call to conversation + RELEASE_ALL_HELD_CALLS = "0" # Release all held calls + RELEASE_ALL_ACTIVE_CALLS = "1" # Release all active calls, accept other + RELEASE_SPECIFIC_CALL = "1x" # Release a specific call X + HOLD_ALL_ACTIVE_CALLS = "2" # Place all active calls on hold, accept other + HOLD_ALL_CALLS_EXCEPT = "2x" # Place all active calls except call X + ADD_HELD_CALL = "3" # Adds a held call to conversation + CONNECT_TWO_CALLS = ( + "4" # Connects the two calls and disconnects the subscriber from both calls + ) class ResponseHoldStatus(enum.IntEnum): @@ -335,10 +340,82 @@ class CallInfo: status: CallInfoStatus mode: CallInfoMode multi_party: CallInfoMultiParty - number: Optional[int] = None + number: Optional[str] = None type: Optional[int] = None +@dataclasses.dataclass +class CallLineIdentification: + """ + Calling Line Identification notification. + + TS 127 007 - V6.8.0, 7.6 Calling line identification presentation +CLIP, but only + number, type and alpha are meaningful in HFP. + + Attributes: + number: String type phone number of format specified by `type`. + type: Type of address octet in integer format (refer TS 24.008 [8] subclause + 10.5.4.7). + subaddr: String type subaddress of format specified by `satype`. + satype: Type of subaddress octet in integer format (refer TS 24.008 [8] + subclause 10.5.4.8). + alpha: Optional string type alphanumeric representation of number corresponding + to the entry found in phonebook; used character set should be the one selected + with command Select TE Character Set +CSCS. + cli_validity: 0 CLI valid, 1 CLI has been withheld by the originator, 2 CLI is + not available due to interworking problems or limitations of originating + network. + """ + + number: str + type: int + subaddr: Optional[str] = None + satype: Optional[int] = None + alpha: Optional[str] = None + cli_validity: Optional[int] = None + + @classmethod + def parse_from(cls: Type[Self], parameters: List[bytes]) -> Self: + return cls( + number=parameters[0].decode(), + type=int(parameters[1]), + subaddr=parameters[2].decode() if len(parameters) >= 3 else None, + satype=( + int(parameters[3]) if len(parameters) >= 4 and parameters[3] else None + ), + alpha=parameters[4].decode() if len(parameters) >= 5 else None, + cli_validity=( + int(parameters[5]) if len(parameters) >= 6 and parameters[5] else None + ), + ) + + def to_clip_string(self) -> str: + return ','.join( + str(arg) if arg else '' + for arg in [ + self.number, + self.type, + self.subaddr, + self.satype, + self.alpha, + self.cli_validity, + ] + ) + + +class VoiceRecognitionState(enum.IntEnum): + """ + vrec values provided in AT+BVRA command. + + Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007. + """ + + DISABLE = 0 + ENABLE = 1 + # (Enhanced Voice Recognition Status only) HF is ready to accept audio. + ENHANCED_READY = 2 + + class CmeError(enum.IntEnum): """ CME ERROR codes (partial listed). @@ -359,7 +436,7 @@ class CmeError(enum.IntEnum): # ----------------------------------------------------------------------------- # Response codes. -RESPONSE_CODES = [ +RESPONSE_CODES = { "+APLSIRI", "+BAC", "+BCC", @@ -390,10 +467,10 @@ class CmeError(enum.IntEnum): "+XAPL", "A", "D", -] +} # Unsolicited responses and statuses. -UNSOLICITED_CODES = [ +UNSOLICITED_CODES = { "+APLSIRI", "+BCS", "+BIND", @@ -411,10 +488,10 @@ class CmeError(enum.IntEnum): "NO ANSWER", "NO CARRIER", "RING", -] +} # Status codes -STATUS_CODES = [ +STATUS_CODES = { "+CME ERROR", "BLACKLISTED", "BUSY", @@ -423,7 +500,7 @@ class CmeError(enum.IntEnum): "NO ANSWER", "NO CARRIER", "OK", -] +} @dataclasses.dataclass @@ -626,10 +703,25 @@ class HfProtocol(pyee.EventEmitter): ag_indicator: When AG update their indicators, notify the new state. Args: ag_indicator: AgIndicator + speaker_volume: Emitted when AG update speaker volume autonomously. + Args: + volume: Int + microphone_volume: Emitted when AG update microphone volume autonomously. + Args: + volume: Int + microphone_volume: Emitted when AG sends a ringtone request. + Args: + None + cli_notification: Emitted when notify the call metadata on line. + Args: + cli_notification: CallLineIdentification + voice_recognition: Emitted when AG starts voice recognition autonomously. + Args: + vrec: VoiceRecognitionState """ - class HfLoopTermination(HfpProtocolError): ... - """Termination signal for run() loop.""" + class HfLoopTermination(HfpProtocolError): + """Termination signal for run() loop.""" supported_hf_features: int supported_audio_codecs: List[AudioCodec] @@ -651,7 +743,11 @@ class HfLoopTermination(HfpProtocolError): ... read_buffer: bytearray active_codec: AudioCodec - def __init__(self, dlc: rfcomm.DLC, configuration: HfConfiguration) -> None: + def __init__( + self, + dlc: rfcomm.DLC, + configuration: HfConfiguration, + ) -> None: super().__init__() # Configure internal state. @@ -841,7 +937,7 @@ async def initiate_slc(self): if self.supports_hf_feature( HfFeature.THREE_WAY_CALLING - ) and self.supports_ag_feature(HfFeature.THREE_WAY_CALLING): + ) and self.supports_ag_feature(AgFeature.THREE_WAY_CALLING): # After the HF has enabled the “Indicators status update” function in # the AG, and if the “Call waiting and 3-way calling” bit was set in the # supported features bitmap by both the HF and the AG, the HF shall @@ -854,9 +950,8 @@ async def initiate_slc(self): ) self.supported_ag_call_hold_operations = [ - CallHoldOperation(int(operation)) - for operation in response.parameters[0] - if not b'x' in operation + CallHoldOperation(operation.decode()) + for operation in response.parameters ] # 4.2.1.4 HF Indicators @@ -987,7 +1082,7 @@ async def query_current_calls(self) -> List[CallInfo]: multi_party=CallInfoMultiParty(int(response.parameters[4])), ) if len(response.parameters) >= 7: - call_info.number = int(response.parameters[5]) + call_info.number = response.parameters[5] call_info.type = int(response.parameters[6]) calls.append(call_info) return calls @@ -1010,6 +1105,21 @@ async def handle_unsolicited(self): await self.update_ag_indicator( int(result.parameters[0]), int(result.parameters[1]) ) + elif result.code == "+VGS": + self.emit('speaker_volume', int(result.parameters[0])) + elif result.code == "+VGM": + self.emit('microphone_volume', int(result.parameters[0])) + elif result.code == "RING": + self.emit('ring') + elif result.code == "+CLIP": + self.emit( + 'cli_notification', CallLineIdentification.parse_from(result.parameters) + ) + elif result.code == "+BVRA": + # TODO: Support Enhanced Voice Recognition. + self.emit( + 'voice_recognition', VoiceRecognitionState(int(result.parameters[0])) + ) else: logging.info(f"unhandled unsolicited response {result.code}") @@ -1050,6 +1160,14 @@ class AgProtocol(pyee.EventEmitter): answer: Emit when HF sends ATA to answer phone call. hang_up: Emit when HF sends AT+CHUP to hang up phone call. dial: Emit when HF sends ATD to dial phone call. + voice_recognition: Emit when HF requests voice recognition state. + Args: + vrec: VoiceRecognitionState + call_hold: Emit when HF requests call hold operation. + Args: + operation: CallHoldOperation + call_index: Optional[int] + """ supported_hf_features: int @@ -1066,10 +1184,12 @@ class AgProtocol(pyee.EventEmitter): read_buffer: bytearray active_codec: AudioCodec + calls: List[CallInfo] indicator_report_enabled: bool inband_ringtone_enabled: bool cme_error_enabled: bool + cli_notification_enabled: bool _remained_slc_setup_features: Set[HfFeature] def __init__(self, dlc: rfcomm.DLC, configuration: AgConfiguration) -> None: @@ -1079,6 +1199,7 @@ def __init__(self, dlc: rfcomm.DLC, configuration: AgConfiguration) -> None: self.dlc = dlc self.read_buffer = bytearray() self.active_codec = AudioCodec.CVSD + self.calls = [] # Build local features. self.supported_ag_features = sum(configuration.supported_ag_features) @@ -1095,6 +1216,7 @@ def __init__(self, dlc: rfcomm.DLC, configuration: AgConfiguration) -> None: self.supported_audio_codecs = [] self.indicator_report_enabled = False self.cme_error_enabled = False + self.cli_notification_enabled = False self.hf_indicators = collections.OrderedDict() @@ -1168,6 +1290,21 @@ def set_inband_ringtone_enabled(self, enabled: bool) -> None: self.inband_ringtone_enabled = enabled self.send_response(f'+BSIR: {1 if enabled else 0}') + def set_speaker_volume(self, level: int) -> None: + """Reports speaker volume.""" + + self.send_response(f'+VGS: {level}') + + def set_microphone_volume(self, level: int) -> None: + """Reports microphone volume.""" + + self.send_response(f'+VGM: {level}') + + def send_ring(self) -> None: + """Sends RING command to trigger ringtone on HF.""" + + self.send_response('RING') + def update_ag_indicator(self, indicator: AgIndicator, value: int) -> None: """Updates AG indicator. @@ -1212,6 +1349,14 @@ async def negotiate_codec(self, codec: AudioCodec) -> None: if (new_codec := await at_bcs_future) != codec: raise HfpProtocolError(f'Expect codec: {codec}, but get {new_codec}') + def send_cli_notification(self, cli: CallLineIdentification) -> None: + """Sends +CLIP CLI notification.""" + + if not self.cli_notification_enabled: + logger.warning('Try to send CLIP while CLI notification is not enabled') + + self.send_response(f'+CLIP: {cli.to_clip_string()}') + def _check_remained_slc_commands(self) -> None: if not self._remained_slc_setup_features: self.emit('slc_complete') @@ -1240,6 +1385,52 @@ def _on_bcs(self, codec: bytes) -> None: self.send_ok() self.emit('codec_negotiation', self.active_codec) + def _on_bvra(self, vrec: bytes) -> None: + self.send_ok() + self.emit('voice_recognition', VoiceRecognitionState(int(vrec))) + + def _on_chld(self, operation_code: bytes) -> None: + call_index: Optional[int] = None + if len(operation_code) > 1: + call_index = int(operation_code[1:]) + operation_code = operation_code[:1] + b'x' + try: + operation = CallHoldOperation(operation_code.decode()) + except: + logger.error(f'Invalid operation: {operation_code.decode()}') + self.send_cme_error(CmeError.OPERATION_NOT_SUPPORTED) + return + + if operation not in self.supported_ag_call_hold_operations: + logger.error(f'Unsupported operation: {operation_code.decode()}') + self.send_cme_error(CmeError.OPERATION_NOT_SUPPORTED) + + if call_index is not None and not any( + call.index == call_index for call in self.calls + ): + logger.error(f'No matching call {call_index}') + self.send_cme_error(CmeError.INVALID_INDEX) + + # Real three-way calls have more complicated situations, but this is not a popular issue - let users to handle the remaining :) + + self.send_ok() + self.emit('call_hold', operation, call_index) + + def _on_chld_test(self) -> None: + if not self.supports_ag_feature(AgFeature.THREE_WAY_CALLING): + self.send_error() + return + + self.send_response( + '+CHLD:' + + ','.join( + operation.value for operation in self.supported_ag_call_hold_operations + ) + ) + self.send_ok() + self._remained_slc_setup_features.remove(HfFeature.THREE_WAY_CALLING) + self._check_remained_slc_commands() + def _on_cind_test(self) -> None: if not self.ag_indicators: self.send_cme_error(CmeError.NOT_FOUND) @@ -1364,6 +1555,26 @@ def _on_chup(self) -> None: self.emit('hang_up') self.send_ok() + def _on_clcc(self) -> None: + for call in self.calls: + response = ( + f'+CLCC: {call.index}' + f',{call.direction.value}' + f',{call.status.value}' + f',{call.mode.value}' + f',{call.multi_party.value}' + f',\"{call.number}\"' + if call.number is not None + else '' f',{call.type}' if call.type is not None else '' + ) + self.send_response(response) + self.send_ok() + + def _on_clip(self, enabled: bytes) -> None: + if not self.supports_hf_feature(HfFeature.CLI_PRESENTATION_CAPABILITY): + logger.error('Remote doesn not support CLI but sends AT+CLIP') + self.cli_notification_enabled = True if enabled == b'1' else False + # ----------------------------------------------------------------------------- # Normative SDP definitions @@ -1596,7 +1807,7 @@ async def find_hf_sdp_record( connection: ACL connection to make SDP search. Returns: - Dictionary mapping from channel number to service class UUID list. + Tuple of (, , ) """ async with sdp.Client(connection) as sdp_client: search_result = await sdp_client.search_attributes( @@ -1640,7 +1851,7 @@ async def find_ag_sdp_record( connection: ACL connection to make SDP search. Returns: - Dictionary mapping from channel number to service class UUID list. + Tuple of (, , ) """ async with sdp.Client(connection) as sdp_client: search_result = await sdp_client.search_attributes( diff --git a/tests/hfp_test.py b/tests/hfp_test.py index 241a71bc..21c723dc 100644 --- a/tests/hfp_test.py +++ b/tests/hfp_test.py @@ -43,6 +43,9 @@ def _default_hf_configuration() -> hfp.HfConfiguration: hfp.HfFeature.CODEC_NEGOTIATION, hfp.HfFeature.ESCO_S4_SETTINGS_SUPPORTED, hfp.HfFeature.HF_INDICATORS, + hfp.HfFeature.ENHANCED_CALL_STATUS, + hfp.HfFeature.THREE_WAY_CALLING, + hfp.HfFeature.CLI_PRESENTATION_CAPABILITY, ], supported_hf_indicators=[ hfp.HfIndicator.ENHANCED_SAFETY, @@ -57,7 +60,11 @@ def _default_hf_configuration() -> hfp.HfConfiguration: # ----------------------------------------------------------------------------- def _default_hf_sdp_features() -> hfp.HfSdpFeature: - return hfp.HfSdpFeature.WIDE_BAND + return ( + hfp.HfSdpFeature.WIDE_BAND + | hfp.HfSdpFeature.THREE_WAY_CALLING + | hfp.HfSdpFeature.CLI_PRESENTATION_CAPABILITY + ) # ----------------------------------------------------------------------------- @@ -69,6 +76,8 @@ def _default_ag_configuration() -> hfp.AgConfiguration: hfp.AgFeature.REJECT_CALL, hfp.AgFeature.CODEC_NEGOTIATION, hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED, + hfp.AgFeature.ENHANCED_CALL_STATUS, + hfp.AgFeature.THREE_WAY_CALLING, ], supported_ag_indicators=[ hfp.AgIndicatorState.call(), @@ -83,14 +92,26 @@ def _default_ag_configuration() -> hfp.AgConfiguration: hfp.HfIndicator.ENHANCED_SAFETY, hfp.HfIndicator.BATTERY_LEVEL, ], - supported_ag_call_hold_operations=[], + supported_ag_call_hold_operations=[ + hfp.CallHoldOperation.ADD_HELD_CALL, + hfp.CallHoldOperation.HOLD_ALL_ACTIVE_CALLS, + hfp.CallHoldOperation.HOLD_ALL_CALLS_EXCEPT, + hfp.CallHoldOperation.RELEASE_ALL_ACTIVE_CALLS, + hfp.CallHoldOperation.RELEASE_ALL_HELD_CALLS, + hfp.CallHoldOperation.RELEASE_SPECIFIC_CALL, + hfp.CallHoldOperation.CONNECT_TWO_CALLS, + ], supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC], ) # ----------------------------------------------------------------------------- def _default_ag_sdp_features() -> hfp.AgSdpFeature: - return hfp.AgSdpFeature.WIDE_BAND | hfp.AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY + return ( + hfp.AgSdpFeature.WIDE_BAND + | hfp.AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY + | hfp.AgSdpFeature.THREE_WAY_CALLING + ) # ----------------------------------------------------------------------------- @@ -165,6 +186,7 @@ async def test_slc_with_minimal_features(): assert hf.supported_ag_features == ag.supported_ag_features assert hf.supported_hf_features == ag.supported_hf_features + assert hf.supported_ag_call_hold_operations == ag.supported_ag_call_hold_operations for a, b in zip(hf.ag_indicators, ag.ag_indicators): assert a.indicator == b.indicator assert a.current_status == b.current_status @@ -177,6 +199,7 @@ async def test_slc(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): assert hf.supported_ag_features == ag.supported_ag_features assert hf.supported_hf_features == ag.supported_hf_features + assert hf.supported_ag_call_hold_operations == ag.supported_ag_call_hold_operations for a, b in zip(hf.ag_indicators, ag.ag_indicators): assert a.indicator == b.indicator assert a.current_status == b.current_status @@ -281,6 +304,175 @@ async def test_terminate_call(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProto await future +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_query_calls_without_calls( + hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] +): + hf, ag = hfp_connections + + await hf.query_current_calls() == [] + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_query_calls_with_calls( + hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] +): + hf, ag = hfp_connections + ag.calls.append( + hfp.CallInfo( + index=1, + direction=hfp.CallInfoDirection.MOBILE_ORIGINATED_CALL, + status=hfp.CallInfoStatus.ACTIVE, + mode=hfp.CallInfoMode.VOICE, + multi_party=hfp.CallInfoMultiParty.NOT_IN_CONFERENCE, + number='123456789', + ) + ) + + await hf.query_current_calls() == ag.calls + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +@pytest.mark.parametrize( + "operation,", + ( + hfp.CallHoldOperation.RELEASE_ALL_HELD_CALLS, + hfp.CallHoldOperation.RELEASE_ALL_ACTIVE_CALLS, + hfp.CallHoldOperation.HOLD_ALL_ACTIVE_CALLS, + hfp.CallHoldOperation.ADD_HELD_CALL, + hfp.CallHoldOperation.CONNECT_TWO_CALLS, + ), +) +async def test_hold_call_without_call_index( + hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol], + operation: hfp.CallHoldOperation, +): + hf, ag = hfp_connections + call_hold_future = asyncio.get_running_loop().create_future() + ag.on("call_hold", lambda op, index: call_hold_future.set_result((op, index))) + + await hf.execute_command(f"AT+CHLD={operation.value}") + + assert (await call_hold_future) == (operation, None) + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +@pytest.mark.parametrize( + "operation,", + ( + hfp.CallHoldOperation.RELEASE_SPECIFIC_CALL, + hfp.CallHoldOperation.HOLD_ALL_CALLS_EXCEPT, + ), +) +async def test_hold_call_with_call_index( + hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol], + operation: hfp.CallHoldOperation, +): + hf, ag = hfp_connections + call_hold_future = asyncio.get_running_loop().create_future() + ag.on("call_hold", lambda op, index: call_hold_future.set_result((op, index))) + ag.calls.append( + hfp.CallInfo( + index=1, + direction=hfp.CallInfoDirection.MOBILE_ORIGINATED_CALL, + status=hfp.CallInfoStatus.ACTIVE, + mode=hfp.CallInfoMode.VOICE, + multi_party=hfp.CallInfoMultiParty.NOT_IN_CONFERENCE, + number=123456789, + ) + ) + + await hf.execute_command(f"AT+CHLD={operation.value.replace('x', '1')}") + + assert (await call_hold_future) == (operation, 1) + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_ring(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + ring_future = asyncio.get_running_loop().create_future() + hf.on("ring", lambda: ring_future.set_result(None)) + + ag.send_ring() + + await ring_future + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_speaker_volume(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + speaker_volume_future = asyncio.get_running_loop().create_future() + hf.on("speaker_volume", speaker_volume_future.set_result) + + ag.set_speaker_volume(10) + + assert await speaker_volume_future == 10 + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_microphone_volume( + hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] +): + hf, ag = hfp_connections + microphone_volume_future = asyncio.get_running_loop().create_future() + hf.on("microphone_volume", microphone_volume_future.set_result) + + ag.set_microphone_volume(10) + + assert await microphone_volume_future == 10 + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_cli_notification(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + cli_notification_future = asyncio.get_running_loop().create_future() + hf.on("cli_notification", cli_notification_future.set_result) + + ag.send_cli_notification( + hfp.CallLineIdentification(number="\"123456789\"", type=129, alpha="\"Bumble\"") + ) + + assert await cli_notification_future == hfp.CallLineIdentification( + number="123456789", type=129, alpha="Bumble", subaddr="", satype=None + ) + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_voice_recognition_from_hf( + hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] +): + hf, ag = hfp_connections + voice_recognition_future = asyncio.get_running_loop().create_future() + ag.on("voice_recognition", voice_recognition_future.set_result) + + await hf.execute_command("AT+BVRA=1") + + assert await voice_recognition_future == hfp.VoiceRecognitionState.ENABLE + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_voice_recognition_from_ag( + hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] +): + hf, ag = hfp_connections + voice_recognition_future = asyncio.get_running_loop().create_future() + hf.on("voice_recognition", voice_recognition_future.set_result) + + ag.send_response("+BVRA: 1") + + assert await voice_recognition_future == hfp.VoiceRecognitionState.ENABLE + + # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_hf_sdp_record():