Skip to content

Commit

Permalink
feat(hardware): add acceleration to pick up tip for 96 channel (#12944)
Browse files Browse the repository at this point in the history
  • Loading branch information
caila-marashaj authored Jul 27, 2023
1 parent c1cb8bc commit 6c38267
Show file tree
Hide file tree
Showing 17 changed files with 451 additions and 178 deletions.
2 changes: 1 addition & 1 deletion api/src/opentrons/config/defaults_ot3.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
OT3AxisKind.Z: 35,
OT3AxisKind.P: 15,
OT3AxisKind.Z_G: 50,
OT3AxisKind.Q: 5.5,
OT3AxisKind.Q: 10,
},
low_throughput={
OT3AxisKind.X: 350,
Expand Down
69 changes: 47 additions & 22 deletions api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@
create_gripper_jaw_home_group,
create_gripper_jaw_hold_group,
create_tip_action_group,
create_tip_action_home_group,
PipetteAction,
create_gear_motor_home_group,
motor_nodes,
LIMIT_SWITCH_OVERTRAVEL_DISTANCE,
map_pipette_type_to_sensor_id,
Expand Down Expand Up @@ -259,6 +258,7 @@ def __init__(
self._estop_detector: Optional[EstopDetector] = None
self._estop_state_machine = EstopStateMachine(detector=None)
self._position = self._get_home_position()
self._gear_motor_position: Dict[NodeId, float] = {}
self._encoder_position = self._get_home_position()
self._motor_status = {}
self._check_updates = check_updates
Expand Down Expand Up @@ -330,6 +330,10 @@ def _build_system_hardware(
usb_messenger=usb_messenger,
)

@property
def gear_motor_position(self) -> Dict[NodeId, float]:
return self._gear_motor_position

def _motor_nodes(self) -> Set[NodeId]:
"""Get a list of the motor controller nodes of all attached and ok devices."""
return motor_nodes(self._subsystem_manager.targets)
Expand Down Expand Up @@ -593,42 +597,63 @@ async def home(
]
async with self._monitor_overpressure(moving_pipettes):
positions = await asyncio.gather(*coros)
# TODO(CM): default gear motor homing routine to have some acceleration
if Axis.Q in checked_axes:
await self.tip_action(
[Axis.Q],
self.axis_bounds[Axis.Q][1] - self.axis_bounds[Axis.Q][0],
self._configuration.motion_settings.max_speed_discontinuity.high_throughput[
distance=self.axis_bounds[Axis.Q][1] - self.axis_bounds[Axis.Q][0],
velocity=self._configuration.motion_settings.max_speed_discontinuity.high_throughput[
Axis.to_kind(Axis.Q)
],
tip_action="home",
)
for position in positions:
self._handle_motor_status_response(position)
return axis_convert(self._position, 0.0)

def _filter_move_group(self, move_group: MoveGroup) -> MoveGroup:
new_group: MoveGroup = []
for step in move_group:
new_group.append(
{
node: axis_step
for node, axis_step in step.items()
if node in self._motor_nodes()
}
)
return new_group

async def tip_action(
self,
axes: Sequence[Axis],
distance: float,
speed: float,
moves: Optional[List[Move[Axis]]] = None,
distance: Optional[float] = None,
velocity: Optional[float] = None,
tip_action: str = "home",
back_off: Optional[bool] = False,
) -> None:
if tip_action == "home":
speed = speed * -1
runner = MoveGroupRunner(
move_groups=create_tip_action_home_group(axes, distance, speed)
# TODO: split this into two functions for homing and 'clamp'
move_group = []
# make sure either moves or distance and velocity is not None
assert bool(moves) ^ (bool(distance) and bool(velocity))
if moves is not None:
move_group = create_tip_action_group(
moves, [NodeId.pipette_left], tip_action
)
else:
runner = MoveGroupRunner(
move_groups=[
create_tip_action_group(
axes, distance, speed, cast(PipetteAction, tip_action)
)
]
elif distance is not None and velocity is not None:
move_group = create_gear_motor_home_group(
float(distance), float(velocity), back_off
)

runner = MoveGroupRunner(
move_groups=[move_group],
ignore_stalls=True if not ff.stall_detection_enabled() else False,
)
positions = await runner.run(can_messenger=self._messenger)
for axis, point in positions.items():
self._position.update({axis: point[0]})
self._encoder_position.update({axis: point[1]})
if NodeId.pipette_left in positions:
self._gear_motor_position = {
NodeId.pipette_left: positions[NodeId.pipette_left][0]
}
else:
log.debug("no position returned from NodeId.pipette_left")

@requires_update
async def gripper_grip_jaw(
Expand Down
22 changes: 12 additions & 10 deletions api/src/opentrons/hardware_control/backends/ot3simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
create_gripper_jaw_hold_group,
create_gripper_jaw_grip_group,
create_gripper_jaw_home_group,
create_tip_action_group,
PipetteAction,
NODEID_SUBSYSTEM,
motor_nodes,
target_to_subsystem,
Expand Down Expand Up @@ -139,6 +137,7 @@ def __init__(
self._initialized = False
self._lights = {"button": False, "rails": False}
self._estop_state_machine = EstopStateMachine(detector=None)
self._gear_motor_position: Dict[NodeId, float] = {}

def _sanitize_attached_instrument(
mount: OT3Mount, passed_ai: Optional[Dict[str, Optional[str]]] = None
Expand Down Expand Up @@ -203,6 +202,10 @@ def initialized(self, value: bool) -> None:
def eeprom_data(self) -> EEPROMData:
return EEPROMData()

@property
def gear_motor_position(self) -> Dict[NodeId, float]:
return self._gear_motor_position

@property
def board_revision(self) -> BoardRevision:
"""Get the board revision"""
Expand Down Expand Up @@ -372,17 +375,15 @@ async def get_tip_present(self, mount: OT3Mount, tip_state: TipStateType) -> Non
"""Get the state of the tip ejector flag for a given mount."""
pass

@ensure_yield
async def tip_action(
self,
axes: Sequence[Axis],
distance: float = 33,
speed: float = -5.5,
tip_action: str = "drop",
moves: Optional[List[Move[Axis]]] = None,
distance: Optional[float] = None,
velocity: Optional[float] = None,
tip_action: str = "home",
back_off: Optional[bool] = False,
) -> None:
_ = create_tip_action_group(
axes, distance, speed, cast(PipetteAction, tip_action)
)
pass

def _attached_to_mount(
self, mount: OT3Mount, expected_instr: Optional[PipetteName]
Expand Down Expand Up @@ -528,6 +529,7 @@ def axis_bounds(self) -> OT3AxisMap[Tuple[float, float]]:
Axis.Y: phony_bounds,
Axis.X: phony_bounds,
Axis.Z_G: phony_bounds,
Axis.Q: phony_bounds,
}

@property
Expand Down
80 changes: 49 additions & 31 deletions api/src/opentrons/hardware_control/backends/ot3utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Shared utilities for ot3 hardware control."""
from typing import Dict, Iterable, List, Set, Tuple, TypeVar, Sequence, cast
from typing import Dict, Iterable, List, Set, Tuple, TypeVar, cast, Sequence, Optional
from typing_extensions import Literal
from logging import getLogger
from opentrons.config.defaults_ot3 import DEFAULT_CALIBRATION_AXIS_MAX_SPEED
Expand Down Expand Up @@ -228,13 +228,16 @@ def get_system_constraints(
) -> "SystemConstraints[Axis]":
conf_by_pip = config.by_gantry_load(gantry_load)
constraints = {}
for axis_kind in [
axis_kind_list = [
OT3AxisKind.P,
OT3AxisKind.X,
OT3AxisKind.Y,
OT3AxisKind.Z,
OT3AxisKind.Z_G,
]:
]
if gantry_load == GantryLoad.HIGH_THROUGHPUT:
axis_kind_list.append(OT3AxisKind.Q)
for axis_kind in axis_kind_list:
for axis in Axis.of_kind(axis_kind):
constraints[axis] = AxisConstraints.build(
conf_by_pip["acceleration"][axis_kind],
Expand Down Expand Up @@ -373,38 +376,53 @@ def create_home_groups(
return [home_group, backoff_group]


def create_tip_action_home_group(
axes: Sequence[Axis], distance: float, velocity: float
) -> List[MoveGroup]:
current_nodes = [axis_to_node(ax) for ax in axes]
home_group = [
create_tip_action_step(
velocity={node_id: np.float64(velocity) for node_id in current_nodes},
distance={node_id: np.float64(distance) for node_id in current_nodes},
present_nodes=current_nodes,
action=PipetteTipActionType.home,
)
]

backoff_group = [
create_tip_action_backoff_step(
velocity={node_id: np.float64(velocity / 2) for node_id in current_nodes}
)
]
return [home_group, backoff_group]
def create_tip_action_group(
moves: Sequence[Move[Axis]],
present_nodes: Iterable[NodeId],
action: str,
) -> MoveGroup:
move_group: MoveGroup = []
for move in moves:
unit_vector = move.unit_vector
for block in move.blocks:
if block.time < (3.0 / interrupts_per_sec):
continue
velocities = unit_vector_multiplication(unit_vector, block.initial_speed)
accelerations = unit_vector_multiplication(unit_vector, block.acceleration)
step = create_tip_action_step(
velocity=_convert_to_node_id_dict(velocities),
acceleration=_convert_to_node_id_dict(accelerations),
duration=block.time,
present_nodes=present_nodes,
action=PipetteTipActionType[action],
)
move_group.append(step)
return move_group


def create_tip_action_group(
axes: Sequence[Axis], distance: float, velocity: float, action: PipetteAction
def create_gear_motor_home_group(
distance: float,
velocity: float,
backoff: Optional[bool] = False,
) -> MoveGroup:
current_nodes = [axis_to_node(ax) for ax in axes]
step = create_tip_action_step(
velocity={node_id: np.float64(velocity) for node_id in current_nodes},
distance={node_id: np.float64(distance) for node_id in current_nodes},
present_nodes=current_nodes,
action=PipetteTipActionType[action],
move_group: MoveGroup = []
home_step = create_tip_action_step(
velocity={NodeId.pipette_left: np.float64(-1 * velocity)},
acceleration={NodeId.pipette_left: np.float64(0)},
duration=np.float64(distance / velocity),
present_nodes=[NodeId.pipette_left],
action=PipetteTipActionType.home,
)
return [step]
move_group.append(home_step)

if backoff:
backoff_group = create_tip_action_backoff_step(
velocity={
node_id: np.float64(velocity / 2) for node_id in [NodeId.pipette_left]
}
)
move_group.append(backoff_group)
return move_group


def create_gripper_jaw_grip_group(
Expand Down
Loading

0 comments on commit 6c38267

Please sign in to comment.