Skip to content

Commit

Permalink
refactor(api, robot-server): add per revision configs (#7109)
Browse files Browse the repository at this point in the history
With the launch of the OT-2 refresh, we'll need to be able to express
some config elements (specifically, currents) differently based on the
hardware revision of the robot in question. This PR adds per-revision
concepts to the robot configs area, specifically around the currents,
which is a huge changeset.

It also has a bunch of refactors that were waiting a long time, like
- robot configs is a dataclass now
- A lot had to deal with that
- Removed the rest of the now-unused config elements.

Closes #7093
  • Loading branch information
sfoster1 authored Jan 4, 2021
1 parent 29fac8a commit 61d9b4f
Show file tree
Hide file tree
Showing 17 changed files with 465 additions and 224 deletions.
204 changes: 124 additions & 80 deletions api/src/opentrons/config/robot_configs.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
from collections import namedtuple
from copy import deepcopy
import json
import logging
import os
from dataclasses import asdict
from pathlib import Path

from typing import Any, Dict, List, Union, Optional
from typing import Any, Dict, List, Union, Optional, TypeVar, cast

from opentrons.config import CONFIG
from opentrons.hardware_control.types import BoardRevision
from .types import CurrentDict, RobotConfig, AxisDict

log = logging.getLogger(__name__)

ROBOT_CONFIG_VERSION = 3
ROBOT_CONFIG_VERSION = 4

PLUNGER_CURRENT_LOW = 0.05
PLUNGER_CURRENT_HIGH = 0.05
Expand All @@ -23,33 +27,66 @@
Y_CURRENT_LOW = 0.3
Y_CURRENT_HIGH = 1.25

XY_CURRENT_LOW_REFRESH = 0.7
MOUNT_CURRENT_HIGH_REFRESH = 0.5

Z_RETRACT_DISTANCE = 2

HIGH_CURRENT: Dict[str, float] = {
'X': X_CURRENT_HIGH,
'Y': Y_CURRENT_HIGH,
'Z': MOUNT_CURRENT_HIGH,
'A': MOUNT_CURRENT_HIGH,
'B': PLUNGER_CURRENT_HIGH,
'C': PLUNGER_CURRENT_HIGH
HIGH_CURRENT: CurrentDict = {
'default': {
'X': X_CURRENT_HIGH,
'Y': Y_CURRENT_HIGH,
'Z': MOUNT_CURRENT_HIGH_REFRESH,
'A': MOUNT_CURRENT_HIGH_REFRESH,
'B': PLUNGER_CURRENT_HIGH,
'C': PLUNGER_CURRENT_HIGH
},
'2.1': {
'X': X_CURRENT_HIGH,
'Y': Y_CURRENT_HIGH,
'Z': MOUNT_CURRENT_HIGH,
'A': MOUNT_CURRENT_HIGH,
'B': PLUNGER_CURRENT_HIGH,
'C': PLUNGER_CURRENT_HIGH
}
}

LOW_CURRENT: Dict[str, float] = {
'X': X_CURRENT_LOW,
'Y': Y_CURRENT_LOW,
'Z': MOUNT_CURRENT_LOW,
'A': MOUNT_CURRENT_LOW,
'B': PLUNGER_CURRENT_LOW,
'C': PLUNGER_CURRENT_LOW
LOW_CURRENT: CurrentDict = {
'default': {
'X': XY_CURRENT_LOW_REFRESH,
'Y': XY_CURRENT_LOW_REFRESH,
'Z': MOUNT_CURRENT_LOW,
'A': MOUNT_CURRENT_LOW,
'B': PLUNGER_CURRENT_LOW,
'C': PLUNGER_CURRENT_LOW
},
'2.1': {
'X': X_CURRENT_LOW,
'Y': Y_CURRENT_LOW,
'Z': MOUNT_CURRENT_LOW,
'A': MOUNT_CURRENT_LOW,
'B': PLUNGER_CURRENT_LOW,
'C': PLUNGER_CURRENT_LOW
}
}

DEFAULT_CURRENT: Dict[str, float] = {
'X': HIGH_CURRENT['X'],
'Y': HIGH_CURRENT['Y'],
'Z': HIGH_CURRENT['Z'],
'A': HIGH_CURRENT['A'],
'B': LOW_CURRENT['B'],
'C': LOW_CURRENT['C']
DEFAULT_CURRENT: CurrentDict = {
'default': {
'X': HIGH_CURRENT['default']['X'],
'Y': HIGH_CURRENT['default']['Y'],
'Z': HIGH_CURRENT['default']['Z'],
'A': HIGH_CURRENT['default']['A'],
'B': LOW_CURRENT['default']['B'],
'C': LOW_CURRENT['default']['C']
},
'2.1': {
'X': HIGH_CURRENT['2.1']['X'],
'Y': HIGH_CURRENT['2.1']['Y'],
'Z': HIGH_CURRENT['2.1']['Z'],
'A': HIGH_CURRENT['2.1']['A'],
'B': LOW_CURRENT['2.1']['B'],
'C': LOW_CURRENT['2.1']['C']
}
}

X_MAX_SPEED = 600
Expand All @@ -59,7 +96,7 @@
B_MAX_SPEED = 40
C_MAX_SPEED = 40

DEFAULT_MAX_SPEEDS: Dict[str, float] = {
DEFAULT_MAX_SPEEDS: AxisDict = {
'X': X_MAX_SPEED,
'Y': Y_MAX_SPEED,
'Z': Z_MAX_SPEED,
Expand Down Expand Up @@ -112,60 +149,67 @@
'A': 400
}

DEFAULT_STEPS_PER_MM = 'M92 X80.00 Y80.00 Z400 A400 B768 C768'

DEFAULT_MOUNT_OFFSET = [-34, 0, 0]
DEFAULT_INST_OFFSET = [0.0, 0.0, 0.0]
DEFAULT_PIPETTE_OFFSET = [0.0, 0.0, 0.0]
SERIAL_SPEED = 115200
DEFAULT_TIP_LENGTH_DICT = {'Pipette': 51.7}
DEFAULT_LOG_LEVEL = 'INFO'

robot_config = namedtuple(
'robot_config',
[
'name',
'version',
'steps_per_mm',
'gantry_steps_per_mm',
'acceleration',
'serial_speed',
'tip_length',
'default_current',
'low_current',
'high_current',
'default_max_speed',
'log_level',
'default_pipette_configs',
'z_retract_distance',
'left_mount_offset'
]
)


def _build_conf_dict(
from_conf: Union[Dict, str, None], default) -> Dict[str, float]:
if not from_conf or isinstance(from_conf, str):

def _build_hw_versioned_current_dict(
from_conf: Optional[Dict[str, Any]], default: CurrentDict) -> CurrentDict:
if not from_conf or not isinstance(from_conf, dict):
return default
# special case: if this is a valid old (i.e. not model-specific) current
# setup, migrate it.
if 'default' not in from_conf and not (set('XYZABC')-set(from_conf.keys())):
new_dct = deepcopy(default)
# Because there's no case in which a machine with a more recent revision
# than 2.1 should have a valid and edited robot config when updating
# to this code, we should default it to 2.1 to avoid breaking other
# robots
new_dct['2.1'] = cast(AxisDict, from_conf)
return new_dct
return cast(CurrentDict, from_conf)


DictType = TypeVar('DictType', bound=Dict)


def _build_dict_with_default(
from_conf: Union[DictType, str, None], default: DictType) -> DictType:
if not isinstance(from_conf, dict):
return default
else:
return from_conf
return cast(DictType, from_conf)


def current_for_revision(
current_dict: CurrentDict,
revision: BoardRevision) -> AxisDict:
if revision == BoardRevision.UNKNOWN:
return current_dict.get('2.1', current_dict['default'])
elif revision.real_name() in current_dict:
return current_dict[revision.real_name()] # type: ignore
else:
return current_dict['default']


def build_config(robot_settings: Dict[str, Any]) -> robot_config:
cfg = robot_config(
def build_config(robot_settings: Dict[str, Any]) -> RobotConfig:
return RobotConfig(
name=robot_settings.get('name', 'Ada Lovelace'),
version=int(robot_settings.get('version', ROBOT_CONFIG_VERSION)),
steps_per_mm=_build_conf_dict(
robot_settings.get('steps_per_mm'), DEFAULT_STEPS_PER_MM),
gantry_steps_per_mm=_build_conf_dict(
version=ROBOT_CONFIG_VERSION,
gantry_steps_per_mm=_build_dict_with_default(
robot_settings.get('steps_per_mm'), DEFAULT_GANTRY_STEPS_PER_MM),
acceleration=_build_conf_dict(
acceleration=_build_dict_with_default(
robot_settings.get('acceleration'), DEFAULT_ACCELERATION),
tip_length=robot_settings.get('tip_length', DEFAULT_TIP_LENGTH_DICT),
serial_speed=robot_settings.get('serial_speed', SERIAL_SPEED),
default_current=robot_settings.get('default_current', DEFAULT_CURRENT),
low_current=robot_settings.get('low_current', LOW_CURRENT),
high_current=robot_settings.get('high_current', HIGH_CURRENT),
default_current=_build_hw_versioned_current_dict(
robot_settings.get('default_current'), DEFAULT_CURRENT),
low_current=_build_hw_versioned_current_dict(
robot_settings.get('low_current'), LOW_CURRENT),
high_current=_build_hw_versioned_current_dict(
robot_settings.get('high_current'), HIGH_CURRENT),
default_max_speed=robot_settings.get(
'default_max_speed', DEFAULT_MAX_SPEEDS),
log_level=robot_settings.get('log_level', DEFAULT_LOG_LEVEL),
Expand All @@ -176,35 +220,36 @@ def build_config(robot_settings: Dict[str, Any]) -> robot_config:
left_mount_offset=robot_settings.get(
'left_mount_offset', DEFAULT_MOUNT_OFFSET),
)
return cfg


def config_to_save(
config: robot_config) -> Dict[str, Any]:
return dict(config._asdict())
config: RobotConfig) -> Dict[str, Any]:
return asdict(config)


def load():
def load() -> RobotConfig:
settings_file = CONFIG['robot_settings_file']
log.debug("Loading robot settings from {}".format(settings_file))
robot_settings = _load_json(settings_file) or {}
return build_config(robot_settings)


def save_robot_settings(config: robot_config, rs_filename=None, tag=None):
def save_robot_settings(config: RobotConfig,
rs_filename: str = None,
tag: str = None):
config_dict = config_to_save(config)

# Save everything else in a different file
rs_filename = rs_filename or CONFIG['robot_settings_file']
filename = rs_filename or CONFIG['robot_settings_file']
if tag:
root, ext = os.path.splitext(rs_filename)
rs_filename = "{}-{}{}".format(root, tag, ext)
_save_json(config_dict, filename=rs_filename)
root, ext = os.path.splitext(filename)
filename = "{}-{}{}".format(root, tag, ext)
_save_json(config_dict, filename=filename)

return config_dict


def backup_configuration(config: robot_config, tag=None):
def backup_configuration(config: RobotConfig, tag: str = None) -> None:
import time
if not tag:
tag = str(int(time.time() * 1000))
Expand All @@ -225,18 +270,18 @@ def get_legacy_gantry_calibration() -> Optional[List[List[float]]]:
return None


def clear():
def clear() -> None:
_clear_file(CONFIG['robot_settings_file'])


def _clear_file(filename):
def _clear_file(filename: Union[str, Path]) -> None:
log.debug('Deleting {}'.format(filename))
if os.path.exists(filename):
os.remove(filename)


# TODO: move to util (write a default load, save JSON function)
def _load_json(filename) -> dict:
def _load_json(filename: Union[str, Path]) -> Dict[str, Any]:
try:
with open(filename, 'r') as file:
res = json.load(file)
Expand All @@ -249,13 +294,12 @@ def _load_json(filename) -> dict:
return res


def _save_json(data, filename):
def _save_json(data: Dict[str, Any], filename: Union[str, Path]) -> None:
try:
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, 'w') as file:
json.dump(data, file, sort_keys=True, indent=4)
file.flush()
os.fsync(file.fileno())
return data
except OSError:
log.exception('Write failed with exception:')
46 changes: 46 additions & 0 deletions api/src/opentrons/config/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from dataclasses import dataclass
from typing import Dict, Tuple
from typing_extensions import TypedDict


class AxisDict(TypedDict):
X: float
Y: float
Z: float
A: float
B: float
C: float


class CurrentDictDefault(TypedDict):
default: AxisDict


CurrentDictModelEntries = TypedDict(
'CurrentDictModelEntries',
{'2.1': AxisDict,
'A': AxisDict,
'B': AxisDict,
'C': AxisDict},
total=False)


class CurrentDict(CurrentDictDefault, CurrentDictModelEntries):
pass


@dataclass
class RobotConfig:
name: str
version: int
gantry_steps_per_mm: Dict[str, float]
acceleration: Dict[str, float]
serial_speed: int
default_pipette_configs: Dict[str, float]
default_current: CurrentDict
low_current: CurrentDict
high_current: CurrentDict
default_max_speed: AxisDict
log_level: str
z_retract_distance: float
left_mount_offset: Tuple[float, float, float]
Loading

0 comments on commit 61d9b4f

Please sign in to comment.