From 51a94288e2294236a6e802e5d9fa4fa7c0116d51 Mon Sep 17 00:00:00 2001 From: zxzxwu <92432172+zxzxwu@users.noreply.github.com> Date: Mon, 15 Apr 2024 12:16:01 +0000 Subject: [PATCH] Type hint all examples --- bumble/gatt_client.py | 16 ++++++++++++++++ bumble/host.py | 2 +- bumble/profiles/device_information_service.py | 15 ++++++++++++--- examples/async_runner.py | 2 +- examples/battery_client.py | 12 +++++++++--- examples/battery_server.py | 8 +++++--- examples/device_information_client.py | 12 +++++++++--- examples/device_information_server.py | 10 ++++++---- examples/heart_rate_client.py | 12 +++++++++--- examples/heart_rate_server.py | 8 +++++--- examples/keyboard.py | 8 +++++--- examples/run_a2dp_info.py | 10 ++++++---- examples/run_a2dp_sink.py | 13 ++++++++----- examples/run_a2dp_source.py | 10 ++++++---- examples/run_advertiser.py | 10 ++++++---- examples/run_asha_sink.py | 10 ++++++---- examples/run_avrcp.py | 8 +++++--- examples/run_classic_connect.py | 8 +++++--- examples/run_classic_discoverable.py | 10 ++++++---- examples/run_classic_discovery.py | 15 ++++++++++----- examples/run_connect_and_encrypt.py | 10 ++++++---- examples/run_controller.py | 11 +++++++---- examples/run_controller_with_scanner.py | 17 ++++++++++------- examples/run_device_with_snooper.py | 15 ++++++++++----- examples/run_gatt_client.py | 8 +++++--- examples/run_gatt_client_and_server.py | 14 +++++++++----- examples/run_gatt_server.py | 10 ++++++---- examples/run_hfp_handsfree.py | 14 +++++++++----- examples/run_hid_device.py | 10 ++++++---- examples/run_hid_host.py | 10 ++++++---- examples/run_notifier.py | 8 +++++--- examples/run_rfcomm_client.py | 16 +++++++++------- examples/run_rfcomm_server.py | 10 ++++++---- examples/run_scanner.py | 19 ++++++++++++------- 34 files changed, 242 insertions(+), 129 deletions(-) diff --git a/bumble/gatt_client.py b/bumble/gatt_client.py index abb0de5e..c71aabd7 100644 --- a/bumble/gatt_client.py +++ b/bumble/gatt_client.py @@ -90,6 +90,22 @@ logger = logging.getLogger(__name__) +# ----------------------------------------------------------------------------- +# Utils +# ----------------------------------------------------------------------------- + + +def show_services(services: Iterable[ServiceProxy]) -> None: + for service in services: + print(color(str(service), 'cyan')) + + for characteristic in service.characteristics: + print(color(' ' + str(characteristic), 'magenta')) + + for descriptor in characteristic.descriptors: + print(color(' ' + str(descriptor), 'green')) + + # ----------------------------------------------------------------------------- # Proxies # ----------------------------------------------------------------------------- diff --git a/bumble/host.py b/bumble/host.py index fd0a2470..609012ac 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -184,7 +184,7 @@ def __init__( self.long_term_key_provider = None self.link_key_provider = None self.pairing_io_capability_provider = None # Classic only - self.snooper = None + self.snooper: Optional[Snooper] = None # Connect to the source and sink if specified if controller_source: diff --git a/bumble/profiles/device_information_service.py b/bumble/profiles/device_information_service.py index f5c6987f..ecb1c0f8 100644 --- a/bumble/profiles/device_information_service.py +++ b/bumble/profiles/device_information_service.py @@ -19,8 +19,8 @@ import struct from typing import Optional, Tuple -from ..gatt_client import ProfileServiceProxy -from ..gatt import ( +from bumble.gatt_client import ServiceProxy, ProfileServiceProxy, CharacteristicProxy +from bumble.gatt import ( GATT_DEVICE_INFORMATION_SERVICE, GATT_FIRMWARE_REVISION_STRING_CHARACTERISTIC, GATT_HARDWARE_REVISION_STRING_CHARACTERISTIC, @@ -104,7 +104,16 @@ def __init__( class DeviceInformationServiceProxy(ProfileServiceProxy): SERVICE_CLASS = DeviceInformationService - def __init__(self, service_proxy): + manufacturer_name: Optional[UTF8CharacteristicAdapter] + model_number: Optional[UTF8CharacteristicAdapter] + serial_number: Optional[UTF8CharacteristicAdapter] + hardware_revision: Optional[UTF8CharacteristicAdapter] + firmware_revision: Optional[UTF8CharacteristicAdapter] + software_revision: Optional[UTF8CharacteristicAdapter] + system_id: Optional[DelegatedCharacteristicAdapter] + ieee_regulatory_certification_data_list: Optional[CharacteristicProxy] + + def __init__(self, service_proxy: ServiceProxy): self.service_proxy = service_proxy for field, uuid in ( diff --git a/examples/async_runner.py b/examples/async_runner.py index 9bc004d3..b29a80f1 100644 --- a/examples/async_runner.py +++ b/examples/async_runner.py @@ -61,7 +61,7 @@ async def func4(x, y): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: print("MAIN: start, loop=", asyncio.get_running_loop()) print("MAIN: invoke func1") func1(1, 2) diff --git a/examples/battery_client.py b/examples/battery_client.py index 3cf11b46..e9105dbc 100644 --- a/examples/battery_client.py +++ b/examples/battery_client.py @@ -21,23 +21,29 @@ import logging from bumble.colors import color from bumble.device import Device +from bumble.hci import Address from bumble.transport import open_transport from bumble.profiles.battery_service import BatteryServiceProxy # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) != 3: print('Usage: battery_client.py ') print('example: battery_client.py usb:0 E1:CA:72:48:C4:E8') return print('<<< connecting to HCI...') - async with await open_transport(sys.argv[1]) as (hci_source, hci_sink): + async with await open_transport(sys.argv[1]) as hci_transport: print('<<< connected') # Create and start a device - device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) await device.power_on() # Connect to the peer diff --git a/examples/battery_server.py b/examples/battery_server.py index b7f941f1..f509954f 100644 --- a/examples/battery_server.py +++ b/examples/battery_server.py @@ -29,14 +29,16 @@ # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) != 3: print('Usage: python battery_server.py ') print('example: python battery_server.py device1.json usb:0') return - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + async with await open_transport_or_link(sys.argv[2]) as hci_transport: + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) # Add a Battery Service to the GATT sever battery_service = BatteryService(lambda _: random.randint(0, 100)) diff --git a/examples/device_information_client.py b/examples/device_information_client.py index 416aa2f1..f6d51ba1 100644 --- a/examples/device_information_client.py +++ b/examples/device_information_client.py @@ -21,12 +21,13 @@ import logging from bumble.colors import color from bumble.device import Device, Peer +from bumble.hci import Address from bumble.profiles.device_information_service import DeviceInformationServiceProxy from bumble.transport import open_transport # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) != 3: print( 'Usage: device_information_client.py ' @@ -35,11 +36,16 @@ async def main(): return print('<<< connecting to HCI...') - async with await open_transport(sys.argv[1]) as (hci_source, hci_sink): + async with await open_transport(sys.argv[1]) as hci_transport: print('<<< connected') # Create and start a device - device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) await device.power_on() # Connect to the peer diff --git a/examples/device_information_server.py b/examples/device_information_server.py index d437caee..92474bc3 100644 --- a/examples/device_information_server.py +++ b/examples/device_information_server.py @@ -28,14 +28,16 @@ # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) != 3: print('Usage: python device_info_server.py ') print('example: python device_info_server.py device1.json usb:0') return - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + async with await open_transport_or_link(sys.argv[2]) as hci_transport: + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) # Add a Device Information Service to the GATT sever device_information_service = DeviceInformationService( @@ -64,7 +66,7 @@ async def main(): # Go! await device.power_on() await device.start_advertising(auto_restart=True) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/heart_rate_client.py b/examples/heart_rate_client.py index ecfcffbc..ea7bc365 100644 --- a/examples/heart_rate_client.py +++ b/examples/heart_rate_client.py @@ -21,23 +21,29 @@ import logging from bumble.colors import color from bumble.device import Device +from bumble.hci import Address from bumble.transport import open_transport from bumble.profiles.heart_rate_service import HeartRateServiceProxy # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) != 3: print('Usage: heart_rate_client.py ') print('example: heart_rate_client.py usb:0 E1:CA:72:48:C4:E8') return print('<<< connecting to HCI...') - async with await open_transport(sys.argv[1]) as (hci_source, hci_sink): + async with await open_transport(sys.argv[1]) as hci_transport: print('<<< connected') # Create and start a device - device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) await device.power_on() # Connect to the peer diff --git a/examples/heart_rate_server.py b/examples/heart_rate_server.py index fad809f0..e40d5db4 100644 --- a/examples/heart_rate_server.py +++ b/examples/heart_rate_server.py @@ -33,14 +33,16 @@ # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) != 3: print('Usage: python heart_rate_server.py ') print('example: python heart_rate_server.py device1.json usb:0') return - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + async with await open_transport_or_link(sys.argv[2]) as hci_transport: + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) # Keep track of accumulated expended energy energy_start_time = time.time() diff --git a/examples/keyboard.py b/examples/keyboard.py index 314a805d..f2afe189 100644 --- a/examples/keyboard.py +++ b/examples/keyboard.py @@ -416,7 +416,7 @@ async def serve(websocket, _path): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 4: print( 'Usage: python keyboard.py ' @@ -434,9 +434,11 @@ async def main(): ) return - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: # Create a device to manage the host - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) command = sys.argv[3] if command == 'connect': diff --git a/examples/run_a2dp_info.py b/examples/run_a2dp_info.py index 3a35695c..e05c87ed 100644 --- a/examples/run_a2dp_info.py +++ b/examples/run_a2dp_info.py @@ -139,18 +139,20 @@ async def find_a2dp_service(connection): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 4: print('Usage: run_a2dp_info.py ') print('example: run_a2dp_info.py classic1.json usb:0 14:7D:DA:4E:53:A8') return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True # Start the controller @@ -187,7 +189,7 @@ async def main(): client = await AVDTP_Protocol.connect(connection, avdtp_version) # Discover all endpoints on the remote device - endpoints = await client.discover_remote_endpoints() + endpoints = list(await client.discover_remote_endpoints()) print(f'@@@ Found {len(endpoints)} endpoints') for endpoint in endpoints: print('@@@', endpoint) diff --git a/examples/run_a2dp_sink.py b/examples/run_a2dp_sink.py index 61bdce32..ca1af84f 100644 --- a/examples/run_a2dp_sink.py +++ b/examples/run_a2dp_sink.py @@ -19,6 +19,7 @@ import sys import os import logging +from typing import Any, Dict from bumble.device import Device from bumble.transport import open_transport_or_link @@ -41,7 +42,7 @@ SbcMediaCodecInformation, ) -Context = {'output': None} +Context: Dict[Any, Any] = {'output': None} # ----------------------------------------------------------------------------- @@ -104,7 +105,7 @@ def on_rtp_packet(packet): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 4: print( 'Usage: run_a2dp_sink.py ' @@ -114,14 +115,16 @@ async def main(): return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') with open(sys.argv[3], 'wb') as sbc_file: Context['output'] = sbc_file # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True # Setup the SDP to expose the sink service @@ -162,7 +165,7 @@ async def main(): await device.set_discoverable(True) await device.set_connectable(True) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_a2dp_source.py b/examples/run_a2dp_source.py index 46452293..a1f955b5 100644 --- a/examples/run_a2dp_source.py +++ b/examples/run_a2dp_source.py @@ -114,7 +114,7 @@ async def stream_packets(read_function, protocol): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 4: print( 'Usage: run_a2dp_source.py ' @@ -126,11 +126,13 @@ async def main(): return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True # Setup the SDP to expose the SRC service @@ -186,7 +188,7 @@ async def read(byte_count): await device.set_discoverable(True) await device.set_connectable(True) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_advertiser.py b/examples/run_advertiser.py index fb594267..4c67d10c 100644 --- a/examples/run_advertiser.py +++ b/examples/run_advertiser.py @@ -28,7 +28,7 @@ # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 3: print( 'Usage: run_advertiser.py [type] [address]' @@ -50,10 +50,12 @@ async def main(): target = None print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) if advertising_type.is_scannable: device.scan_response_data = bytes( @@ -66,7 +68,7 @@ async def main(): await device.power_on() await device.start_advertising(advertising_type=advertising_type, target=target) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_asha_sink.py b/examples/run_asha_sink.py index 2d6f0d53..105eb751 100644 --- a/examples/run_asha_sink.py +++ b/examples/run_asha_sink.py @@ -49,7 +49,7 @@ # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) != 4: print( 'Usage: python run_asha_sink.py ' @@ -60,8 +60,10 @@ async def main(): audio_out = open(sys.argv[3], 'wb') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + async with await open_transport_or_link(sys.argv[2]) as hci_transport: + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) # Handler for audio control commands def on_audio_control_point_write(_connection, value): @@ -197,7 +199,7 @@ def on_data(data): await device.power_on() await device.start_advertising(auto_restart=True) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_avrcp.py b/examples/run_avrcp.py index 4bb41437..793e0007 100644 --- a/examples/run_avrcp.py +++ b/examples/run_avrcp.py @@ -331,7 +331,7 @@ async def set_absolute_volume(self, volume: int) -> None: # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 3: print( 'Usage: run_avrcp_controller.py ' @@ -341,11 +341,13 @@ async def main(): return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True # Setup the SDP to expose the sink service diff --git a/examples/run_classic_connect.py b/examples/run_classic_connect.py index 0acaedd2..362e6b8f 100644 --- a/examples/run_classic_connect.py +++ b/examples/run_classic_connect.py @@ -32,7 +32,7 @@ # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 3: print( 'Usage: run_classic_connect.py ' @@ -42,11 +42,13 @@ async def main(): return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True device.le_enabled = False await device.power_on() diff --git a/examples/run_classic_discoverable.py b/examples/run_classic_discoverable.py index 076a9ec5..52f47fc4 100644 --- a/examples/run_classic_discoverable.py +++ b/examples/run_classic_discoverable.py @@ -91,18 +91,20 @@ # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 3: print('Usage: run_classic_discoverable.py ') print('example: run_classic_discoverable.py classic1.json usb:04b4:f901') return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True device.sdp_service_records = SDP_SERVICE_RECORDS await device.power_on() @@ -111,7 +113,7 @@ async def main(): await device.set_discoverable(True) await device.set_connectable(True) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_classic_discovery.py b/examples/run_classic_discovery.py index 569c8b38..af35bb70 100644 --- a/examples/run_classic_discovery.py +++ b/examples/run_classic_discovery.py @@ -20,8 +20,8 @@ import os import logging from bumble.colors import color - from bumble.device import Device +from bumble.hci import Address from bumble.transport import open_transport_or_link from bumble.core import DeviceClass @@ -53,22 +53,27 @@ def on_inquiry_result(self, address, class_of_device, data, rssi): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) != 2: print('Usage: run_classic_discovery.py ') print('example: run_classic_discovery.py usb:04b4:f901') return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[1]) as hci_transport: print('<<< connected') - device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) device.listener = DiscoveryListener() await device.power_on() await device.start_discovery() - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_connect_and_encrypt.py b/examples/run_connect_and_encrypt.py index b541a0e9..3b9d1800 100644 --- a/examples/run_connect_and_encrypt.py +++ b/examples/run_connect_and_encrypt.py @@ -25,7 +25,7 @@ # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 3: print( 'Usage: run_connect_and_encrypt.py ' @@ -37,11 +37,13 @@ async def main(): return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) await device.power_on() # Connect to the peer @@ -56,7 +58,7 @@ async def main(): print(f'!!! Encryption failed: {error}') return - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_controller.py b/examples/run_controller.py index 596ac8b1..05dedfcd 100644 --- a/examples/run_controller.py +++ b/examples/run_controller.py @@ -36,7 +36,7 @@ # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) != 4: print( 'Usage: run_controller.py ' @@ -49,7 +49,7 @@ async def main(): return print('>>> connecting to HCI...') - async with await open_transport_or_link(sys.argv[3]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[3]) as hci_transport: print('>>> connected') # Create a local link @@ -57,7 +57,10 @@ async def main(): # Create a first controller using the packet source/sink as its host interface controller1 = Controller( - 'C1', host_source=hci_source, host_sink=hci_sink, link=link + 'C1', + host_source=hci_transport.source, + host_sink=hci_transport.sink, + link=link, ) controller1.random_address = sys.argv[1] @@ -98,7 +101,7 @@ async def main(): await device.start_advertising() await device.start_scanning() - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_controller_with_scanner.py b/examples/run_controller_with_scanner.py index 9603cff8..9e935a96 100644 --- a/examples/run_controller_with_scanner.py +++ b/examples/run_controller_with_scanner.py @@ -20,9 +20,9 @@ import sys import os from bumble.colors import color - from bumble.device import Device from bumble.controller import Controller +from bumble.hci import Address from bumble.link import LocalLink from bumble.transport import open_transport_or_link @@ -45,14 +45,14 @@ def on_advertisement(self, advertisement): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) != 2: print('Usage: run_controller.py ') print('example: run_controller_with_scanner.py serial:/dev/pts/14,1000000') return print('>>> connecting to HCI...') - async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[1]) as hci_transport: print('>>> connected') # Create a local link @@ -60,22 +60,25 @@ async def main(): # Create a first controller using the packet source/sink as its host interface controller1 = Controller( - 'C1', host_source=hci_source, host_sink=hci_sink, link=link + 'C1', + host_source=hci_transport.source, + host_sink=hci_transport.sink, + link=link, + public_address='E0:E1:E2:E3:E4:E5', ) - controller1.address = 'E0:E1:E2:E3:E4:E5' # Create a second controller using the same link controller2 = Controller('C2', link=link) # Create a device with a scanner listener device = Device.with_hci( - 'Bumble', 'F0:F1:F2:F3:F4:F5', controller2, controller2 + 'Bumble', Address('F0:F1:F2:F3:F4:F5'), controller2, controller2 ) device.listener = ScannerListener() await device.power_on() await device.start_scanning() - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_device_with_snooper.py b/examples/run_device_with_snooper.py index be9a40e8..87307e8d 100644 --- a/examples/run_device_with_snooper.py +++ b/examples/run_device_with_snooper.py @@ -20,31 +20,36 @@ import os import logging from bumble.colors import color - +from bumble.hci import Address from bumble.device import Device from bumble.transport import open_transport_or_link from bumble.snoop import BtSnooper # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) != 3: print('Usage: run_device_with_snooper.py ') print('example: run_device_with_snooper.py usb:0 btsnoop.log') return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[1]) as hci_transport: print('<<< connected') - device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) with open(sys.argv[2], "wb") as snoop_file: device.host.snooper = BtSnooper(snoop_file) await device.power_on() await device.start_scanning() - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_gatt_client.py b/examples/run_gatt_client.py index dcf8a1b9..45481591 100644 --- a/examples/run_gatt_client.py +++ b/examples/run_gatt_client.py @@ -69,7 +69,7 @@ async def on_connection(self, connection): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 3: print( 'Usage: run_gatt_client.py ' @@ -79,11 +79,13 @@ async def main(): return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device to manage the host, with a custom listener - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.listener = Listener(device) await device.power_on() diff --git a/examples/run_gatt_client_and_server.py b/examples/run_gatt_client_and_server.py index 609fe183..e25d14cf 100644 --- a/examples/run_gatt_client_and_server.py +++ b/examples/run_gatt_client_and_server.py @@ -19,21 +19,21 @@ import os import logging from bumble.colors import color - from bumble.core import ProtocolError from bumble.controller import Controller from bumble.device import Device, Peer +from bumble.hci import Address from bumble.host import Host from bumble.link import LocalLink from bumble.gatt import ( Service, Characteristic, Descriptor, - show_services, GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR, GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, GATT_DEVICE_INFORMATION_SERVICE, ) +from bumble.gatt_client import show_services # ----------------------------------------------------------------------------- @@ -43,7 +43,7 @@ def on_connection(self, connection): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: # Create a local link link = LocalLink() @@ -51,14 +51,18 @@ async def main(): client_controller = Controller("client controller", link=link) client_host = Host() client_host.controller = client_controller - client_device = Device("client", address='F0:F1:F2:F3:F4:F5', host=client_host) + client_device = Device( + "client", address=Address('F0:F1:F2:F3:F4:F5'), host=client_host + ) await client_device.power_on() # Setup a stack for the server server_controller = Controller("server controller", link=link) server_host = Host() server_host.controller = server_controller - server_device = Device("server", address='F6:F7:F8:F9:FA:FB', host=server_host) + server_device = Device( + "server", address=Address('F6:F7:F8:F9:FA:FB'), host=server_host + ) server_device.listener = ServerListener() await server_device.power_on() diff --git a/examples/run_gatt_server.py b/examples/run_gatt_server.py index 46d42a28..874115cf 100644 --- a/examples/run_gatt_server.py +++ b/examples/run_gatt_server.py @@ -71,7 +71,7 @@ def my_custom_write_with_error(connection, value): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 3: print( 'Usage: run_gatt_server.py ' @@ -81,11 +81,13 @@ async def main(): return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device to manage the host - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.listener = Listener(device) # Add a few entries to the device's GATT server @@ -146,7 +148,7 @@ async def main(): else: await device.start_advertising(auto_restart=True) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_hfp_handsfree.py b/examples/run_hfp_handsfree.py index 6c5f3a19..5433284c 100644 --- a/examples/run_hfp_handsfree.py +++ b/examples/run_hfp_handsfree.py @@ -84,14 +84,14 @@ def on_ag_indicator(indicator): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 3: print('Usage: run_classic_hfp.py ') print('example: run_classic_hfp.py classic2.json usb:04b4:f901') return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Hands-Free profile configuration. @@ -116,7 +116,9 @@ async def main(): ) # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True # Create and register a server @@ -128,7 +130,9 @@ async def main(): # Advertise the HFP RFComm channel in the SDP device.sdp_service_records = { - 0x00010001: hfp.sdp_records(0x00010001, channel_number, configuration) + 0x00010001: hfp.make_hf_sdp_records( + 0x00010001, channel_number, configuration + ) } # Let's go! @@ -164,7 +168,7 @@ async def serve(websocket: websockets.WebSocketServerProtocol, _path): await websockets.serve(serve, 'localhost', 8989) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_hid_device.py b/examples/run_hid_device.py index 26bc9961..2287be09 100644 --- a/examples/run_hid_device.py +++ b/examples/run_hid_device.py @@ -489,7 +489,7 @@ async def serve(websocket, _path): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 3: print( 'Usage: python run_hid_device.py ' @@ -601,11 +601,13 @@ def on_virtual_cable_unplug_cb(): asyncio.create_task(handle_virtual_cable_unplug()) print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True # Create and register HID device @@ -742,7 +744,7 @@ async def menu(): print("Executing in Web mode") await keyboard_device(hid_device) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_hid_host.py b/examples/run_hid_host.py index 7519b4e1..cc17cc13 100644 --- a/examples/run_hid_host.py +++ b/examples/run_hid_host.py @@ -275,7 +275,7 @@ async def get_stream_reader(pipe) -> asyncio.StreamReader: # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 4: print( 'Usage: run_hid_host.py ' @@ -324,11 +324,13 @@ def on_hid_virtual_cable_unplug_cb(): asyncio.create_task(handle_virtual_cable_unplug()) print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< CONNECTED') # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True # Create HID host and start it @@ -557,7 +559,7 @@ async def menu(): # Interrupt Channel await hid_host.connect_interrupt_channel() - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_notifier.py b/examples/run_notifier.py index 5f6def3b..869e716c 100644 --- a/examples/run_notifier.py +++ b/examples/run_notifier.py @@ -57,18 +57,20 @@ def on_my_characteristic_subscription(peer, enabled): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 3: print('Usage: run_notifier.py ') print('example: run_notifier.py device1.json usb:0') return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device to manage the host - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.listener = Listener(device) # Add a few entries to the device's GATT server diff --git a/examples/run_rfcomm_client.py b/examples/run_rfcomm_client.py index 39ee7763..9232dc95 100644 --- a/examples/run_rfcomm_client.py +++ b/examples/run_rfcomm_client.py @@ -165,7 +165,7 @@ async def tcp_server(tcp_port, rfcomm_session): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 5: print( 'Usage: run_rfcomm_client.py ' @@ -178,11 +178,13 @@ async def main(): return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True await device.power_on() @@ -192,8 +194,8 @@ async def main(): connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) print(f'=== Connected to {connection.peer_address}!') - channel = sys.argv[4] - if channel == 'discover': + channel_str = sys.argv[4] + if channel_str == 'discover': await list_rfcomm_channels(connection) return @@ -213,7 +215,7 @@ async def main(): rfcomm_mux = await rfcomm_client.start() print('@@@ Started') - channel = int(channel) + channel = int(channel_str) print(f'### Opening session for channel {channel}...') try: session = await rfcomm_mux.open_dlc(channel) @@ -229,7 +231,7 @@ async def main(): tcp_port = int(sys.argv[5]) asyncio.create_task(tcp_server(tcp_port, session)) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_rfcomm_server.py b/examples/run_rfcomm_server.py index 41915a43..14bc7eba 100644 --- a/examples/run_rfcomm_server.py +++ b/examples/run_rfcomm_server.py @@ -107,7 +107,7 @@ async def run(self, port): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 4: print( 'Usage: run_rfcomm_server.py ' @@ -124,11 +124,13 @@ async def main(): uuid = 'E6D55659-C8B4-4B85-96BB-B1143AF6D3AE' print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True # Create a TCP server @@ -153,7 +155,7 @@ async def main(): await device.set_discoverable(True) await device.set_connectable(True) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_scanner.py b/examples/run_scanner.py index 4a094b9f..5748b486 100644 --- a/examples/run_scanner.py +++ b/examples/run_scanner.py @@ -20,27 +20,31 @@ import os import logging from bumble.colors import color - +from bumble.hci import Address from bumble.device import Device from bumble.transport import open_transport_or_link # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 2: print('Usage: run_scanner.py [filter]') print('example: run_scanner.py usb:0') return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[1]) as hci_transport: print('<<< connected') filter_duplicates = len(sys.argv) == 3 and sys.argv[2] == 'filter' - device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) - @device.on('advertisement') - def _(advertisement): + def on_adv(advertisement): address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[ advertisement.address.address_type ] @@ -67,10 +71,11 @@ def _(advertisement): f'{advertisement.data.to_string(separator)}' ) + device.on('advertisement', on_adv) await device.power_on() await device.start_scanning(filter_duplicates=filter_duplicates) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # -----------------------------------------------------------------------------