From 1f24785408d6d673e575a3335a28c55d7f2cea93 Mon Sep 17 00:00:00 2001 From: Lenn Date: Fri, 22 Mar 2024 22:19:05 +0100 Subject: [PATCH] Add is_charging and is_wired, and add None options for callbacks --- motionblindsble/device.py | 95 +++++++++++++++++++++++++++++---------- tests/test_device.py | 22 +++++---- 2 files changed, 86 insertions(+), 31 deletions(-) diff --git a/motionblindsble/device.py b/motionblindsble/device.py index adb1b4a..da4eb21 100644 --- a/motionblindsble/device.py +++ b/motionblindsble/device.py @@ -305,7 +305,9 @@ class MotionDevice: _running_type: MotionRunningType | None = None _position: int | None _tilt: int | None - _battery: int | None + _battery_percentage: int | None + _is_charging: bool | None + _is_wired: bool | None _speed: int | None _end_position_info: MotionPositionInfo | None _received_end_position_info_event: Event @@ -325,20 +327,33 @@ class MotionDevice: _disabled_connection_callbacks: list[MotionCallback] _status_callbacks: list[ Callable[ - [int, int, int, MotionSpeedLevel | None, MotionPositionInfo], None + [ + int | None, + int | None, + int | None, + bool | None, + bool | None, + MotionSpeedLevel | None, + MotionPositionInfo | None, + ], + None, ] ] - _feedback_callbacks: list[Callable[[int, int, MotionPositionInfo], None]] - _position_callbacks: list[Callable[[int, int], None]] - _battery_callbacks: list[Callable[[int], None]] + _feedback_callbacks: list[ + Callable[[int | None, int | None, MotionPositionInfo | None], None] + ] + _position_callbacks: list[Callable[[int | None, int | None], None]] + _battery_callbacks: list[ + Callable[[int | None, bool | None, bool | None], None] + ] _speed_callbacks: list[Callable[[MotionSpeedLevel | None], None]] _end_position_callbacks: list[Callable[[MotionSpeedLevel | None], None]] _connection_callbacks: list[Callable[[MotionConnectionType], None]] _calibration_callbacks: list[ Callable[[MotionCalibrationType | None], None] ] - _running_callbacks: list[Callable[[MotionRunningType], None]] - _signal_strength_callbacks: list[Callable[[int], None]] + _running_callbacks: list[Callable[[MotionRunningType | None], None]] + _signal_strength_callbacks: list[Callable[[int | None], None]] def __init__( self, @@ -561,7 +576,10 @@ def _notification_callback( elif decrypted_message.startswith(MotionNotificationType.STATUS.value): position: int = decrypted_message_bytes[6] tilt: int = decrypted_message_bytes[7] - battery_percentage: int = decrypted_message_bytes[17] + battery: int = decrypted_message_bytes[17] + battery_percentage = min([battery & 0x7F, 100]) + is_charging: bool = bool(battery & 0x80) + is_wired: bool = battery == 0xFF try: speed_level: MotionSpeedLevel | None = MotionSpeedLevel( decrypted_message_bytes[12] @@ -582,6 +600,8 @@ def _notification_callback( position, tilt, battery_percentage, + is_charging, + is_wired, speed_level, end_position_info, ) @@ -601,7 +621,8 @@ def _disconnect_callback(self, _: BleakClient) -> None: "(%s) Automatically reconnecting using create_task", self.ble_device.address, ) - self._create_task(target=self.connect()) # type: ignore[call-arg] + # type: ignore[call-arg] + self._create_task(target=self.connect()) else: _LOGGER.debug( "(%s) Automatically reconnecting", self.ble_device.address @@ -893,6 +914,8 @@ def update_status( position: int | None, tilt: int | None, battery_percentage: int | None, + is_charging: bool | None, + is_wired: bool | None, speed_level: MotionSpeedLevel | None, end_position_info: MotionPositionInfo | None, ) -> None: @@ -901,19 +924,21 @@ def update_status( _LOGGER.debug( ( "(%s) Received status; position: %s, tilt: %s, " - "battery: %s, speed: %s, end positions: %s, " - "favorite position set: %s" + "battery percentage: %s, is charging: %s, is wired: %s, " + "speed: %s, end positions: %s, favorite position set: %s" ), self.ble_device.address, str(position), str(tilt), str(battery_percentage), + is_charging, + is_wired, speed_level.name if speed_level is not None else None, end_position_info.end_positions.name, end_position_info.favorite_position, ) self.update_position(position, tilt) - self.update_battery(battery_percentage) + self.update_battery(battery_percentage, is_charging, is_wired) self.update_speed(speed_level) self.update_end_position_info(end_position_info) if self._is_connection_callback_disabled(MotionCallback.STATUS): @@ -923,6 +948,8 @@ def update_status( position, tilt, battery_percentage, + is_charging, + is_wired, speed_level, end_position_info, ) @@ -1000,18 +1027,28 @@ def update_running( for running_callback in self._running_callbacks: running_callback(running_type) - def update_battery(self, battery_percentage: int | None) -> None: + def update_battery( + self, + battery_percentage: int | None, + is_charging: bool | None, + is_wired: bool | None, + ) -> None: """Update the battery percentage.""" _LOGGER.debug( - "(%s) Updating battery: %s", + "(%s) Updating battery; percentage: %s, " + "is charging: %s, is wired: %s", self.ble_device.address, battery_percentage, + is_charging, + is_wired, ) - self._battery = battery_percentage + self._battery_percentage = battery_percentage + self._is_charging = is_charging + self._is_wired = is_wired if self._is_connection_callback_disabled(MotionCallback.BATTERY): return for battery_callback in self._battery_callbacks: - battery_callback(battery_percentage) + battery_callback(battery_percentage, is_charging, is_wired) def update_speed(self, speed_level: MotionSpeedLevel | None) -> None: """Update the speed to a particular speed level.""" @@ -1091,7 +1128,16 @@ def update_signal_strength(self, rssi: int | None) -> None: def register_status_callback( self, callback: Callable[ - [int, int, int, MotionSpeedLevel | None, MotionPositionInfo], None + [ + int | None, + int | None, + int | None, + bool | None, + bool | None, + MotionSpeedLevel | None, + MotionPositionInfo | None, + ], + None, ], ) -> None: """Register the callback used to update when status is received. @@ -1099,7 +1145,10 @@ def register_status_callback( self._status_callbacks.append(callback) def register_feedback_callback( - self, callback: Callable[[int, int, MotionPositionInfo], None] + self, + callback: Callable[ + [int | None, int | None, MotionPositionInfo | None], None + ], ) -> None: """Register the callback used to update when feedback is received. Includes position, tilt, and end position info.""" @@ -1112,13 +1161,13 @@ def register_position_callback( self._position_callbacks.append(callback) def register_battery_callback( - self, callback: Callable[[int], None] + self, callback: Callable[[int | None, bool | None, bool | None], None] ) -> None: """Register the callback used to update the battery percentage.""" self._battery_callbacks.append(callback) def register_end_position_callback( - self, callback: Callable[[MotionPositionInfo], None] + self, callback: Callable[[MotionPositionInfo | None], None] ) -> None: """Register the callback used to update the end position info.""" self._end_position_callbacks.append(callback) @@ -1136,19 +1185,19 @@ def register_connection_callback( self._connection_callbacks.append(callback) def register_calibration_callback( - self, callback: Callable[[MotionCalibrationType], None] + self, callback: Callable[[MotionCalibrationType | None], None] ) -> None: """Register the callback used to update the calibration status.""" self._calibration_callbacks.append(callback) def register_running_callback( - self, callback: Callable[[MotionRunningType], None] + self, callback: Callable[[MotionRunningType | None], None] ) -> None: """Register the callback used to update the running type.""" self._running_callbacks.append(callback) def register_signal_strength_callback( - self, callback: Callable[[int], None] + self, callback: Callable[[int | None], None] ) -> None: """Register the callback used to update the signal strength.""" self._signal_strength_callbacks.append(callback) diff --git a/tests/test_device.py b/tests/test_device.py index 9daf82f..c649228 100644 --- a/tests/test_device.py +++ b/tests/test_device.py @@ -625,14 +625,16 @@ async def test_notification_callback(self) -> None: + "00" + "FFFF" + "00" - + "60" + + "FF" ), ) for status_callback in device._status_callbacks: status_callback.assert_called_once_with( 100, 180, - 96, + 100, + True, + True, MotionSpeedLevel.MEDIUM, device._end_position_info, ) @@ -659,12 +661,12 @@ async def test_notification_callback(self) -> None: + "00" + "0000" + "00" - + "0A" + + "8F" ), ) for status_callback in device._status_callbacks: status_callback.assert_called_with( - 0, 0, 10, MotionSpeedLevel.HIGH, device._end_position_info + 0, 0, 15, True, False, MotionSpeedLevel.HIGH, device._end_position_info ) assert device._end_position_info is not None @@ -689,7 +691,7 @@ async def test_notification_callback(self) -> None: ) for status_callback in device._status_callbacks: status_callback.assert_called_with( - 0, 0, 10, None, device._end_position_info + 0, 0, 10, False, False, None, device._end_position_info ) @patch("motionblindsble.device.MotionCrypt.encrypt", return_value="AA") @@ -804,13 +806,17 @@ async def test_disabled_connection_callbacks(self) -> None: 5, 5, 10, + True, + False, MotionSpeedLevel.HIGH, MotionPositionInfo(0x0E, 0xFFFF), ], [ "_position", "_tilt", - "_battery", + "_battery_percentage", + "_is_charging", + "_is_wired", "_speed", "_end_position_info", ], @@ -868,8 +874,8 @@ async def test_disabled_connection_callbacks(self) -> None: device.remove_battery_callback, MotionCallback.BATTERY, device.update_battery, - [5], - ["_battery"], + [5, True, False], + ["_battery_percentage", "_is_charging", "_is_wired"], ), ( device.register_speed_callback,