Skip to content

Commit

Permalink
Add is_charging and is_wired, and add None options for callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
LennP committed Mar 22, 2024
1 parent 3ac068d commit 1f24785
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 31 deletions.
95 changes: 72 additions & 23 deletions motionblindsble/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,9 @@ class MotionDevice:
_running_type: MotionRunningType | None = None
_position: int | None
_tilt: int | None
_battery: int | None
_battery_percentage: int | None
_is_charging: bool | None
_is_wired: bool | None
_speed: int | None
_end_position_info: MotionPositionInfo | None
_received_end_position_info_event: Event
Expand All @@ -325,20 +327,33 @@ class MotionDevice:
_disabled_connection_callbacks: list[MotionCallback]
_status_callbacks: list[
Callable[
[int, int, int, MotionSpeedLevel | None, MotionPositionInfo], None
[
int | None,
int | None,
int | None,
bool | None,
bool | None,
MotionSpeedLevel | None,
MotionPositionInfo | None,
],
None,
]
]
_feedback_callbacks: list[Callable[[int, int, MotionPositionInfo], None]]
_position_callbacks: list[Callable[[int, int], None]]
_battery_callbacks: list[Callable[[int], None]]
_feedback_callbacks: list[
Callable[[int | None, int | None, MotionPositionInfo | None], None]
]
_position_callbacks: list[Callable[[int | None, int | None], None]]
_battery_callbacks: list[
Callable[[int | None, bool | None, bool | None], None]
]
_speed_callbacks: list[Callable[[MotionSpeedLevel | None], None]]
_end_position_callbacks: list[Callable[[MotionSpeedLevel | None], None]]
_connection_callbacks: list[Callable[[MotionConnectionType], None]]
_calibration_callbacks: list[
Callable[[MotionCalibrationType | None], None]
]
_running_callbacks: list[Callable[[MotionRunningType], None]]
_signal_strength_callbacks: list[Callable[[int], None]]
_running_callbacks: list[Callable[[MotionRunningType | None], None]]
_signal_strength_callbacks: list[Callable[[int | None], None]]

