From 8385478b338bffed9146a77317017865aadf9843 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thi=C3=A9baud=20Fuchs?= Date: Wed, 17 Apr 2024 16:58:35 +0200 Subject: [PATCH 1/2] Add new mouse peripheral and tests --- examples/crazy-mouse.py | 24 +++ facedancer/devices/mouse.py | 168 ++++++++++++++++++ test/loopback_fullspeed.py | 104 +++++++++++ test/loopback_highspeed.py | 104 +++++++++++ test/scripts/requirements.txt | 1 + test/scripts/test_loopback.py | 142 +++++++++++++++ .../test_loopback_randomize_packetsize.py | 148 +++++++++++++++ test/scripts/test_speedtest.py | 141 +++++++++++++++ test/scripts/test_speedtest_one_by_one.py | 142 +++++++++++++++ test/speedtest_enumeration.py | 110 ++++++++++++ test/speedtest_fullspeed.py | 71 ++++++++ test/speedtest_highspeed.py | 104 +++++++++++ 12 files changed, 1259 insertions(+) create mode 100755 examples/crazy-mouse.py create mode 100644 facedancer/devices/mouse.py create mode 100755 test/loopback_fullspeed.py create mode 100755 test/loopback_highspeed.py create mode 100644 test/scripts/requirements.txt create mode 100644 test/scripts/test_loopback.py create mode 100644 test/scripts/test_loopback_randomize_packetsize.py create mode 100644 test/scripts/test_speedtest.py create mode 100644 test/scripts/test_speedtest_one_by_one.py create mode 100755 test/speedtest_enumeration.py create mode 100755 test/speedtest_fullspeed.py create mode 100755 test/speedtest_highspeed.py diff --git a/examples/crazy-mouse.py b/examples/crazy-mouse.py new file mode 100755 index 00000000..c1f11394 --- /dev/null +++ b/examples/crazy-mouse.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 +# +# This file is part of FaceDancer. +# +""" USB mouse device example, makes the mouse go crazy on screen """ + +import asyncio + +from facedancer import main +from facedancer.devices.mouse import USBMouseDevice + +device = USBMouseDevice() + + +async def crazy_mouse(): + """Makes the mouse oscillate""" + while 1: + device.set_x(-10) + await asyncio.sleep(0.1) + device.set_x(10) + await asyncio.sleep(0.1) + + +main(device, crazy_mouse()) diff --git a/facedancer/devices/mouse.py b/facedancer/devices/mouse.py new file mode 100644 index 00000000..756fa47a --- /dev/null +++ b/facedancer/devices/mouse.py @@ -0,0 +1,168 @@ +""" +Create a basic mouse device with three buttons and two axis +""" + +from . import default_main +from .. import * +from ..classes.hid.descriptor import * +from ..classes.hid.usage import * + + +@use_inner_classes_automatically +class USBMouseDevice(USBDevice): + """Simple USB mouse device.""" + + name: str = "USB Mouse Device" + product_string: str = "Non-suspicious Mouse" + + # Local mouse state + _x: int = 0 + _y: int = 0 + _wheel: int = 0 + _trigger: bool = False + _secondary: bool = False + _tertiary: bool = False + + class MouseConfiguration(USBConfiguration): + """Primary configuration : act as a mouse""" + + max_power: int = 100 + self_powered: bool = False + supports_remote_wakeup: bool = True + + class MouseInterface(USBInterface): + """Core HID interface for our mouse""" + + name: str = "Generic USB mouse interface" + class_number: int = 3 # Human Interface Device class number + + class MouseEventEndpoint(USBEndpoint): + """Interrupt IN endpoint for guaranteed max latency""" + + number: int = 1 + direction: USBDirection = USBDirection.IN + transfer_type: USBTransferType = USBTransferType.INTERRUPT + interval: int = 10 + + class MouseHIDDescriptor(USBClassDescriptor): + """Container for the mouse HID report descriptors""" + + number: int = USBDescriptorTypeNumber.HID + + # raw descriptor fields + bLength: bytes = b"\x09" + bHIDDescriptorType: bytes = b"\x21" # HID descriptor type + bcdHID: bytes = b"\x11\x01" # HID 1.11 + bCountryCode: bytes = b"\x00" + bNumDescriptors: bytes = b"\x01" + bDescriptorType: bytes = b"\x22" # Report descriptor type + wDescriptorLength: bytes = ( + b"\x3e\x00" # 62 -- TODO should be computed automatically + ) + + raw: bytes = ( + bLength + + bHIDDescriptorType + + bcdHID + + bCountryCode + + bNumDescriptors + + bDescriptorType + + wDescriptorLength + ) + + class MouseReportDescriptor(HIDReportDescriptor): + """Defines the mouse report descriptor : + * X/Y axis + * three buttons (trigger/primary, secondary, tertiary) + """ + + fields: tuple = ( + USAGE_PAGE(HIDUsagePage.GENERIC_DESKTOP), + USAGE(HIDGenericDesktopUsage.MOUSE), + COLLECTION(HIDCollection.APPLICATION), + USAGE(HIDGenericDesktopUsage.POINTER), + COLLECTION(HIDCollection.PHYSICAL), + USAGE_PAGE(HIDUsagePage.BUTTONS), + USAGE_MINIMUM(0x01), # see HID 1.11 + USAGE_MAXIMUM(0x03), + LOGICAL_MINIMUM(0x0), + LOGICAL_MAXIMUM(0x01), + REPORT_SIZE(1), + REPORT_COUNT(3), + INPUT(variable=True, relative=False), + REPORT_SIZE(5), + REPORT_COUNT(1), + INPUT(variable=True, constant=True), + USAGE_PAGE(HIDUsagePage.GENERIC_DESKTOP), + USAGE(HIDGenericDesktopUsage.X), + USAGE(HIDGenericDesktopUsage.Y), + LOGICAL_MINIMUM(0x81), # -127 + LOGICAL_MAXIMUM(0x7F), # 127 + REPORT_SIZE(8), + REPORT_COUNT(2), + INPUT(variable=True, relative=True), + USAGE(HIDGenericDesktopUsage.WHEEL), + LOGICAL_MINIMUM(0x81), # -127 + LOGICAL_MAXIMUM(0x7F), # 127 + REPORT_SIZE(8), + REPORT_COUNT(1), + INPUT(variable=True, relative=True), + END_COLLECTION(), + END_COLLECTION(), + ) + + @class_request_handler(number=USBStandardRequests.GET_INTERFACE) + @to_this_interface + def handle_get_interface_request(self, request): + # Silently stall GET_INTERFACE class requests. + request.stall() + + def set_x(self, x: int): + """Set X axis translation""" + self._x = x + + def set_y(self, y: int): + """Set Y axis translation""" + self._y = y + + def set_wheel(self, rotation: int): + """Set rotation""" + self._wheel = rotation + + def set_trigger(self, down: bool): + """Set down to True to trigger primary button""" + self._trigger = down + + def set_secondary(self, down: bool): + """Set down to True to trigger secondary button""" + self._secondary = down + + def set_tertiary(self, down: bool): + """Set down to True to trigger tertiary button""" + self._tertiary = down + + def _get_buttons_state(self): + """Create buttons report from current state""" + state = 0x00 + + if self._trigger: + state |= 1 << 0 + if self._secondary: + state |= 1 << 1 + if self._tertiary: + state |= 1 << 2 + + return state + + def handle_data_requested(self, endpoint: USBEndpoint): + """Provide data once per host request.""" + endpoint.send( + self._get_buttons_state().to_bytes(1, "little") + + self._x.to_bytes(1, "little", signed=True) + + self._y.to_bytes(1, "little", signed=True) + + self._wheel.to_bytes(1, "little", signed=True) + ) + + +if __name__ == "__main__": + default_main(USBMouseDevice) diff --git a/test/loopback_fullspeed.py b/test/loopback_fullspeed.py new file mode 100755 index 00000000..b448b5f1 --- /dev/null +++ b/test/loopback_fullspeed.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 +# +# +""" +Create a basic mouse device with three buttons and two axis +""" + +from facedancer.devices import default_main +from facedancer import * + + +@use_inner_classes_automatically +class USBLoopback(USBDevice): + """Loopback on EP1""" + + name: str = "USB EP1 Loopback" + product_string: str = "Loopback device" + max_packet_size_ep0: int = 64 + device_speed : DeviceSpeed = DeviceSpeed.FULL + + EP_MAX_SIZE = 64 + buffer = [None, None, None, None, None] + + class USBLoopbackConfiguration(USBConfiguration): + """Primary configuration : act as a mouse""" + + max_power: int = 100 + self_powered: bool = False + supports_remote_wakeup: bool = True + ep_in_ready: bool = False + + class USBLoopbackInterface(USBInterface): + """Core interface""" + + name: str = "Loopback device" + class_number: int = 0xff # Vendor class + + class USBLoopbackOUT1(USBEndpoint): + """Interrupt IN endpoint for guaranteed max latency""" + + number: int = 1 + direction: USBDirection = USBDirection.OUT + transfer_type: USBTransferType = USBTransferType.BULK + max_packet_size: int = 64 + + class USBLoopbackIN1(USBEndpoint): + """Interrupt IN endpoint for guaranteed max latency""" + + number: int = 1 + direction: USBDirection = USBDirection.IN + transfer_type: USBTransferType = USBTransferType.BULK + max_packet_size: int = 64 + + class USBLoopbackOUT2(USBEndpoint): + """Interrupt IN endpoint for guaranteed max latency""" + + number: int = 2 + direction: USBDirection = USBDirection.OUT + transfer_type: USBTransferType = USBTransferType.BULK + max_packet_size: int = 64 + + class USBLoopbackIN2(USBEndpoint): + """Interrupt IN endpoint for guaranteed max latency""" + + number: int = 2 + direction: USBDirection = USBDirection.IN + transfer_type: USBTransferType = USBTransferType.BULK + max_packet_size: int = 64 + + class USBLoopbackOUT3(USBEndpoint): + """Interrupt IN endpoint for guaranteed max latency""" + + number: int = 3 + direction: USBDirection = USBDirection.OUT + transfer_type: USBTransferType = USBTransferType.BULK + max_packet_size: int = 64 + + class USBLoopbackIN3(USBEndpoint): + """Interrupt IN endpoint for guaranteed max latency""" + + number: int = 3 + direction: USBDirection = USBDirection.IN + transfer_type: USBTransferType = USBTransferType.BULK + max_packet_size: int = 64 + + @class_request_handler(number=USBStandardRequests.GET_INTERFACE) + @to_this_interface + def handle_get_interface_request(self, request): + # Silently stall GET_INTERFACE class requests. + request.stall() + + def handle_data_received(self, ep, data): + print(f"received {len(data)} bytes on {ep}") + self.buffer[ep.number] = data + + def handle_data_requested(self, ep): + """Provide data once per host request.""" + if self.buffer[ep.number] is not None: + self.send(ep.number, self.buffer[ep.number]) + self.buffer[ep.number] = None + + +if __name__ == "__main__": + default_main(USBLoopback) diff --git a/test/loopback_highspeed.py b/test/loopback_highspeed.py new file mode 100755 index 00000000..25b1aef7 --- /dev/null +++ b/test/loopback_highspeed.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 +# +# +""" +Create a basic mouse device with three buttons and two axis +""" + +from facedancer.devices import default_main +from facedancer import * + + +@use_inner_classes_automatically +class USBLoopback(USBDevice): + """Loopback on EP1""" + + name: str = "USB EP1 Loopback" + product_string: str = "Loopback device" + max_packet_size_ep0: int = 64 + device_speed : DeviceSpeed = DeviceSpeed.HIGH + + EP_MAX_SIZE = 512 + buffer = [None, None, None, None, None] + + class USBLoopbackConfiguration(USBConfiguration): + """Primary configuration : act as a mouse""" + + max_power: int = 100 + self_powered: bool = False + supports_remote_wakeup: bool = True + ep_in_ready: bool = False + + class USBLoopbackInterface(USBInterface): + """Core interface""" + + name: str = "Loopback device" + class_number: int = 0xff # Vendor class + + class USBLoopbackOUT1(USBEndpoint): + """Interrupt IN endpoint for guaranteed max latency""" + + number: int = 1 + direction: USBDirection = USBDirection.OUT + transfer_type: USBTransferType = USBTransferType.BULK + max_packet_size: int = 512 + + class USBLoopbackIN1(USBEndpoint): + """Interrupt IN endpoint for guaranteed max latency""" + + number: int = 1 + direction: USBDirection = USBDirection.IN + transfer_type: USBTransferType = USBTransferType.BULK + max_packet_size: int = 512 + + class USBLoopbackOUT2(USBEndpoint): + """Interrupt IN endpoint for guaranteed max latency""" + + number: int = 2 + direction: USBDirection = USBDirection.OUT + transfer_type: USBTransferType = USBTransferType.BULK + max_packet_size: int = 512 + + class USBLoopbackIN2(USBEndpoint): + """Interrupt IN endpoint for guaranteed max latency""" + + number: int = 2 + direction: USBDirection = USBDirection.IN + transfer_type: USBTransferType = USBTransferType.BULK + max_packet_size: int = 512 + + class USBLoopbackOUT3(USBEndpoint): + """Interrupt IN endpoint for guaranteed max latency""" + + number: int = 3 + direction: USBDirection = USBDirection.OUT + transfer_type: USBTransferType = USBTransferType.BULK + max_packet_size: int = 512 + + class USBLoopbackIN3(USBEndpoint): + """Interrupt IN endpoint for guaranteed max latency""" + + number: int = 3 + direction: USBDirection = USBDirection.IN + transfer_type: USBTransferType = USBTransferType.BULK + max_packet_size: int = 512 + + @class_request_handler(number=USBStandardRequests.GET_INTERFACE) + @to_this_interface + def handle_get_interface_request(self, request): + # Silently stall GET_INTERFACE class requests. + request.stall() + + def handle_data_received(self, ep, data): + print(f"received {len(data)} bytes on {ep}") + self.buffer[ep.number] = data + + def handle_data_requested(self, ep): + """Provide data once per host request.""" + if self.buffer[ep.number] is not None: + self.send(ep.number, self.buffer[ep.number]) + self.buffer[ep.number] = None + + +if __name__ == "__main__": + default_main(USBLoopback) diff --git a/test/scripts/requirements.txt b/test/scripts/requirements.txt new file mode 100644 index 00000000..7e2effa8 --- /dev/null +++ b/test/scripts/requirements.txt @@ -0,0 +1 @@ +pyusb==1.2.1 diff --git a/test/scripts/test_loopback.py b/test/scripts/test_loopback.py new file mode 100644 index 00000000..5ed413b7 --- /dev/null +++ b/test/scripts/test_loopback.py @@ -0,0 +1,142 @@ +#!/usr/bin/python3 +# Copyright 2023 Quarkslab + +""" +Test the integrity of data sent by a device on its EP OUT, sent back to us with its EP IN. +""" +import sys +import time +import random +import array +import usb.core +import usb.util + +ENDP_BURST_SIZE = 4 +TOTAL_TIME_NS = 0 +BUFFER_SIZE = int(100*1e3) + + +def check(byte_array, packet_size, reference_array): + """ + Check the received buffers against what has been sent, to check for integrity + """ + for i in range(len(byte_array) // packet_size): + packet_in = byte_array[i * packet_size:i*packet_size + packet_size] + packet_ref = reference_array[i * + packet_size:i*packet_size + packet_size] + if packet_in != packet_ref: + print(f"Error at {i} ") + print(packet_in) + print(packet_ref) + return False + return True + + +def send_next_packet(head, ep_in, ep_out, endp_max_packet_size, buffer_in, buffer_out): + global TOTAL_TIME_NS + start = time.time_ns() + packet_size = min(endp_max_packet_size, len(buffer_out) - head) + # print(f"sending {packet_size} bytes on {ep_out}") + ep_out.write(buffer_out[head:head + packet_size]) + # print(f"reading on {ep_in}") + buf = ep_in.read(packet_size) + stop = time.time_ns() + TOTAL_TIME_NS += (stop - start) + + num_read = len(buf) + if num_read != packet_size: + print("retrying packet ! ") + return send_next_packet(head, ep_in, ep_out, endp_max_packet_size, buffer_in, buffer_out) + buffer_in[head:head + packet_size] = buf + head += num_read + return head + + +if __name__ == "__main__": + # find our device + dev = usb.core.find(idVendor=0x610b, idProduct=0x4653) + + if dev is None: + raise ValueError('Device not found') + + if dev.speed == usb.util.SPEED_SUPER: + ENDP_BURST_SIZE = 4 + print(f"USB30 Superspeed burst {ENDP_BURST_SIZE}") + else: + print("USB20") + ENDP_BURST_SIZE = 1 + + print("Configuration of the device :") + + for cfg in dev: + sys.stdout.write(str(cfg) + '\n') + + # get an endpoint instance + cfg = dev.get_active_configuration() + intf = cfg[(0, 0)] + + ep_in = list(usb.util.find_descriptor( + intf, + find_all=True, + # match the first OUT endpoint + custom_match=lambda e: \ + usb.util.endpoint_direction(e.bEndpointAddress) == \ + usb.util.ENDPOINT_IN)) + + ep_out = list(usb.util.find_descriptor( + intf, + # match the first OUT endpoint + find_all=True, + custom_match=lambda e: \ + usb.util.endpoint_direction(e.bEndpointAddress) == \ + usb.util.ENDPOINT_OUT)) + + assert ep_in is not None + assert ep_out is not None + # assert ep_out.wMaxPacketSize == ep_in.wMaxPacketSize + + print("Reading ...") + + ROUNDS = 4 + SUCCESS = True + + fails = [] + for i in range(len(ep_in)): + TOTAL_TIME_NS = 0 + print(f"EP {i+1}") + try: + endp_max_packet_size = ENDP_BURST_SIZE * ep_out[i].wMaxPacketSize + buffer_out = array.array( + 'B', [int(random.random() * 255) for i in range(BUFFER_SIZE)]) + buffer_in = array.array('B', [0 for i in range(BUFFER_SIZE)]) + + START = time.time_ns() + head = 0 + num_sent = 0 + total_to_send = len(buffer_out) + while head < total_to_send: + try: + head = send_next_packet( + head, ep_in[i], ep_out[i], endp_max_packet_size, buffer_in, buffer_out) + sys.stdout.write(f"\r{100. * head/total_to_send} % sent") + except usb.core.USBTimeoutError: # HydraUSB3 tends to timeout when handling USB3 + print("error timeout, retrying ! ") + + STOP = time.time_ns() + sys.stdout.write("\r") + if check(buffer_in, endp_max_packet_size, buffer_out): + print( + f"Success ! Transfer rate with python processing {len(buffer_in) / ((STOP - START) * 1e-9) * 1e-6} MB/s") + print( + f"Success ! Transfer rate with only transfer {len(buffer_in) / ((TOTAL_TIME_NS) * 1e-9) * 1e-6} MB/s") + else: + print("Error") + fails.append(i) + except: + fails.append(i) + +print( + f"There have been {len(fails)} fails. Endpoints {[ep + 1 for ep in fails]} failed ") + +if len(fails) == 0: + print("Test successful ! ") diff --git a/test/scripts/test_loopback_randomize_packetsize.py b/test/scripts/test_loopback_randomize_packetsize.py new file mode 100644 index 00000000..aaaedf8f --- /dev/null +++ b/test/scripts/test_loopback_randomize_packetsize.py @@ -0,0 +1,148 @@ +#!/usr/bin/python3 +# Copyright 2023 Quarkslab + +""" +Test the integrity of data sent by a device on its EP OUT, sent back to us with its EP IN. +This time, randomize the size of individual packets, to check if the device can handle packet sizes different from its max. +""" + +import sys +import time +import random +import usb.core +import usb.util +import array + +ENDP_BURST_SIZE = 4 +TOTAL_TIME_NS = 0 +BUFFER_SIZE = int(100 * 1e3) + + +def check(byte_array, packet_size, reference_array): + """ + Check the received buffers against what has been sent, to check for integrity + """ + for i in range(len(byte_array) // packet_size): + packet_in = byte_array[i * packet_size:i*packet_size + packet_size] + packet_ref = reference_array[i * + packet_size:i*packet_size + packet_size] + if packet_in != packet_ref: + print(f"Error at {i} \r\n") + print(packet_in) + print(packet_ref) + return False + break + return True + + +def send_next_packet(head, ep_in, ep_out, endp_max_packet_size, buffer_in, buffer_out): + """ + Send next packet from buffer_out at head, receive it back in buffer_in. + Saves the time diff in TOTAL_TIME_NS, to try to remove python processing from the measurement. + """ + global TOTAL_TIME_NS + start = time.time_ns() + packet_size = random.randint( + 0, min(endp_max_packet_size, len(buffer_out) - head)) + ep_out.write(buffer_out[head:head + packet_size]) + buf = ep_in.read(packet_size) + stop = time.time_ns() + TOTAL_TIME_NS += (stop - start) + + num_read = len(buf) + if num_read != packet_size: + print("retrying packet ! \r\n") + return send_next_packet(head, ep_in, ep_out, endp_max_packet_size, buffer_in, buffer_out) + buffer_in[head:head + packet_size] = buf + head += num_read + return head + + +if __name__ == "__main__": + # find our device + dev = usb.core.find(idVendor=0x610b, idProduct=0x4653) + + if dev is None: + raise ValueError('Device not found') + + if dev.speed == usb.util.SPEED_SUPER: + ENDP_BURST_SIZE = 4 + print(f"USB30 Superspeed burst {ENDP_BURST_SIZE}") + else: + print("USB20") + ENDP_BURST_SIZE = 1 + + print("Configuration of the device :") + + for cfg in dev: + sys.stdout.write(str(cfg) + '\n') + + # get an endpoint instance + cfg = dev.get_active_configuration() + intf = cfg[(0, 0)] + + ep_in = list(usb.util.find_descriptor( + intf, + find_all=True, + # match the first OUT endpoint + custom_match=lambda e: \ + usb.util.endpoint_direction(e.bEndpointAddress) == \ + usb.util.ENDPOINT_IN)) + + ep_out = list(usb.util.find_descriptor( + intf, + # match the first OUT endpoint + find_all=True, + custom_match=lambda e: \ + usb.util.endpoint_direction(e.bEndpointAddress) == \ + usb.util.ENDPOINT_OUT)) + + assert ep_in is not None + assert ep_out is not None + # assert ep_out.wMaxPacketSize == ep_in.wMaxPacketSize + + print("Reading ...") + + ROUNDS = 4 + SUCCESS = True + + fails = [] + for i in range(len(ep_in)): + TOTAL_TIME_NS = 0 + print(f"EP {i}") + try: + endp_max_packet_size = ENDP_BURST_SIZE * ep_out[i].wMaxPacketSize + + buffer_out = array.array( + 'B', [int(random.random() * 255) for i in range(BUFFER_SIZE)]) + buffer_in = array.array('B', [0 for i in range(BUFFER_SIZE)]) + + START = time.time_ns() + head = 0 + while head < len(buffer_out): + try: + head = send_next_packet( + head, ep_in[i], ep_out[i], endp_max_packet_size, buffer_in, buffer_out) + except usb.core.USBTimeoutError: + head = send_next_packet( + head, ep_in[i], ep_out[i], endp_max_packet_size, buffer_in, buffer_out) + + STOP = time.time_ns() + + if check(buffer_in, endp_max_packet_size, buffer_out): + print( + f"Success ! Transfer rate with python processing {len(buffer_in) / ((STOP - START) * 1e-9) * 1e-6} MB/s") + print( + f"Success ! Transfer rate with only transfer {len(buffer_in) / ((TOTAL_TIME_NS) * 1e-9) * 1e-6} MB/s") + else: + print("Error") + fails.append(i) + except Exception as e: + print(e) + fails.append(i) + +print( + f"There have been {len(fails)} fails. Endpoints {[ep + 1 for ep in fails]} failed \r\n") + +if len(fails) == 0: + print("Test successful ! \r\n") diff --git a/test/scripts/test_speedtest.py b/test/scripts/test_speedtest.py new file mode 100644 index 00000000..98b21046 --- /dev/null +++ b/test/scripts/test_speedtest.py @@ -0,0 +1,141 @@ +#!/usr/bin/python3 +# Copyright 2023 Quarkslab + +""" +Tests the speed of a USB device with one EP IN and one EP OUT. +Data is sent to the EP OUT, and read from EP IN, without integrity checks. + +This program can also write its results in a CSV, and repeat the measurement count times. +""" + +import sys +import time +import random +import array +import argparse +import csv +import usb.core +import usb.util + + +ENDP_BURST_SIZE = 1 +TOTAL_TRANSFER_SIZE_KB = 500 + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + prog="Hydradancer speedtest", description="Measuring write/read transfer rate") + csv_group = parser.add_argument_group( + "CSV export", "Export results to CSV") + csv_group.add_argument("--csv", action="store", help="Export to CSV") + parser.add_argument("--count", default=1, action="store", + help="Number of runs", type=int) + args = parser.parse_args() + + # find our device + dev = usb.core.find(idVendor=0x610b, idProduct=0x4653) + + # was it found? + if dev is None: + raise ValueError('Device not found') + + if dev.speed == usb.util.SPEED_SUPER: + ENDP_BURST_SIZE = 4 + print(f"USB30 Superspeed burst {ENDP_BURST_SIZE}") + else: + print("USB20") + ENDP_BURST_SIZE = 1 + + # set the active configuration. With no arguments, the first + # configuration will be the active one + # dev.set_configuration() + + print("Configuration of the device :") + + for cfg in dev: + sys.stdout.write(str(cfg) + '\n') + + # get an endpoint instance + cfg = dev.get_active_configuration() + intf = cfg[(0, 0)] + + ep_in = usb.util.find_descriptor( + intf, + # match the first OUT endpoint + custom_match=lambda e: \ + usb.util.endpoint_direction(e.bEndpointAddress) == \ + usb.util.ENDPOINT_IN) + + ep_out = usb.util.find_descriptor( + intf, + # match the first OUT endpoint + custom_match=lambda e: \ + usb.util.endpoint_direction(e.bEndpointAddress) == \ + usb.util.ENDPOINT_OUT) + + assert ep_in is not None + assert ep_out is not None + assert ep_out.wMaxPacketSize == ep_in.wMaxPacketSize + + print("Reading ...") + + ROUNDS = 4 + SUCCESS = True + + endp_max_packet_size = ENDP_BURST_SIZE * ep_out.wMaxPacketSize + + # our test device will send fullsize packets + buffer_size = int(((TOTAL_TRANSFER_SIZE_KB * 1e3) // + endp_max_packet_size) * endp_max_packet_size) + buffer_out = array.array( + 'B', [int(random.random() * 255) for i in range(buffer_size)]) + buffer_in = array.array('B', [0 for i in range(buffer_size)]) + + write_transfer_time_diff = [] + read_transfer_time_diff = [] + + for i in range(args.count): + try: + START = time.time_ns() + effectively_written = ep_out.write(buffer_out, timeout=10000) + STOP = time.time_ns() + write_transfer_rate = effectively_written / \ + ((STOP - START) * 1e-9) * 1e-6 + write_transfer_time_diff.append(STOP - START) + + if effectively_written != len(buffer_out): + print("Error, wrote less than expected") + exit(1) + + print(f"Transfer rate write {write_transfer_rate} MB/s") + except usb.core.USBTimeoutError: + write_transfer_time_diff.append(-1) + print("Error timeout") + + try: + START = time.time_ns() + effectively_read = ep_in.read(buffer_in, timeout=10000) + STOP = time.time_ns() + read_transfer_rate = effectively_read / \ + ((STOP - START) * 1e-9) * 1e-6 + read_transfer_time_diff.append(STOP - START) + + if effectively_read != len(buffer_in): + print("Error, read less than expected") + exit(1) + + print(f"Transfer rate read {read_transfer_rate} MB/s") + except usb.core.USBTimeoutError: + read_transfer_time_diff.append(-1) + print("Error timeout") + + if args.csv is not None: + with open(args.csv, 'w', newline='', encoding='utf-8') as csvFile: + fieldnames = [ + 'Write(ns)', 'Read(ns)', 'Transfer size write (byte)', 'Transfer size read (byte)'] + writer = csv.DictWriter( + csvFile, fieldnames=fieldnames, dialect='excel') + + writer.writeheader() + for i in range(len(read_transfer_time_diff)): + writer.writerow({'Write(ns)': write_transfer_time_diff[i], 'Read(ns)': read_transfer_time_diff[i], + 'Transfer size write (byte)': effectively_written, 'Transfer size read (byte)': effectively_read}) diff --git a/test/scripts/test_speedtest_one_by_one.py b/test/scripts/test_speedtest_one_by_one.py new file mode 100644 index 00000000..628247cd --- /dev/null +++ b/test/scripts/test_speedtest_one_by_one.py @@ -0,0 +1,142 @@ +#!/usr/bin/python3 +# Copyright 2023 Quarkslab + +""" +Tests the speed of a USB device with one EP IN and one EP OUT. +Data is sent to the EP OUT, and read from EP IN, without integrity checks. + +NOTE : contrary to test_speedtest, this one sends/reads data of size the maximum packet size * max burst. +This was made for devices that can't handle libusb optimizations when sending large buffers. + +This program can also write its results in a CSV, and repeat the measurement count times. +""" + +import sys +import time +import random +import array +import argparse +import csv +import usb.core +import usb.util + + +ENDP_BURST_SIZE = 1 +TOTAL_TRANSFER_SIZE_KB = 500 + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + prog="Hydradancer speedtest", description="Measuring write/read transfer rate") + csv_group = parser.add_argument_group( + "CSV export", "Export results to CSV") + csv_group.add_argument("--csv", action="store", help="Export to CSV") + parser.add_argument("--count", default=1, action="store", + help="Number of runs", type=int) + args = parser.parse_args() + + # find our device + dev = usb.core.find(idVendor=0x610b, idProduct=0x4653) + + # was it found? + if dev is None: + raise ValueError('Device not found') + + if dev.speed == usb.util.SPEED_SUPER: + ENDP_BURST_SIZE = 4 + print(f"USB30 Superspeed burst {ENDP_BURST_SIZE}") + else: + print("USB20") + ENDP_BURST_SIZE = 1 + + # set the active configuration. With no arguments, the first + # configuration will be the active one + # dev.set_configuration() + + print("Configuration of the device :") + + for cfg in dev: + sys.stdout.write(str(cfg) + '\n') + + # get an endpoint instance + cfg = dev.get_active_configuration() + intf = cfg[(0, 0)] + + ep_in = usb.util.find_descriptor( + intf, + # match the first OUT endpoint + custom_match=lambda e: \ + usb.util.endpoint_direction(e.bEndpointAddress) == \ + usb.util.ENDPOINT_IN) + + ep_out = usb.util.find_descriptor( + intf, + # match the first OUT endpoint + custom_match=lambda e: \ + usb.util.endpoint_direction(e.bEndpointAddress) == \ + usb.util.ENDPOINT_OUT) + + assert ep_in is not None + assert ep_out is not None + assert ep_out.wMaxPacketSize == ep_in.wMaxPacketSize + + print("Reading ...") + + ROUNDS = 4 + SUCCESS = True + + endp_max_packet_size = ENDP_BURST_SIZE * ep_out.wMaxPacketSize + buffer_size = int(((TOTAL_TRANSFER_SIZE_KB * 1e3) // + endp_max_packet_size) * endp_max_packet_size) + buffer_out = array.array( + 'B', [int(random.random() * 255) for i in range(buffer_size)]) + buffer_in = array.array('B', [0 for i in range(buffer_size)]) + random_packet = array.array( + 'B', [int(random.random() * 255) for i in range(endp_max_packet_size)]) + + write_transfer_time_diff = [] + read_transfer_time_diff = [] + + for i in range(args.count): + try: + remaining_write = len(buffer_out) + START = time.time_ns() + while remaining_write != 0: + packet_size = min(remaining_write, endp_max_packet_size) + remaining_write -= ep_out.write( + random_packet[:packet_size], timeout=10000) + STOP = time.time_ns() + write_transfer_rate = len(buffer_out) / \ + ((STOP - START) * 1e-9) * 1e-6 + write_transfer_time_diff.append(STOP - START) + print(f"Transfer rate write {write_transfer_rate} MB/s") + except usb.core.USBTimeoutError: + write_transfer_time_diff.append(-1) + print("Error timeout") + + try: + remaining_read = len(buffer_in) + START = time.time_ns() + while remaining_read != 0: + packet_size = min(remaining_read, endp_max_packet_size) + buffer = ep_in.read(packet_size, timeout=10000) + remaining_read -= len(buffer) + STOP = time.time_ns() + read_transfer_rate = len(buffer_in) / \ + ((STOP - START) * 1e-9) * 1e-6 + read_transfer_time_diff.append(STOP - START) + print(f"Transfer rate read {read_transfer_rate} MB/s") + except usb.core.USBTimeoutError: + read_transfer_time_diff.append(-1) + print("Error timeout") + + if args.csv is not None: + with open(args.csv, 'w', newline='', encoding='utf-8') as csvFile: + fieldnames = [ + 'Write(ns)', 'Read(ns)', 'Transfer size write (byte)', 'Transfer size read (byte)'] + writer = csv.DictWriter( + csvFile, fieldnames=fieldnames, dialect='excel') + + writer.writeheader() + for i in range(len(read_transfer_time_diff)): + writer.writerow({'Write(ns)': write_transfer_time_diff[i], 'Read(ns)': read_transfer_time_diff[i], 'Transfer size write (byte)': len( + buffer_out), 'Transfer size read (byte)': len(buffer_in)}) diff --git a/test/speedtest_enumeration.py b/test/speedtest_enumeration.py new file mode 100755 index 00000000..affdd94d --- /dev/null +++ b/test/speedtest_enumeration.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python3 + +from facedancer import main +from facedancer import * +from facedancer.classes import USBDeviceClass +import logging +import time +import datetime + + +class EndOfEnumerationException(Exception): + pass + + +@use_inner_classes_automatically +class SomeDevice(USBDevice): + """ Emulate a USB Device""" + + device_class: int = 0 + device_subclass: int = 0 + protocol_revision_number: int = 0 + + max_packet_size_ep0: int = 64 + vendor_id: int = 0x610b + product_id: int = 0x4653 + manufacturer_string: str = "FaceDancer" + product_string: str = "Generic USB Device" + serial_number_string: str = "S/N 3420E" + supported_languages: tuple = (LanguageIDs.ENGLISH_US,) + device_revision: int = 0 + usb_spec_version: int = 0x0002 + device_speed : DeviceSpeed = DeviceSpeed.FULL + + class SomeConfiguration(USBConfiguration): + configuration_number: int = 1 + configuration_string: str = None + self_powered: bool = False + supports_remote_wakeup: bool = True + max_power: int = 500 + + class SomeTemplate(USBInterface): + number: int = 0 + class_number: int = USBDeviceClass.VENDOR_SPECIFIC + subclass_number: int = 0 + protocol_number: int = 0 + interface_string: str = None + + class INEndpoint(USBEndpoint): + number: int = 1 + direction: USBDirection = USBDirection.IN + transfer_type: USBTransferType = USBTransferType.BULK + max_packet_size: int = 64 + interval: int = 0 + + def handle_data_requested(self): + self.send(b"Hello!") + + class OUTEndpoint(USBEndpoint): + number: int = 1 + direction: USBDirection = USBDirection.OUT + transfer_type: USBTransferType = USBTransferType.BULK + max_packet_size: int = 64 + interval: int = 0 + + def handle_data_received(self, data): + logging.info(f"Received data: {data}") + + def handle_data_received(self, endpoint, data): + super().handle_data_received(endpoint, data) + + @standard_request_handler(number=USBStandardRequests.SET_CONFIGURATION) + @to_device + def handle_set_configuration_request(self, request): + """ Handle SET_CONFIGURATION requests; per USB2 [9.4.7] """ + print("received SET_CONFIGURATION request") + + # If the host is requesting configuration zero, they're asking + # us to drop our configuration. + if request.value == 0: + self.configuration = None + request.acknowledge() + + # Otherwise, we'll find a given configuration and apply it. + else: + try: + self.configuration = self.configurations[request.value] + request.acknowledge() + except KeyError: + request.stall() + + # Notify the backend of the reconfiguration, in case + # it needs to e.g. set up endpoints accordingly + self.backend.configured(self.configuration) + raise EndOfEnumerationException() + + +if __name__ == "__main__": + + count = 1000 + + START = datetime.datetime.now() + for i in range(count): + print("Start of enumeration") + try: + main(SomeDevice) + except EndOfEnumerationException: + print("End of enumeration") + STOP = datetime.datetime.now() + + print(f"Success, enumerated {count} devices, took {STOP-START}") diff --git a/test/speedtest_fullspeed.py b/test/speedtest_fullspeed.py new file mode 100755 index 00000000..7527eabb --- /dev/null +++ b/test/speedtest_fullspeed.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +# +# +""" +USB full-speed speedtest +""" + +from facedancer.devices import default_main +from facedancer import * + + +@use_inner_classes_automatically +class USBSpeedtest(USBDevice): + """Loopback on EP1""" + + name: str = "USB full-speed speedtest" + product_string: str = "USB full-speed speedtest" + max_packet_size_ep0: int = 64 + device_speed : DeviceSpeed = DeviceSpeed.FULL + + EP_MAX_SIZE = 64 + buffer = [None, None, None, None, None] + random_buffer = [i % 256 for i in range(64)] + + class USBSpeedtestConfiguration(USBConfiguration): + """USB full-speed speedtest""" + + max_power: int = 100 + self_powered: bool = False + supports_remote_wakeup: bool = True + + class USBSpeedtestInterface(USBInterface): + """Core interface""" + + name: str = "USB full-speed speedtest" + class_number: int = 0xff # Vendor class + + class USBSpeedtestOUT(USBEndpoint): + """Interrupt OUT endpoint""" + + number: int = 1 + direction: USBDirection = USBDirection.OUT + transfer_type: USBTransferType = USBTransferType.BULK + max_packet_size: int = 64 + + class USBSpeedtestIN(USBEndpoint): + """Interrupt IN endpoint""" + + number: int = 2 + direction: USBDirection = USBDirection.IN + transfer_type: USBTransferType = USBTransferType.BULK + max_packet_size: int = 64 + + @class_request_handler(number=USBStandardRequests.GET_INTERFACE) + @to_this_interface + def handle_get_interface_request(self, request): + # Silently stall GET_INTERFACE class requests. + request.stall() + + def handle_data_received(self, ep, data): + print(f"received {len(data)} bytes on {ep}") + self.buffer[ep.number] = data + + def handle_data_requested(self, ep): + """Provide data once per host request.""" + print(f"sending {len(self.random_buffer)} bytes on {ep}") + self.send(ep.number, self.random_buffer) + + +if __name__ == "__main__": + default_main(USBSpeedtest) diff --git a/test/speedtest_highspeed.py b/test/speedtest_highspeed.py new file mode 100755 index 00000000..7b970c51 --- /dev/null +++ b/test/speedtest_highspeed.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 +# +# +""" +USB High-speed speedtest +""" + +from facedancer.devices import default_main +from facedancer import * + + +@use_inner_classes_automatically +class USBSpeedtest(USBDevice): + """USB High-speed speedtest""" + + name: str = "USB High-speed speedtest" + product_string: str = "USB High-speed speedtest" + max_packet_size_ep0: int = 64 + device_speed : DeviceSpeed = DeviceSpeed.HIGH + + EP_MAX_SIZE = 512 + buffer = [None, None, None, None, None] + random_buffer = [i % 256 for i in range(512)] + + + class USBSpeedtestConfiguration(USBConfiguration): + """Primary configuration : act as a mouse""" + + max_power: int = 100 + self_powered: bool = False + supports_remote_wakeup: bool = True + + class USBSpeedtestInterface(USBInterface): + """Core interface""" + + name: str = "USB High-speed speedtest" + class_number: int = 0xff # Vendor class + + class USBSpeedtestOUT1(USBEndpoint): + """Interrupt IN endpoint for guaranteed max latency""" + + number: int = 1 + direction: USBDirection = USBDirection.OUT + transfer_type: USBTransferType = USBTransferType.BULK + max_packet_size: int = 512 + + class USBSpeedtestIN1(USBEndpoint): + """Interrupt IN endpoint for guaranteed max latency""" + + number: int = 1 + direction: USBDirection = USBDirection.IN + transfer_type: USBTransferType = USBTransferType.BULK + max_packet_size: int = 512 + + class USBSpeedtestOUT2(USBEndpoint): + """Interrupt IN endpoint for guaranteed max latency""" + + number: int = 2 + direction: USBDirection = USBDirection.OUT + transfer_type: USBTransferType = USBTransferType.BULK + max_packet_size: int = 512 + + class USBSpeedtestIN2(USBEndpoint): + """Interrupt IN endpoint for guaranteed max latency""" + + number: int = 2 + direction: USBDirection = USBDirection.IN + transfer_type: USBTransferType = USBTransferType.BULK + max_packet_size: int = 512 + + class USBSpeedtestOUT3(USBEndpoint): + """Interrupt IN endpoint for guaranteed max latency""" + + number: int = 3 + direction: USBDirection = USBDirection.OUT + transfer_type: USBTransferType = USBTransferType.BULK + max_packet_size: int = 512 + + class USBSpeedtestIN3(USBEndpoint): + """Interrupt IN endpoint for guaranteed max latency""" + + number: int = 3 + direction: USBDirection = USBDirection.IN + transfer_type: USBTransferType = USBTransferType.BULK + max_packet_size: int = 512 + + @class_request_handler(number=USBStandardRequests.GET_INTERFACE) + @to_this_interface + def handle_get_interface_request(self, request): + # Silently stall GET_INTERFACE class requests. + request.stall() + + def handle_data_received(self, ep, data): + print(f"received {len(data)} bytes on {ep}") + self.buffer[ep.number] = data + + def handle_data_requested(self, ep): + """Provide data once per host request.""" + print(f"sending {len(self.random_buffer)} bytes on {ep}") + self.send(ep.number, self.random_buffer) + + +if __name__ == "__main__": + default_main(USBSpeedtest) From 7484c03c80f80fd3d1b601c614248a5ea33adeee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thi=C3=A9baud=20Fuchs?= Date: Thu, 6 Jun 2024 13:32:22 +0200 Subject: [PATCH 2/2] Use priming in speedtests --- test/speedtest_fullspeed.py | 12 ++++++++++++ test/speedtest_highspeed.py | 11 +++++++++++ 2 files changed, 23 insertions(+) diff --git a/test/speedtest_fullspeed.py b/test/speedtest_fullspeed.py index 7527eabb..71be19c4 100755 --- a/test/speedtest_fullspeed.py +++ b/test/speedtest_fullspeed.py @@ -66,6 +66,18 @@ def handle_data_requested(self, ep): print(f"sending {len(self.random_buffer)} bytes on {ep}") self.send(ep.number, self.random_buffer) + def handle_buffer_empty(self, endpoint: USBEndpoint): + """ Handler called when a given endpoint first has an empty buffer. + + Often, an empty buffer indicates an opportunity to queue data + for sending ('prime an endpoint'), but doesn't necessarily mean + that the host is planning on reading the data. + + This function is called only once per buffer. + """ + print(f"priming {len(self.random_buffer)} bytes on {endpoint}") + self.send(endpoint.number, self.random_buffer) + if __name__ == "__main__": default_main(USBSpeedtest) diff --git a/test/speedtest_highspeed.py b/test/speedtest_highspeed.py index 7b970c51..ad21e62a 100755 --- a/test/speedtest_highspeed.py +++ b/test/speedtest_highspeed.py @@ -99,6 +99,17 @@ def handle_data_requested(self, ep): print(f"sending {len(self.random_buffer)} bytes on {ep}") self.send(ep.number, self.random_buffer) + def handle_buffer_empty(self, endpoint: USBEndpoint): + """ Handler called when a given endpoint first has an empty buffer. + + Often, an empty buffer indicates an opportunity to queue data + for sending ('prime an endpoint'), but doesn't necessarily mean + that the host is planning on reading the data. + + This function is called only once per buffer. + """ + print(f"priming {len(self.random_buffer)} bytes on {endpoint}") + self.send(endpoint.number, self.random_buffer) if __name__ == "__main__": default_main(USBSpeedtest)