Skip to content

Commit

Permalink
Merge pull request #470 from zxzxwu/examples
Browse files Browse the repository at this point in the history
Type hint all examples
  • Loading branch information
zxzxwu authored Apr 15, 2024
2 parents 8758856 + 51a9428 commit 51321ca
Show file tree
Hide file tree
Showing 34 changed files with 242 additions and 129 deletions.
16 changes: 16 additions & 0 deletions bumble/gatt_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,22 @@
logger = logging.getLogger(__name__)


# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------


def show_services(services: Iterable[ServiceProxy]) -> None:
for service in services:
print(color(str(service), 'cyan'))

for characteristic in service.characteristics:
print(color(' ' + str(characteristic), 'magenta'))

for descriptor in characteristic.descriptors:
print(color(' ' + str(descriptor), 'green'))


# -----------------------------------------------------------------------------
# Proxies
# -----------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion bumble/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def __init__(
self.long_term_key_provider = None
self.link_key_provider = None
self.pairing_io_capability_provider = None # Classic only
self.snooper = None
self.snooper: Optional[Snooper] = None

# Connect to the source and sink if specified
if controller_source:
Expand Down
15 changes: 12 additions & 3 deletions bumble/profiles/device_information_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import struct
from typing import Optional, Tuple

from ..gatt_client import ProfileServiceProxy
from ..gatt import (
from bumble.gatt_client import ServiceProxy, ProfileServiceProxy, CharacteristicProxy
from bumble.gatt import (
GATT_DEVICE_INFORMATION_SERVICE,
GATT_FIRMWARE_REVISION_STRING_CHARACTERISTIC,
GATT_HARDWARE_REVISION_STRING_CHARACTERISTIC,
Expand Down Expand Up @@ -104,7 +104,16 @@ def __init__(
class DeviceInformationServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = DeviceInformationService

def __init__(self, service_proxy):
manufacturer_name: Optional[UTF8CharacteristicAdapter]
model_number: Optional[UTF8CharacteristicAdapter]
serial_number: Optional[UTF8CharacteristicAdapter]
hardware_revision: Optional[UTF8CharacteristicAdapter]
firmware_revision: Optional[UTF8CharacteristicAdapter]
software_revision: Optional[UTF8CharacteristicAdapter]
system_id: Optional[DelegatedCharacteristicAdapter]
ieee_regulatory_certification_data_list: Optional[CharacteristicProxy]

def __init__(self, service_proxy: ServiceProxy):
self.service_proxy = service_proxy

for field, uuid in (
Expand Down
2 changes: 1 addition & 1 deletion examples/async_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def func4(x, y):


# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
print("MAIN: start, loop=", asyncio.get_running_loop())
print("MAIN: invoke func1")
func1(1, 2)
Expand Down
12 changes: 9 additions & 3 deletions examples/battery_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,29 @@
import logging
from bumble.colors import color
from bumble.device import Device
from bumble.hci import Address
from bumble.transport import open_transport
from bumble.profiles.battery_service import BatteryServiceProxy


# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print('Usage: battery_client.py <transport-spec> <bluetooth-address>')
print('example: battery_client.py usb:0 E1:CA:72:48:C4:E8')
return

print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport(sys.argv[1]) as hci_transport:
print('<<< connected')

# Create and start a device
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
await device.power_on()

# Connect to the peer
Expand Down
8 changes: 5 additions & 3 deletions examples/battery_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@


# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print('Usage: python battery_server.py <device-config> <transport-spec>')
print('example: python battery_server.py device1.json usb:0')
return

async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)

# Add a Battery Service to the GATT sever
battery_service = BatteryService(lambda _: random.randint(0, 100))
Expand Down
12 changes: 9 additions & 3 deletions examples/device_information_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
import logging
from bumble.colors import color
from bumble.device import Device, Peer
from bumble.hci import Address
from bumble.profiles.device_information_service import DeviceInformationServiceProxy
from bumble.transport import open_transport


# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print(
'Usage: device_information_client.py <transport-spec> <bluetooth-address>'
Expand All @@ -35,11 +36,16 @@ async def main():
return

print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport(sys.argv[1]) as hci_transport:
print('<<< connected')

# Create and start a device
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
await device.power_on()

# Connect to the peer
Expand Down
10 changes: 6 additions & 4 deletions examples/device_information_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@


# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print('Usage: python device_info_server.py <device-config> <transport-spec>')
print('example: python device_info_server.py device1.json usb:0')
return

async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)

# Add a Device Information Service to the GATT sever
device_information_service = DeviceInformationService(
Expand Down Expand Up @@ -64,7 +66,7 @@ async def main():
# Go!
await device.power_on()
await device.start_advertising(auto_restart=True)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()


# -----------------------------------------------------------------------------
Expand Down
12 changes: 9 additions & 3 deletions examples/heart_rate_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,29 @@
import logging
from bumble.colors import color
from bumble.device import Device
from bumble.hci import Address
from bumble.transport import open_transport
from bumble.profiles.heart_rate_service import HeartRateServiceProxy


# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print('Usage: heart_rate_client.py <transport-spec> <bluetooth-address>')
print('example: heart_rate_client.py usb:0 E1:CA:72:48:C4:E8')
return

print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport(sys.argv[1]) as hci_transport:
print('<<< connected')

# Create and start a device
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
await device.power_on()

# Connect to the peer
Expand Down
8 changes: 5 additions & 3 deletions examples/heart_rate_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@


# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print('Usage: python heart_rate_server.py <device-config> <transport-spec>')
print('example: python heart_rate_server.py device1.json usb:0')
return

async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)

# Keep track of accumulated expended energy
energy_start_time = time.time()
Expand Down
8 changes: 5 additions & 3 deletions examples/keyboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ async def serve(websocket, _path):


# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print(
'Usage: python keyboard.py <device-config> <transport-spec> <command>'
Expand All @@ -434,9 +434,11 @@ async def main():
)
return

async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
# Create a device to manage the host
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)

