Skip to content

Commit

Permalink
feat(app, api): map PAPIv2 liquid handling commands to JSON v6 comman…
Browse files Browse the repository at this point in the history
…ds (#11296)

closes #11187 & RSS-61

Co-authored-by: Brian Cooper <[email protected]>
Co-authored-by: Sakib Hossain <[email protected]>
Co-authored-by: smb2268 <[email protected]>
  • Loading branch information
4 people authored and sfoster1 committed Oct 21, 2022
1 parent f32ef20 commit a19104f
Show file tree
Hide file tree
Showing 3 changed files with 667 additions and 81 deletions.
283 changes: 231 additions & 52 deletions api/src/opentrons/protocol_runner/legacy_command_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from collections import defaultdict
from datetime import datetime
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Union

from opentrons_shared_data.pipette.dev_types import PipetteNameType
from opentrons.types import MountType, DeckSlotName, Location
Expand Down Expand Up @@ -82,6 +82,7 @@ def __init__(
self._labware_id_by_slot: Dict[DeckSlotName, str] = {
DeckSlotName.FIXED_TRASH: FIXED_TRASH_ID
}
self._labware_id_by_module_id: Dict[str, str] = {}
self._pipette_id_by_mount: Dict[MountType, str] = {}
self._module_id_by_slot: Dict[DeckSlotName, str] = {}

Expand All @@ -91,7 +92,7 @@ def __init__(
] = {}
self._module_data_provider = module_data_provider or ModuleDataProvider()

def map_command(
def map_command( # noqa: C901
self,
command: legacy_command_types.CommandMessage,
) -> List[pe_actions.Action]:
Expand Down Expand Up @@ -150,6 +151,42 @@ def map_command(
"completedAt": now,
}
)
elif isinstance(running_command, pe_commands.Aspirate):
completed_command = running_command.copy(
update={
"result": pe_commands.AspirateResult.construct(
volume=running_command.params.volume
),
"status": pe_commands.CommandStatus.SUCCEEDED,
"completedAt": now,
}
)
elif isinstance(running_command, pe_commands.Dispense):
completed_command = running_command.copy(
update={
"result": pe_commands.DispenseResult.construct(
volume=running_command.params.volume
),
"status": pe_commands.CommandStatus.SUCCEEDED,
"completedAt": now,
}
)
elif isinstance(running_command, pe_commands.BlowOut):
completed_command = running_command.copy(
update={
"result": pe_commands.BlowOutResult.construct(),
"status": pe_commands.CommandStatus.SUCCEEDED,
"completedAt": now,
}
)
elif isinstance(running_command, pe_commands.Custom):
completed_command = running_command.copy(
update={
"result": pe_commands.CustomResult.construct(),
"status": pe_commands.CommandStatus.SUCCEEDED,
"completedAt": now,
}
)
else:
completed_command = running_command.copy(
update={
Expand Down Expand Up @@ -192,78 +229,222 @@ def _build_initial_command(
now: datetime,
) -> pe_commands.Command:
engine_command: pe_commands.Command
well: LegacyWell
if (
command["name"] == legacy_command_types.PICK_UP_TIP
and "instrument" in command["payload"]
and "location" in command["payload"]
and isinstance(command["payload"]["location"], LegacyWell)
):
pipette: LegacyPipetteContext = command["payload"]["instrument"]
_well = command["payload"].get("location")
if isinstance(_well, LegacyWell):
well = _well
else:
raise Exception("Unknown pick_up_tip location.")
mount = MountType(pipette.mount)
slot = DeckSlotName.from_primitive(well.parent.parent) # type: ignore[arg-type]
well_name = well.well_name
labware_id = self._labware_id_by_slot[slot]
pipette_id = self._pipette_id_by_mount[mount]
if command["name"] == legacy_command_types.PICK_UP_TIP:
engine_command = self._build_pick_up_tip_command(
command=command, command_id=command_id, now=now
)
elif command["name"] == legacy_command_types.DROP_TIP:
engine_command = self._build_drop_tip_command(
command=command, command_id=command_id, now=now
)

engine_command = pe_commands.PickUpTip.construct(
elif (
command["name"] == legacy_command_types.ASPIRATE
or command["name"] == legacy_command_types.DISPENSE
):
engine_command = self._build_liquid_handling_commands(
command=command, command_id=command_id, now=now
)
elif command["name"] == legacy_command_types.BLOW_OUT:
engine_command = self._build_blow_out_command(
command=command, command_id=command_id, now=now
)
elif command["name"] == legacy_command_types.PAUSE:
engine_command = pe_commands.WaitForResume.construct(
id=command_id,
key=command_id,
status=pe_commands.CommandStatus.RUNNING,
createdAt=now,
startedAt=now,
params=pe_commands.PickUpTipParams.construct(
pipetteId=pipette_id,
labwareId=labware_id,
wellName=well_name,
params=pe_commands.WaitForResumeParams.construct(
message=command["payload"]["userMessage"],
),
)
elif (
command["name"] == legacy_command_types.DROP_TIP
and "instrument" in command["payload"]
and "location" in command["payload"]
):
pipette: LegacyPipetteContext = command["payload"]["instrument"] # type: ignore
location = command["payload"].get("location")
if isinstance(location, Location):
well = location.labware.as_well()
else:
raise Exception("Unknown drop_tip location.")
else:
engine_command = pe_commands.Custom.construct(
id=command_id,
key=command_id,
status=pe_commands.CommandStatus.RUNNING,
createdAt=now,
startedAt=now,
params=LegacyCommandParams.construct(
legacyCommandType=command["name"],
legacyCommandText=command["payload"]["text"],
),
)

return engine_command

def _build_drop_tip_command(
self,
command: legacy_command_types.DropTipMessage,
command_id: str,
now: datetime,
) -> pe_commands.Command:
pipette: LegacyPipetteContext = command["payload"]["instrument"]
location = command["payload"]["location"]
assert location.labware.is_well
well = location.labware.as_well()
mount = MountType(pipette.mount)
# the following type checking suppression assumes the tiprack is not loaded on top of a module
slot = DeckSlotName.from_primitive(well.parent.parent) # type: ignore[arg-type]
well_name = well.well_name
labware_id = self._labware_id_by_slot[slot]
pipette_id = self._pipette_id_by_mount[mount]
return pe_commands.DropTip.construct(
id=command_id,
key=command_id,
status=pe_commands.CommandStatus.RUNNING,
createdAt=now,
startedAt=now,
params=pe_commands.DropTipParams.construct(
pipetteId=pipette_id,
labwareId=labware_id,
wellName=well_name,
),
)

def _build_pick_up_tip_command(
self,
command: legacy_command_types.PickUpTipMessage,
command_id: str,
now: datetime,
) -> pe_commands.Command:
pipette: LegacyPipetteContext = command["payload"]["instrument"]
location = command["payload"]["location"]
assert isinstance(location, LegacyWell)
well = location
mount = MountType(pipette.mount)
# the following type checking suppression assumes the tiprack is not loaded on top of a module
slot = DeckSlotName.from_primitive(well.parent.parent) # type: ignore[arg-type]
well_name = well.well_name
labware_id = self._labware_id_by_slot[slot]
pipette_id = self._pipette_id_by_mount[mount]

return pe_commands.PickUpTip.construct(
id=command_id,
key=command_id,
status=pe_commands.CommandStatus.RUNNING,
createdAt=now,
startedAt=now,
params=pe_commands.PickUpTipParams.construct(
pipetteId=pipette_id,
labwareId=labware_id,
wellName=well_name,
),
)

def _build_liquid_handling_commands(
self,
command: Union[
legacy_command_types.AspirateMessage, legacy_command_types.DispenseMessage
],
command_id: str,
now: datetime,
) -> pe_commands.Command:
pipette: LegacyPipetteContext = command["payload"]["instrument"]
location = command["payload"]["location"]
volume = command["payload"]["volume"]
# TODO:(jr, 15.08.2022): aspirate and dispense commands with no specified labware
# get filtered into custom. Refactor this in followup legacy command mapping
if isinstance(location, Location) and location.labware.is_well:
well = location.labware.as_well()
slot = DeckSlotName(location.labware.first_parent())
parent_module_id = self._module_id_by_slot.get(slot)
labware_id = (
self._labware_id_by_module_id[parent_module_id]
if parent_module_id is not None
else self._labware_id_by_slot[slot]
)
mount = MountType(pipette.mount)
slot = DeckSlotName.from_primitive(well.parent.parent) # type: ignore[arg-type]
well_name = well.well_name
labware_id = self._labware_id_by_slot[slot]
pipette_id = self._pipette_id_by_mount[mount]
engine_command = pe_commands.DropTip.construct(

if command["name"] == legacy_command_types.ASPIRATE:
flow_rate = command["payload"]["rate"] * pipette.flow_rate.aspirate
return pe_commands.Aspirate.construct(
id=command_id,
key=command_id,
status=pe_commands.CommandStatus.RUNNING,
createdAt=now,
startedAt=now,
params=pe_commands.AspirateParams.construct(
pipetteId=pipette_id,
labwareId=labware_id,
wellName=well_name,
volume=volume,
flowRate=flow_rate,
),
)
else:
flow_rate = command["payload"]["rate"] * pipette.flow_rate.dispense
return pe_commands.Dispense.construct(
id=command_id,
key=command_id,
status=pe_commands.CommandStatus.RUNNING,
createdAt=now,
startedAt=now,
params=pe_commands.DispenseParams.construct(
pipetteId=pipette_id,
labwareId=labware_id,
wellName=well_name,
volume=volume,
flowRate=flow_rate,
),
)
else:
return pe_commands.Custom.construct(
id=command_id,
key=command_id,
status=pe_commands.CommandStatus.RUNNING,
createdAt=now,
startedAt=now,
params=pe_commands.DropTipParams.construct(
pipetteId=pipette_id,
labwareId=labware_id,
wellName=well_name,
params=LegacyCommandParams.construct(
legacyCommandType=command["name"],
legacyCommandText=command["payload"]["text"],
),
)
elif command["name"] == legacy_command_types.PAUSE:
engine_command = pe_commands.WaitForResume.construct(

def _build_blow_out_command(
self,
command: legacy_command_types.BlowOutMessage,
command_id: str,
now: datetime,
) -> pe_commands.Command:
pipette: LegacyPipetteContext = command["payload"]["instrument"]
location = command["payload"]["location"]
flow_rate = pipette.flow_rate.blow_out
# TODO:(jr, 15.08.2022): blow_out commands with no specified labware get filtered
# into custom. Remove location.labware.is_empty is False when refactor is complete
if isinstance(location, Location) and location.labware.is_well:
well = location.labware.as_well()
slot = DeckSlotName(location.labware.first_parent())
parent_module_id = self._module_id_by_slot.get(slot)
labware_id = (
self._labware_id_by_module_id[parent_module_id]
if parent_module_id is not None
else self._labware_id_by_slot[slot]
)
mount = MountType(pipette.mount)
well_name = well.well_name
pipette_id = self._pipette_id_by_mount[mount]
return pe_commands.BlowOut.construct(
id=command_id,
key=command_id,
status=pe_commands.CommandStatus.RUNNING,
createdAt=now,
startedAt=now,
params=pe_commands.WaitForResumeParams.construct(
message=command["payload"]["userMessage"],
params=pe_commands.BlowOutParams.construct(
pipetteId=pipette_id,
labwareId=labware_id,
wellName=well_name,
flowRate=flow_rate,
),
)
# TODO:(jr, 15.08.2022): blow_out commands with no specified labware get filtered
# into custom. Refactor this in followup legacy command mapping
else:
engine_command = pe_commands.Custom.construct(
return pe_commands.Custom.construct(
id=command_id,
key=command_id,
status=pe_commands.CommandStatus.RUNNING,
Expand All @@ -275,8 +456,6 @@ def _build_initial_command(
),
)

return engine_command

def _map_labware_load(
self, labware_load_info: LegacyLabwareLoadInfo
) -> pe_commands.Command:
Expand Down Expand Up @@ -320,9 +499,9 @@ def _map_labware_load(

self._command_count["LOAD_LABWARE"] = count + 1
if isinstance(location, pe_types.DeckSlotLocation):
# TODO (spp, 2021-11-16): Account for labware on modules when mapping legacy
# pipetting commands; either in self._labware_id_by_slot or something else
self._labware_id_by_slot[location.slotName] = labware_id
elif isinstance(location, pe_types.ModuleLocation):
self._labware_id_by_module_id[location.moduleId] = labware_id
return load_labware_command

def _map_instrument_load(
Expand Down
Loading

0 comments on commit a19104f

Please sign in to comment.