Skip to content

Commit

Permalink
feat(robot-server): status bar animation during firmware updates (#12954
Browse files Browse the repository at this point in the history
)

* Run animations for subsystem updates
* Made the activation animation a little smoother
* Added tests for animations during updates
  • Loading branch information
fsinapi authored Jun 26, 2023
1 parent 7322ac8 commit 7ae3efc
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 8 deletions.
3 changes: 0 additions & 3 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,6 @@ async def build_hardware_controller(
api_instance = cls(backend, loop=checked_loop, config=checked_config)

await api_instance.set_status_bar_enabled(status_bar_enabled)
# TODO: Remove this line once the robot server runs the startup
# animation after initialization!
await api_instance.set_status_bar_state(StatusBarState.IDLE)
module_controls = await AttachedModulesControl.build(
api_instance, board_revision=backend.board_revision
)
Expand Down
3 changes: 2 additions & 1 deletion api/src/opentrons/hardware_control/status_bar_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ async def _status_bar_activation(self) -> None:
# This animation uses an intermediate color between the blue and the white.
# This results in a sort of light-blue effect.
steps: List[status_bar.ColorStep] = [
status_bar.ColorStep(LightTransitionType.instant, 1000, status_bar.OFF),
status_bar.ColorStep(LightTransitionType.linear, 250, status_bar.OFF),
status_bar.ColorStep(LightTransitionType.linear, 750, status_bar.OFF),
status_bar.ColorStep(LightTransitionType.linear, 1000, status_bar.BLUE),
status_bar.ColorStep(
LightTransitionType.linear,
Expand Down
8 changes: 7 additions & 1 deletion robot-server/robot_server/hardware.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
from opentrons.util.helpers import utc_now
from opentrons.hardware_control import ThreadManagedHardware, HardwareControlAPI
from opentrons.hardware_control.simulator_setup import load_simulator_thread_manager
from opentrons.hardware_control.types import HardwareEvent, DoorStateNotification
from opentrons.hardware_control.types import (
HardwareEvent,
DoorStateNotification,
StatusBarState,
)
from opentrons.protocols.api_support.deck_type import (
guess_from_global_config as guess_deck_type_from_global_config,
)
Expand Down Expand Up @@ -306,6 +310,8 @@ async def _postinit_ot3_tasks(
await _do_updates(hardware, update_manager)
await hardware.cache_instruments()
await _home_on_boot(hardware)
update_manager.mark_initialized()
await hardware.set_status_bar_state(StatusBarState.ACTIVATION)
except Exception:
log.exception("Hardware initialization failure")
raise
Expand Down
102 changes: 101 additions & 1 deletion robot-server/robot_server/subsystems/firmware_update_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
Callable,
Awaitable,
List,
Set,
)

from opentrons.hardware_control.types import (
SubSystem as HWSubSystem,
StatusBarState,
)
from opentrons.hardware_control.errors import UpdateOngoingError

Expand Down Expand Up @@ -244,6 +246,84 @@ def _drain_queue_provide_last(self) -> Optional[UpdateProgress]:
return packet


class AnimationHandler:
"""Interface to manage animations to represent firmware updates.
The status bar should be set to the updating animation whenever an update
is in progress, but this is complicated by a couple of factors:
- When the rear panel firmware is being updated, the status bar will be stuck
off because it is controlled by that MCU
- When all updates are finished, the next state depends on whether the updates
are automatic updates on startup (which should lead into the off status) or
the updates are manually requested by a client (in which case we should
transition back to the Idle status).
This class is left public to enable mocking for testing.
"""

def __init__(self, hw_handle: "OT3API") -> None:
self._hardware_handle = hw_handle
self._initialized = False
self._in_progress: Set[SubSystem] = set()
self._lock = Lock()

def mark_initialized(self) -> None:
"""Mark that the on-startup updates have finished.
Once this has been called, update completion will rever the status bar
to `IDLE` instead of `OFF`.
"""
self._initialized = True

async def update_started(self, subsystem: SubSystem) -> None:
"""Update status bar when a new update is started.
If this is the only running update, start the `update` animation ONLY if
the rear panel is not currently updating.
"""
async with self._lock:
if subsystem in self._in_progress:
return
self._in_progress.add(subsystem)

if SubSystem.rear_panel in self._in_progress:
# We can't control the status bar
return
if len(self._in_progress) == 1:
# This is the only update running, update the status bar
await self._hardware_handle.set_status_bar_state(
StatusBarState.UPDATING
)

async def update_complete(self, subsystem: SubSystem) -> None:
"""Update status bar when an update finishes (succesfully or not).
- If the rear panel just finished AND there are other updates running, start
the update animation
- If the last remaining update just finished, set the status to IDLE or OFF
(based on whether the `mark_initialized` function has been called yet).
- Otherwise, do nothing.
"""
async with self._lock:
if subsystem not in self._in_progress:
return
self._in_progress.remove(subsystem)

if SubSystem.rear_panel in self._in_progress:
# We can't control the status bar
return
if len(self._in_progress) > 0 and subsystem == SubSystem.rear_panel:
# The rear panel just finished AND we're still updating, so we
# have to set the rear panel to go back to blinking
await self._hardware_handle.set_status_bar_state(
StatusBarState.UPDATING
)
elif len(self._in_progress) == 0:
# There are no more updates.
state = StatusBarState.IDLE if self._initialized else StatusBarState.OFF
await self._hardware_handle.set_status_bar_state(state)


class UpdateProcessHandle:
"""The external interface to get status notifications from the update process."""

Expand Down Expand Up @@ -307,12 +387,22 @@ class FirmwareUpdateManager:
_task_runner: TaskRunner
_hardware_handle: "OT3API"

def __init__(self, task_runner: TaskRunner, hw_handle: "OT3API") -> None:
_animation_handler: AnimationHandler

def __init__(
self,
task_runner: TaskRunner,
hw_handle: "OT3API",
animation_handler: Optional[AnimationHandler] = None,
) -> None:
self._all_updates_by_id = {}
self._running_updates_by_subsystem = {}
self._task_runner = task_runner
self._management_lock = Lock()
self._hardware_handle = hw_handle
self._animation_handler = animation_handler or AnimationHandler(
hw_handle=hw_handle
)

async def _get_by_id(self, update_id: str) -> _UpdateProcess:
async with self._management_lock:
Expand Down Expand Up @@ -342,6 +432,7 @@ async def _complete() -> None:
# make sure this process gets its progress updated since nothing may
# update it from the route handler after this
await process.provide_latest_progress()
await self._animation_handler.update_complete(subsystem)
except KeyError:
log.exception(f"Double pop for update on {subsystem}")

Expand All @@ -351,6 +442,7 @@ async def _complete() -> None:
self._running_updates_by_subsystem[hw_subsystem] = self._all_updates_by_id[
update_id
]
await self._animation_handler.update_started(subsystem=subsystem)
self._task_runner.run(self._all_updates_by_id[update_id]._update_task)
return self._all_updates_by_id[update_id]

Expand Down Expand Up @@ -415,3 +507,11 @@ async def start_update_process(
process = await self._emplace(update_id, subsystem, created_at)
await process.provide_latest_progress()
return process.get_handle()

def mark_initialized(self) -> None:
"""Mark that the on-startup updates are complete.
This is used by the animation handler to dictate what the status bar should do
when updates finish, which depends on whether the robot is initializing or not.
"""
self._animation_handler.mark_initialized()
59 changes: 57 additions & 2 deletions robot-server/tests/subsystems/test_firmware_update_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
SubSystem as HWSubSystem,
UpdateStatus as HWUpdateStatus,
SubSystemState,
StatusBarState,
)
from opentrons.hardware_control.errors import UpdateOngoingError as HWUpdateOngoingError

Expand All @@ -30,6 +31,7 @@
UpdateInProgress,
SubsystemNotFound,
NoOngoingUpdate,
AnimationHandler,
)

from robot_server.subsystems.models import UpdateState, SubSystem
Expand All @@ -44,6 +46,12 @@ def decoy_task_runner(decoy: Decoy) -> TaskRunner:
return decoy.mock(cls=TaskRunner)


@pytest.fixture
def decoy_animation_handler(decoy: Decoy) -> AnimationHandler:
"""Get a mocked out AnimationHandler."""
return decoy.mock(cls=AnimationHandler)


@pytest.fixture
async def task_runner() -> AsyncIterator[TaskRunner]:
"""Get a real task runner that will be cleaned up properly."""
Expand All @@ -66,9 +74,13 @@ def ot3_hardware_api(decoy: Decoy) -> OT3API:


@pytest.fixture
def subject(task_runner: TaskRunner, ot3_hardware_api: OT3API) -> FirmwareUpdateManager:
def subject(
task_runner: TaskRunner,
ot3_hardware_api: OT3API,
decoy_animation_handler: AnimationHandler,
) -> FirmwareUpdateManager:
"""Get a FirmwareUpdateManager to test."""
return FirmwareUpdateManager(task_runner, ot3_hardware_api)
return FirmwareUpdateManager(task_runner, ot3_hardware_api, decoy_animation_handler)


def _build_attached_subsystem(
Expand Down Expand Up @@ -270,6 +282,7 @@ async def test_complete_updates_leave_ongoing(
updater: MockUpdater,
subject: FirmwareUpdateManager,
ot3_hardware_api: OT3API,
decoy_animation_handler: AnimationHandler,
decoy: Decoy,
) -> None:
"""It should move completed updates out of ongoing whether they succeed or fail."""
Expand All @@ -291,10 +304,52 @@ async def test_complete_updates_leave_ongoing(
with pytest.raises(NoOngoingUpdate):
await subject.get_ongoing_update_process_handle_by_subsystem(SubSystem.gantry_x)
assert subject.get_update_process_handle_by_id("some-id") == proc
decoy.verify(
[
await decoy_animation_handler.update_started(subsystem=SubSystem.gantry_x),
await decoy_animation_handler.update_complete(subsystem=SubSystem.gantry_x),
],
times=1,
)


@pytest.mark.ot3_only
async def test_correct_exception_for_wrong_id(subject: FirmwareUpdateManager) -> None:
"""It uses a custom exception for incorrect ids."""
with pytest.raises(UpdateIdNotFound):
subject.get_update_process_handle_by_id("blahblah")


@pytest.mark.ot3_only
async def test_animation_handler(ot3_hardware_api: OT3API, decoy: Decoy) -> None:
"""It sets the lights accordingly."""
subject = AnimationHandler(hw_handle=ot3_hardware_api)

# First group of updates:
# - UPDATING
# - UPDATING again (once rear panel finishes)
# - OFF (once finished)
await subject.update_started(SubSystem.gantry_x)
await subject.update_started(SubSystem.rear_panel)
await subject.update_started(SubSystem.gantry_y)

await subject.update_complete(SubSystem.gantry_x)
await subject.update_complete(SubSystem.rear_panel)
await subject.update_complete(SubSystem.gantry_y)

# Second group of updates - UPDATING and then IDLE
subject.mark_initialized()
await subject.update_started(SubSystem.head)
await subject.update_complete(SubSystem.head)

decoy.verify(
[
# First group
await ot3_hardware_api.set_status_bar_state(StatusBarState.UPDATING),
await ot3_hardware_api.set_status_bar_state(StatusBarState.UPDATING),
await ot3_hardware_api.set_status_bar_state(StatusBarState.OFF),
await ot3_hardware_api.set_status_bar_state(StatusBarState.UPDATING),
await ot3_hardware_api.set_status_bar_state(StatusBarState.IDLE),
],
times=1,
)

0 comments on commit 7ae3efc

Please sign in to comment.