Skip to content

Commit

Permalink
feat(api): Support the new pipette configurations and 96 channel in t…
Browse files Browse the repository at this point in the history
…he hardware controller (#11830)

* feat(api): change pipette functions based on the tip working volume 
* feat(api): do not use "pipette name" and "pipette model" to look up pipettes via configurations
  • Loading branch information
Laura-Danielle committed Dec 27, 2022
1 parent df16d25 commit 65a9940
Show file tree
Hide file tree
Showing 45 changed files with 2,316 additions and 310 deletions.
10 changes: 5 additions & 5 deletions api/src/opentrons/config/defaults_ot3.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
OT3AxisKind.X: 500,
OT3AxisKind.Y: 500,
OT3AxisKind.Z: 35,
OT3AxisKind.P: 45,
OT3AxisKind.P: 5,
},
low_throughput={
OT3AxisKind.X: 500,
Expand All @@ -100,7 +100,7 @@
OT3AxisKind.X: 1000,
OT3AxisKind.Y: 1000,
OT3AxisKind.Z: 100,
OT3AxisKind.P: 50,
OT3AxisKind.P: 10,
},
low_throughput={
OT3AxisKind.X: 1000,
Expand All @@ -125,7 +125,7 @@
OT3AxisKind.Y: 10,
OT3AxisKind.Z: 10,
OT3AxisKind.Z_G: 15,
OT3AxisKind.P: 10,
OT3AxisKind.P: 5,
},
high_throughput={
OT3AxisKind.X: 10,
Expand Down Expand Up @@ -190,7 +190,7 @@
high_throughput={
OT3AxisKind.X: 0.5,
OT3AxisKind.Y: 0.5,
OT3AxisKind.Z: 0.1,
OT3AxisKind.Z: 0.8,
OT3AxisKind.P: 0.3,
},
low_throughput={
Expand Down Expand Up @@ -220,7 +220,7 @@
OT3AxisKind.X: 1.4,
OT3AxisKind.Y: 1.4,
OT3AxisKind.Z: 1.4,
OT3AxisKind.P: 1.0,
OT3AxisKind.P: 2.0,
},
low_throughput={
OT3AxisKind.X: 1.4,
Expand Down
199 changes: 192 additions & 7 deletions api/src/opentrons/config/ot3_pipette_config.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,206 @@
import re
from typing import List, Optional, Union, cast
from dataclasses import dataclass
from opentrons_shared_data.pipette import load_data
from opentrons_shared_data.pipette.dev_types import PipetteModel, PipetteName
from opentrons_shared_data.pipette.pipette_definition import (
PipetteChannelType,
PipetteModelType,
PipetteVersionType,
PipetteGenerationType,
PipetteConfigurations,
PIPETTE_AVAILABLE_TYPES,
PIPETTE_CHANNELS_INTS,
PipetteModelMajorVersionType,
PipetteModelMinorVersionType,
)

DEFAULT_CALIBRATION_OFFSET = [0.0, 0.0, 0.0]
DEFAULT_MODEL = PipetteModelType.p1000
DEFAULT_CHANNELS = PipetteChannelType.SINGLE_CHANNEL
DEFAULT_MODEL_VERSION = PipetteVersionType(major=1, minor=0)


def load_ot3_pipette(
pipette_model: str, number_of_channels: int, version: float
) -> PipetteConfigurations:
requested_model = PipetteModelType(pipette_model)
requested_channels = PipetteChannelType(number_of_channels)
requested_version = PipetteVersionType.convert_from_float(version)
# TODO (lc 12-5-2022) Ideally we can deprecate this
# at somepoint once we load pipettes by channels and type
@dataclass
class PipetteNameType:
pipette_type: PipetteModelType
pipette_channels: PipetteChannelType
pipette_generation: PipetteGenerationType

def __repr__(self) -> str:
base_name = f"{self.pipette_type.name}_{self.pipette_channels}"
if self.pipette_generation == PipetteGenerationType.GEN1:
return base_name
elif self.pipette_channels == PipetteChannelType.NINETY_SIX_CHANNEL:
return base_name
else:
return f"{base_name}_{self.pipette_generation.name.lower()}"


@dataclass
class PipetteModelVersionType:
pipette_type: PipetteModelType
pipette_channels: PipetteChannelType
pipette_version: PipetteVersionType

def __repr__(self) -> str:
base_name = f"{self.pipette_type.name}_{self.pipette_channels}"

return f"{base_name}_v{self.pipette_version}"


def channels_from_string(channels: str) -> PipetteChannelType:
"""Convert channels from a string.
With both `py:data:PipetteName` and `py:data:PipetteObject`, we refer to channel types
as `single`, `multi` or `96`.
Args:
channels (str): The channel string we wish to convert.
Returns:
PipetteChannelType: A `py:obj:PipetteChannelType`
representing the number of channels on a pipette.
"""
if channels == "96":
return PipetteChannelType.NINETY_SIX_CHANNEL
elif channels == "multi":
return PipetteChannelType.EIGHT_CHANNEL
else:
return PipetteChannelType.SINGLE_CHANNEL


def version_from_string(version: str) -> PipetteVersionType:
"""Convert a version string to a py:obj:PipetteVersionType.
The version string will either be in the format of `int.int` or `vint.int`.
Args:
version (str): The string version we wish to convert.
Returns:
PipetteVersionType: A pipette version object.
"""
version_list = [v for v in re.split("\\.|[v]", version) if v]
major = cast(PipetteModelMajorVersionType, int(version_list[0]))
minor = cast(PipetteModelMinorVersionType, int(version_list[1]))
return PipetteVersionType(major, minor)


def version_from_generation(pipette_name_list: List[str]) -> PipetteVersionType:
"""Convert a string generation name to a py:obj:PipetteVersionType.
Pipette generations are strings in the format of "gen1" or "gen2", and
usually associated withe :py:data:PipetteName.
Args:
pipette_name_list (List[str]): A list of strings from the separated by `_`
py:data:PipetteName.
Returns:
PipetteVersionType: A pipette version object.
"""
if "gen3" in pipette_name_list:
return PipetteVersionType(3, 0)
elif "gen2" in pipette_name_list:
return PipetteVersionType(2, 0)
else:
return PipetteVersionType(1, 0)


def convert_pipette_name(
name: PipetteName, provided_version: Optional[str] = None
) -> PipetteModelVersionType:
"""Convert the py:data:PipetteName to a py:obj:PipetteModelVersionType.
`PipetteNames` are in the format of "p300_single" or "p300_single_gen1".
Args:
name (PipetteName): The pipette name we want to convert.
Returns:
PipetteModelVersionType: An object representing a broken out PipetteName
string.
"""
split_pipette_name = name.split("_")
channels = channels_from_string(split_pipette_name[1])
if provided_version:
version = version_from_string(provided_version)
else:
version = version_from_generation(split_pipette_name)

pipette_type = PipetteModelType[split_pipette_name[0]]

return PipetteModelVersionType(pipette_type, channels, version)


def convert_pipette_model(
model: Optional[PipetteModel], provided_version: Optional[str] = ""
) -> PipetteModelVersionType:
"""Convert the py:data:PipetteModel to a py:obj:PipetteModelVersionType.
`PipetteModel` are in the format of "p300_single_v1.0" or "p300_single_v3.3".
Sometimes, models may not have a version, in which case the `provided_version` arg
allows you to specify a version to search for.
Args:
model (PipetteModel): The pipette model we want to convert.
provided_version (str, Optional): The provided version we'd like to look for.
Returns:
PipetteModelVersionType: An object representing a broken out PipetteName
string.
"""
# TODO (lc 12-5-2022) This helper function is needed
# until we stop using "name" and "model" to refer
# to attached pipettes.
# We need to figure out how to default the pipette model as well
# rather than returning a p1000
if model and not provided_version:
pipette_type, parsed_channels, parsed_version = model.split("_")
channels = channels_from_string(parsed_channels)
version = version_from_string(parsed_version)
elif model and provided_version:
pipette_type, parsed_channels = model.split("_")
channels = channels_from_string(parsed_channels)
version = version_from_string(provided_version)
else:
pipette_type = DEFAULT_MODEL.value
channels = DEFAULT_CHANNELS
version = DEFAULT_MODEL_VERSION
return PipetteModelVersionType(PipetteModelType[pipette_type], channels, version)


def supported_pipette(model_or_name: Union[PipetteName, PipetteModel, None]) -> bool:
"""Determine if a pipette type is supported.
Args:
model_or_name (Union[PipetteName, PipetteModel, None]): The pipette we want to check.
Returns:
bool: Whether or not the given pipette name or model is supported.
"""
if not model_or_name:
return False
split_model_or_name = model_or_name.split("_")
channels_as_int = channels_from_string(split_model_or_name[1]).as_int
if (
split_model_or_name[0] in PIPETTE_AVAILABLE_TYPES
or channels_as_int in PIPETTE_CHANNELS_INTS
):
return True
return False


def load_ot3_pipette(model_type: PipetteModelVersionType) -> PipetteConfigurations:
return load_data.load_definition(
requested_model, requested_channels, requested_version
model_type.pipette_type, model_type.pipette_channels, model_type.pipette_version
)
7 changes: 6 additions & 1 deletion api/src/opentrons/hardware_control/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
from .execution_manager import ExecutionManager
from .threaded_async_lock import ThreadedAsyncLock, ThreadedAsyncForbidden
from .protocols import HardwareControlAPI
from .instruments import AbstractInstrument, Pipette, Gripper
from .instruments import AbstractInstrument, Gripper

# TODO (lc 12-05-2022) We should 1. figure out if we need
# to globally export a class that is strictly used in the hardware controller
# and 2. how to properly export an ot2 and ot3 pipette.
from .instruments.ot2.pipette import Pipette


ThreadManagedHardware = ThreadManager[HardwareControlAPI]
Expand Down
58 changes: 39 additions & 19 deletions api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
Iterator,
)
from opentrons.config.types import OT3Config, GantryLoad
from opentrons.config import pipette_config, gripper_config
from opentrons.config import ot3_pipette_config, gripper_config
from .ot3utils import (
axis_convert,
create_move_group,
Expand Down Expand Up @@ -94,14 +94,13 @@
capacitive_pass,
)
from opentrons_hardware.drivers.gpio import OT3GPIO
from opentrons_shared_data.pipette.dev_types import PipetteName

if TYPE_CHECKING:
from opentrons_shared_data.pipette.dev_types import PipetteName, PipetteModel
from ..dev_types import (
AttachedPipette,
OT3AttachedPipette,
AttachedGripper,
OT3AttachedInstruments,
InstrumentHardwareConfigs,
)

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -415,21 +414,43 @@ async def gripper_home_jaw(self) -> None:
self._handle_motor_status_response(positions)

@staticmethod
def _synthesize_model_name(name: FirmwarePipetteName, model: str) -> "PipetteModel":
return cast("PipetteModel", f"{name.name}_v{model}")
def _lookup_serial_key(pipette_name: FirmwarePipetteName) -> str:
lookup_name = {
FirmwarePipetteName.p1000_single: "P1KS",
FirmwarePipetteName.p1000_multi: "P1KM",
FirmwarePipetteName.p50_single: "P50S",
FirmwarePipetteName.p50_multi: "P50M",
FirmwarePipetteName.p1000_96: "P1KH",
FirmwarePipetteName.p50_96: "P50H",
}
return lookup_name[pipette_name]

@staticmethod
def _combine_serial_number(pipette_info: ohc_tool_types.PipetteInformation) -> str:
serialized_name = OT3Controller._lookup_serial_key(pipette_info.name)
version = ot3_pipette_config.version_from_string(pipette_info.model)
return f"{serialized_name}V{version.major}{version.minor}{pipette_info.serial}"

@staticmethod
def _build_attached_pip(
attached: ohc_tool_types.PipetteInformation, mount: OT3Mount
) -> AttachedPipette:
) -> OT3AttachedPipette:
if attached.name == FirmwarePipetteName.unknown:
raise InvalidPipetteName(name=attached.name_int, mount=mount)
try:
# TODO (lc 12-8-2022) We should return model as an int rather than
# a string.
# TODO (lc 12-6-2022) We should also provide the full serial number
# for PipetteInformation.serial so we don't have to use
# helper methods to convert the serial back to what was flashed
# on the eeprom.
return {
"config": pipette_config.load(
OT3Controller._synthesize_model_name(attached.name, attached.model)
"config": ot3_pipette_config.load_ot3_pipette(
ot3_pipette_config.convert_pipette_name(
cast(PipetteName, attached.name.name), attached.model
)
),
"id": attached.serial,
"id": OT3Controller._combine_serial_number(attached),
}
except KeyError:
raise InvalidPipetteModel(
Expand All @@ -444,7 +465,7 @@ def _build_attached_gripper(
serial = attached.serial
return {
"config": gripper_config.load(model, serial),
"id": serial,
"id": f"GRPV{attached.model}{serial}",
}

@staticmethod
Expand Down Expand Up @@ -672,12 +693,6 @@ async def clean_up(self) -> None:
self._event_watcher.close()
return None

async def configure_mount(
self, mount: OT3Mount, config: InstrumentHardwareConfigs
) -> None:
"""Configure a mount."""
return None

@staticmethod
def _get_home_position() -> Dict[NodeId, float]:
return {
Expand Down Expand Up @@ -765,9 +780,13 @@ async def probe_network(self, timeout: float = 5.0) -> None:
# when that method actually does canbus stuff
instrs = await self.get_attached_instruments({})
expected = {NodeId.gantry_x, NodeId.gantry_y, NodeId.head}
if instrs.get(OT3Mount.LEFT, cast("AttachedPipette", {})).get("config", None):
if instrs.get(OT3Mount.LEFT, cast("OT3AttachedPipette", {})).get(
"config", None
):
expected.add(NodeId.pipette_left)
if instrs.get(OT3Mount.RIGHT, cast("AttachedPipette", {})).get("config", None):
if instrs.get(OT3Mount.RIGHT, cast("OT3AttachedPipette", {})).get(
"config", None
):
expected.add(NodeId.pipette_right)
if instrs.get(OT3Mount.GRIPPER, cast("AttachedGripper", {})).get(
"config", None
Expand All @@ -777,6 +796,7 @@ async def probe_network(self, timeout: float = 5.0) -> None:
self._present_nodes = self._replace_gripper_node(
self._replace_head_node(present)
)
log.info(f"The present nodes are now {self._present_nodes}")

def _axis_is_present(self, axis: OT3Axis) -> bool:
try:
Expand Down
Loading

0 comments on commit 65a9940

Please sign in to comment.