From 7e728ea29e8e7df8029877df823d13990464c734 Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Wed, 15 Jan 2020 11:57:19 -0500 Subject: [PATCH] fix(api): smoothie driver: Limit high currents to moving axes (#4729) V1 habitually issues calls to the smoothie driver's move() function that include only the axes that are going to move (basically because of the mover setup that underpins motion control in V1). In V2, we have a unified source of control for motor commands that always sends a full axis spec. The smoothie driver was setting high currents for all axes specified in the motion command, whether those axes moved or not. That meant that most of the time, any command would set all axes to high currents. This is unnecessary, generates heat, and may overstress the power supply, leading to unexpected skips in certain high-friction situations. Instead, use the code we already have to cut down on gcode command length to determine which axes will actually move and only set those axes to high currents. Closes #4714 --- .../drivers/smoothie_drivers/driver_3_0.py | 132 +++++++++--------- api/tests/opentrons/drivers/test_driver.py | 10 +- 2 files changed, 73 insertions(+), 69 deletions(-) diff --git a/api/src/opentrons/drivers/smoothie_drivers/driver_3_0.py b/api/src/opentrons/drivers/smoothie_drivers/driver_3_0.py index 6f1bd4f494c..7f5cc88c343 100755 --- a/api/src/opentrons/drivers/smoothie_drivers/driver_3_0.py +++ b/api/src/opentrons/drivers/smoothie_drivers/driver_3_0.py @@ -4,7 +4,7 @@ import logging from time import sleep from threading import Event, RLock -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional, Union from math import isclose from serial.serialutil import SerialException # type: ignore @@ -222,7 +222,7 @@ def _byte_array_to_hex_string(byte_array): return res -def _parse_switch_values(raw_switch_values): +def _parse_switch_values(raw_switch_values: str) -> Dict[str, bool]: if not raw_switch_values or not isinstance(raw_switch_values, str): raise ParseError( 'Unexpected argument to _parse_switch_values: {}'.format( @@ -442,7 +442,7 @@ def read_pipette_model(self, mount) -> Optional[str]: return res - def write_pipette_id(self, mount, data_string): + def write_pipette_id(self, mount: str, data_string: str): ''' Writes to an attached pipette's ID memory location The ID is unique to this pipette, and is a string of unknown length @@ -458,7 +458,7 @@ def write_pipette_id(self, mount, data_string): self._write_to_pipette( GCODES['WRITE_INSTRUMENT_ID'], mount, data_string) - def write_pipette_model(self, mount, data_string): + def write_pipette_model(self, mount: str, data_string: str): ''' Writes to an attached pipette's MODEL memory location The MODEL is a unique string for this model of pipette @@ -473,7 +473,9 @@ def write_pipette_model(self, mount, data_string): self._write_to_pipette( GCODES['WRITE_INSTRUMENT_MODEL'], mount, data_string) - def update_pipette_config(self, axis, data): + def update_pipette_config( + self, axis: str, data: Dict[str, float])\ + -> Dict[str, Dict[str, float]]: ''' Updates the following configs for a given pipette mount based on the detected pipette type: @@ -481,6 +483,10 @@ def update_pipette_config(self, axis, data): - Max Travel M365.1 - endstop debounce M365.2 (NOT for zprobe debounce) - retract from endstop distance M365.3 + + Returns the data as the value of a dict with the axis as a key. + For instance, calling update_pipette_config('B', {'retract': 2}) + would return (if successful) {'B': {'retract': 2}} ''' if self.simulating: return {axis: data} @@ -491,7 +497,7 @@ def update_pipette_config(self, axis, data): 'max_travel': 'M365.1', 'home': 'M365.0'} - res_msg = {axis: {}} + res_msg: Dict[str, Dict[str, float]] = {axis: {}} for key, value in data.items(): if key == 'debounce': @@ -509,7 +515,7 @@ def update_pipette_config(self, axis, data): # FIXME (JG 9/28/17): Should have a more thought out # way of simulating vs really running - def connect(self, port=None): + def connect(self, port: str = None): if environ.get('ENABLE_VIRTUAL_SMOOTHIE', '').lower() == 'true': self.simulating = True return @@ -523,12 +529,12 @@ def disconnect(self): self._connection = None self.simulating = True - def is_connected(self): + def is_connected(self) -> bool: if not self._connection: return False return self._connection.is_open - def _connect_to_port(self, port=None): + def _connect_to_port(self, port: str = None): try: smoothie_id = environ.get('OT_SMOOTHIE_ID', 'AMA') self._connection = serial_communication.connect( @@ -546,12 +552,12 @@ def _connect_to_port(self, port=None): raise SerialException(error_msg) @property - def port(self): + def port(self) -> Optional[str]: if not self._connection: return None return self._connection.port - def get_fw_version(self): + def get_fw_version(self) -> str: ''' Queries Smoothieware for it's build version, and returns the parsed response. @@ -574,7 +580,7 @@ def get_fw_version(self): return version @property - def position(self): + def position(self) -> Dict[str, float]: """ Instead of sending M114.2 we are storing target values in self._position since movement and home commands are blocking and @@ -587,28 +593,19 @@ def position(self): return {k.upper(): v for k, v in self._position.items()} @property - def switch_state(self): + def switch_state(self) -> Dict[str, bool]: '''Returns the state of all SmoothieBoard limit switches''' res = self._send_command(GCODES['LIMIT_SWITCH_STATUS']) return _parse_switch_values(res) - def update_homed_flags(self, flags=None): + def update_homed_flags( + self, flags: Dict[str, bool] = None): ''' Returns Smoothieware's current homing-status, which is a dictionary of boolean values for each axis (XYZABC). If an axis is False, then it still needs to be homed, and it's coordinate cannot be trusted. Smoothieware sets it's internal homing flags for all axes to False when it has yet to home since booting/restarting, or an endstop/homing error - - returns: dict - { - 'X': False, - 'Y': True, - 'Z': False, - 'A': True, - 'B': False, - 'C': True - } ''' if flags and isinstance(flags, dict): self.homed_flags.update(flags) @@ -618,7 +615,7 @@ def update_homed_flags(self, flags=None): elif self.is_connected(): - def _recursive_update_homed_flags(retries): + def _recursive_update_homed_flags(retries: int): try: res = self._send_command(GCODES['HOMING_STATUS']) flags = _parse_homing_status_values(res) @@ -634,7 +631,7 @@ def _recursive_update_homed_flags(retries): _recursive_update_homed_flags(DEFAULT_COMMAND_RETRIES) @property - def current(self): + def current(self) -> Dict[str, float]: return self._current_settings['now'] @property @@ -642,18 +639,18 @@ def speed(self): pass @property - def steps_per_mm(self): + def steps_per_mm(self) -> Dict[str, float]: return self._steps_per_mm @contextlib.contextmanager - def restore_speed(self, value): + def restore_speed(self, value: Union[float, str]): self.set_speed(value, update=False) try: yield finally: self.set_speed(self._combined_speed) - def set_speed(self, value, update=True): + def set_speed(self, value: Union[float, str], update: bool = True): ''' set total axes movement speed in mm/second''' if update: self._combined_speed = float(value) @@ -669,14 +666,15 @@ def pop_speed(self): self.set_speed(self._saved_axes_speed) @contextlib.contextmanager - def restore_axis_max_speed(self, new_max_speeds): + def restore_axis_max_speed(self, new_max_speeds: Dict[str, float]): self.set_axis_max_speed(new_max_speeds, update=False) try: yield finally: self.set_axis_max_speed(self._max_speed_settings) - def set_axis_max_speed(self, settings, update=True): + def set_axis_max_speed( + self, settings: Dict[str, float], update: bool = True): ''' Sets the maximum speed (mm/sec) that a given axis will move @@ -703,7 +701,7 @@ def push_axis_max_speed(self): def pop_axis_max_speed(self): self.set_axis_max_speed(self._saved_max_speed_settings) - def set_acceleration(self, settings): + def set_acceleration(self, settings: Dict[str, float]): ''' Sets the acceleration (mm/sec^2) that a given axis will move @@ -727,7 +725,7 @@ def push_acceleration(self): def pop_acceleration(self): self.set_acceleration(self._saved_acceleration) - def set_active_current(self, settings): + def set_active_current(self, settings: Dict[str, float]): ''' Sets the amperage of each motor for when it is activated by driver. Values are initialized from the `robot_config.high_current` values, @@ -760,7 +758,7 @@ def push_active_current(self): def pop_active_current(self): self.set_active_current(self._active_current_settings['saved']) - def set_dwelling_current(self, settings): + def set_dwelling_current(self, settings: Dict[str, float]): ''' Sets the amperage of each motor for when it is dwelling. Values are initialized from the `robot_config.log_current` values, @@ -793,7 +791,8 @@ def push_dwelling_current(self): def pop_dwelling_current(self): self.set_dwelling_current(self._dwelling_current_settings['saved']) - def _save_current(self, settings, axes_active=True): + def _save_current( + self, settings: Dict[str, float], axes_active: bool = True): ''' Sets the current in Amperes (A) by axis. Currents are limited to be between 0.0-2.0 amps per axis motor. @@ -821,7 +820,7 @@ def _set_saved_current(self): ''' self._send_command(self._generate_current_command()) - def _generate_current_command(self): + def _generate_current_command(self) -> str: ''' Returns a constructed GCode string that contains this driver's axis-current settings, plus a small delay to wait for those settings @@ -841,7 +840,7 @@ def _generate_current_command(self): log.debug("_generate_current_command: {}".format(command)) return command - def disengage_axis(self, axes): + def disengage_axis(self, axes: str): ''' Disable the stepper-motor-driver's 36v output to motor This is a safe GCODE to send to Smoothieware, as it will automatically @@ -858,7 +857,7 @@ def disengage_axis(self, axes): for axis in axes: self.engaged_axes[axis] = False - def dwell_axes(self, axes): + def dwell_axes(self, axes: str): ''' Sets motors to low current, for when they are not moving. @@ -877,7 +876,7 @@ def dwell_axes(self, axes): if dwelling_currents: self._save_current(dwelling_currents, axes_active=False) - def activate_axes(self, axes): + def activate_axes(self, axes: str): ''' Sets motors to a high current, for when they are moving and/or must hold position @@ -1025,7 +1024,7 @@ def _handle_return(self, ret_code: str): if is_alarm or is_error: raise SmoothieError(ret_code) - def _remove_unwanted_characters(self, command, response): + def _remove_unwanted_characters(self, command: str, response: str) -> str: # smoothieware can enter a weird state, where it repeats back # the sent command at the beginning of its response. # Check for this echo, and strips the command from the response @@ -1177,7 +1176,7 @@ def _setup(self): self.pop_acceleration() log.debug("setup done") - def update_steps_per_mm(self, data): + def update_steps_per_mm(self, data: Dict[str, float]): # Using M92, update steps per mm for a given axis if self.simulating: self.steps_per_mm.update(data) @@ -1195,7 +1194,7 @@ def update_steps_per_mm(self, data): self.steps_per_mm[axis] = value self._send_command(GCODES['STEPS_PER_MM'] + cmd) - def _read_from_pipette(self, gcode, mount) -> Optional[str]: + def _read_from_pipette(self, gcode: str, mount: str) -> Optional[str]: ''' Read from an attached pipette's internal memory. The gcode used determines which portion of memory is read and returned. @@ -1209,8 +1208,8 @@ def _read_from_pipette(self, gcode, mount) -> Optional[str]: String (str) with value 'left' or 'right' ''' allowed_mounts = {'left': 'L', 'right': 'R'} - mount = allowed_mounts.get(mount) - if not mount: + allowed_mount = allowed_mounts.get(mount) + if not allowed_mount: raise ValueError('Unexpected mount: {}'.format(mount)) try: # EMI interference from both plunger motors has been found to @@ -1219,18 +1218,19 @@ def _read_from_pipette(self, gcode, mount) -> Optional[str]: self.disengage_axis('ZABC') self.delay(PIPETTE_READ_DELAY) # request from Smoothieware the information from that pipette - res = self._send_command(gcode + mount, suppress_error_msg=True) + res = self._send_command( + gcode + allowed_mount, suppress_error_msg=True) if res: res = _parse_instrument_data(res) - assert mount in res + assert allowed_mount in res # data is read/written as strings of HEX characters # to avoid firmware weirdness in how it parses GCode arguments - return _byte_array_to_ascii_string(res[mount]) + return _byte_array_to_ascii_string(res[allowed_mount]) except (ParseError, AssertionError, SmoothieError): pass return None - def _write_to_pipette(self, gcode, mount, data_string): + def _write_to_pipette(self, gcode: str, mount: str, data_string: str): ''' Write to an attached pipette's internal memory. The gcode used determines which portion of memory is written to. @@ -1246,8 +1246,8 @@ def _write_to_pipette(self, gcode, mount, data_string): String (str) that is of unkown length ''' allowed_mounts = {'left': 'L', 'right': 'R'} - mount = allowed_mounts.get(mount) - if not mount: + allowed_mount = allowed_mounts.get(mount) + if not allowed_mount: raise ValueError('Unexpected mount: {}'.format(mount)) if not isinstance(data_string, str): raise ValueError( @@ -1261,14 +1261,14 @@ def _write_to_pipette(self, gcode, mount, data_string): # to avoid firmware weirdness in how it parses GCode arguments byte_string = _byte_array_to_hex_string( bytearray(data_string.encode())) - command = gcode + mount + byte_string + command = gcode + allowed_mount + byte_string log.debug("_write_to_pipette: {}".format(command)) self._send_command(command) # ----------- END Private functions ----------- # # ----------- Public interface ---------------- # - def move(self, target, home_flagged_axes=False): + def move(self, target: Dict[str, float], home_flagged_axes: bool = False): ''' Move to the `target` Smoothieware coordinate, along any of the size axes, XYZABC. @@ -1288,7 +1288,7 @@ def move(self, target, home_flagged_axes=False): ''' self.run_flag.wait() - def valid_movement(coords, axis): + def valid_movement(coords: Optional[float], axis: str) -> bool: return not ( (axis in DISABLE_AXES) or (coords is None) or @@ -1296,7 +1296,7 @@ def valid_movement(coords, axis): rel_tol=1e-05, abs_tol=1e-08) ) - def create_coords_list(coords_dict): + def create_coords_list(coords_dict: Dict[str, float]) -> List[str]: return [ axis + str(round(coords, GCODE_ROUNDING_PRECISION)) for axis, coords in sorted(coords_dict.items()) @@ -1319,11 +1319,11 @@ def create_coords_list(coords_dict): non_moving_axes = ''.join([ ax for ax in AXES - if ax not in target.keys() + if not valid_movement(target.get(ax), ax) ]) self.dwell_axes(non_moving_axes) - self.activate_axes(target.keys()) - + self.activate_axes(''.join(ax for ax in target.keys() + if valid_movement(target[ax], ax))) # include the current-setting gcodes within the moving gcode string # to reduce latency, since we're setting current so much command = self._generate_current_command() @@ -1351,7 +1351,9 @@ def create_coords_list(coords_dict): self._update_position(target) - def home(self, axis=AXES, disabled=DISABLE_AXES): + def home(self, + axis: str = AXES, + disabled: str = DISABLE_AXES) -> Dict[str, float]: self.run_flag.wait() @@ -1459,7 +1461,8 @@ def fast_home(self, axis, safety_margin): disabled = ''.join([ax for ax in AXES if ax not in axis.upper()]) return self.home(axis=axis, disabled=disabled) - def unstick_axes(self, axes, distance=None, speed=None): + def unstick_axes( + self, axes: str, distance: float = None, speed: float = None): ''' The plunger axes on OT2 can build up static friction over time and when it's cold. To get over this, the robot can move that plunger at @@ -1522,7 +1525,7 @@ def resume(self): if not self.simulating: self.run_flag.set() - def delay(self, seconds): + def delay(self, seconds: float): # per http://smoothieware.org/supported-g-codes: # In grbl mode P is float seconds to comply with gcode standards command = '{code}P{seconds}'.format( @@ -1532,7 +1535,8 @@ def delay(self, seconds): log.debug("delay: {}".format(command)) self._send_command(command, timeout=int(seconds) + 1) - def probe_axis(self, axis, probing_distance) -> Dict[str, float]: + def probe_axis( + self, axis: str, probing_distance: float) -> Dict[str, float]: if axis.upper() in AXES: self.engaged_axes[axis] = True command = GCODES['PROBE'] + axis.upper() + str(probing_distance) @@ -1568,13 +1572,13 @@ def read_button(self): def read_window_switches(self): return gpio.read_window_switches() - def set_lights(self, button=None, rails=None): + def set_lights(self, button: bool = None, rails: bool = None): if button is not None: gpio.set_button_light(blue=button) if rails is not None: gpio.set_rail_lights(rails) - def get_lights(self): + def get_lights(self) -> Dict[str, bool]: return {'button': gpio.get_button_light()[2], 'rails': gpio.get_rail_lights()} @@ -1591,7 +1595,7 @@ def kill(self): self._reset_from_error() self._setup() - def home_flagged_axes(self, axes_string): + def home_flagged_axes(self, axes_string: str): ''' Given a list of axes to check, this method will home each axis if Smoothieware's internal flag sets it as needing to be homed diff --git a/api/tests/opentrons/drivers/test_driver.py b/api/tests/opentrons/drivers/test_driver.py index dde279934f6..f721bfc6541 100755 --- a/api/tests/opentrons/drivers/test_driver.py +++ b/api/tests/opentrons/drivers/test_driver.py @@ -268,11 +268,11 @@ def _parse_position_response(arg): smoothie.move({ 'X': 10.987654321, - 'Y': 1.12345678, - 'Z': 2, - 'A': 3, - 'B': 4, - 'C': 5}) + 'Y': 2.12345678, + 'Z': 2.5, + 'A': 3.5, + 'B': 4.25, + 'C': 5.55}) expected = [ # Set active axes high ['M907 A0.8 B0.05 C0.05 X1.25 Y1.25 Z0.8 G4P0.005 G0.+[BC].+'],