command = sys.argv[3]
if command == 'connect':
Expand Down
10 changes: 6 additions & 4 deletions examples/run_a2dp_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,18 +139,20 @@ async def find_a2dp_service(connection):


# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print('Usage: run_a2dp_info.py <device-config> <transport-spec> <bt-addr>')
print('example: run_a2dp_info.py classic1.json usb:0 14:7D:DA:4E:53:A8')
return

print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')

# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True

# Start the controller
Expand Down Expand Up @@ -187,7 +189,7 @@ async def main():
client = await AVDTP_Protocol.connect(connection, avdtp_version)

# Discover all endpoints on the remote device
endpoints = await client.discover_remote_endpoints()
endpoints = list(await client.discover_remote_endpoints())
print(f'@@@ Found {len(endpoints)} endpoints')
for endpoint in endpoints:
print('@@@', endpoint)
Expand Down
13 changes: 8 additions & 5 deletions examples/run_a2dp_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import sys
import os
import logging
from typing import Any, Dict

from bumble.device import Device
from bumble.transport import open_transport_or_link
Expand All @@ -41,7 +42,7 @@
SbcMediaCodecInformation,
)

Context = {'output': None}
Context: Dict[Any, Any] = {'output': None}


# -----------------------------------------------------------------------------
Expand Down Expand Up @@ -104,7 +105,7 @@ def on_rtp_packet(packet):


# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print(
'Usage: run_a2dp_sink.py <device-config> <transport-spec> <sbc-file> '
Expand All @@ -114,14 +115,16 @@ async def main():
return

print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')

with open(sys.argv[3], 'wb') as sbc_file:
Context['output'] = sbc_file

# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True

# Setup the SDP to expose the sink service
Expand Down Expand Up @@ -162,7 +165,7 @@ async def main():
await device.set_discoverable(True)
await device.set_connectable(True)

await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()


# -----------------------------------------------------------------------------
Expand Down
10 changes: 6 additions & 4 deletions examples/run_a2dp_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ async def stream_packets(read_function, protocol):


# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print(
'Usage: run_a2dp_source.py <device-config> <transport-spec> <sbc-file> '
Expand All @@ -126,11 +126,13 @@ async def main():
return

print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')

# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True

# Setup the SDP to expose the SRC service
Expand Down Expand Up @@ -186,7 +188,7 @@ async def read(byte_count):
await device.set_discoverable(True)
await device.set_connectable(True)

await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()


# -----------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 51321ca

Please sign in to comment.