diff --git a/lib/pimoroni_yukon/modules/serial_servo.py b/lib/pimoroni_yukon/modules/serial_servo.py index 243c8a2..d75e880 100644 --- a/lib/pimoroni_yukon/modules/serial_servo.py +++ b/lib/pimoroni_yukon/modules/serial_servo.py @@ -4,8 +4,6 @@ from .common import YukonModule, ADC_FLOAT, IO_LOW, IO_HIGH from machine import Pin, UART -from ucollections import OrderedDict -from pimoroni_yukon.errors import OverTemperatureError class Duplexer: diff --git a/lib/pimoroni_yukon/protocols/lx_servos.py b/lib/pimoroni_yukon/protocols/lx_servos.py index 84b835c..940e5d1 100644 --- a/lib/pimoroni_yukon/protocols/lx_servos.py +++ b/lib/pimoroni_yukon/protocols/lx_servos.py @@ -4,21 +4,24 @@ import time import struct +from machine import Pin from pimoroni_yukon.timing import ticks_add, ticks_diff, ticks_ms from pimoroni_yukon.errors import TimeoutError +import pimoroni_yukon.logging as logging from ucollections import namedtuple Command = namedtuple("Command", ("value", "length")) -# LX-16A Protocol +# LX Servo Protocol FRAME_HEADER = 0x55 FRAME_HEADER_LENGTH = 3 FRAME_LENGTH_INDEX = 3 +BROADCAST_ID = 0xFE SERVO_MOVE_TIME_WRITE = Command(1, 7) SERVO_MOVE_TIME_READ = Command(2, 3) SERVO_MOVE_TIME_WAIT_WRITE = Command(7, 7) -SERVO_MOVE_TIME_WAIT_READ = Command(8, 3) +# SERVO_MOVE_TIME_WAIT_READ = Command(8, 3) # Does not appear to be implemented on the LX-16A SERVO_MOVE_START = Command(11, 3) SERVO_MOVE_STOP = Command(12, 3) SERVO_ID_WRITE = Command(13, 4) @@ -44,8 +47,6 @@ SERVO_LED_ERROR_WRITE = Command(35, 4) SERVO_LED_ERROR_READ = Command(36, 3) -BROADCAST_ID = 0xFE - def CheckSum(buffer): checksum = 0 @@ -371,12 +372,19 @@ def SerialServoReadVin(uart, duplexer, id, timeout=1.0): return ret -from machine import Pin class LXServo: + SERVO_MODE = 0 + MOTOR_MODE = 1 + DEFAULT_READ_TIMEOUT = 1.0 + + OVER_TEMPERATURE = 0b001 + OVER_VOLTAGE = 0b010 + OVER_LOADED = 0b100 + def __init__(self, id, uart, duplexer, debug_pin=None): - if id == BROADCAST_ID: - raise ValueError("Cannot create an LXServo with the broadcast id. use the `LXServo.broadcast_` functions instead.") + if id < 0 or id > BROADCAST_ID: + raise ValueError(f"id out of range. Expected 0 to {BROADCAST_ID}") self.__id = id self.__uart = uart @@ -386,19 +394,41 @@ def __init__(self, id, uart, duplexer, debug_pin=None): if self.__debug_pin is not None: self.__debug_pin.init(Pin.OUT) + if self.__id == BROADCAST_ID: + self.__switch_to_servo_mode() + else: + logging.info(self.__message_header() + "Searching for servo ... ", end="") + + self.__send(SERVO_ID_READ) + self.__wait_for_send() + try: + self.__receive("B", LXServo.DEFAULT_READ_TIMEOUT) + except TimeoutError: + raise RuntimeError(self.__message_header() + "Cannot find servo") from None + + logging.info("found") + + self.__read_mode_and_speed(LXServo.DEFAULT_READ_TIMEOUT) + + def __message_header(self): + return f"[Servo{self.__id}] " + @property def id(self): return self.__id @staticmethod - def detect(id, uart, duplexer, timeout=1.0): + def detect(id, uart, duplexer, timeout=DEFAULT_READ_TIMEOUT): try: SerialServoReadID(uart, duplexer, id, timeout) return True except TimeoutError: return False - def move(self, angle, duration): + def move_to(self, angle, duration): + if self.__mode != LXServo.SERVO_MODE: + self.__switch_to_servo_mode() + position = int(((angle / 90) * 360) + 500) position = min(max(position, 0), 1000) @@ -408,150 +438,234 @@ def move(self, angle, duration): self.__send(SERVO_MOVE_TIME_WRITE, "HH", position, ms) - def read_move(self, timeout=1.0): + def read_move(self, timeout=DEFAULT_READ_TIMEOUT): self.__send(SERVO_MOVE_TIME_READ) self.__wait_for_send() - # TODO - return self.__receive("HH", timeout) - def queue_move(self, position, time): - if position < 0: - position = 0 - if position > 1000: - position = 1000 + received = self.__receive("HH", timeout) + if received is None: + return (float("NAN"), float("NAN")) - # TODO - self.__send(SERVO_MOVE_TIME_WAIT_WRITE, "HH", position, time) + angle = ((received[0] - 500) / 360) * 90 + duration = received[1] / 1000.0 + return angle, duration - def read_queued(self, timeout=1.0): + def queue_move(self, angle, duration): + position = int(((angle / 90) * 360) + 500) + position = min(max(position, 0), 1000) + + ms = int(1000.0 * duration + 0.5) + if ms < 0 or ms > 30000: + raise ValueError("duration out of range. Expected 0.0s to 30.0s") + + self.__send(SERVO_MOVE_TIME_WAIT_WRITE, "HH", position, ms) + + # Does not appear to be implemented on the LX-16A + """ + def read_queued(self, timeout=DEFAULT_READ_TIMEOUT): self.__send(SERVO_MOVE_TIME_WAIT_READ) self.__wait_for_send() - # TODO - return self.__receive("HH", timeout) + + received = self.__receive("HH", timeout) + if received is None: + return (float("NAN"), float("NAN")) + + angle = ((received[0] - 500) / 360) * 90 + duration = received[1] / 1000.0 + return angle, duration + """ def start_queued(self): + if self.__mode != LXServo.SERVO_MODE: + self.__switch_to_servo_mode() + self.__send(SERVO_MOVE_START) def stop(self): - self.__send(SERVO_MOVE_STOP) + if self.__id == BROADCAST_ID: + self.__send(SERVO_MOVE_STOP) + self.drive_at(0.0) + else: + if self.__mode == LXServo.SERVO_MODE: + self.__send(SERVO_MOVE_STOP) + else: + self.drive_at(0.0) + + def change_id(self, new_id, timeout=DEFAULT_READ_TIMEOUT): + if self.__id == BROADCAST_ID: + raise ValueError("cannot change the broadcast address") + + if new_id < 0 or new_id >= BROADCAST_ID: + raise ValueError(f"id out of range. Expected 0 to {BROADCAST_ID - 1}") + + logging.info(self.__message_header() + f"Changing ID to {new_id} ... ", end="") - def set_id(self, new_id): self.__send(SERVO_ID_WRITE, "B", new_id) self.__id = new_id - def read_id(self, timeout=1.0): + if self.read_id(timeout) != self.__id: + raise RuntimeError(self.__message_header() + "Cannot find servo") + + logging.info("success") + + def read_id(self, timeout=DEFAULT_READ_TIMEOUT): self.__send(SERVO_ID_READ) self.__wait_for_send() - # TODO return self.__receive("B", timeout) - def adjust_angle_offset(self, amount): - # TODO - self.__send(SERVO_ANGLE_OFFSET_ADJUST, "H", amount) + def apply_angle_offset(self, new_offset): + offset = int((new_offset / 90) * 360) + offset = min(max(offset, -125), 125) + + self.__send(SERVO_ANGLE_OFFSET_ADJUST, "b", offset) - def set_angle_offset(self, new_offset): - # TODO - self.__send(SERVO_ANGLE_OFFSET_WRITE, "H", new_offset) + def save_angle_offset(self): + self.__send(SERVO_ANGLE_OFFSET_WRITE) - def read_angle_offset(self, timeout=1.0): - # TODO + def read_angle_offset(self, timeout=DEFAULT_READ_TIMEOUT): self.__send(SERVO_ANGLE_OFFSET_READ) self.__wait_for_send() - return self.__receive("B", timeout) - def set_angle_limit(self, new_limit): - # TODO - self.__send(SERVO_ANGLE_LIMIT_WRITE, "H", new_limit) + received = self.__receive("b", timeout) + if received is None: + return float("NAN") + + return (received / 360) * 90 + + def set_angle_limits(self, new_lower_limit, new_upper_limit): + lower_position = int(((new_lower_limit / 90) * 360) + 500) + lower_position = max(lower_position, 0) + upper_position = int(((new_upper_limit / 90) * 360) + 500) + upper_position = min(max(upper_position, 0), 1000) + lower_position = min(lower_position, upper_position) + + self.__send(SERVO_ANGLE_LIMIT_WRITE, "HH", lower_position, upper_position) - def read_angle_limit(self, timeout=1.0): - # TODO + def read_angle_limits(self, timeout=DEFAULT_READ_TIMEOUT): self.__send(SERVO_ANGLE_LIMIT_READ) self.__wait_for_send() - return self.__receive("B", timeout) - def set_voltage_limit(self, new_limit): - # TODO - self.__send(SERVO_VIN_LIMIT_WRITE, "H", new_limit) + received = self.__receive("HH", timeout) + if received is None: + return (float("NAN"), float("NAN")) + + lower_limit = ((received[0] - 500) / 360) * 90 + upper_limit = ((received[1] - 500) / 360) * 90 + return lower_limit, upper_limit + + def set_voltage_limits(self, new_lower_limit, new_upper_limit): + lower_millivolts = int(new_lower_limit * 1000) + lower_millivolts = max(lower_millivolts, 4500) + upper_millivolts = int(new_upper_limit * 1000) + upper_millivolts = min(max(upper_millivolts, 4500), 14000) # Servos from factor had 14V set + lower_millivolts = min(lower_millivolts, upper_millivolts) - def read_voltage_limit(self, timeout=1.0): - # TODO + self.__send(SERVO_VIN_LIMIT_WRITE, "HH", lower_millivolts, upper_millivolts) + + def read_voltage_limits(self, timeout=DEFAULT_READ_TIMEOUT): self.__send(SERVO_VIN_LIMIT_READ) self.__wait_for_send() - return self.__receive("H", timeout) - def set_temperature_limit(self, new_limit): - # TODO - self.__send(SERVO_TEMP_MAX_LIMIT_WRITE, "H", new_limit) + received = self.__receive("HH", timeout) + if received is None: + return (float("NAN"), float("NAN")) + + lower_limit = received[0] / 1000 + upper_limit = received[1] / 1000 + return lower_limit, upper_limit - def read_temperature_limit(self, timeout=1.0): - # TODO + def set_max_temperature_limit(self, new_upper_limit): + new_upper_limit = min(max(new_upper_limit, 50), 100) + self.__send(SERVO_TEMP_MAX_LIMIT_WRITE, "B", new_upper_limit) + + def read_max_temperature_limit(self, timeout=DEFAULT_READ_TIMEOUT): self.__send(SERVO_TEMP_MAX_LIMIT_READ) self.__wait_for_send() - return self.__receive("H", timeout) + return self.__receive("B", timeout) - def read_temperature(self, timeout=1.0): + def read_temperature(self, timeout=DEFAULT_READ_TIMEOUT): self.__send(SERVO_TEMP_READ) self.__wait_for_send() return self.__receive("B", timeout) - def read_voltage(self, timeout=1.0): + def read_voltage(self, timeout=DEFAULT_READ_TIMEOUT): self.__send(SERVO_VIN_READ) self.__wait_for_send() - return self.__receive("H", timeout) - def read_position(self, timeout=1.0): - # TODO + received = self.__receive("H", timeout) + if received is None: + return float("NAN") + + return received / 1000 + + def read_angle(self, timeout=DEFAULT_READ_TIMEOUT): self.__send(SERVO_POS_READ) self.__wait_for_send() - return self.__receive("H", timeout) - def set_mode(self, mode, speed): - # TODO - self.__send(SERVO_OR_MOTOR_MODE_WRITE, "BBH", mode, 0, speed) + received = self.__receive("h", timeout) # Angle reports full 360 degree range as signed + if received is None: + return float("NAN") + + return ((received - 500) / 360) * 90 - def read_mode(self, timeout=1.0): - # TODO + def drive_at(self, speed): + value = int(speed * 1000) + value = min(max(value, -1000), 1000) + self.__send(SERVO_OR_MOTOR_MODE_WRITE, "BBh", LXServo.MOTOR_MODE, 0, value) + self.__mode = LXServo.MOTOR_MODE + + def __switch_to_motor_mode(self): + self.__send(SERVO_OR_MOTOR_MODE_WRITE, "BBh", LXServo.MOTOR_MODE, 0, 0) + self.__mode = LXServo.MOTOR_MODE + + def __switch_to_servo_mode(self): + self.__send(SERVO_OR_MOTOR_MODE_WRITE, "BBh", LXServo.SERVO_MODE, 0, 0) + self.__mode = LXServo.SERVO_MODE + + def __read_mode_and_speed(self, timeout): self.__send(SERVO_OR_MOTOR_MODE_READ) self.__wait_for_send() - return self.__receive("H", timeout) - def load(self): - # TODO + received = self.__receive("BBh", timeout) + if received is None: + return (float("NAN"), float("NAN")) + + self.__mode = LXServo.MOTOR_MODE if received[0] == 1 else LXServo.SERVO_MODE + return self.__mode, received[2] / 1000 + + def read_mode(self, timeout=DEFAULT_READ_TIMEOUT): + return self.__read_mode_and_speed(timeout)[0] + + def read_speed(self, timeout=DEFAULT_READ_TIMEOUT): + return self.__read_mode_and_speed(timeout)[1] + + def enable(self): self.__send(SERVO_LOAD_OR_UNLOAD_WRITE, "B", 1) - def unload(self): - # TODO + def disable(self): self.__send(SERVO_LOAD_OR_UNLOAD_WRITE, "B", 0) - def read_loaded(self, timeout=1.0): - # TODO + def is_enabled(self, timeout=DEFAULT_READ_TIMEOUT): self.__send(SERVO_LOAD_OR_UNLOAD_READ) self.__wait_for_send() - return self.__receive("H", timeout) + return self.__receive("B", timeout) == 1 - def led_on(self): - # TODO - self.__send(SERVO_LED_CTRL_WRITE, "B", 1) + def set_led(self, value): + self.__send(SERVO_LED_CTRL_WRITE, "B", 0 if value else 1) # LED state is inverted - def led_off(self): - # TODO - self.__send(SERVO_LED_CTRL_WRITE, "B", 0) - - def read_led(self, timeout=1.0): - # TODO + def is_led_on(self, timeout=DEFAULT_READ_TIMEOUT): self.__send(SERVO_LED_CTRL_READ) self.__wait_for_send() - return self.__receive("H", timeout) + return self.__receive("B", timeout) == 0 # LED state is inverted - def set_led_error(self, value): - # TODO - self.__send(SERVO_LED_ERROR_WRITE, "B", 1) + def configure_faults(self, conditions): + conditions &= LXServo.OVER_TEMPERATURE | LXServo.OVER_VOLTAGE | LXServo.OVER_LOADED + self.__send(SERVO_LED_ERROR_WRITE, "B", conditions) - def read_led_error(self, timeout=1.0): - # TODO + def read_fault_config(self, timeout=DEFAULT_READ_TIMEOUT): self.__send(SERVO_LED_ERROR_READ) self.__wait_for_send() - return self.__receive("H", timeout) + return self.__receive("B", timeout) @staticmethod def __checksum(buffer): @@ -569,13 +683,13 @@ def __send(self, command, fmt="", *data): buffer = bytearray(FRAME_HEADER_LENGTH + command.length) struct.pack_into("