From 93249af44b8030ad497d5be3addb85d1110b417d Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Thu, 9 Jun 2022 09:11:59 -0400 Subject: [PATCH] refactor(api,hardware): o3: add capacitive_pass (#10613) * refactor(hardware): capture sensor data The sensor scheduler has a method to tell sensors to control the sync line that can optionally log data, but sometimes we want to _capture_ the data and not touch the sync line at all. A new capture_output context manager will capture streaming data from the pipette sensor and yield it to an asyncio queue for later processing. * refactor(hardware): add capacitive_pass This new method in tool_sensors works similarly to capacitive_probe, and has similar arguments, but rather than stopping motion when a threshold is exceeded it runs the entire motion and returns a list of all data captured during the motion. * refactor(api): ot3: add capacitive_sweep This hardware control method sweeps the specified axis while capturing capacitive sensor data. --- .../backends/ot3controller.py | 22 ++++- .../hardware_control/backends/ot3simulator.py | 10 ++ api/src/opentrons/hardware_control/ot3api.py | 29 ++++++ .../hardware_control/test_ot3_api.py | 72 +++++++++++++++ .../hardware_control/tool_sensors.py | 55 +++++++++-- .../opentrons_hardware/sensors/scheduler.py | 55 ++++++++++- hardware/tests/conftest.py | 10 +- .../hardware_control/test_tool_sensors.py | 81 +++++++++++++++- .../sensors/test_scheduler.py | 92 +++++++++++++++++++ 9 files changed, 409 insertions(+), 17 deletions(-) create mode 100644 hardware/tests/opentrons_hardware/sensors/test_scheduler.py diff --git a/api/src/opentrons/hardware_control/backends/ot3controller.py b/api/src/opentrons/hardware_control/backends/ot3controller.py index 7b9dcaf9cdb..dc15c04bac7 100644 --- a/api/src/opentrons/hardware_control/backends/ot3controller.py +++ b/api/src/opentrons/hardware_control/backends/ot3controller.py @@ -79,7 +79,10 @@ from opentrons_hardware.hardware_control.types import NodeMap from opentrons_hardware.hardware_control.tools import detector, types as ohc_tool_types -from opentrons_hardware.hardware_control.tool_sensors import capacitive_probe +from opentrons_hardware.hardware_control.tool_sensors import ( + capacitive_probe, + capacitive_pass, +) if TYPE_CHECKING: from opentrons_shared_data.pipette.dev_types import PipetteName, PipetteModel @@ -670,3 +673,20 @@ async def capacitive_probe( ) self._position[axis_to_node(moving)] = pos + + async def capacitive_pass( + self, + mount: OT3Mount, + moving: OT3Axis, + distance_mm: float, + speed_mm_per_s: float, + ) -> List[float]: + data = await capacitive_pass( + self._messenger, + sensor_node_for_mount(mount), + axis_to_node(moving), + distance_mm, + speed_mm_per_s, + ) + self._position[axis_to_node(moving)] += distance_mm + return data diff --git a/api/src/opentrons/hardware_control/backends/ot3simulator.py b/api/src/opentrons/hardware_control/backends/ot3simulator.py index 56db23fb558..5b871ac51d3 100644 --- a/api/src/opentrons/hardware_control/backends/ot3simulator.py +++ b/api/src/opentrons/hardware_control/backends/ot3simulator.py @@ -457,3 +457,13 @@ async def capacitive_probe( speed_mm_per_s: float, ) -> None: self._position[axis_to_node(moving)] += distance_mm + + async def capacitive_pass( + self, + mount: OT3Mount, + moving: OT3Axis, + distance_mm: float, + speed_mm_per_s: float, + ) -> List[float]: + self._position[axis_to_node(moving)] += distance_mm + return [] diff --git a/api/src/opentrons/hardware_control/ot3api.py b/api/src/opentrons/hardware_control/ot3api.py index 5ec22faa53d..f0dbe3a7c57 100644 --- a/api/src/opentrons/hardware_control/ot3api.py +++ b/api/src/opentrons/hardware_control/ot3api.py @@ -1312,3 +1312,32 @@ async def capacitive_probe( end_pos = await self.gantry_position(mount) await self.move_to(mount, pass_start_pos) return moving_axis.of_point(end_pos) + + async def capacitive_sweep( + self, + mount: OT3Mount, + moving_axis: OT3Axis, + begin: top_types.Point, + end: top_types.Point, + speed_mm_s: float, + ) -> List[float]: + if moving_axis not in [ + OT3Axis.X, + OT3Axis.Y, + ] and moving_axis != OT3Axis.by_mount(mount): + raise RuntimeError( + "Probing must be done with a gantry axis or the mount of the sensing" + " tool" + ) + sweep_distance = moving_axis.of_point( + machine_vector_from_deck_vector( + end - begin, self._transforms.deck_calibration.attitude + ) + ) + + await self.move_to(mount, begin) + values = await self._backend.capacitive_pass( + mount, moving_axis, sweep_distance, speed_mm_s + ) + await self.move_to(mount, begin) + return values diff --git a/api/tests/opentrons/hardware_control/test_ot3_api.py b/api/tests/opentrons/hardware_control/test_ot3_api.py index 902d74c01df..c64626935b6 100644 --- a/api/tests/opentrons/hardware_control/test_ot3_api.py +++ b/api/tests/opentrons/hardware_control/test_ot3_api.py @@ -100,9 +100,29 @@ def _update_position( ot3_hardware._backend._position[axis_to_node(moving)] += distance_mm / 2 mock_probe.side_effect = _update_position + yield mock_probe +@pytest.fixture +def mock_backend_capacitive_pass( + ot3_hardware: ThreadManager[OT3API], +) -> Iterator[AsyncMock]: + backend = ot3_hardware.managed_obj._backend + with patch.object( + backend, "capacitive_pass", AsyncMock(spec=backend.capacitive_pass) + ) as mock_pass: + + async def _update_position( + mount: OT3Mount, moving: OT3Axis, distance_mm: float, speed_mm_per_s: float + ) -> None: + ot3_hardware._backend._position[axis_to_node(moving)] += distance_mm / 2 + return [1, 2, 3, 4, 5, 6, 8] + + mock_pass.side_effect = _update_position + yield mock_pass + + @pytest.mark.parametrize( "mount,moving", [ @@ -224,3 +244,55 @@ async def test_capacitive_probe_invalid_axes( await ot3_hardware.capacitive_probe(mount, moving, 2, fake_settings) mock_move_to.assert_not_called() mock_backend_capacitive_probe.assert_not_called() + + +@pytest.mark.parametrize( + "axis,begin,end,distance", + [ + # Points must be passed through the attitude transform and therefore + # flipped + (OT3Axis.X, Point(0, 0, 0), Point(1, 0, 0), -1), + (OT3Axis.Y, Point(0, 0, 0), Point(0, -1, 0), 1), + ], +) +async def test_capacitive_sweep( + axis: OT3Axis, + begin: Point, + end: Point, + distance: float, + ot3_hardware: ThreadManager[OT3API], + mock_move_to: AsyncMock, + mock_backend_capacitive_pass: AsyncMock, +) -> None: + data = await ot3_hardware.capacitive_sweep(OT3Mount.RIGHT, axis, begin, end, 3) + assert data == [1, 2, 3, 4, 5, 6, 8] + mock_backend_capacitive_pass.assert_called_once_with( + OT3Mount.RIGHT, axis, distance, 3 + ) + + +@pytest.mark.parametrize( + "mount,moving", + ( + [OT3Mount.RIGHT, OT3Axis.Z_L], + [OT3Mount.LEFT, OT3Axis.Z_R], + [OT3Mount.RIGHT, OT3Axis.P_L], + [OT3Mount.RIGHT, OT3Axis.P_R], + [OT3Mount.LEFT, OT3Axis.P_L], + [OT3Mount.RIGHT, OT3Axis.P_R], + ), +) +async def test_capacitive_sweep_invalid_axes( + ot3_hardware: ThreadManager[OT3API], + mock_move_to: AsyncMock, + mock_backend_capacitive_probe: AsyncMock, + mount: OT3Mount, + moving: OT3Axis, + fake_settings: CapacitivePassSettings, +) -> None: + with pytest.raises(RuntimeError, match=r"Probing must be done with.*"): + await ot3_hardware.capacitive_sweep( + mount, moving, Point(0, 0, 0), Point(1, 0, 0), 2 + ) + mock_move_to.assert_not_called() + mock_backend_capacitive_probe.assert_not_called() diff --git a/hardware/opentrons_hardware/hardware_control/tool_sensors.py b/hardware/opentrons_hardware/hardware_control/tool_sensors.py index d20ce232573..4d45549c7a5 100644 --- a/hardware/opentrons_hardware/hardware_control/tool_sensors.py +++ b/hardware/opentrons_hardware/hardware_control/tool_sensors.py @@ -1,5 +1,6 @@ """Functions for commanding motion limited by tool sensors.""" -from typing import Union +import asyncio +from typing import Union, List, Iterator from logging import getLogger from numpy import float64 from math import copysign @@ -16,13 +17,28 @@ SensorDataType, ) from opentrons_hardware.drivers.can_bus.can_messenger import CanMessenger -from opentrons_hardware.hardware_control.motion import MoveStopCondition, create_step +from opentrons_hardware.hardware_control.motion import ( + MoveStopCondition, + create_step, + MoveGroupStep, +) from opentrons_hardware.hardware_control.move_group_runner import MoveGroupRunner LOG = getLogger(__name__) ProbeTarget = Union[Literal[NodeId.pipette_left, NodeId.pipette_right, NodeId.gripper]] +def _build_pass_step(mover: NodeId, distance: float, speed: float) -> MoveGroupStep: + return create_step( + distance={mover: float64(abs(distance))}, + velocity={mover: float64(speed * copysign(1.0, distance))}, + acceleration={}, + duration=float64(abs(distance / speed)), + present_nodes=[mover], + stop_condition=MoveStopCondition.cap_sensor, + ) + + async def capacitive_probe( messenger: CanMessenger, tool: ProbeTarget, @@ -53,14 +69,7 @@ async def capacitive_probe( if not threshold: raise RuntimeError("Could not set threshold for probe") LOG.info(f"starting capacitive probe with threshold {threshold.to_float()}") - pass_group = create_step( - distance={mover: float64(abs(distance))}, - velocity={mover: float64(speed * copysign(1.0, distance))}, - acceleration={}, - duration=float64(abs(distance / speed)), - present_nodes=[mover], - stop_condition=MoveStopCondition.cap_sensor, - ) + pass_group = _build_pass_step(mover, distance, speed) runner = MoveGroupRunner(move_groups=[[pass_group]]) async with sensor_scheduler.bind_sync( SensorInformation(sensor_type=SensorType.capacitive, node_id=tool), @@ -69,3 +78,29 @@ async def capacitive_probe( ): position = await runner.run(can_messenger=messenger) return position[mover] + + +async def capacitive_pass( + messenger: CanMessenger, + tool: ProbeTarget, + mover: NodeId, + distance: float, + speed: float, +) -> List[float]: + """Move the specified axis while capturing capacitive sensor readings.""" + sensor_scheduler = SensorScheduler() + sensor = SensorInformation(sensor_type=SensorType.capacitive, node_id=tool) + pass_group = _build_pass_step(mover, distance, speed) + runner = MoveGroupRunner(move_groups=[[pass_group]]) + await runner.prep(messenger) + async with sensor_scheduler.capture_output(sensor, messenger) as output_queue: + await runner.execute(messenger) + + def _drain() -> Iterator[float]: + while True: + try: + yield output_queue.get_nowait() + except asyncio.QueueEmpty: + break + + return list(_drain()) diff --git a/hardware/opentrons_hardware/sensors/scheduler.py b/hardware/opentrons_hardware/sensors/scheduler.py index 39719baf5d7..e5703102e79 100644 --- a/hardware/opentrons_hardware/sensors/scheduler.py +++ b/hardware/opentrons_hardware/sensors/scheduler.py @@ -3,12 +3,13 @@ import logging from contextlib import asynccontextmanager -from typing import Optional, Type, TypeVar, Callable, AsyncIterator +from typing import Optional, Type, TypeVar, Callable, AsyncIterator, cast from opentrons_hardware.firmware_bindings.constants import ( NodeId, SensorOutputBinding, SensorType, + MessageId, ) from opentrons_hardware.firmware_bindings.arbitration_id import ArbitrationId @@ -304,3 +305,55 @@ async def bind_sync( ) ), ) + + @asynccontextmanager + async def capture_output( + self, + target_sensor: SensorInformation, + can_messenger: CanMessenger, + ) -> AsyncIterator["asyncio.Queue[float]"]: + """While acquired, capture the sensor's logging output.""" + response_queue: "asyncio.Queue[float]" = asyncio.Queue() + + def _logging_listener( + message: MessageDefinition, arb_id: ArbitrationId + ) -> None: + payload = cast(ReadFromSensorResponse, message).payload + response_queue.put_nowait( + SensorDataType.build(payload.sensor_data).to_float() + ) + + def _filter(arbitration_id: ArbitrationId) -> bool: + return ( + NodeId(arbitration_id.parts.originating_node_id) + == target_sensor.node_id + ) and ( + MessageId(arbitration_id.parts.message_id) + == MessageId.read_sensor_response + ) + + can_messenger.add_listener(_logging_listener, _filter) + await can_messenger.send( + node_id=target_sensor.node_id, + message=BindSensorOutputRequest( + payload=BindSensorOutputRequestPayload( + sensor=SensorTypeField(target_sensor.sensor_type), + binding=SensorOutputBindingField(SensorOutputBinding.report.value), + ) + ), + ) + try: + yield response_queue + finally: + can_messenger.remove_listener(_logging_listener) + await can_messenger.send( + node_id=target_sensor.node_id, + message=BindSensorOutputRequest( + payload=BindSensorOutputRequestPayload( + sensor=SensorTypeField(target_sensor.sensor_type), + binding=SensorOutputBindingField( + SensorOutputBinding.none.value + ), + ) + ), + ) diff --git a/hardware/tests/conftest.py b/hardware/tests/conftest.py index c0e714412ee..a62b4add53c 100644 --- a/hardware/tests/conftest.py +++ b/hardware/tests/conftest.py @@ -20,7 +20,9 @@ class MockCanMessageNotifier: def __init__(self) -> None: """Constructor.""" - self._listeners: List[MessageListenerCallback] = [] + self._listeners: List[ + Tuple[MessageListenerCallback, Optional[MessageListenerCallbackFilter]] + ] = [] def add_listener( self, @@ -28,11 +30,13 @@ def add_listener( filter: Optional[MessageListenerCallbackFilter] = None, ) -> None: """Add listener.""" - self._listeners.append(listener) + self._listeners.append((listener, filter)) def notify(self, message: MessageDefinition, arbitration_id: ArbitrationId) -> None: """Notify.""" - for listener in self._listeners: + for listener, filter in self._listeners: + if filter and not filter(arbitration_id): + continue listener(message, arbitration_id) diff --git a/hardware/tests/opentrons_hardware/hardware_control/test_tool_sensors.py b/hardware/tests/opentrons_hardware/hardware_control/test_tool_sensors.py index ac75fa3be9f..43941f4e09f 100644 --- a/hardware/tests/opentrons_hardware/hardware_control/test_tool_sensors.py +++ b/hardware/tests/opentrons_hardware/hardware_control/test_tool_sensors.py @@ -7,17 +7,27 @@ from opentrons_hardware.firmware_bindings.messages.message_definitions import ( ExecuteMoveGroupRequest, MoveCompleted, + ReadFromSensorResponse, ) from opentrons_hardware.firmware_bindings.messages import MessageDefinition -from opentrons_hardware.firmware_bindings.messages.payloads import MoveCompletedPayload -from opentrons_hardware.firmware_bindings.utils import UInt8Field, UInt32Field +from opentrons_hardware.firmware_bindings.messages.payloads import ( + MoveCompletedPayload, + ReadFromSensorResponsePayload, +) +from opentrons_hardware.firmware_bindings.utils import ( + UInt8Field, + UInt32Field, + Int32Field, +) from opentrons_hardware.drivers.can_bus.can_messenger import CanMessenger +from opentrons_hardware.firmware_bindings.messages.fields import SensorTypeField from tests.conftest import CanLoopback from opentrons_hardware.hardware_control.tool_sensors import ( capacitive_probe, + capacitive_pass, ProbeTarget, ) from opentrons_hardware.firmware_bindings.constants import ( @@ -136,3 +146,70 @@ def move_responder( ANY, log=ANY, ) + + +@pytest.mark.parametrize( + "target_node,motor_node,distance,speed,", + [ + (NodeId.pipette_left, NodeId.head_l, 10, 10), + (NodeId.pipette_right, NodeId.head_r, 10, -10), + (NodeId.gripper, NodeId.gripper_z, -10, 10), + (NodeId.pipette_left, NodeId.gantry_x, -10, -10), + (NodeId.gripper, NodeId.gantry_y, 10, 10), + ], +) +async def test_capacitive_sweep( + mock_messenger: AsyncMock, + message_send_loopback: CanLoopback, + mock_sensor_threshold: AsyncMock, + mock_bind_sync: AsyncMock, + target_node: ProbeTarget, + motor_node: NodeId, + distance: float, + speed: float, +) -> None: + """Test capacitive sweep.""" + + def move_responder( + node_id: NodeId, message: MessageDefinition + ) -> List[Tuple[NodeId, MessageDefinition, NodeId]]: + message.payload.serialize() + if isinstance(message, ExecuteMoveGroupRequest): + sensor_values: List[Tuple[NodeId, MessageDefinition, NodeId]] = [ + ( + NodeId.host, + ReadFromSensorResponse( + payload=ReadFromSensorResponsePayload( + sensor=SensorTypeField(SensorType.capacitive.value), + sensor_data=Int32Field(i << 16), + ) + ), + target_node, + ) + for i in range(10) + ] + move_ack: List[Tuple[NodeId, MessageDefinition, NodeId]] = [ + ( + NodeId.host, + MoveCompleted( + payload=MoveCompletedPayload( + group_id=UInt8Field(0), + seq_id=UInt8Field(0), + current_position_um=UInt32Field(10000), + encoder_position=UInt32Field(10000), + ack_id=UInt8Field(0), + ) + ), + motor_node, + ), + ] + return sensor_values + move_ack + else: + return [] + + message_send_loopback.add_responder(move_responder) + + result = await capacitive_pass( + mock_messenger, target_node, motor_node, distance, speed + ) + assert result == list(range(10)) diff --git a/hardware/tests/opentrons_hardware/sensors/test_scheduler.py b/hardware/tests/opentrons_hardware/sensors/test_scheduler.py new file mode 100644 index 00000000000..43602c44d17 --- /dev/null +++ b/hardware/tests/opentrons_hardware/sensors/test_scheduler.py @@ -0,0 +1,92 @@ +"""Tests for the sensor scheduler.""" + +import mock +import asyncio +from typing import Iterator +from opentrons_hardware.sensors import scheduler, utils +from opentrons_hardware.firmware_bindings.constants import ( + NodeId, + SensorType, + SensorOutputBinding, +) +from opentrons_hardware.firmware_bindings.arbitration_id import ( + ArbitrationId, + ArbitrationIdParts, +) + +from opentrons_hardware.firmware_bindings.utils import Int32Field + +from opentrons_hardware.firmware_bindings.messages.fields import ( + SensorTypeField, + SensorOutputBindingField, +) + +from opentrons_hardware.firmware_bindings.messages.message_definitions import ( + ReadFromSensorResponse, + BindSensorOutputRequest, +) +from opentrons_hardware.firmware_bindings.messages.payloads import ( + BindSensorOutputRequestPayload, + ReadFromSensorResponsePayload, +) + +from tests.conftest import MockCanMessageNotifier + + +async def test_capture_output( + mock_messenger: mock.AsyncMock, + can_message_notifier: MockCanMessageNotifier, +) -> None: + """Test that data is received from the polling function.""" + subject = scheduler.SensorScheduler() + stim_message = BindSensorOutputRequest( + payload=BindSensorOutputRequestPayload( + sensor=SensorTypeField(SensorType.capacitive), + binding=SensorOutputBindingField(SensorOutputBinding.report.value), + ) + ) + reset_message = BindSensorOutputRequest( + payload=BindSensorOutputRequestPayload( + sensor=SensorTypeField(SensorType.capacitive), + binding=SensorOutputBindingField(SensorOutputBinding.none.value), + ) + ) + async with subject.capture_output( + utils.SensorInformation( + sensor_type=SensorType.capacitive, node_id=NodeId.pipette_left + ), + mock_messenger, + ) as output_queue: + mock_messenger.send.assert_called_with( + node_id=NodeId.pipette_left, message=stim_message + ) + for i in range(10): + can_message_notifier.notify( + ReadFromSensorResponse( + payload=ReadFromSensorResponsePayload( + sensor=SensorTypeField(SensorType.capacitive.value), + sensor_data=Int32Field(i << 16), + ) + ), + ArbitrationId( + parts=ArbitrationIdParts( + message_id=ReadFromSensorResponse.message_id, + node_id=NodeId.host, + originating_node_id=NodeId.pipette_left, + function_code=0, + ) + ), + ) + mock_messenger.send.assert_called_with( + node_id=NodeId.pipette_left, message=reset_message + ) + + def _drain() -> Iterator[float]: + while True: + try: + yield output_queue.get_nowait() + except asyncio.QueueEmpty: + break + + for index, value in enumerate(_drain()): + assert value == index