Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receiver: Test data extraction of responses #2698

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 75 additions & 27 deletions lib/logitech_receiver/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,53 @@ class Pairing:
error: Optional[any] = None


def extract_serial(response: bytes) -> str:
"""Extracts serial number from receiver response."""
return response.hex().upper()


def extract_max_devices(response: bytes) -> int:
"""Extracts maximum number of supported devices from response."""
max_devices = response[6]
return int(max_devices)


def extract_remaining_pairings(response: bytes) -> int:
ps = ord(response[2:3])
remaining_pairings = ps - 5 if ps >= 5 else -1
return int(remaining_pairings)


def extract_codename(response: bytes) -> str:
codename = response[2 : 2 + ord(response[1:2])]
return codename.decode("ascii")


def extract_power_switch_location(response: bytes) -> str:
"""Extracts power switch location from response."""
index = response[9] & 0x0F
return hidpp10_constants.POWER_SWITCH_LOCATION[index]


def extract_connection_count(response: bytes) -> int:
"""Extract connection count from receiver response."""
return ord(response[1:2])


def extract_wpid(response: bytes) -> str:
"""Extract wpid from receiver response."""
return response.hex().upper()


def extract_polling_rate(response: bytes) -> int:
"""Returns polling rate in milliseconds."""
return int(response[2])


def extract_device_kind(response: int) -> str:
return hidpp10_constants.DEVICE_KIND[response]


