Skip to content

Commit

Permalink
added operations_since_measurement to LiquidHeightInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
pmoegenburg committed Oct 18, 2024
1 parent 5959370 commit 661e96a
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 14 deletions.
4 changes: 3 additions & 1 deletion api/src/opentrons/protocol_engine/state/geometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1437,7 +1437,9 @@ def get_well_height_after_volume(
This is given an initial handling height, with reference to the well bottom.
"""
well_geometry = self._labware.get_well_geometry(labware_id=labware_id, well_name=well_name)
well_geometry = self._labware.get_well_geometry(
labware_id=labware_id, well_name=well_name
)
initial_volume = find_volume_at_well_height(
target_height=initial_height, well_geometry=well_geometry
)
Expand Down
52 changes: 39 additions & 13 deletions api/src/opentrons/protocol_engine/state/wells.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,33 +48,35 @@ def _handle_succeeded_command(self, command: Command) -> None:
labware_id=command.params.labwareId,
well_name=command.params.wellName,
height=command.result.z_position,
time=command.completedAt,
time=command.completedAt
if command.completedAt is not None
else command.createdAt,
)
if isinstance(command.result, LoadLiquidResult):
self._set_liquid_height_after_load(
labware_id=command.params.labwareId,
well_name=next(iter(command.params.volumeByWell)),
volume=next(iter(command.params.volumeByWell.values())),
time=command.completedAt,
time=command.completedAt
if command.completedAt is not None
else command.createdAt,
)
if isinstance(command.result, AspirateResult):
self._update_liquid_height_after_operation(
labware_id=command.params.labwareId,
well_name=command.params.wellName,
volume=-command.result.volume,
time=command.completedAt,
)
if isinstance(command.result, DispenseResult):
self._update_liquid_height_after_operation(
labware_id=command.params.labwareId,
well_name=command.params.wellName,
volume=command.result.volume,
time=command.completedAt,
)

def _handle_failed_command(self, action: FailCommandAction) -> None:
if isinstance(action.error, LiquidNotFoundError):
self._set_liquid_height_from_probe(
self._set_liquid_height_after_probe(
labware_id=action.error.private.labware_id,
well_name=action.error.private.well_name,
height=None,
Expand All @@ -85,7 +87,9 @@ def _set_liquid_height_after_probe(
self, labware_id: str, well_name: str, height: float, time: datetime
) -> None:
"""Set the liquid height of the well from a LiquidProbe command."""
lhi = LiquidHeightInfo(height=height, last_measured=time)
lhi = LiquidHeightInfo(
height=height, last_measured=time, operations_since_measurement=0
)
if labware_id not in self._state.measured_liquid_heights:
self._state.measured_liquid_heights[labware_id] = {}
self._state.measured_liquid_heights[labware_id][well_name] = lhi
Expand All @@ -94,19 +98,41 @@ def _set_liquid_height_after_load(
self, labware_id: str, well_name: str, volume: float, time: datetime
) -> None:
"""Set the liquid height of the well from a LoadLiquid command."""
height = get_well_height_at_volume(labware_id=labware_id, well_name=well_name, volume=volume)
lhi = LiquidHeightInfo(height=height, last_measured=time)
height = get_well_height_at_volume(
labware_id=labware_id, well_name=well_name, volume=volume
)
lhi = LiquidHeightInfo(
height=height, last_measured=time, operations_since_measurement=0
)
if labware_id not in self._state.measured_liquid_heights:
self._state.measured_liquid_heights[labware_id] = {}
self._state.measured_liquid_heights[labware_id][well_name] = lhi

def _update_liquid_height_after_operation(
self, labware_id: str, well_name: str, volume: float, time: datetime
self, labware_id: str, well_name: str, volume: float
) -> None:
"""Set the liquid height of the well from a LoadLiquid command."""
initial_height = self._state.measured_liquid_heights[labware_id][well_name]
height = get_well_height_after_volume(labware_id=labware_id, well_name=well_name, initial_height=initial_height, volume=volume)
lhi = LiquidHeightInfo(height=height, last_measured=time)
"""Update the liquid height of the well after an Aspirate or Dispense command."""
time = self._state.measured_liquid_heights[labware_id][well_name].last_measured
operations_since_measurement = (
self._state.measured_liquid_heights[labware_id][
well_name
].operations_since_measurement
+ 1
)
initial_height = self._state.measured_liquid_heights[labware_id][
well_name
].height
height = get_well_height_after_volume(
labware_id=labware_id,
well_name=well_name,
initial_height=initial_height,
volume=volume,
)
lhi = LiquidHeightInfo(
height=height,
last_measured=time,
operations_since_measurement=operations_since_measurement,
)
if labware_id not in self._state.measured_liquid_heights:
self._state.measured_liquid_heights[labware_id] = {}
self._state.measured_liquid_heights[labware_id][well_name] = lhi
Expand Down
1 change: 1 addition & 0 deletions api/src/opentrons/protocol_engine/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ class LiquidHeightInfo(BaseModel):

height: float
last_measured: datetime
operations_since_measurement: int


class LiquidHeightSummary(BaseModel):
Expand Down

0 comments on commit 661e96a

Please sign in to comment.