Skip to content

Commit

Permalink
Updated to use the move characteristic for setting the desk position,…
Browse files Browse the repository at this point in the history
… which always stops at the right height and avoids having to monitor the height
  • Loading branch information
sroebert committed Jan 5, 2024
1 parent 6ab68bc commit 2d93b0c
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 67 deletions.
93 changes: 44 additions & 49 deletions idasen/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from typing import Union
import asyncio
import logging
import struct
import sys
import time


_UUID_HEIGHT: str = "99fa0021-338a-1024-8a49-009c0215f78a"
Expand All @@ -28,18 +28,24 @@


# height calculation offset in meters, assumed to be the same for all desks
def _bytes_to_meters(raw: bytearray) -> float:
"""Converts a value read from the desk in bytes to meters."""
def _bytes_to_meters_and_speed(raw: bytearray) -> Tuple[float, int]:
"""Converts a value read from the desk in bytes to height in meters and speed."""
raw_len = len(raw)
expected_len = 4
assert (
raw_len == expected_len
), f"Expected raw value to be {expected_len} bytes long, got {raw_len} bytes"

high_byte: int = int(raw[1])
low_byte: int = int(raw[0])
int_raw: int = (high_byte << 8) + low_byte
return float(int_raw / 10000) + IdasenDesk.MIN_HEIGHT
int_raw, speed = struct.unpack("<Hh", raw)
meters = float(int(int_raw) / 10000) + IdasenDesk.MIN_HEIGHT

return meters, int(speed)


def _meters_to_bytes(meters: float) -> bytearray:
"""Converts meters to bytes for setting the position on the desk"""
int_raw: int = int((meters - IdasenDesk.MIN_HEIGHT) * 10000)
return bytearray(struct.pack("<H", int_raw))


def _is_desk(device: BLEDevice, adv: AdvertisementData) -> bool:
Expand Down Expand Up @@ -184,7 +190,7 @@ async def monitor(self, callback: Callable[[float], Awaitable[None]]):
previous_height = 0.0

async def output_listener(char: BleakGATTCharacteristic, data: bytearray):
height = _bytes_to_meters(data)
height, _ = _bytes_to_meters_and_speed(data)
self._logger.debug(f"Got data: {height}m")

nonlocal previous_height
Expand Down Expand Up @@ -248,7 +254,7 @@ async def wakeup(self):
This exists for compatibility with the Linak DPG1C controller,
it is not necessary with the original idasen controller.
>>> async def example() -> str:
>>> async def example():
... async with IdasenDesk(mac="AA:AA:AA:AA:AA:AA") as desk:
... await desk.wakeup()
>>> asyncio.run(example())
Expand Down Expand Up @@ -322,46 +328,26 @@ async def move_to_target(self, target: float):
self._moving = True

async def do_move() -> None:
previous_height = await self.get_height()
will_move_up = target > previous_height
last_move_time: Optional[float] = None
current_height = await self.get_height()
if current_height == target:
return

# Wakeup and stop commands are needed in order to
# start the reference input for setting the position
await self._client.write_gatt_char(_UUID_COMMAND, _COMMAND_WAKEUP)
await self._client.write_gatt_char(_UUID_COMMAND, _COMMAND_STOP)

data = _meters_to_bytes(target)

while True:
height = await self.get_height()
difference = target - height
self._logger.debug(f"{target=} {height=} {difference=}")
if (height < previous_height and will_move_up) or (
height > previous_height and not will_move_up
):
self._logger.warning(
"stopped moving because desk safety feature kicked in"
)
return

if height == previous_height:
if (
last_move_time is not None
and time.time() - last_move_time > 0.5
):
self._logger.warning(
"desk is not moving anymore. physical button probably "
"pressed"
)
return
else:
last_move_time = time.time()

if not self._moving:
return

if abs(difference) < 0.005: # tolerance of 0.005 meters
self._logger.info(f"reached target of {target:.3f}")
await self._stop()
return
elif difference > 0:
await self.move_up()
elif difference < 0:
await self.move_down()
previous_height = height
await self._client.write_gatt_char(_UUID_REFERENCE_INPUT, data)
await asyncio.sleep(0.4)

# Stop as soon as the speed is 0,
# which means the desk has reached the target position
speed = await self._get_speed()
if speed == 0:
break

