Skip to content

Commit

Permalink
OT3 backend will always return a valid estop state machine
Browse files Browse the repository at this point in the history
  • Loading branch information
fsinapi committed Jul 21, 2023
1 parent e23f040 commit f826a1b
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 30 deletions.
8 changes: 4 additions & 4 deletions api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def __init__(
FirmwareUpdate(),
)
self._estop_detector: Optional[EstopDetector] = None
self._estop_state_machine: Optional[EstopStateMachine] = None
self._estop_state_machine = EstopStateMachine(detector=None)
self._position = self._get_home_position()
self._encoder_position = self._get_home_position()
self._motor_status = {}
Expand Down Expand Up @@ -1170,17 +1170,17 @@ def _door_listener(msg: BinaryMessageDefinition) -> None:
def status_bar_interface(self) -> status_bar.StatusBar:
return self._status_bar

async def build_estop_state_machine(self) -> bool:
async def build_estop_detector(self) -> bool:
"""Must be called to set up the estop detector & state machine."""
if self._drivers.usb_messenger is None:
return False
self._estop_detector = await EstopDetector.build(
usb_messenger=self._drivers.usb_messenger
)
self._estop_state_machine = EstopStateMachine(self._estop_detector)
self._estop_state_machine.subscribe_to_detector(self._estop_detector)
return True

