Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(robot-server): implement the load labware function to accept tipracks from the client #7131

Merged
merged 11 commits into from
Jan 5, 2021
2 changes: 2 additions & 0 deletions robot-server/robot_server/robot/calibration/check/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class CalibrationCheckSessionStatus(BaseModel):
comparisonsByPipette: ComparisonStatePerPipette
labware: List[RequiredLabware]
activeTipRack: RequiredLabware
supportedCommands: List[Optional[str]] = Field(
..., description="A list of supported commands for this user flow")

class Config:
arbitrary_types_allowed = True
Expand Down
27 changes: 20 additions & 7 deletions robot-server/robot_server/robot/calibration/check/user_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import robot_server.robot.calibration.util as uf
from robot_server.robot.calibration.helper_classes import (
RobotHealthCheck, PipetteRank, PipetteInfo,
RequiredLabware)
RequiredLabware, SupportedCommands)

from robot_server.service.session.models.command_definitions import \
CalibrationCommand, DeckCalibrationCommand, CheckCalibrationCommand
Expand Down Expand Up @@ -115,6 +115,7 @@ def __init__(
CalibrationCommand.invalidate_last_action: self.invalidate_last_action, # noqa: E501
CalibrationCommand.exit: self.exit_session,
}
self._supported_commands = SupportedCommands(namespace='calibration')

@property
def deck(self) -> Deck:
Expand Down Expand Up @@ -163,7 +164,11 @@ def active_tiprack(self) -> labware.Labware:
def hw_pipette(self) -> Pipette:
return self._get_hw_pipettes()[0]

async def transition(self):
@property
def supported_commands(self) -> List:
return self._supported_commands.supported()

async def transition(self, tiprackDefinition: Optional[dict] = None):
pass

async def change_active_pipette(self):
Expand Down Expand Up @@ -247,7 +252,9 @@ def _select_starting_pipette(
max_volume=pip.config.max_volume,
mount=mount,
tip_rack=self._get_tiprack_by_pipette_volume(
pip.config.max_volume, pip_calibration))
pip.config.max_volume, pip_calibration),
default_tipracks=uf.get_default_tipracks(
pip.config.default_tipracks))
return info, [info]

right_pip = pips[Mount.RIGHT]
Expand All @@ -262,14 +269,18 @@ def _select_starting_pipette(
rank=PipetteRank.first,
mount=Mount.RIGHT,
tip_rack=self._get_tiprack_by_pipette_volume(
right_pip.config.max_volume, r_calibration))
right_pip.config.max_volume, r_calibration),
default_tipracks=uf.get_default_tipracks(
right_pip.config.default_tipracks))
l_info = PipetteInfo(
channels=left_pip.config.channels,
max_volume=left_pip.config.max_volume,
rank=PipetteRank.first,
mount=Mount.LEFT,
tip_rack=self._get_tiprack_by_pipette_volume(
left_pip.config.max_volume, l_calibration))
left_pip.config.max_volume, l_calibration),
default_tipracks=uf.get_default_tipracks(
left_pip.config.default_tipracks))
if left_pip.config.max_volume > right_pip.config.max_volume or \
right_pip.config.channels > left_pip.config.channels:
r_info.rank = PipetteRank.second
Expand Down Expand Up @@ -475,7 +486,8 @@ def get_instruments(self) -> List[CheckAttachedPipette]:
tipRackUri=info_pip.tip_rack.uri,
rank=info_pip.rank.value,
mount=str(info_pip.mount),
serial=hw_pip.pipette_id) # type: ignore[arg-type]
serial=hw_pip.pipette_id, # type: ignore[arg-type]
defaultTipracks=info_pip.default_tipracks) # type: ignore[arg-type] # noqa: E501
for hw_pip, info_pip in zip(hw_pips, info_pips)]

def get_active_pipette(self) -> CheckAttachedPipette:
Expand All @@ -494,7 +506,8 @@ def get_active_pipette(self) -> CheckAttachedPipette:
tipRackUri=self.active_pipette.tip_rack.uri,
rank=self.active_pipette.rank.value,
mount=str(self.mount),
serial=self.hw_pipette.pipette_id) # type: ignore[arg-type]
serial=self.hw_pipette.pipette_id, # type: ignore[arg-type]
defaultTipracks=self.active_pipette.default_tipracks) # type: ignore[arg-type] # noqa: E501

def _determine_threshold(self) -> Point:
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pydantic import BaseModel, Field
from typing import List
from typing import List, Optional

from ..helper_classes import AttachedPipette, RequiredLabware

