Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(api): Port tip consumption to StateUpdate #16469

Merged
merged 9 commits into from
Oct 11, 2024
4 changes: 3 additions & 1 deletion api/src/opentrons/protocol_engine/commands/drop_tip.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ async def execute(self, params: DropTipParams) -> SuccessData[DropTipResult, Non

await self._tip_handler.drop_tip(pipette_id=pipette_id, home_after=home_after)

state_update.update_tip_state(pipette_id=params.pipetteId, tip_geometry=None)
state_update.update_pipette_tip_state(
pipette_id=params.pipetteId, tip_geometry=None
)

return SuccessData(
public=DropTipResult(position=deck_point),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ async def execute(

state_update = update_types.StateUpdate()

state_update.update_tip_state(pipette_id=params.pipetteId, tip_geometry=None)
state_update.update_pipette_tip_state(
pipette_id=params.pipetteId, tip_geometry=None
)

return SuccessData(
public=DropTipInPlaceResult(), private=None, state_update=state_update
Expand Down
18 changes: 14 additions & 4 deletions api/src/opentrons/protocol_engine/commands/pick_up_tip.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ class TipPhysicallyMissingError(ErrorOccurrence):
of the pipette.
"""

# The thing above about marking the tips as used makes it so that
# when the protocol is resumed and the Python Protocol API calls
# `get_next_tip()`, we'll move on to other tips as expected.

isDefined: bool = True
errorType: Literal["tipPhysicallyMissing"] = "tipPhysicallyMissing"
errorCode: str = ErrorCodes.TIP_PICKUP_FAILED.value.code
Expand Down Expand Up @@ -130,11 +134,10 @@ async def execute(
labware_id=labware_id,
well_name=well_name,
)
state_update.update_tip_state(
pipette_id=pipette_id,
tip_geometry=tip_geometry,
)
except TipNotAttachedError as e:
state_update.mark_tips_as_used(
pipette_id=pipette_id, labware_id=labware_id, well_name=well_name
)
return DefinedErrorData(
public=TipPhysicallyMissingError(
id=self._model_utils.generate_id(),
Expand All @@ -150,6 +153,13 @@ async def execute(
state_update=state_update,
)
else:
state_update.update_pipette_tip_state(
pipette_id=pipette_id,
tip_geometry=tip_geometry,
)
state_update.mark_tips_as_used(
pipette_id=pipette_id, labware_id=labware_id, well_name=well_name
)
return SuccessData(
public=PickUpTipResult(
tipVolume=tip_geometry.volume,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ async def execute(
)

state_update = StateUpdate()
state_update.update_tip_state(pipette_id=params.pipetteId, tip_geometry=None)
state_update.update_pipette_tip_state(
pipette_id=params.pipetteId, tip_geometry=None
)

return SuccessData(
public=UnsafeDropTipInPlaceResult(), private=None, state_update=state_update
Expand Down
120 changes: 46 additions & 74 deletions api/src/opentrons/protocol_engine/state/tips.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,18 @@
from enum import Enum
from typing import Dict, Optional, List, Union

from opentrons.protocol_engine.state import update_types

from ._abstract_store import HasState, HandlesActions
from ..actions import (
Action,
SucceedCommandAction,
FailCommandAction,
ResetTipsAction,
)
from ..actions import Action, SucceedCommandAction, ResetTipsAction, get_state_update
from ..commands import (
Command,
LoadLabwareResult,
PickUpTip,
PickUpTipResult,
DropTipResult,
DropTipInPlaceResult,
unsafe,
)
from ..commands.configuring_common import (
PipetteConfigUpdateResultMixin,
PipetteNozzleLayoutResultMixin,
)
from ..error_recovery_policy import ErrorRecoveryType

from opentrons.hardware_control.nozzle_manager import NozzleMap

Expand All @@ -38,16 +29,26 @@ class TipRackWellState(Enum):
TipRackStateByWellName = Dict[str, TipRackWellState]


# todo(mm, 2024-10-10): This info is duplicated between here and PipetteState because
# TipStore is using it to compute which tips a PickUpTip removes from the tip rack,
# given the pipette's current nozzle map. We could avoid this duplication by moving the
# computation to TipView, calling it from PickUpTipImplementation, and passing the
# precomputed list of wells to TipStore.
@dataclass
class _PipetteInfo:
channels: int
active_channels: int
nozzle_map: NozzleMap
Comment on lines +32 to +41
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this seem like a reasonable plan?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, definitely



@dataclass
class TipState:
"""State of all tips."""

tips_by_labware_id: Dict[str, TipRackStateByWellName]
column_by_labware_id: Dict[str, List[List[str]]]

channels_by_pipette_id: Dict[str, int]
active_channels_by_pipette_id: Dict[str, int]
nozzle_map_by_pipette_id: Dict[str, NozzleMap]
pipette_info_by_pipette_id: Dict[str, _PipetteInfo]


class TipStore(HasState[TipState], HandlesActions):
Expand All @@ -60,37 +61,33 @@ def __init__(self) -> None:
self._state = TipState(
tips_by_labware_id={},
column_by_labware_id={},
channels_by_pipette_id={},
active_channels_by_pipette_id={},
nozzle_map_by_pipette_id={},
pipette_info_by_pipette_id={},
)

def handle_action(self, action: Action) -> None:
"""Modify state in reaction to an action."""
state_update = get_state_update(action)
if state_update is not None:
self._handle_state_update(state_update)

if isinstance(action, SucceedCommandAction):
if isinstance(action.private_result, PipetteConfigUpdateResultMixin):
pipette_id = action.private_result.pipette_id
config = action.private_result.config
self._state.channels_by_pipette_id[pipette_id] = config.channels
self._state.active_channels_by_pipette_id[pipette_id] = config.channels
self._state.nozzle_map_by_pipette_id[pipette_id] = config.nozzle_map
self._state.pipette_info_by_pipette_id[pipette_id] = _PipetteInfo(
channels=config.channels,
active_channels=config.channels,
nozzle_map=config.nozzle_map,
)

self._handle_succeeded_command(action.command)

if isinstance(action.private_result, PipetteNozzleLayoutResultMixin):
pipette_id = action.private_result.pipette_id
nozzle_map = action.private_result.nozzle_map
if nozzle_map:
self._state.active_channels_by_pipette_id[
pipette_id
] = nozzle_map.tip_count
self._state.nozzle_map_by_pipette_id[pipette_id] = nozzle_map
else:
self._state.active_channels_by_pipette_id[
pipette_id
] = self._state.channels_by_pipette_id[pipette_id]

elif isinstance(action, FailCommandAction):
self._handle_failed_command(action)
Comment on lines -82 to -93
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nice! Is NozzleMap always available now? I think there was some state that had nozzle map optional even though I think the map was indeed available.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nice! Is NozzleMap always available now?

Yep, for the purposes of TipStore, at least.

I think there was some state that had nozzle map optional even though I think the map was indeed available.

Good memory. I think that's this, over in PipetteStore. I'll play around with that and see if we can simplify it now. #14529 (comment)

pipette_info = self._state.pipette_info_by_pipette_id[pipette_id]
pipette_info.active_channels = nozzle_map.tip_count
pipette_info.nozzle_map = nozzle_map

elif isinstance(action, ResetTipsAction):
labware_id = action.labware_id
Expand All @@ -116,48 +113,20 @@ def _handle_succeeded_command(self, command: Command) -> None:
column for column in definition.ordering
]

elif isinstance(command.result, PickUpTipResult):
labware_id = command.params.labwareId
well_name = command.params.wellName
pipette_id = command.params.pipetteId
self._set_used_tips(
pipette_id=pipette_id, well_name=well_name, labware_id=labware_id
)

elif isinstance(
command.result,
(DropTipResult, DropTipInPlaceResult, unsafe.UnsafeDropTipInPlaceResult),
):
pipette_id = command.params.pipetteId

def _handle_failed_command(
self,
action: FailCommandAction,
) -> None:
# If a pickUpTip command fails recoverably, mark the tips as used. This way,
# when the protocol is resumed and the Python Protocol API calls
# `get_next_tip()`, we'll move on to other tips as expected.
#
# We don't attempt this for nonrecoverable errors because maybe the failure
# was due to a bad labware ID or well name.
if (
isinstance(action.running_command, PickUpTip)
and action.type != ErrorRecoveryType.FAIL_RUN
):
def _handle_state_update(self, state_update: update_types.StateUpdate) -> None:
if state_update.tips_used != update_types.NO_CHANGE:
self._set_used_tips(
pipette_id=action.running_command.params.pipetteId,
labware_id=action.running_command.params.labwareId,
well_name=action.running_command.params.wellName,
pipette_id=state_update.tips_used.pipette_id,
labware_id=state_update.tips_used.labware_id,
well_name=state_update.tips_used.well_name,
)
# Note: We're logically removing the tip from the tip rack,
# but we're not logically updating the pipette to have that tip on it.

def _set_used_tips( # noqa: C901
self, pipette_id: str, well_name: str, labware_id: str
) -> None:
columns = self._state.column_by_labware_id.get(labware_id, [])
wells = self._state.tips_by_labware_id.get(labware_id, {})
nozzle_map = self._state.nozzle_map_by_pipette_id[pipette_id]
nozzle_map = self._state.pipette_info_by_pipette_id[pipette_id].nozzle_map

# TODO (cb, 02-28-2024): Transition from using partial nozzle map to full instrument map for the set used logic
num_nozzle_cols = len(nozzle_map.columns)
Expand Down Expand Up @@ -225,7 +194,7 @@ def _identify_tip_cluster(
critical_row: int,
entry_well: str,
) -> Optional[List[str]]:
tip_cluster = []
tip_cluster: list[str] = []
for i in range(active_columns):
if entry_well == "A1" or entry_well == "H1":
if critical_column - i >= 0:
Expand Down Expand Up @@ -276,12 +245,12 @@ def _validate_tip_cluster(

# In the case of a 96ch we can attempt to index in by singular rows and columns assuming that indexed direction is safe
# The tip cluster list is ordered: Each row from a column in order by columns
tip_cluster_final_column = []
tip_cluster_final_column: list[str] = []
for i in range(active_rows):
tip_cluster_final_column.append(
tip_cluster[((active_columns * active_rows) - 1) - i]
)
tip_cluster_final_row = []
tip_cluster_final_row: list[str] = []
for i in range(active_columns):
tip_cluster_final_row.append(
tip_cluster[(active_rows - 1) + (i * active_rows)]
Expand Down Expand Up @@ -472,19 +441,22 @@ def _cluster_search_H12(active_columns: int, active_rows: int) -> Optional[str]:

def get_pipette_channels(self, pipette_id: str) -> int:
"""Return the given pipette's number of channels."""
return self._state.channels_by_pipette_id[pipette_id]
return self._state.pipette_info_by_pipette_id[pipette_id].channels

def get_pipette_active_channels(self, pipette_id: str) -> int:
"""Get the number of channels being used in the given pipette's configuration."""
return self._state.active_channels_by_pipette_id[pipette_id]
return self._state.pipette_info_by_pipette_id[pipette_id].active_channels

def get_pipette_nozzle_map(self, pipette_id: str) -> NozzleMap:
"""Get the current nozzle map the given pipette's configuration."""
return self._state.nozzle_map_by_pipette_id[pipette_id]
return self._state.pipette_info_by_pipette_id[pipette_id].nozzle_map

def get_pipette_nozzle_maps(self) -> Dict[str, NozzleMap]:
"""Get current nozzle maps keyed by pipette id."""
return self._state.nozzle_map_by_pipette_id
return {
pipette_id: pipette_info.nozzle_map
for pipette_id, pipette_info in self._state.pipette_info_by_pipette_id.items()
}

def has_clean_tip(self, labware_id: str, well_name: str) -> bool:
"""Get whether a well in a labware has a clean tip.
Expand Down
39 changes: 33 additions & 6 deletions api/src/opentrons/protocol_engine/state/update_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,23 @@ class PipetteTipStateUpdate:
tip_geometry: typing.Optional[TipGeometry]


@dataclasses.dataclass
class TipsUsedUpdate:
"""Represents an update that marks tips in a tip rack as used."""

pipette_id: str
"""The pipette that did the tip pickup."""

labware_id: str

well_name: str
"""The well that the pipette's primary nozzle targeted.

Wells in addition to this one will also be marked as used, depending on the
pipette's nozzle layout.
"""


@dataclasses.dataclass
class StateUpdate:
"""Represents an update to perform on engine state."""
Expand All @@ -154,8 +171,10 @@ class StateUpdate:

loaded_labware: LoadedLabwareUpdate | NoChangeType = NO_CHANGE

tips_used: TipsUsedUpdate | NoChangeType = NO_CHANGE

# These convenience functions let the caller avoid the boilerplate of constructing a
# complicated dataclass tree, and they give us a
# complicated dataclass tree.

@typing.overload
def set_pipette_location(
Expand Down Expand Up @@ -207,6 +226,10 @@ def set_pipette_location( # noqa: D102
new_deck_point=new_deck_point,
)

def clear_all_pipette_locations(self) -> None:
"""Mark all pipettes as having an unknown location."""
self.pipette_location = CLEAR

def set_labware_location(
self,
*,
Expand Down Expand Up @@ -238,10 +261,6 @@ def set_loaded_labware(
display_name=display_name,
)

def clear_all_pipette_locations(self) -> None:
"""Mark all pipettes as having an unknown location."""
self.pipette_location = CLEAR

def set_load_pipette(
self,
pipette_id: str,
Expand Down Expand Up @@ -274,10 +293,18 @@ def update_pipette_nozzle(self, pipette_id: str, nozzle_map: NozzleMap) -> None:
pipette_id=pipette_id, nozzle_map=nozzle_map
)

def update_tip_state(
def update_pipette_tip_state(
self, pipette_id: str, tip_geometry: typing.Optional[TipGeometry]
) -> None:
"""Update tip state."""
self.pipette_tip_state = PipetteTipStateUpdate(
pipette_id=pipette_id, tip_geometry=tip_geometry
)

def mark_tips_as_used(
self, pipette_id: str, labware_id: str, well_name: str
) -> None:
"""Mark tips in a tip rack as used. See `MarkTipsUsedState`."""
self.tips_used = TipsUsedUpdate(
pipette_id=pipette_id, labware_id=labware_id, well_name=well_name
)
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ async def test_success(
pipette_id="pipette-id",
tip_geometry=TipGeometry(length=42, diameter=5, volume=300),
),
tips_used=update_types.TipsUsedUpdate(
pipette_id="pipette-id", labware_id="labware-id", well_name="A3"
),
),
)

Expand Down Expand Up @@ -139,6 +142,9 @@ async def test_tip_physically_missing_error(
labware_id="labware-id", well_name="well-name"
),
new_deck_point=DeckPoint(x=111, y=222, z=333),
)
),
tips_used=update_types.TipsUsedUpdate(
pipette_id="pipette-id", labware_id="labware-id", well_name="well-name"
),
),
)
Loading
Loading