self._move_task = asyncio.create_task(do_move())
await self._move_task
Expand Down Expand Up @@ -400,7 +386,16 @@ async def get_height(self) -> float:
>>> asyncio.run(example())
1.0
"""
return _bytes_to_meters(await self._client.read_gatt_char(_UUID_HEIGHT))
height, _ = await self._get_height_and_speed()
return height

async def _get_speed(self) -> int:
_, speed = await self._get_height_and_speed()
return speed

async def _get_height_and_speed(self) -> Tuple[float, int]:
raw = await self._client.read_gatt_char(_UUID_HEIGHT)
return _bytes_to_meters_and_speed(raw)

@staticmethod
async def discover() -> Optional[str]:
Expand Down
44 changes: 26 additions & 18 deletions tests/test_idasen.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from idasen import _bytes_to_meters, _is_desk
from idasen import _bytes_to_meters_and_speed, _meters_to_bytes, _is_desk
from idasen import IdasenDesk
from types import SimpleNamespace
from typing import AsyncGenerator
Expand Down Expand Up @@ -31,10 +31,12 @@ class MockBleakClient:

def __init__(self):
self._height = 1.0
self._is_moving = False
self.is_connected = False

async def __aenter__(self):
self._height = 1.0
self._is_moving = False
self.is_connected = True
return self

Expand All @@ -53,21 +55,26 @@ async def start_notify(self, uuid: str, callback: Callable):
await callback(None, bytearray([0x10, 0x00, 0x00, 0x00]))

async def write_gatt_char(
self, uuid: str, command: bytearray, response: bool = False
self, uuid: str, data: bytearray, response: bool = False
):
if uuid == idasen._UUID_COMMAND:
if command == idasen._COMMAND_UP:
if data == idasen._COMMAND_UP:
self._height += 0.001
elif command == idasen._COMMAND_DOWN:
elif data == idasen._COMMAND_DOWN:
self._height -= 0.001
if uuid == idasen._UUID_REFERENCE_INPUT:
assert len(data) == 2

data_with_speed = bytearray([data[0], data[1], 0, 0])
requested_height, _ = _bytes_to_meters_and_speed(data_with_speed)
self._height += min(0.1, max(-0.1, requested_height - self._height))

self._is_moving = self._height != requested_height

async def read_gatt_char(self, uuid: str) -> bytearray:
norm = self._height - IdasenDesk.MIN_HEIGHT
norm *= 10000
norm = int(norm)
low_byte = norm & 0xFF
high_byte = (norm >> 8) & 0xFF
return bytearray([low_byte, high_byte, 0x00, 0x00])
height_bytes = _meters_to_bytes(self._height)
speed_byte = 0x01 if self._is_moving else 0x00
return bytearray([height_bytes[0], height_bytes[1], 0x00, speed_byte])

@property
def address(self) -> str:
Expand Down Expand Up @@ -139,7 +146,7 @@ async def test_move_to_target_raises(desk: IdasenDesk, target: float):
@pytest.mark.parametrize("target", [0.7, 1.1])
async def test_move_to_target(desk: IdasenDesk, target: float):
await desk.move_to_target(target)
assert abs(await desk.get_height() - target) < 0.005
assert abs(await desk.get_height() - target) < 0.001


async def test_move_abort_when_no_movement():
Expand Down Expand Up @@ -190,16 +197,17 @@ async def write_gatt_char_mock(


@pytest.mark.parametrize(
"raw, result",
"raw, height, speed",
[
(bytearray([0x64, 0x19, 0x00, 0x00]), IdasenDesk.MAX_HEIGHT),
(bytearray([0x00, 0x00, 0x00, 0x00]), IdasenDesk.MIN_HEIGHT),
(bytearray([0x51, 0x04, 0x00, 0x00]), 0.7305),
(bytearray([0x08, 0x08, 0x00, 0x00]), 0.8256),
(bytearray([0x64, 0x19, 0x00, 0x00]), IdasenDesk.MAX_HEIGHT, 0),
(bytearray([0x00, 0x00, 0x00, 0x00]), IdasenDesk.MIN_HEIGHT, 0),
(bytearray([0x51, 0x04, 0x00, 0x00]), 0.7305, 0),
(bytearray([0x08, 0x08, 0x00, 0x00]), 0.8256, 0),
(bytearray([0x08, 0x08, 0x02, 0x01]), 0.8256, 258),
],
)
def test_bytes_to_meters(raw: bytearray, result: float):
assert _bytes_to_meters(raw) == result
def test_bytes_to_meters_and_speed(raw: bytearray, height: float, speed: int):
assert _bytes_to_meters_and_speed(raw) == (height, speed)


async def test_fail_to_connect(caplog, monkeypatch):
Expand Down

0 comments on commit 2d93b0c

Please sign in to comment.