Skip to content

Commit

Permalink
refactor(robot-server): deck cal - jog to deck and calibration points (
Browse files Browse the repository at this point in the history
…#6389)

* refactor(robot-server): deck cal - jog to deck and calibration points

closes #6349, closes #6351

* create TypedDicts, update tests

* uncomment tavern test, fix lint error
  • Loading branch information
ahiuchingau authored Aug 24, 2020
1 parent 7fff515 commit f7da768
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 12 deletions.
25 changes: 24 additions & 1 deletion robot-server/robot_server/robot/calibration/deck/constants.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
from __future__ import annotations

from enum import Enum
from typing import TYPE_CHECKING
from opentrons.types import Point
from robot_server.robot.calibration.constants import STATE_WILDCARD

if TYPE_CHECKING:
from typing_extensions import Final
from .dev_types import StatePointMap


class DeckCalibrationState(str, Enum):
sessionStarted = "sessionStarted"
Expand All @@ -17,6 +24,22 @@ class DeckCalibrationState(str, Enum):
WILDCARD = STATE_WILDCARD


POINT_ONE_ID: Final = '1BLC'
POINT_TWO_ID: Final = '3BRC'
POINT_THREE_ID: Final = '7TLC'
JOG_TO_DECK_SLOT = '5'
TIP_RACK_SLOT = '8'

MOVE_TO_DECK_SAFETY_BUFFER = Point(0, -10, 5)
MOVE_TO_TIP_RACK_SAFETY_BUFFER = Point(0, 0, 10)
MOVE_TO_POINT_SAFETY_BUFFER = Point(0, 0, 5)

MOVE_POINT_STATE_MAP: StatePointMap = {
DeckCalibrationState.joggingToDeck: POINT_ONE_ID,
DeckCalibrationState.savingPointOne: POINT_TWO_ID,
DeckCalibrationState.savingPointTwo: POINT_THREE_ID
}
SAVE_POINT_STATE_MAP: StatePointMap = {
DeckCalibrationState.savingPointOne: POINT_ONE_ID,
DeckCalibrationState.savingPointTwo: POINT_TWO_ID,
DeckCalibrationState.savingPointThree: POINT_THREE_ID
}
26 changes: 26 additions & 0 deletions robot-server/robot_server/robot/calibration/deck/dev_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from typing import Dict, Union
from typing_extensions import TypedDict, Literal
from opentrons.types import Point

from .constants import DeckCalibrationState

SavedPoints = TypedDict(
'SavedPoints',
{
'1BLC': Point,
'3BRC': Point,
'7TLC': Point,
},
total=False
)

ExpectedPoints = TypedDict(
'ExpectedPoints',
{
'1BLC': Point,
'3BRC': Point,
'7TLC': Point,
})

StatePointMap = Dict[
DeckCalibrationState, Union[Literal['1BLC', '3BRC', '7TLC']]]
95 changes: 85 additions & 10 deletions robot-server/robot_server/robot/calibration/deck/user_flow.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
from __future__ import annotations

import logging
from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple
from typing import (
Any, Awaitable, Callable, Dict, List, Optional, Tuple,
Union, TYPE_CHECKING)

from opentrons.calibration_storage import get
from opentrons.calibration_storage.types import TipLengthCalNotFound
from opentrons.calibration_storage import helpers
from opentrons.config import feature_flags as ff
from opentrons.hardware_control import robot_calibration as robot_cal
from opentrons.hardware_control import ThreadManager, CriticalPoint
from opentrons.hardware_control.pipette import Pipette
from opentrons.protocol_api import geometry, labware
Expand All @@ -12,22 +18,34 @@
from robot_server.robot.calibration.constants import \
TIP_RACK_LOOKUP_BY_MAX_VOL
from robot_server.service.errors import RobotServerError
from opentrons.util import linal

from robot_server.service.session.models import (
CalibrationCommand, DeckCalibrationCommand)
from robot_server.robot.calibration.constants import (
SHORT_TRASH_DECK, STANDARD_DECK)
import robot_server.robot.calibration.util as uf
from .constants import (
DeckCalibrationState as State,
JOG_TO_DECK_SLOT,
POINT_ONE_ID,
POINT_TWO_ID,
POINT_THREE_ID,
TIP_RACK_SLOT,
MOVE_TO_TIP_RACK_SAFETY_BUFFER)
MOVE_TO_DECK_SAFETY_BUFFER,
MOVE_TO_TIP_RACK_SAFETY_BUFFER,
MOVE_POINT_STATE_MAP,
SAVE_POINT_STATE_MAP)
from .state_machine import DeckCalibrationStateMachine
# TODO: uncomment the following to raise deck cal errors
from ..errors import CalibrationError
from ..helper_classes import (
RequiredLabware,
AttachedPipette)