Expand All @@ -11,6 +11,8 @@ class DeckCalibrationSessionStatus(BaseModel):
...,
description="Current step of deck calibration user flow")
labware: List[RequiredLabware]
supportedCommands: List[Optional[str]] = Field(
..., description="A list of supported commands for this user flow")

class Config:
schema_extra = {
Expand Down
53 changes: 37 additions & 16 deletions robot-server/robot_server/robot/calibration/deck/user_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
from ..errors import CalibrationError
from ..helper_classes import (
RequiredLabware,
AttachedPipette)
AttachedPipette,
SupportedCommands)

if TYPE_CHECKING:
from .dev_types import SavedPoints, ExpectedPoints
from opentrons_shared_data.labware import LabwareDefinition


MODULE_LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -71,6 +73,7 @@ def __init__(self,
hardware: ThreadManager):
self._hardware = hardware
self._hw_pipette, self._mount = self._select_target_pipette()
self._default_tipracks = self._get_default_tipracks()

deck_load_name = SHORT_TRASH_DECK if ff.short_fixed_trash() \
else STANDARD_DECK
Expand Down Expand Up @@ -104,6 +107,8 @@ def __init__(self,
robot_cal.build_temporary_identity_calibration())
self._hw_pipette.update_pipette_offset(
robot_cal.load_pipette_offset(pip_id=None, mount=self._mount))
self._supported_commands = SupportedCommands(namespace='calibration')
self._supported_commands.loadLabware = True

@property
def deck(self) -> Deck:
Expand Down Expand Up @@ -136,6 +141,10 @@ def tip_origin(self, new_val: Optional[Point]):
def reset_tip_origin(self):
self._tip_origin_pt = None

@property
def supported_commands(self) -> List:
return self._supported_commands.supported()

@property
def current_state(self) -> State:
return self._current_state
Expand All @@ -144,16 +153,16 @@ def get_pipette(self) -> Optional[AttachedPipette]:
# TODO(mc, 2020-09-17): s/tip_length/tipLength
# TODO(mc, 2020-09-17): type of pipette_id does not match expected
# type of AttachedPipette.serial
return AttachedPipette( # type: ignore[call-arg]
return AttachedPipette(
model=self._hw_pipette.model,
name=self._hw_pipette.name,
tip_length=self._hw_pipette.config.tip_length,
tipLength=self._hw_pipette.config.tip_length,
mount=str(self._mount),
serial=self._hw_pipette.pipette_id) # type: ignore[arg-type]
serial=self._hw_pipette.pipette_id, # type: ignore[arg-type]
defaultTipracks=self._default_tipracks)

def get_required_labware(self) -> List[RequiredLabware]:
lw = self._get_tip_rack_lw()
return [RequiredLabware.from_lw(lw)]
return [RequiredLabware.from_lw(self._tip_rack)]

def _set_current_state(self, to_state: State):
self._current_state = to_state
Expand Down Expand Up @@ -190,15 +199,20 @@ def _select_target_pipette(self) -> Tuple[Pipette, Mount]:
[(right_pip, Mount.RIGHT), (left_pip, Mount.LEFT)],
key=lambda p_m: p_m[0].config.max_volume)[0]

def _get_tip_rack_lw(self) -> labware.Labware:
pip_vol = self._hw_pipette.config.max_volume
lw_load_name = TIP_RACK_LOOKUP_BY_MAX_VOL[str(pip_vol)].load_name
return labware.load(
lw_load_name, self._deck.position_for(TIP_RACK_SLOT))
def _get_tip_rack_lw(
self, tiprack_definition: Optional['LabwareDefinition'] = None
) -> labware.Labware:
if tiprack_definition:
return labware.load_from_definition(
tiprack_definition, self._deck.position_for(TIP_RACK_SLOT))
else:
pip_vol = self._hw_pipette.config.max_volume
lw_load_name = TIP_RACK_LOOKUP_BY_MAX_VOL[str(pip_vol)].load_name
return labware.load(
lw_load_name, self._deck.position_for(TIP_RACK_SLOT))

def _initialize_deck(self):
tip_rack_lw = self._get_tip_rack_lw()
self._deck[TIP_RACK_SLOT] = tip_rack_lw
def _get_default_tipracks(self):
return uf.get_default_tipracks(self.hw_pipette.config.default_tipracks)

