Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api,app): Allow blowout and droptip when unhomed #15816

Merged
merged 8 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ async def reset_tip_detectors(

@ExecutionManagerProvider.wait_for_running
async def _update_position_estimation(
self, axes: Optional[List[Axis]] = None
self, axes: Optional[Sequence[Axis]] = None
) -> None:
"""
Function to update motor estimation for a set of axes
Expand Down Expand Up @@ -1141,6 +1141,12 @@ async def gantry_position(
z=cur_pos[Axis.by_mount(realmount)],
)

async def update_axis_position_estimations(self, axes: Sequence[Axis]) -> None:
"""Update specified axes position estimators from their encoders."""
await self._update_position_estimation(axes)
await self._cache_current_position()
await self._cache_encoder_position()

async def move_to(
self,
mount: Union[top_types.Mount, OT3Mount],
Expand Down
10 changes: 2 additions & 8 deletions api/src/opentrons/hardware_control/protocols/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
"""Typing protocols describing a hardware controller."""
from typing_extensions import Protocol, Type

from opentrons.hardware_control.types import Axis

from .module_provider import ModuleProvider
from .hardware_manager import HardwareManager
from .chassis_accessory_manager import ChassisAccessoryManager
Expand All @@ -20,6 +18,7 @@
from .gripper_controller import GripperController
from .flex_calibratable import FlexCalibratable
from .flex_instrument_configurer import FlexInstrumentConfigurer
from .position_estimator import PositionEstimator

from .types import (
CalibrationType,
Expand Down Expand Up @@ -64,6 +63,7 @@ def cache_tip(self, mount: MountArgType, tip_length: float) -> None:


class FlexHardwareControlInterface(
PositionEstimator,
ModuleProvider,
ExecutionControllable,
LiquidHandler[CalibrationType, MountArgType, ConfigType],
Expand All @@ -87,12 +87,6 @@ class FlexHardwareControlInterface(
def get_robot_type(self) -> Type[FlexRobotType]:
return FlexRobotType

def motor_status_ok(self, axis: Axis) -> bool:
...

def encoder_status_ok(self, axis: Axis) -> bool:
...

def cache_tip(self, mount: MountArgType, tip_length: float) -> None:
...

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Protocol, Sequence

from ..types import Axis


class PositionEstimator(Protocol):
"""Position-control extensions for harwdare with encoders."""

async def update_axis_position_estimations(self, axes: Sequence[Axis]) -> None:
"""Update the specified axes' position estimators from their encoders.

This will allow these axes to make a non-home move even if they do not currently have
a position estimation (unless there is no tracked poition from the encoders, as would be
true immediately after boot).

Axis encoders have less precision than their position estimators. Calling this function will
cause absolute position drift. After this function is called, the axis should be homed before
it is relied upon for accurate motion.

This function updates only the requested axes. If other axes have bad position estimation,
moves that require those axes or attempts to get the position of those axes will still fail.
"""
...

def motor_status_ok(self, axis: Axis) -> bool:
"""Return whether an axis' position estimator is healthy.

The position estimator is healthy if the axis has
1) been homed
2) not suffered a loss-of-positioning (from a cancel or stall, for instance) since being homed

If this function returns false, getting the position of this axis or asking it to move will fail.
"""
...

def encoder_status_ok(self, axis: Axis) -> bool:
"""Return whether an axis' position encoder tracking is healthy.

The encoder status is healthy if the axis has been homed since booting up.

If this function returns false, updating the estimator from the encoder will fail.
"""
...
3 changes: 3 additions & 0 deletions api/src/opentrons/protocol_engine/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from . import temperature_module
from . import thermocycler
from . import calibration
from . import unsafe

from .hash_command_params import hash_protocol_command_params
from .generate_command_schema import generate_command_schema
Expand Down Expand Up @@ -548,6 +549,8 @@
"thermocycler",
# calibration command bundle
"calibration",
# unsafe command bundle
"unsafe",
# configure pipette volume command bundle
"ConfigureForVolume",
"ConfigureForVolumeCreate",
Expand Down
16 changes: 16 additions & 0 deletions api/src/opentrons/protocol_engine/commands/command_unions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from . import thermocycler

from . import calibration
from . import unsafe

from .set_rail_lights import (
SetRailLights,
Expand Down Expand Up @@ -387,6 +388,9 @@
calibration.CalibratePipette,
calibration.CalibrateModule,
calibration.MoveToMaintenancePosition,
unsafe.UnsafeBlowOutInPlace,
unsafe.UnsafeDropTipInPlace,
unsafe.UpdatePositionEstimators,
],
Field(discriminator="commandType"),
]
Expand Down Expand Up @@ -456,6 +460,9 @@
calibration.CalibratePipetteParams,
calibration.CalibrateModuleParams,
calibration.MoveToMaintenancePositionParams,
unsafe.UnsafeBlowOutInPlaceParams,
unsafe.UnsafeDropTipInPlaceParams,
unsafe.UpdatePositionEstimatorsParams,
]

CommandType = Union[
Expand Down Expand Up @@ -523,6 +530,9 @@
calibration.CalibratePipetteCommandType,
calibration.CalibrateModuleCommandType,
calibration.MoveToMaintenancePositionCommandType,
unsafe.UnsafeBlowOutInPlaceCommandType,
unsafe.UnsafeDropTipInPlaceCommandType,
unsafe.UpdatePositionEstimatorsCommandType,
]

CommandCreate = Annotated[
Expand Down Expand Up @@ -591,6 +601,9 @@
calibration.CalibratePipetteCreate,
calibration.CalibrateModuleCreate,
calibration.MoveToMaintenancePositionCreate,
unsafe.UnsafeBlowOutInPlaceCreate,
unsafe.UnsafeDropTipInPlaceCreate,
unsafe.UpdatePositionEstimatorsCreate,
],
Field(discriminator="commandType"),
]
Expand Down Expand Up @@ -660,6 +673,9 @@
calibration.CalibratePipetteResult,
calibration.CalibrateModuleResult,
calibration.MoveToMaintenancePositionResult,
unsafe.UnsafeBlowOutInPlaceResult,
unsafe.UnsafeDropTipInPlaceResult,
unsafe.UpdatePositionEstimatorsResult,
]

# todo(mm, 2024-06-12): Ideally, command return types would have specific
Expand Down
45 changes: 45 additions & 0 deletions api/src/opentrons/protocol_engine/commands/unsafe/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Commands that will cause inaccuracy or incorrect behavior but are still necessary."""

