diff --git a/hardware/opentrons_hardware/hardware_control/tool_sensors.py b/hardware/opentrons_hardware/hardware_control/tool_sensors.py index a0f9fbcd7457..974378956976 100644 --- a/hardware/opentrons_hardware/hardware_control/tool_sensors.py +++ b/hardware/opentrons_hardware/hardware_control/tool_sensors.py @@ -1,7 +1,17 @@ """Functions for commanding motion limited by tool sensors.""" import asyncio from functools import partial -from typing import Union, List, Iterator, Tuple, Dict, Callable, AsyncContextManager +from typing import ( + Union, + List, + Iterator, + Tuple, + Dict, + Callable, + AsyncContextManager, + Optional, +) +from enum import Enum, auto from logging import getLogger from numpy import float64 from math import copysign @@ -22,6 +32,19 @@ SensorDataType, sensor_fixed_point_conversion, ) +from opentrons_hardware.firmware_bindings.messages.payloads import ( + BindSensorOutputRequestPayload, + SendAccumulatedPressureDataPayload, +) +from opentrons_hardware.firmware_bindings.messages.message_definitions import ( + BindSensorOutputRequest, + SendAccumulatedPressureDataRequest, +) +from opentrons_hardware.firmware_bindings.messages.fields import ( + SensorOutputBindingField, + SensorTypeField, + SensorIdField, +) from opentrons_hardware.sensors.sensor_types import SensorInformation, PressureSensor from opentrons_hardware.sensors.scheduler import SensorScheduler from opentrons_hardware.sensors.utils import SensorThresholdInformation @@ -48,7 +71,11 @@ def _build_pass_step( speed: Dict[NodeId, float], stop_condition: MoveStopCondition = MoveStopCondition.sync_line, ) -> MoveGroupStep: - return create_step( + pipette_nodes = [ + i for i in movers if i in [NodeId.pipette_left, NodeId.pipette_right] + ] + + move_group = create_step( distance={ax: float64(abs(distance[ax])) for ax in movers}, velocity={ ax: float64(speed[ax] * copysign(1.0, distance[ax])) for ax in movers @@ -60,6 +87,65 @@ def _build_pass_step( present_nodes=movers, stop_condition=stop_condition, ) + pipette_move = create_step( + distance={ax: float64(abs(distance[ax])) for ax in movers}, + velocity={ + ax: float64(speed[ax] * copysign(1.0, distance[ax])) for ax in movers + }, + acceleration={}, + # use any node present to calculate duration of the move, assuming the durations + # will be the same + duration=float64(abs(distance[movers[0]] / speed[movers[0]])), + present_nodes=pipette_nodes, + stop_condition=MoveStopCondition.sensor_report, + ) + print(f"Move group for probe {move_group}") + for node in pipette_nodes: + move_group[node] = pipette_move[node] + print(f"Move group for probe {move_group}") + return move_group + + +async def run_pass_output_to_csv( + messenger: CanMessenger, + sensor_driver: SensorDriver, + pressure_sensor: PressureSensor, + mount_speed: float, + plunger_speed: float, + threshold_pascals: float, + head_node: NodeId, + move_group: MoveGroupRunner, +) -> Dict[NodeId, MotorPositionStatus]: + file_heading = [ + "time(s)", + "Pressure(pascals)", + "z_velocity(mm/s)", + "plunger_velocity(mm/s)", + "threshold(pascals)", + ] + sensor_metadata = [0, 0, mount_speed, plunger_speed, threshold_pascals] + sensor_capturer = LogListener( + mount=head_node, + data_file="/var/pressure_sensor_data.csv", + file_heading=file_heading, + sensor_metadata=sensor_metadata, + ) + binding = [SensorOutputBinding.sync, SensorOutputBinding.report] + + async with sensor_driver.bind_output(messenger, pressure_sensor, binding): + messenger.add_listener(sensor_capturer, None) + + async with sensor_capturer: + positions = await move_group.run(can_messenger=messenger) + messenger.remove_listener(sensor_capturer) + + return positions + + +class OutputOptions(Enum): + write_to_csv = auto() + can_bus_only = auto() + none = auto() async def liquid_probe( @@ -70,12 +156,15 @@ async def liquid_probe( plunger_speed: float, mount_speed: float, threshold_pascals: float, - log_pressure: bool = True, + output_format: OutputOptions = OutputOptions.can_bus_only, + data_file: Optional[str] = None, auto_zero_sensor: bool = True, num_baseline_reads: int = 10, sensor_id: SensorId = SensorId.S0, + canbus_output: bool = True, ) -> Dict[NodeId, MotorPositionStatus]: """Move the mount and pipette simultaneously while reading from the pressure sensor.""" + log_file: str = "/var/pressure_sensor_data.csv" if not data_file else data_file sensor_driver = SensorDriver() threshold_fixed_point = threshold_pascals * sensor_fixed_point_conversion pressure_sensor = PressureSensor.build( @@ -90,7 +179,6 @@ async def liquid_probe( ) LOG.debug(f"found baseline pressure: {pressure_baseline} pascals") - binding = [SensorOutputBinding.sync] await sensor_driver.send_stop_threshold(messenger, pressure_sensor) sensor_group = _build_pass_step( @@ -102,33 +190,37 @@ async def liquid_probe( sensor_runner = MoveGroupRunner(move_groups=[[sensor_group]]) - if log_pressure: - file_heading = [ - "time(s)", - "Pressure(pascals)", - "z_velocity(mm/s)", - "plunger_velocity(mm/s)", - "threshold(pascals)", - ] - sensor_metadata = [0, 0, mount_speed, plunger_speed, threshold_pascals] - sensor_capturer = LogListener( - mount=head_node, - data_file="/var/pressure_sensor_data.csv", - file_heading=file_heading, - sensor_metadata=sensor_metadata, + if output_format == OutputOptions.write_to_csv: + return await run_pass_output_to_csv( + messenger, + sensor_driver, + pressure_sensor, + mount_speed, + plunger_speed, + threshold_pascals, + head_node, + sensor_runner, ) - binding.append(SensorOutputBinding.report) - - async with sensor_driver.bind_output(messenger, pressure_sensor, binding): - if log_pressure: - messenger.add_listener(sensor_capturer, None) - - async with sensor_capturer: - positions = await sensor_runner.run(can_messenger=messenger) - messenger.remove_listener(sensor_capturer) - else: - positions = await sensor_runner.run(can_messenger=messenger) + elif output_format == OutputOptions.can_bus_only: + async with sensor_driver.bind_output( + messenger, + pressure_sensor, + [ + SensorOutputBinding.sync, + SensorOutputBinding.report, + ], + ): + return await sensor_runner.run(can_messenger=messenger) + else: # none + async with sensor_driver.bind_output( + messenger, + pressure_sensor, + [ + SensorOutputBinding.sync, + ], + ): + return await sensor_runner.run(can_messenger=messenger) return positions