def _build_expected_points_dict(self) -> ExpectedPoints:
pos_1 = self._deck.get_calibration_position(POINT_ONE_ID).position
Expand Down Expand Up @@ -242,8 +256,15 @@ async def get_current_point(
return await self._hardware.gantry_position(self._mount,
critical_point)

async def load_labware(self):
pass
async def load_labware(self, tiprackDefinition: dict):
self._supported_commands.loadLabware = False
if tiprackDefinition:
verified_definition = labware.verify_definition(tiprackDefinition)
self._tip_rack = self._get_tip_rack_lw(
verified_definition)
if self._deck[TIP_RACK_SLOT]:
del self._deck[TIP_RACK_SLOT]
self._deck[TIP_RACK_SLOT] = self._tip_rack

async def jog(self, vector):
await self._hardware.move_rel(mount=self._mount,
Expand Down
26 changes: 25 additions & 1 deletion robot-server/robot_server/robot/calibration/helper_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

from opentrons.types import Mount
from enum import Enum
from dataclasses import dataclass
from dataclasses import dataclass, fields
from pydantic import BaseModel, Field
from opentrons.protocol_api import labware
from opentrons.types import DeckLocation


if typing.TYPE_CHECKING:
from opentrons.protocol_api.labware import Labware
from opentrons_shared_data.labware import LabwareDefinition


class RobotHealthCheck(Enum):
Expand Down Expand Up @@ -64,6 +65,27 @@ class PipetteInfo:
max_volume: int
channels: int
tip_rack: 'Labware'
default_tipracks: typing.List['LabwareDefinition']


@dataclass
class SupportedCommands:
"""
A class that allows you to set currently supported
commands depending on the current state.
"""
loadLabware: bool = False

def __init__(self, namespace: str):
self._namespace = namespace

def supported(self):
commands = []
for field in fields(self):
result = getattr(self, field.name)
if result:
commands.append(f"{self._namespace}.{field.name}")
return commands


# TODO: BC: the mount field here is typed as a string
Expand All @@ -87,6 +109,8 @@ class AttachedPipette(BaseModel):
Field(None, description="The mount this pipette attached to")
serial: str =\
Field(None, description="The serial number of the attached pipette")
defaultTipracks: typing.List[dict] =\
Field(None, description="A list of default tipracks for this pipette")


class RequiredLabware(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,7 @@
from typing_extensions import Final


class GenericState(str, Enum):
def __getattr__(self, name: str):
# We need to define this method
# to ensure that mypy will
# understand the defined attributes
# on the subclasses.
return getattr(self.__class__, name)


class PipetteOffsetCalibrationState(GenericState):
class PipetteOffsetCalibrationState(str, Enum):
sessionStarted = "sessionStarted"
labwareLoaded = "labwareLoaded"
preparingPipette = "preparingPipette"
Expand All @@ -29,7 +20,7 @@ class PipetteOffsetCalibrationState(GenericState):
WILDCARD = STATE_WILDCARD


class PipetteOffsetWithTipLengthCalibrationState(GenericState):
class PipetteOffsetWithTipLengthCalibrationState(str, Enum):
sessionStarted = "sessionStarted"
labwareLoaded = "labwareLoaded"
measuringNozzleOffset = "measuringNozzleOffset"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ class PipetteOffsetCalibrationSessionStatus(BaseModel):
shouldPerformTipLength: bool =\
Field(None, description="Does tip length calibration data exist for "
"this pipette and tip rack combination")
supportedCommands: List[Optional[str]] = Field(
..., description="A list of supported commands for this user flow")
nextSteps: Optional[NextSteps] =\
Field(None, description="Next Available Steps in Session")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict
from typing import Dict, Type

from robot_server.service.session.models.command_definitions import \
CommandDefinition, CalibrationCommand
Expand Down Expand Up @@ -108,6 +108,19 @@ def __init__(self):
states=set(s for s in POCState),
transitions=PIP_OFFSET_CAL_TRANSITIONS
)
self._state = POCState
self._current_state = POCState.sessionStarted

@property
def state(self) -> Type[POCState]:
return self._state

@property
def current_state(self) -> POCState:
return self._current_state

def set_state(self, state: POCState):
self._current_state = state

def get_next_state(self, from_state: POCState, command: CommandDefinition):
next_state = self._state_machine.get_next_state(from_state, command)
Expand All @@ -123,6 +136,19 @@ def __init__(self):
states=set(s for s in POWTState),
transitions=PIP_OFFSET_WITH_TL_TRANSITIONS,
)
self._state = POWTState
self._current_state = POWTState.sessionStarted

@property
def state(self) -> Type[POWTState]:
return self._state

@property
def current_state(self) -> POWTState:
return self._current_state

def set_state(self, state: POWTState):
self._current_state = state

def get_next_state(
self, from_state: POWTState, command: CommandDefinition):
Expand Down
Loading