if TYPE_CHECKING:
from .dev_types import SavedPoints, ExpectedPoints


MODULE_LOG = logging.getLogger(__name__)

Expand All @@ -43,6 +61,15 @@
COMMAND_MAP = Dict[str, COMMAND_HANDLER]


def tuplefy_cal_point_dicts(
pt_dicts: Union[ExpectedPoints, SavedPoints]) -> linal.SolvePoints:
return (
tuple(pt_dicts[POINT_ONE_ID]), # type: ignore
tuple(pt_dicts[POINT_TWO_ID]),
tuple(pt_dicts[POINT_THREE_ID])
)


class DeckCalibrationUserFlow:
def __init__(self,
hardware: ThreadManager):
Expand All @@ -59,6 +86,9 @@ def __init__(self,
self._state_machine = DeckCalibrationStateMachine()

self._tip_origin_pt: Optional[Point] = None
self._z_height_reference: Optional[float] = None
self._expected_points = self._build_expected_points_dict()
self._saved_points: SavedPoints = {}

self._command_map: COMMAND_MAP = {
CalibrationCommand.load_labware: self.load_labware,
Expand Down Expand Up @@ -132,6 +162,21 @@ def _get_tip_rack_lw(self) -> labware.Labware:
return labware.load(
lw_load_name, self._deck.position_for(TIP_RACK_SLOT))

def _initialize_deck(self):
tip_rack_lw = self._get_tip_rack_lw()
self._deck[TIP_RACK_SLOT] = tip_rack_lw

def _build_expected_points_dict(self) -> ExpectedPoints:
pos_1 = self._deck.get_calibration_position(POINT_ONE_ID).position
pos_2 = self._deck.get_calibration_position(POINT_TWO_ID).position
pos_3 = self._deck.get_calibration_position(POINT_THREE_ID).position
exp_pt: ExpectedPoints = {
POINT_ONE_ID: Point(*pos_1),
POINT_TWO_ID: Point(*pos_2),
POINT_THREE_ID: Point(*pos_3)
}
return exp_pt

async def handle_command(self,
name: Any,
data: Dict[Any, Any]):
Expand Down Expand Up @@ -162,9 +207,7 @@ def _get_critical_point(self) -> CriticalPoint:
# self._hw_pipette.critical_point)

async def _get_current_point(self) -> Point:
cp = self._get_critical_point()
return await self._hardware.gantry_position(self._mount,
critical_point=cp)
return await self._hardware.gantry_position(self._mount)

async def load_labware(self):
pass
Expand All @@ -180,19 +223,51 @@ async def move_to_tip_rack(self):
await self._move(to_loc)

async def move_to_deck(self):
pass
deck_pt = self._deck.get_slot_center(JOG_TO_DECK_SLOT)
ydim = self._deck.get_slot_definition(
JOG_TO_DECK_SLOT)['boundingBox']['yDimension']
new_pt = deck_pt + Point(0, (ydim/2), 0) + \
MOVE_TO_DECK_SAFETY_BUFFER
to_loc = Location(new_pt, None)
await self._move(to_loc)

def _get_move_to_point_loc_by_state(self) -> Location:
assert self._z_height_reference
pt_id = MOVE_POINT_STATE_MAP[self._current_state]
coords = self._deck.get_calibration_position(pt_id).position
loc = Location(Point(*coords), None)
return loc.move(point=Point(0, 0, self._z_height_reference))

async def move_to_point_one(self):
pass
await self._move(self._get_move_to_point_loc_by_state())

async def move_to_point_two(self):
pass
await self._move(self._get_move_to_point_loc_by_state())

async def move_to_point_three(self):
pass
await self._move(self._get_move_to_point_loc_by_state())

async def save_offset(self):
pass
cur_pt = await self._get_current_point()
if self.current_state == State.joggingToDeck:
self._z_height_reference = cur_pt.z
else:
pt_id = SAVE_POINT_STATE_MAP[self._current_state]
self._saved_points[pt_id] = cur_pt

if self._current_state == State.savingPointThree:
self._save_attitude_matrix()

def _save_attitude_matrix(self):
e = tuplefy_cal_point_dicts(self._expected_points)
a = tuplefy_cal_point_dicts(self._saved_points)
tiprack_hash = helpers.hash_labware_def(
self._deck[TIP_RACK_SLOT]._definition) + ''
robot_cal.save_attitude_matrix(
expected=e,
actual=a,
pipette_id=self._hw_pipette.pipette_id,
tiprack_hash=tiprack_hash)

def _get_tip_length(self) -> float:
try:
Expand Down
77 changes: 76 additions & 1 deletion robot-server/tests/robot/calibration/deck/test_user_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
from opentrons.types import Mount, Point
from opentrons.hardware_control import pipette
from robot_server.robot.calibration.deck.user_flow import \
DeckCalibrationUserFlow
DeckCalibrationUserFlow, tuplefy_cal_point_dicts
from robot_server.robot.calibration.deck.constants import \
POINT_ONE_ID, POINT_TWO_ID, POINT_THREE_ID, DeckCalibrationState


@pytest.fixture
Expand Down Expand Up @@ -130,3 +132,76 @@ async def test_return_tip(mock_user_flow):
]
uf._hardware.move_to.assert_has_calls(move_calls)
uf._hardware.drop_tip.assert_called()


async def test_jog(mock_user_flow):
uf = mock_user_flow
await uf.jog(vector=(0, 0, 0.1))
assert await uf._get_current_point() == Point(0, 0, 0.1)
await uf.jog(vector=(1, 0, 0))
assert await uf._get_current_point() == Point(1, 0, 0.1)


@pytest.mark.parametrize(
"state,point_id", [
(DeckCalibrationState.joggingToDeck, POINT_ONE_ID),
(DeckCalibrationState.savingPointOne, POINT_TWO_ID),
(DeckCalibrationState.savingPointTwo, POINT_THREE_ID)])
async def test_get_move_to_cal_point_location(mock_user_flow,
state, point_id):
uf = mock_user_flow
uf._z_height_reference = 30

pt_list = uf._deck.get_calibration_position(point_id).position
exp = Point(pt_list[0], pt_list[1], 30)

uf._current_state = state
assert uf._get_move_to_point_loc_by_state().point == exp


async def test_save_z_height(mock_user_flow):
uf = mock_user_flow
uf._current_state = DeckCalibrationState.joggingToDeck
assert uf._z_height_reference is None

await uf._hardware.move_to(
mount=uf._mount,
abs_position=Point(x=10, y=10, z=10),
critical_point=uf._hw_pipette.critical_point
)
await uf.save_offset()
assert uf._z_height_reference == 10


@pytest.mark.parametrize(
"state,point_id", [
(DeckCalibrationState.savingPointOne, POINT_ONE_ID),
(DeckCalibrationState.savingPointTwo, POINT_TWO_ID),
(DeckCalibrationState.savingPointThree, POINT_THREE_ID)])
async def test_save_cal_point_offsets(mock_user_flow, state, point_id):
uf = mock_user_flow

def mock_save_attitude_matrix(*args, **kwargs):
pass

uf._save_attitude_matrix = mock_save_attitude_matrix
uf._current_state = state
await uf._hardware.move_to(
mount=uf._mount,
abs_position=Point(x=10, y=10, z=10),
critical_point=uf._hw_pipette.critical_point
)

assert state not in uf._saved_points
await uf.save_offset()
assert uf._saved_points[point_id] == Point(10, 10, 10)


def test_tuplefy_cal_point_dicts():
saved_points = {
'1BLC': Point(1, 1, 3),
'3BRC': Point(2, 2, 2),
'7TLC': Point(1, 2, 1)}

a = tuplefy_cal_point_dicts(saved_points)
assert a == ((1, 1, 3), (2, 2, 2), (1, 2, 1))

0 comments on commit f7da768

Please sign in to comment.