def __init__(
self,
Expand Down Expand Up @@ -561,7 +576,10 @@ def _notification_callback(
elif decrypted_message.startswith(MotionNotificationType.STATUS.value):
position: int = decrypted_message_bytes[6]
tilt: int = decrypted_message_bytes[7]
battery_percentage: int = decrypted_message_bytes[17]
battery: int = decrypted_message_bytes[17]
battery_percentage = min([battery & 0x7F, 100])
is_charging: bool = bool(battery & 0x80)
is_wired: bool = battery == 0xFF
try:
speed_level: MotionSpeedLevel | None = MotionSpeedLevel(
decrypted_message_bytes[12]
Expand All @@ -582,6 +600,8 @@ def _notification_callback(
position,
tilt,
battery_percentage,
is_charging,
is_wired,
speed_level,
end_position_info,
)
Expand All @@ -601,7 +621,8 @@ def _disconnect_callback(self, _: BleakClient) -> None:
"(%s) Automatically reconnecting using create_task",
self.ble_device.address,
)
self._create_task(target=self.connect()) # type: ignore[call-arg]
# type: ignore[call-arg]
self._create_task(target=self.connect())
else:
_LOGGER.debug(
"(%s) Automatically reconnecting", self.ble_device.address
Expand Down Expand Up @@ -893,6 +914,8 @@ def update_status(
position: int | None,
tilt: int | None,
battery_percentage: int | None,
is_charging: bool | None,
is_wired: bool | None,
speed_level: MotionSpeedLevel | None,
end_position_info: MotionPositionInfo | None,
) -> None:
Expand All @@ -901,19 +924,21 @@ def update_status(
_LOGGER.debug(
(
"(%s) Received status; position: %s, tilt: %s, "
"battery: %s, speed: %s, end positions: %s, "
"favorite position set: %s"
"battery percentage: %s, is charging: %s, is wired: %s, "
"speed: %s, end positions: %s, favorite position set: %s"
),
self.ble_device.address,
str(position),
str(tilt),
str(battery_percentage),
is_charging,
is_wired,
speed_level.name if speed_level is not None else None,
end_position_info.end_positions.name,
end_position_info.favorite_position,
)
self.update_position(position, tilt)
self.update_battery(battery_percentage)
self.update_battery(battery_percentage, is_charging, is_wired)
self.update_speed(speed_level)
self.update_end_position_info(end_position_info)
if self._is_connection_callback_disabled(MotionCallback.STATUS):
Expand All @@ -923,6 +948,8 @@ def update_status(
position,
tilt,
battery_percentage,
is_charging,
is_wired,
speed_level,
end_position_info,
)
Expand Down Expand Up @@ -1000,18 +1027,28 @@ def update_running(
for running_callback in self._running_callbacks:
running_callback(running_type)

def update_battery(self, battery_percentage: int | None) -> None:
def update_battery(
self,
battery_percentage: int | None,
is_charging: bool | None,
is_wired: bool | None,
) -> None:
"""Update the battery percentage."""
_LOGGER.debug(
"(%s) Updating battery: %s",
"(%s) Updating battery; percentage: %s, "
"is charging: %s, is wired: %s",
self.ble_device.address,
battery_percentage,
is_charging,
is_wired,
)
self._battery = battery_percentage
self._battery_percentage = battery_percentage
self._is_charging = is_charging
self._is_wired = is_wired
if self._is_connection_callback_disabled(MotionCallback.BATTERY):
return
for battery_callback in self._battery_callbacks:
battery_callback(battery_percentage)
battery_callback(battery_percentage, is_charging, is_wired)

def update_speed(self, speed_level: MotionSpeedLevel | None) -> None:
"""Update the speed to a particular speed level."""
Expand Down Expand Up @@ -1091,15 +1128,27 @@ def update_signal_strength(self, rssi: int | None) -> None:
def register_status_callback(
self,
callback: Callable[
[int, int, int, MotionSpeedLevel | None, MotionPositionInfo], None
[
int | None,
int | None,
int | None,
bool | None,
bool | None,
MotionSpeedLevel | None,
MotionPositionInfo | None,
],
None,
],
) -> None:
"""Register the callback used to update when status is received.
Includes position, tilt, battery percentage and end position info."""
self._status_callbacks.append(callback)

def register_feedback_callback(
self, callback: Callable[[int, int, MotionPositionInfo], None]
self,
callback: Callable[
[int | None, int | None, MotionPositionInfo | None], None
],
) -> None:
"""Register the callback used to update when feedback is received.
Includes position, tilt, and end position info."""
Expand All @@ -1112,13 +1161,13 @@ def register_position_callback(
self._position_callbacks.append(callback)

def register_battery_callback(
self, callback: Callable[[int], None]
self, callback: Callable[[int | None, bool | None, bool | None], None]
) -> None:
"""Register the callback used to update the battery percentage."""
self._battery_callbacks.append(callback)

def register_end_position_callback(
self, callback: Callable[[MotionPositionInfo], None]
self, callback: Callable[[MotionPositionInfo | None], None]
) -> None:
"""Register the callback used to update the end position info."""
self._end_position_callbacks.append(callback)
Expand All @@ -1136,19 +1185,19 @@ def register_connection_callback(
self._connection_callbacks.append(callback)

def register_calibration_callback(
self, callback: Callable[[MotionCalibrationType], None]
self, callback: Callable[[MotionCalibrationType | None], None]
) -> None:
"""Register the callback used to update the calibration status."""
self._calibration_callbacks.append(callback)

def register_running_callback(
self, callback: Callable[[MotionRunningType], None]
self, callback: Callable[[MotionRunningType | None], None]
) -> None:
"""Register the callback used to update the running type."""
self._running_callbacks.append(callback)

def register_signal_strength_callback(
self, callback: Callable[[int], None]
self, callback: Callable[[int | None], None]
) -> None:
"""Register the callback used to update the signal strength."""
self._signal_strength_callbacks.append(callback)
Expand Down
22 changes: 14 additions & 8 deletions tests/test_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,14 +625,16 @@ async def test_notification_callback(self) -> None:
+ "00"
+ "FFFF"
+ "00"
+ "60"
+ "FF"
),
)
for status_callback in device._status_callbacks:
status_callback.assert_called_once_with(
100,
180,
96,
100,
True,
True,
MotionSpeedLevel.MEDIUM,
device._end_position_info,
)
Expand All @@ -659,12 +661,12 @@ async def test_notification_callback(self) -> None:
+ "00"
+ "0000"
+ "00"
+ "0A"
+ "8F"
),
)
for status_callback in device._status_callbacks:
status_callback.assert_called_with(
0, 0, 10, MotionSpeedLevel.HIGH, device._end_position_info
0, 0, 15, True, False, MotionSpeedLevel.HIGH, device._end_position_info
)
assert device._end_position_info is not None

Expand All @@ -689,7 +691,7 @@ async def test_notification_callback(self) -> None:
)
for status_callback in device._status_callbacks:
status_callback.assert_called_with(
0, 0, 10, None, device._end_position_info
0, 0, 10, False, False, None, device._end_position_info
)

@patch("motionblindsble.device.MotionCrypt.encrypt", return_value="AA")
Expand Down Expand Up @@ -804,13 +806,17 @@ async def test_disabled_connection_callbacks(self) -> None:
5,
5,
10,
True,
False,
MotionSpeedLevel.HIGH,
MotionPositionInfo(0x0E, 0xFFFF),
],
[
"_position",
"_tilt",
"_battery",
"_battery_percentage",
"_is_charging",
"_is_wired",
"_speed",
"_end_position_info",
],
Expand Down Expand Up @@ -868,8 +874,8 @@ async def test_disabled_connection_callbacks(self) -> None:
device.remove_battery_callback,
MotionCallback.BATTERY,
device.update_battery,
[5],
["_battery"],
[5, True, False],
["_battery_percentage", "_is_charging", "_is_wired"],
),
(
device.register_speed_callback,
Expand Down

0 comments on commit 1f24785

Please sign in to comment.