from .unsafe_blow_out_in_place import (
UnsafeBlowOutInPlaceCommandType,
UnsafeBlowOutInPlaceParams,
UnsafeBlowOutInPlaceResult,
UnsafeBlowOutInPlace,
UnsafeBlowOutInPlaceCreate,
)
from .unsafe_drop_tip_in_place import (
UnsafeDropTipInPlaceCommandType,
UnsafeDropTipInPlaceParams,
UnsafeDropTipInPlaceResult,
UnsafeDropTipInPlace,
UnsafeDropTipInPlaceCreate,
)

from .update_position_estimators import (
UpdatePositionEstimatorsCommandType,
UpdatePositionEstimatorsParams,
UpdatePositionEstimatorsResult,
UpdatePositionEstimators,
UpdatePositionEstimatorsCreate,
)

__all__ = [
# Unsafe blow-out-in-place command models
"UnsafeBlowOutInPlaceCommandType",
"UnsafeBlowOutInPlaceParams",
"UnsafeBlowOutInPlaceResult",
"UnsafeBlowOutInPlace",
"UnsafeBlowOutInPlaceCreate",
# Unsafe drop-tip command models
"UnsafeDropTipInPlaceCommandType",
"UnsafeDropTipInPlaceParams",
"UnsafeDropTipInPlaceResult",
"UnsafeDropTipInPlace",
"UnsafeDropTipInPlaceCreate",
# Update position estimate command models
"UpdatePositionEstimatorsCommandType",
"UpdatePositionEstimatorsParams",
"UpdatePositionEstimatorsResult",
"UpdatePositionEstimators",
"UpdatePositionEstimatorsCreate",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""Command models to blow out in place while plunger positions are unknown."""

from __future__ import annotations
from typing import TYPE_CHECKING, Optional, Type
from typing_extensions import Literal

from pydantic import BaseModel

from ..command import AbstractCommandImpl, BaseCommand, BaseCommandCreate, SuccessData
from ..pipetting_common import PipetteIdMixin, FlowRateMixin
from ...resources import ensure_ot3_hardware
from ...errors.error_occurrence import ErrorOccurrence

from opentrons.hardware_control import HardwareControlAPI
from opentrons.hardware_control.types import Axis

if TYPE_CHECKING:
from ...execution import PipettingHandler
from ...state import StateView


UnsafeBlowOutInPlaceCommandType = Literal["unsafe/blowOutInPlace"]


class UnsafeBlowOutInPlaceParams(PipetteIdMixin, FlowRateMixin):
"""Payload required to blow-out in place while position is unknown."""

pass


class UnsafeBlowOutInPlaceResult(BaseModel):
"""Result data from an UnsafeBlowOutInPlace command."""

pass


class UnsafeBlowOutInPlaceImplementation(
AbstractCommandImpl[
UnsafeBlowOutInPlaceParams, SuccessData[UnsafeBlowOutInPlaceResult, None]
]
):
"""UnsafeBlowOutInPlace command implementation."""

def __init__(
self,
pipetting: PipettingHandler,
state_view: StateView,
hardware_api: HardwareControlAPI,
**kwargs: object,
) -> None:
self._pipetting = pipetting
self._state_view = state_view
self._hardware_api = hardware_api

async def execute(
self, params: UnsafeBlowOutInPlaceParams
) -> SuccessData[UnsafeBlowOutInPlaceResult, None]:
"""Blow-out without moving the pipette even when position is unknown."""
ot3_hardware_api = ensure_ot3_hardware(self._hardware_api)
pipette_location = self._state_view.motion.get_pipette_location(
params.pipetteId
)
await ot3_hardware_api.update_axis_position_estimations(
[Axis.of_main_tool_actuator(pipette_location.mount.to_hw_mount())]
)
await self._pipetting.blow_out_in_place(
pipette_id=params.pipetteId, flow_rate=params.flowRate
)

return SuccessData(public=UnsafeBlowOutInPlaceResult(), private=None)


class UnsafeBlowOutInPlace(
BaseCommand[UnsafeBlowOutInPlaceParams, UnsafeBlowOutInPlaceResult, ErrorOccurrence]
):
"""UnsafeBlowOutInPlace command model."""

commandType: UnsafeBlowOutInPlaceCommandType = "unsafe/blowOutInPlace"
params: UnsafeBlowOutInPlaceParams
result: Optional[UnsafeBlowOutInPlaceResult]

_ImplementationCls: Type[
UnsafeBlowOutInPlaceImplementation
] = UnsafeBlowOutInPlaceImplementation


class UnsafeBlowOutInPlaceCreate(BaseCommandCreate[UnsafeBlowOutInPlaceParams]):
"""UnsafeBlowOutInPlace command request model."""

commandType: UnsafeBlowOutInPlaceCommandType = "unsafe/blowOutInPlace"
params: UnsafeBlowOutInPlaceParams

_CommandCls: Type[UnsafeBlowOutInPlace] = UnsafeBlowOutInPlace
Loading
Loading