@property
def estop_state_machine(self) -> Optional[EstopStateMachine]:
def estop_state_machine(self) -> EstopStateMachine:
"""Accessor for the API to get the state machine, if it exists."""
return self._estop_state_machine
8 changes: 5 additions & 3 deletions api/src/opentrons/hardware_control/backends/ot3simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
Move,
Coordinates,
)
from opentrons.hardware_control.estop_state import EstopStateMachine
from opentrons_hardware.drivers.eeprom import EEPROMData
from opentrons.hardware_control.module_control import AttachedModulesControl
from opentrons.hardware_control import modules
Expand Down Expand Up @@ -137,6 +138,7 @@ def __init__(
self._update_required = False
self._initialized = False
self._lights = {"button": False, "rails": False}
self._estop_state_machine = EstopStateMachine(detector=None)

def _sanitize_attached_instrument(
mount: OT3Mount, passed_ai: Optional[Dict[str, Optional[str]]] = None
Expand Down Expand Up @@ -690,6 +692,6 @@ def subsystems(self) -> Dict[SubSystem, SubSystemState]:
}

@property
def estop_state_machine(self) -> None:
"""Simulator will not return an estop state machine."""
return None
def estop_state_machine(self) -> EstopStateMachine:
"""Return an estop state machine locked in the "disengaged" state."""
return self._estop_state_machine
36 changes: 28 additions & 8 deletions api/src/opentrons/hardware_control/estop_state.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""opentrons.hardware_control.estop_state: module to manage estop state machine on OT-3."""

from typing import List
from typing import List, Optional
from opentrons_hardware.hardware_control.estop.detector import (
EstopSummary,
EstopDetector,
Expand All @@ -18,17 +18,37 @@
class EstopStateMachine:
"""Class to manage global Estop state."""

def __init__(self, detector: EstopDetector) -> None:
"""Create a new EstopStateMachine."""
self._detector = detector
def __init__(self, detector: Optional[EstopDetector]) -> None:
"""Create a new EstopStateMachine.
If detector is None, the state machine will be initialized in
a happy state (Disengaged, both estops detected) until it is
hooked up to a valid detector.
"""
self._detector: Optional[EstopDetector] = None
self._state: EstopState = EstopState.DISENGAGED
self._summary = detector.status
self._transition_from_disengaged()
detector.add_listener(self.detector_listener)
# Start off in a happy state until a detector is added
self._summary = EstopSummary(
left_detected=True, right_detected=True, engaged=False
)
if detector is not None:
self.subscribe_to_detector(detector=detector)
self._listeners: List[HardwareEventHandler] = []

def subscribe_to_detector(self, detector: EstopDetector) -> None:
"""Configure the estop state machine to listen to a detector.
This function will also transition the state based on the current
status of the detector."""
if self._detector is not None:
self._detector.remove_listener(self.detector_listener)
self._detector = detector
detector.add_listener(listener=self.detector_listener)
self._handle_state_transition(new_summary=detector.status)

def __del__(self) -> None:
self._detector.remove_listener(self.detector_listener)
if self._detector is not None:
self._detector.remove_listener(self.detector_listener)

def add_listener(self, listener: HardwareEventHandler) -> None:
"""Add a hardware event listener for estop event changes."""
Expand Down
13 changes: 2 additions & 11 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@
TipStateType,
EstopOverallStatus,
EstopAttachLocation,
EstopState,
EstopPhysicalStatus,
)
from .errors import (
MustHomeError,
Expand Down Expand Up @@ -321,7 +319,7 @@ async def build_hardware_controller(
api_instance, board_revision=backend.board_revision
)
backend.module_controls = module_controls
await backend.build_estop_state_machine()
await backend.build_estop_detector()
door_state = await backend.door_state()
api_instance._update_door_state(door_state)
backend.add_door_state_listener(api_instance._update_door_state)
Expand Down Expand Up @@ -2256,12 +2254,6 @@ def attached_subsystems(self) -> Dict[SubSystem, SubSystemState]:

@property
def estop_status(self) -> EstopOverallStatus:
if self._backend.estop_state_machine is None:
return EstopOverallStatus(
state=EstopState.DISENGAGED,
left_physical_state=EstopPhysicalStatus.DISENGAGED,
right_physical_state=EstopPhysicalStatus.DISENGAGED,
)
return EstopOverallStatus(
state=self._backend.estop_state_machine.state,
left_physical_state=self._backend.estop_state_machine.get_physical_status(
Expand All @@ -2276,6 +2268,5 @@ def estop_acknowledge_and_clear(self) -> EstopOverallStatus:
"""Attempt to acknowledge an Estop event and clear the status.
Returns the estop status after clearing the status."""
if self._backend.estop_state_machine is not None:
self._backend.estop_state_machine.acknowledge_and_clear()
self._backend.estop_state_machine.acknowledge_and_clear()
return self.estop_status
45 changes: 44 additions & 1 deletion api/tests/opentrons/hardware_control/test_estop_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,53 @@ def mock_estop_detector(decoy: Decoy, initial_state: EstopSummary) -> EstopDetec


@pytest.fixture
def subject(mock_estop_detector: EstopDetector, decoy: Decoy) -> EstopStateMachine:
def subject(mock_estop_detector: EstopDetector) -> EstopStateMachine:
return EstopStateMachine(detector=mock_estop_detector)


async def test_estop_state_no_detector(
mock_estop_detector: EstopDetector, decoy: Decoy
) -> None:
"""Test that the estop state machine works without a detector."""
subject = EstopStateMachine(detector=None)
assert subject.state == EstopState.DISENGAGED
assert (
subject.get_physical_status(EstopAttachLocation.LEFT)
== EstopPhysicalStatus.DISENGAGED
)
assert (
subject.get_physical_status(EstopAttachLocation.RIGHT)
== EstopPhysicalStatus.DISENGAGED
)

decoy.when(mock_estop_detector.status).then_return(
EstopSummary(left_detected=False, right_detected=True, engaged=True)
)

subject.subscribe_to_detector(detector=mock_estop_detector)

assert subject.state == EstopState.PHYSICALLY_ENGAGED
assert (
subject.get_physical_status(EstopAttachLocation.LEFT)
== EstopPhysicalStatus.NOT_PRESENT
)
assert (
subject.get_physical_status(EstopAttachLocation.RIGHT)
== EstopPhysicalStatus.ENGAGED
)

# Check that adding a second listener will wipe out the first one
subject.subscribe_to_detector(detector=mock_estop_detector)

decoy.verify(
[
mock_estop_detector.add_listener(subject.detector_listener),
mock_estop_detector.remove_listener(subject.detector_listener),
mock_estop_detector.add_listener(subject.detector_listener),
]
)


async def test_estop_state_listener(
subject: EstopStateMachine, mock_estop_detector: EstopDetector, decoy: Decoy
) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Detector for estop status messages."""
from opentrons_hardware.drivers.binary_usb import BinaryMessenger

from typing import List, Callable
from typing import List, Callable, Optional

from dataclasses import dataclass

Expand Down Expand Up @@ -40,7 +40,7 @@ class EstopDetector:
"""

def __init__(
self, usb_messenger: BinaryMessenger, initial_state: EstopSummary
self, usb_messenger: Optional[BinaryMessenger], initial_state: EstopSummary
) -> None:
"""Create a new EstopDetector."""
self._usb_messenger = usb_messenger
Expand All @@ -59,7 +59,8 @@ def __init__(
)

def __del__(self) -> None:
self._usb_messenger.remove_listener(self._estop_connected_listener)
if self._usb_messenger is not None:
self._usb_messenger.remove_listener(self._estop_connected_listener)

@staticmethod
async def build(usb_messenger: BinaryMessenger) -> "EstopDetector":
Expand Down

0 comments on commit f826a1b

Please sign in to comment.