Skip to content

Commit

Permalink
Allows more flexible scanning
Browse files Browse the repository at this point in the history
This patch uses the BleakScanner interface to simplify the scanning as
well as provide more means of connecting. With an address, a BLEDevice,
or with a user provided scanner.

This change will allow me to implement a Home Assistant integration
using [Bluetooth
discovery](https://developers.home-assistant.io/docs/network_discovery/#best-practices-for-library-authors).

Other changes: `black`
  • Loading branch information
ViViDboarder committed Aug 25, 2022
1 parent 339abca commit 4bc845c
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 20 deletions.
1 change: 1 addition & 0 deletions pyhatchbabyrest/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
COLOR_GRADIENT = (254, 254, 254) # setting this color turns on Gradient mode
CHAR_TX = "02240002-5efd-47eb-9c1a-de53f7a2b232"
CHAR_FEEDBACK = "02260002-5efd-47eb-9c1a-de53f7a2b232"
BT_MANUFACTURER_ID = 1076


class PyHatchBabyRestSound(IntEnum):
Expand Down
56 changes: 36 additions & 20 deletions pyhatchbabyrest/pyhatchbabyrestasync.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,53 @@
import asyncio
from typing import Optional
from typing import Union

from bleak import BleakClient, discover # type: ignore
from bleak import BleakClient
from bleak import BleakScanner
from bleak.backends.device import BLEDevice

from .constants import CHAR_TX, CHAR_FEEDBACK, PyHatchBabyRestSound
from .constants import BT_MANUFACTURER_ID
from .constants import CHAR_FEEDBACK
from .constants import CHAR_TX
from .constants import PyHatchBabyRestSound


class PyHatchBabyRestAsync(object):
""" An asynchronous interface to a Hatch Baby Rest device using bleak. """
def __init__(self, addr: str = None):
"""An asynchronous interface to a Hatch Baby Rest device using bleak."""

def __init__(
self,
address_or_ble_device: Union[str, BLEDevice, None],
scanner: Optional[BleakScanner],
):
loop = asyncio.get_event_loop()
devices = loop.run_until_complete(discover())

for device in devices:
if addr:
if device.address == addr:
self.device = device
break
else:
try:
if 1076 in device.metadata["manufacturer_data"].keys():
self.device = device
break
except KeyError:
pass
scanner = BleakScanner() if scanner is None else scanner

if not address_or_ble_device:
device = loop.run_until_complete(
scanner.find_device_by_filter(
lambda device, _: BT_MANUFACTURER_ID
in device.metadata["manufacturer_data"].keys()
)
)
elif isinstance(address_or_ble_device, BLEDevice):
device = address_or_ble_device
else:
device = loop.run_until_complete(
scanner.find_device_by_address(address_or_ble_device)
)

if device is None:
raise RuntimeError(
"No address provided and could not find device via scan."
"No address or BLEDevice provided and could not find device via scan."
)

self.device = device

loop.run_until_complete(self._refresh_data())

async def _send_command(self, command: str):
""" Send a command do the device.
"""Send a command do the device.
:param command: The command to send.
"""
Expand Down

0 comments on commit 4bc845c

Please sign in to comment.