class Receiver:
"""A generic Receiver instance, mostly implementing the interface used on Unifying, Nano, and LightSpeed receivers"
The paired devices are available through the sequence interface.
Expand Down Expand Up @@ -126,9 +173,9 @@ def initialize(self, product_info: dict):
# read the receiver information subregister, so we can find out max_devices
serial_reply = self.read_register(Registers.RECEIVER_INFO, _IR.receiver_information)
if serial_reply:
self.serial = serial_reply[1:5].hex().upper()
self.max_devices = serial_reply[6]
if self.max_devices <= 0 or self.max_devices > 6:
self.serial = extract_serial(serial_reply[1:5])
self.max_devices = extract_max_devices(serial_reply)
if not (1 <= self.max_devices <= 6):
self.max_devices = product_info.get("max_devices", 1)
else: # handle receivers that don't have a serial number specially (i.e., c534)
self.serial = None
Expand Down Expand Up @@ -161,8 +208,7 @@ def remaining_pairings(self, cache=True):
if self._remaining_pairings is None or not cache:
ps = self.read_register(Registers.RECEIVER_CONNECTION)
if ps is not None:
ps = ord(ps[2:3])
self._remaining_pairings = ps - 5 if ps >= 5 else -1
self._remaining_pairings = extract_remaining_pairings(ps)
return self._remaining_pairings

def enable_connection_notifications(self, enable=True):
Expand All @@ -189,22 +235,21 @@ def enable_connection_notifications(self, enable=True):
def device_codename(self, n):
codename = self.read_register(Registers.RECEIVER_INFO, _IR.device_name + n - 1)
if codename:
codename = codename[2 : 2 + ord(codename[1:2])]
return codename.decode("ascii")
return extract_codename(codename)

def notify_devices(self):
"""Scan all devices."""
if self.handle:
if not self.write_register(Registers.RECEIVER_CONNECTION, 0x02):
logger.warning("%s: failed to trigger device link notifications", self)

def notification_information(self, number, notification):
def notification_information(self, number, notification) -> tuple[bool, bool, str, str]:
"""Extract information from unifying-style notification"""
assert notification.address != 0x02
online = not bool(notification.data[0] & 0x40)
encrypted = bool(notification.data[0] & 0x20) or notification.address == 0x10
kind = hidpp10_constants.DEVICE_KIND[notification.data[0] & 0x0F]
wpid = (notification.data[2:3] + notification.data[1:2]).hex().upper()
kind = extract_device_kind(notification.data[0] & 0x0F)
wpid = extract_wpid(notification.data[2:3] + notification.data[1:2])
return online, encrypted, wpid, kind

def device_pairing_information(self, n: int) -> dict:
Expand All @@ -214,28 +259,29 @@ def device_pairing_information(self, n: int) -> dict:
power_switch = "(unknown)"
pair_info = self.read_register(Registers.RECEIVER_INFO, _IR.pairing_information + n - 1)
if pair_info: # a receiver that uses Unifying-style pairing registers
wpid = pair_info[3:5].hex().upper()
kind = hidpp10_constants.DEVICE_KIND[pair_info[7] & 0x0F]
polling_rate = str(pair_info[2]) + "ms"
wpid = extract_wpid(pair_info[3:5])
kind = extract_device_kind(pair_info[7] & 0x0F)
polling_rate_ms = extract_polling_rate(pair_info)
polling_rate = f"{polling_rate_ms}ms"
elif not self.receiver_kind == "unifying": # may be an old Nano receiver
device_info = self.read_register(Registers.RECEIVER_INFO, 0x04) # undocumented
if device_info:
logger.warning("using undocumented register for device wpid")
wpid = device_info[3:5].hex().upper()
kind = hidpp10_constants.DEVICE_KIND[0x00] # unknown kind
wpid = extract_wpid(device_info[3:5])
kind = extract_device_kind(0x00) # unknown kind
else:
raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information - non-unifying")
else:
raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information")
pair_info = self.read_register(Registers.RECEIVER_INFO, _IR.extended_pairing_information + n - 1)
if pair_info:
power_switch = hidpp10_constants.POWER_SWITCH_LOCATION[pair_info[9] & 0x0F]
serial = pair_info[1:5].hex().upper()
power_switch = extract_power_switch_location(pair_info)
serial = extract_serial(pair_info[1:5])
else: # some Nano receivers?
pair_info = self.read_register(0x2D5) # undocumented and questionable
if pair_info:
logger.warning("using undocumented register for device serial number")
serial = pair_info[1:5].hex().upper()
serial = extract_serial(pair_info[1:5])
return {"wpid": wpid, "kind": kind, "polling": polling_rate, "serial": serial, "power_switch": power_switch}

def register_new_device(self, number, notification=None):
Expand Down Expand Up @@ -281,7 +327,9 @@ def set_lock(self, lock_closed=True, device=0, timeout=0):

def count(self):
count = self.read_register(Registers.RECEIVER_CONNECTION)
return 0 if count is None else ord(count[1:2])
if count is None:
return 0
return extract_connection_count(count)

def request(self, request_id, *params):
if bool(self):
Expand Down Expand Up @@ -406,7 +454,7 @@ def __init__(self, *args, **kwargs):

def initialize(self, product_info: dict):
serial_reply = self.read_register(Registers.BOLT_UNIQUE_ID)
self.serial = serial_reply.hex().upper()
self.serial = extract_serial(serial_reply)
self.max_devices = product_info.get("max_devices", 1)

def device_codename(self, n):
Expand All @@ -418,9 +466,9 @@ def device_codename(self, n):
def device_pairing_information(self, n: int) -> dict:
pair_info = self.read_register(Registers.RECEIVER_INFO, _IR.bolt_pairing_information + n)
if pair_info:
wpid = (pair_info[3:4] + pair_info[2:3]).hex().upper()
kind = hidpp10_constants.DEVICE_KIND[pair_info[1] & 0x0F]
serial = pair_info[4:8].hex().upper()
wpid = extract_wpid(pair_info[3:4] + pair_info[2:3])
kind = extract_device_kind(pair_info[1] & 0x0F)
serial = extract_serial(pair_info[4:8])
return {"wpid": wpid, "kind": kind, "polling": None, "serial": serial, "power_switch": "(unknown)"}
else:
raise exceptions.NoSuchDevice(number=n, receiver=self, error="can't read Bolt pairing register")
Expand Down Expand Up @@ -478,8 +526,8 @@ def notification_information(self, number, notification):
assert notification.address == 0x02
online = True
encrypted = bool(notification.data[0] & 0x80)
kind = hidpp10_constants.DEVICE_KIND[_get_kind_from_index(self, number)]
wpid = "00" + notification.data[2:3].hex().upper()
kind = extract_device_kind(_get_kind_from_index(self, number))
wpid = extract_wpid("00" + notification.data[2:3])
return online, encrypted, wpid, kind

def device_pairing_information(self, number: int) -> dict:
Expand All @@ -488,11 +536,11 @@ def device_pairing_information(self, number: int) -> dict:
if not wpid:
logger.error("Unable to get wpid from udev for device %d of %s", number, self)
raise exceptions.NoSuchDevice(number=number, receiver=self, error="Not present 27Mhz device")
kind = hidpp10_constants.DEVICE_KIND[_get_kind_from_index(self, number)]
kind = extract_device_kind(_get_kind_from_index(self, number))
return {"wpid": wpid, "kind": kind, "polling": "", "serial": None, "power_switch": "(unknown)"}


def _get_kind_from_index(receiver, index):
def _get_kind_from_index(receiver, index: int) -> int:
"""Get device kind from 27Mhz device index"""
# From drivers/hid/hid-logitech-dj.c
if index == 1: # mouse
Expand Down
82 changes: 82 additions & 0 deletions tests/logitech_receiver/test_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,85 @@ def test_receiver_factory_no_device(device_info, responses):

with pytest.raises(exceptions.NoSuchDevice):
r.device_pairing_information(1)


def test_extract_serial_number():
response = b'\x03\x16\xcc\x9c\xb4\x05\x06"\x00\x00\x00\x00\x00\x00\x00\x00'

serial_number = receiver.extract_serial(response[1:5])

assert serial_number == "16CC9CB4"


def test_extract_max_devices():
response = b'\x03\x16\xcc\x9c\xb4\x05\x06"\x00\x00\x00\x00\x00\x00\x00\x00'

max_devices = receiver.extract_max_devices(response)

assert max_devices == 6


@pytest.mark.parametrize(
"response, expected_remaining_pairings",
[
(b"\x00\x03\x00", -1),
(b"\x00\x02\t", 4),
],
)
def test_extract_remaining_pairings(response, expected_remaining_pairings):
remaining_pairings = receiver.extract_remaining_pairings(response)

assert remaining_pairings == expected_remaining_pairings


def test_extract_codename():
response = b"A\x04K520"

codename = receiver.extract_codename(response)

assert codename == "K520"


def test_extract_power_switch_location():
response = b"0\x19\x8e>\xb8\x06\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00"

ps_location = receiver.extract_power_switch_location(response)

assert ps_location == "base"


def test_extract_connection_count():
response = b"\x00\x03\x00"

connection_count = receiver.extract_connection_count(response)

assert connection_count == 3


def test_extract_wpid():
response = b"@\x82"

res = receiver.extract_wpid(response)

assert res == "4082"


def test_extract_polling_rate():
response = b"\x08@\x82\x04\x02\x02\x07\x00\x00\x00\x00\x00\x00\x00"

polling_rate = receiver.extract_polling_rate(response)

assert polling_rate == 130


@pytest.mark.parametrize(
"data, expected_device_kind",
[
(0x00, "unknown"),
(0x03, "numpad"),
],
)
def test_extract_device_kind(data, expected_device_kind):
device_kind = receiver.extract_device_kind(data)

assert str(device_kind) == expected_device_kind
Loading