Skip to content

Commit

Permalink
feat(api): add option to ignore different tip presence states (#14980)
Browse files Browse the repository at this point in the history
## Overview
This code adds an argument called `ht_operational_sensor` to
`get_tip_presence_status`, that when used tells the api to only return
the tip presence state of the instrument probe type specified. This
allows calibration and partial tip flows to execute and check against
their expected tip status without failing.

## TODO
A follow-up pr will go up using this parameter for the
`get_tip_presence` call in the calibration flow.

## Review Requests
I'll most likely address any non-blocking change requests in a follow-up
pr so we can cut the internal release as fast as possible, but let me
know if:

- `ht_operational_sensor` makes sense or if we can think of a better
name
- we should otherwise go about anything differently here.
  • Loading branch information
caila-marashaj authored Apr 23, 2024
1 parent 4794f55 commit cfefcbc
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 12 deletions.
4 changes: 3 additions & 1 deletion api/src/opentrons/hardware_control/backends/flex_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,9 @@ async def capacitive_pass(
def subsystems(self) -> Dict[SubSystem, SubSystemState]:
...

async def get_tip_status(self, mount: OT3Mount) -> TipStateType:
async def get_tip_status(
self, mount: OT3Mount, ht_operation_sensor: Optional[InstrumentProbeType] = None
) -> TipStateType:
...

def current_tip_state(self, mount: OT3Mount) -> Optional[bool]:
Expand Down
10 changes: 8 additions & 2 deletions api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -1521,8 +1521,14 @@ async def update_tip_detector(self, mount: OT3Mount, sensor_count: int) -> None:
async def teardown_tip_detector(self, mount: OT3Mount) -> None:
await self._tip_presence_manager.clear_detector(mount)

async def get_tip_status(self, mount: OT3Mount) -> TipStateType:
return await self.tip_presence_manager.get_tip_status(mount)
async def get_tip_status(
self,
mount: OT3Mount,
ht_operational_sensor: Optional[InstrumentProbeType] = None,
) -> TipStateType:
return await self.tip_presence_manager.get_tip_status(
mount, ht_operational_sensor
)

def current_tip_state(self, mount: OT3Mount) -> Optional[bool]:
return self.tip_presence_manager.current_tip_state(mount)
Expand Down
6 changes: 5 additions & 1 deletion api/src/opentrons/hardware_control/backends/ot3simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,11 @@ def subsystems(self) -> Dict[SubSystem, SubSystemState]:
for axis in self._present_axes
}

async def get_tip_status(self, mount: OT3Mount) -> TipStateType:
async def get_tip_status(
self,
mount: OT3Mount,
ht_operational_sensor: Optional[InstrumentProbeType] = None,
) -> TipStateType:
return TipStateType(self._sim_tip_state[mount])

def current_tip_state(self, mount: OT3Mount) -> Optional[bool]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import cast, Callable, Optional, List, Set
from typing_extensions import TypedDict, Literal

from opentrons.hardware_control.types import TipStateType, OT3Mount
from opentrons.hardware_control.types import TipStateType, OT3Mount, InstrumentProbeType

from opentrons_hardware.drivers.can_bus import CanMessenger
from opentrons_hardware.firmware_bindings.constants import NodeId
Expand All @@ -14,8 +14,11 @@
from opentrons_shared_data.errors.exceptions import (
TipDetectorNotFound,
UnmatchedTipPresenceStates,
GeneralError,
)

from .ot3utils import sensor_id_for_instrument

log = logging.getLogger(__name__)

TipListener = Callable[[OT3Mount, bool], None]
Expand Down Expand Up @@ -111,17 +114,40 @@ def current_tip_state(self, mount: OT3Mount) -> Optional[bool]:
return state

@staticmethod
def _get_tip_presence(results: List[tip_types.TipNotification]) -> TipStateType:
def _get_tip_presence(
results: List[tip_types.TipNotification],
ht_operational_sensor: Optional[InstrumentProbeType] = None,
) -> TipStateType:
"""
We can use ht_operational_sensor used to specify that we only care
about the status of one tip presence sensor on a high throughput
pipette, and the other is allowed to be different.
"""
if ht_operational_sensor:
target_sensor_id = sensor_id_for_instrument(ht_operational_sensor)
for r in results:
if r.sensor == target_sensor_id:
return TipStateType(r.presence)
# raise an error if requested sensor response isn't found
raise GeneralError(
message=f"Requested status for sensor {ht_operational_sensor} not found."
)
# more than one sensor reported, we have to check if their states match
if len(set(r.presence for r in results)) > 1:
raise UnmatchedTipPresenceStates(
{int(r.sensor): int(r.presence) for r in results}
)
return TipStateType(results[0].presence)

async def get_tip_status(self, mount: OT3Mount) -> TipStateType:
async def get_tip_status(
self,
mount: OT3Mount,
ht_operational_sensor: Optional[InstrumentProbeType] = None,
) -> TipStateType:
detector = self.get_detector(mount)
return self._get_tip_presence(await detector.request_tip_status())
return self._get_tip_presence(
await detector.request_tip_status(), ht_operational_sensor
)

def get_detector(self, mount: OT3Mount) -> TipDetector:
detector = self._detectors[self._get_key(mount)]
Expand Down
12 changes: 9 additions & 3 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2072,6 +2072,7 @@ async def _high_throughput_check_tip(self) -> AsyncIterator[None]:
async def get_tip_presence_status(
self,
mount: Union[top_types.Mount, OT3Mount],
ht_operational_sensor: Optional[InstrumentProbeType] = None,
) -> TipStateType:
"""
Check tip presence status. If a high throughput pipette is present,
Expand All @@ -2085,14 +2086,19 @@ async def get_tip_presence_status(
and self._gantry_load == GantryLoad.HIGH_THROUGHPUT
):
await stack.enter_async_context(self._high_throughput_check_tip())
result = await self._backend.get_tip_status(real_mount)
result = await self._backend.get_tip_status(
real_mount, ht_operational_sensor
)
return result

async def verify_tip_presence(
self, mount: Union[top_types.Mount, OT3Mount], expected: TipStateType
self,
mount: Union[top_types.Mount, OT3Mount],
expected: TipStateType,
ht_operational_sensor: Optional[InstrumentProbeType] = None,
) -> None:
real_mount = OT3Mount.from_mount(mount)
status = await self.get_tip_presence_status(real_mount)
status = await self.get_tip_presence_status(real_mount, ht_operational_sensor)
if status != expected:
raise FailedTipStateCheck(expected, status.value)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import AsyncIterator, Dict
from decoy import Decoy

from opentrons.hardware_control.types import OT3Mount, TipStateType
from opentrons.hardware_control.types import OT3Mount, TipStateType, InstrumentProbeType
from opentrons.hardware_control.backends.tip_presence_manager import TipPresenceManager
from opentrons_hardware.hardware_control.tip_presence import (
TipDetector,
Expand Down Expand Up @@ -110,6 +110,51 @@ async def test_get_tip_status_for_high_throughput(
result == expected_type


@pytest.mark.parametrize(
"tip_presence,expected_type,sensor_to_look_at",
[
(
{SensorId.S0: False, SensorId.S1: False},
TipStateType.ABSENT,
InstrumentProbeType.PRIMARY,
),
(
{SensorId.S0: True, SensorId.S1: True},
TipStateType.PRESENT,
InstrumentProbeType.SECONDARY,
),
(
{SensorId.S0: False, SensorId.S1: True},
TipStateType.ABSENT,
InstrumentProbeType.PRIMARY,
),
(
{SensorId.S0: False, SensorId.S1: True},
TipStateType.PRESENT,
InstrumentProbeType.SECONDARY,
),
],
)
async def test_allow_different_tip_states_ht(
subject: TipPresenceManager,
tip_detector_controller: TipDetectorController,
tip_presence: Dict[SensorId, bool],
expected_type: TipStateType,
sensor_to_look_at: InstrumentProbeType,
) -> None:
mount = OT3Mount.LEFT
await tip_detector_controller.retrieve_tip_status_highthroughput(tip_presence)

result = await subject.get_tip_status(mount, sensor_to_look_at)
result == expected_type

# if sensor_to_look_at is not used, different tip states
# should result in an UnmatchedTipStates error
if len(set(tip_presence[t] for t in tip_presence)) > 1:
with pytest.raises(UnmatchedTipPresenceStates):
result = await subject.get_tip_status(mount)


@pytest.mark.parametrize(
"tip_presence",
[
Expand Down

0 comments on commit cfefcbc

Please sign in to comment.