diff --git a/idasen/__init__.py b/idasen/__init__.py index b94c7a1..fcfb22b 100644 --- a/idasen/__init__.py +++ b/idasen/__init__.py @@ -10,8 +10,8 @@ from typing import Union import asyncio import logging +import struct import sys -import time _UUID_HEIGHT: str = "99fa0021-338a-1024-8a49-009c0215f78a" @@ -28,18 +28,24 @@ # height calculation offset in meters, assumed to be the same for all desks -def _bytes_to_meters(raw: bytearray) -> float: - """Converts a value read from the desk in bytes to meters.""" +def _bytes_to_meters_and_speed(raw: bytearray) -> Tuple[float, int]: + """Converts a value read from the desk in bytes to height in meters and speed.""" raw_len = len(raw) expected_len = 4 assert ( raw_len == expected_len ), f"Expected raw value to be {expected_len} bytes long, got {raw_len} bytes" - high_byte: int = int(raw[1]) - low_byte: int = int(raw[0]) - int_raw: int = (high_byte << 8) + low_byte - return float(int_raw / 10000) + IdasenDesk.MIN_HEIGHT + int_raw, speed = struct.unpack(" bytearray: + """Converts meters to bytes for setting the position on the desk""" + int_raw: int = int((meters - IdasenDesk.MIN_HEIGHT) * 10000) + return bytearray(struct.pack(" bool: @@ -184,7 +190,7 @@ async def monitor(self, callback: Callable[[float], Awaitable[None]]): previous_height = 0.0 async def output_listener(char: BleakGATTCharacteristic, data: bytearray): - height = _bytes_to_meters(data) + height, _ = _bytes_to_meters_and_speed(data) self._logger.debug(f"Got data: {height}m") nonlocal previous_height @@ -248,7 +254,7 @@ async def wakeup(self): This exists for compatibility with the Linak DPG1C controller, it is not necessary with the original idasen controller. - >>> async def example() -> str: + >>> async def example(): ... async with IdasenDesk(mac="AA:AA:AA:AA:AA:AA") as desk: ... await desk.wakeup() >>> asyncio.run(example()) @@ -322,46 +328,26 @@ async def move_to_target(self, target: float): self._moving = True async def do_move() -> None: - previous_height = await self.get_height() - will_move_up = target > previous_height - last_move_time: Optional[float] = None + current_height = await self.get_height() + if current_height == target: + return + + # Wakeup and stop commands are needed in order to + # start the reference input for setting the position + await self._client.write_gatt_char(_UUID_COMMAND, _COMMAND_WAKEUP) + await self._client.write_gatt_char(_UUID_COMMAND, _COMMAND_STOP) + + data = _meters_to_bytes(target) + while True: - height = await self.get_height() - difference = target - height - self._logger.debug(f"{target=} {height=} {difference=}") - if (height < previous_height and will_move_up) or ( - height > previous_height and not will_move_up - ): - self._logger.warning( - "stopped moving because desk safety feature kicked in" - ) - return - - if height == previous_height: - if ( - last_move_time is not None - and time.time() - last_move_time > 0.5 - ): - self._logger.warning( - "desk is not moving anymore. physical button probably " - "pressed" - ) - return - else: - last_move_time = time.time() - - if not self._moving: - return - - if abs(difference) < 0.005: # tolerance of 0.005 meters - self._logger.info(f"reached target of {target:.3f}") - await self._stop() - return - elif difference > 0: - await self.move_up() - elif difference < 0: - await self.move_down() - previous_height = height + await self._client.write_gatt_char(_UUID_REFERENCE_INPUT, data) + await asyncio.sleep(0.4) + + # Stop as soon as the speed is 0, + # which means the desk has reached the target position + speed = await self._get_speed() + if speed == 0: + break self._move_task = asyncio.create_task(do_move()) await self._move_task @@ -400,7 +386,16 @@ async def get_height(self) -> float: >>> asyncio.run(example()) 1.0 """ - return _bytes_to_meters(await self._client.read_gatt_char(_UUID_HEIGHT)) + height, _ = await self._get_height_and_speed() + return height + + async def _get_speed(self) -> int: + _, speed = await self._get_height_and_speed() + return speed + + async def _get_height_and_speed(self) -> Tuple[float, int]: + raw = await self._client.read_gatt_char(_UUID_HEIGHT) + return _bytes_to_meters_and_speed(raw) @staticmethod async def discover() -> Optional[str]: diff --git a/tests/test_idasen.py b/tests/test_idasen.py index 6a1ae4e..3f1fb47 100644 --- a/tests/test_idasen.py +++ b/tests/test_idasen.py @@ -1,4 +1,4 @@ -from idasen import _bytes_to_meters, _is_desk +from idasen import _bytes_to_meters_and_speed, _meters_to_bytes, _is_desk from idasen import IdasenDesk from types import SimpleNamespace from typing import AsyncGenerator @@ -31,10 +31,12 @@ class MockBleakClient: def __init__(self): self._height = 1.0 + self._is_moving = False self.is_connected = False async def __aenter__(self): self._height = 1.0 + self._is_moving = False self.is_connected = True return self @@ -53,21 +55,26 @@ async def start_notify(self, uuid: str, callback: Callable): await callback(None, bytearray([0x10, 0x00, 0x00, 0x00])) async def write_gatt_char( - self, uuid: str, command: bytearray, response: bool = False + self, uuid: str, data: bytearray, response: bool = False ): if uuid == idasen._UUID_COMMAND: - if command == idasen._COMMAND_UP: + if data == idasen._COMMAND_UP: self._height += 0.001 - elif command == idasen._COMMAND_DOWN: + elif data == idasen._COMMAND_DOWN: self._height -= 0.001 + if uuid == idasen._UUID_REFERENCE_INPUT: + assert len(data) == 2 + + data_with_speed = bytearray([data[0], data[1], 0, 0]) + requested_height, _ = _bytes_to_meters_and_speed(data_with_speed) + self._height += min(0.1, max(-0.1, requested_height - self._height)) + + self._is_moving = self._height != requested_height async def read_gatt_char(self, uuid: str) -> bytearray: - norm = self._height - IdasenDesk.MIN_HEIGHT - norm *= 10000 - norm = int(norm) - low_byte = norm & 0xFF - high_byte = (norm >> 8) & 0xFF - return bytearray([low_byte, high_byte, 0x00, 0x00]) + height_bytes = _meters_to_bytes(self._height) + speed_byte = 0x01 if self._is_moving else 0x00 + return bytearray([height_bytes[0], height_bytes[1], 0x00, speed_byte]) @property def address(self) -> str: @@ -139,7 +146,7 @@ async def test_move_to_target_raises(desk: IdasenDesk, target: float): @pytest.mark.parametrize("target", [0.7, 1.1]) async def test_move_to_target(desk: IdasenDesk, target: float): await desk.move_to_target(target) - assert abs(await desk.get_height() - target) < 0.005 + assert abs(await desk.get_height() - target) < 0.001 async def test_move_abort_when_no_movement(): @@ -190,16 +197,17 @@ async def write_gatt_char_mock( @pytest.mark.parametrize( - "raw, result", + "raw, height, speed", [ - (bytearray([0x64, 0x19, 0x00, 0x00]), IdasenDesk.MAX_HEIGHT), - (bytearray([0x00, 0x00, 0x00, 0x00]), IdasenDesk.MIN_HEIGHT), - (bytearray([0x51, 0x04, 0x00, 0x00]), 0.7305), - (bytearray([0x08, 0x08, 0x00, 0x00]), 0.8256), + (bytearray([0x64, 0x19, 0x00, 0x00]), IdasenDesk.MAX_HEIGHT, 0), + (bytearray([0x00, 0x00, 0x00, 0x00]), IdasenDesk.MIN_HEIGHT, 0), + (bytearray([0x51, 0x04, 0x00, 0x00]), 0.7305, 0), + (bytearray([0x08, 0x08, 0x00, 0x00]), 0.8256, 0), + (bytearray([0x08, 0x08, 0x02, 0x01]), 0.8256, 258), ], ) -def test_bytes_to_meters(raw: bytearray, result: float): - assert _bytes_to_meters(raw) == result +def test_bytes_to_meters_and_speed(raw: bytearray, height: float, speed: int): + assert _bytes_to_meters_and_speed(raw) == (height, speed) async def test_fail_to_connect(caplog, monkeypatch):