Skip to content

Commit

Permalink
refactor(robot-server): redirect maintenance engine/runner calls via …
Browse files Browse the repository at this point in the history
…run orchestrator (#15397)
  • Loading branch information
TamarZanzouri authored and aaron-kulkarni committed Jun 13, 2024
1 parent ffa04d5 commit c9184fb
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 168 deletions.
5 changes: 5 additions & 0 deletions api/src/opentrons/protocol_runner/run_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
StateSummary,
CommandPointer,
CommandSlice,
DeckType,
)
from ..protocol_engine.types import (
PostRunHardwareState,
Expand Down Expand Up @@ -329,3 +330,7 @@ def prepare(self) -> None:
def get_robot_type(self) -> RobotType:
"""Get engine robot type."""
return self._protocol_engine.state_view.config.robot_type

def get_deck_type(self) -> DeckType:
"""Get engine deck type."""
return self._protocol_engine.state_view.config.deck_type
159 changes: 95 additions & 64 deletions robot-server/robot_server/maintenance_runs/maintenance_engine_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,37 @@
import asyncio
import logging
from datetime import datetime
from typing import List, NamedTuple, Optional, Callable
from typing import List, Optional, Callable

from opentrons.protocol_engine.errors.exceptions import EStopActivatedError
from opentrons.protocol_engine.types import PostRunHardwareState
from opentrons_shared_data.robot.dev_types import RobotType
from opentrons_shared_data.robot.dev_types import RobotTypeEnum
from opentrons.protocol_engine.types import PostRunHardwareState, DeckConfigurationType
from opentrons.protocol_engine import (
Config as ProtocolEngineConfig,
DeckType,
LabwareOffsetCreate,
StateSummary,
create_protocol_engine,
CommandSlice,
CommandPointer,
Command,
CommandCreate,
LabwareOffset,
)
from opentrons.protocol_runner import RunResult, RunOrchestrator

from opentrons.config import feature_flags

from opentrons.hardware_control import HardwareControlAPI
from opentrons.hardware_control.types import (
EstopState,
HardwareEvent,
EstopStateNotification,
HardwareEventHandler,
)
from opentrons.protocol_runner import LiveRunner, RunResult
from opentrons.protocol_engine import (
Config as ProtocolEngineConfig,
DeckType,
LabwareOffsetCreate,
ProtocolEngine,
StateSummary,
create_protocol_engine,
)

from opentrons.protocol_engine.types import DeckConfigurationType

from opentrons_shared_data.robot.dev_types import RobotType, RobotTypeEnum
from opentrons_shared_data.labware.dev_types import LabwareUri
from opentrons_shared_data.labware.labware_definition import LabwareDefinition

_log = logging.getLogger(__name__)

Expand All @@ -45,13 +49,8 @@ class NoRunnerEnginePairError(RuntimeError):
"""Raised if you try to get the current engine or runner while there is none."""


class RunnerEnginePair(NamedTuple):
"""A stored ProtocolRunner/ProtocolEngine pair."""

run_id: str
created_at: datetime
runner: LiveRunner
engine: ProtocolEngine
class NoRunOrchestrator(RuntimeError):
"""Raised if you try to get the current run orchestrator while there is none."""


async def handle_estop_event(
Expand All @@ -72,8 +71,8 @@ async def handle_estop_event(
return
# todo(mm, 2024-04-17): This estop teardown sequencing belongs in the
# runner layer.
engine_store.engine.estop()
await engine_store.engine.finish(error=EStopActivatedError())
engine_store.run_orchestrator.estop()
await engine_store.run_orchestrator.finish(error=EStopActivatedError())
except Exception:
# This is a background task kicked off by a hardware event,
# so there's no one to propagate this exception to.
Expand All @@ -100,6 +99,9 @@ def run_handler_in_engine_thread_from_hardware_thread(
class MaintenanceEngineStore:
"""Factory and in-memory storage for ProtocolEngine."""

_run_orchestrator: Optional[RunOrchestrator] = None
_created_at: Optional[datetime] = None

def __init__(
self,
hardware_api: HardwareControlAPI,
Expand All @@ -117,37 +119,27 @@ def __init__(
self._hardware_api = hardware_api
self._robot_type = robot_type
self._deck_type = deck_type
self._runner_engine_pair: Optional[RunnerEnginePair] = None
hardware_api.register_callback(_get_estop_listener(self))

@property
def engine(self) -> ProtocolEngine:
"""Get the "current" ProtocolEngine."""
if self._runner_engine_pair is None:
raise NoRunnerEnginePairError()
return self._runner_engine_pair.engine

@property
def runner(self) -> LiveRunner:
"""Get the "current" ProtocolRunner."""
if self._runner_engine_pair is None:
raise NoRunnerEnginePairError()
return self._runner_engine_pair.runner
def run_orchestrator(self) -> RunOrchestrator:
"""Get the "current" RunOrchestrator."""
if self._run_orchestrator is None:
raise NoRunOrchestrator()
return self._run_orchestrator

@property
def current_run_id(self) -> Optional[str]:
"""Get the run identifier associated with the current engine/runner pair."""
"""Get the run identifier associated with the current engine."""
return (
self._runner_engine_pair.run_id
if self._runner_engine_pair is not None
else None
self.run_orchestrator.run_id if self._run_orchestrator is not None else None
)

@property
def current_run_created_at(self) -> datetime:
"""Get the run creation datetime."""
assert self._runner_engine_pair is not None, "Run not yet created."
return self._runner_engine_pair.created_at
assert self._created_at is not None, "Run not yet created."
return self._created_at

async def create(
self,
Expand All @@ -171,7 +163,7 @@ async def create(
# Because we will be clearing engine store before creating a new one,
# the runner-engine pair should be None at this point.
assert (
self._runner_engine_pair is None
self._run_orchestrator is None
), "There is an active maintenance run that was not cleared correctly."
engine = await create_protocol_engine(
hardware_api=self._hardware_api,
Expand All @@ -186,23 +178,16 @@ async def create(
notify_publishers=notify_publishers,
)

# Using LiveRunner as the runner to allow for future refactor of maintenance runs
# See https://opentrons.atlassian.net/browse/RSS-226
runner = LiveRunner(protocol_engine=engine, hardware_api=self._hardware_api)

# TODO (spp): set live runner start func

for offset in labware_offsets:
engine.add_labware_offset(offset)

self._runner_engine_pair = RunnerEnginePair(
run_id=run_id,
created_at=created_at,
runner=runner,
engine=engine,
self._run_orchestrator = RunOrchestrator.build_orchestrator(
run_id=run_id, protocol_engine=engine, hardware_api=self._hardware_api
)

return engine.state_view.get_summary()
self._created_at = created_at

return self._run_orchestrator.get_state_summary()

async def clear(self) -> RunResult:
"""Remove the ProtocolEngine.
Expand All @@ -211,20 +196,66 @@ async def clear(self) -> RunResult:
EngineConflictError: The current runner/engine pair is not idle, so
they cannot be cleared.
"""
engine = self.engine
state_view = engine.state_view

if state_view.commands.get_is_okay_to_clear():
await engine.finish(
if self.run_orchestrator.get_is_okay_to_clear():
await self.run_orchestrator.finish(
drop_tips_after_run=False,
set_run_status=False,
post_run_hardware_state=PostRunHardwareState.STAY_ENGAGED_IN_PLACE,
)
else:
raise EngineConflictError("Current run is not idle or stopped.")

run_data = state_view.get_summary()
commands = state_view.commands.get_all()
self._runner_engine_pair = None
run_data = self.run_orchestrator.get_state_summary()
commands = self.run_orchestrator.get_all_commands()
self._run_orchestrator = None
self._created_at = None

return RunResult(state_summary=run_data, commands=commands, parameters=[])

def get_command_slice(
self,
cursor: Optional[int],
length: int,
) -> CommandSlice:
"""Get a slice of run commands.
Args:
cursor: Requested index of first command in the returned slice.
length: Length of slice to return.
"""
return self.run_orchestrator.get_command_slice(cursor=cursor, length=length)

def get_current_command(self) -> Optional[CommandPointer]:
"""Get the current running command."""
return self.run_orchestrator.get_current_command()

def get_command_recovery_target(self) -> Optional[CommandPointer]:
"""Get the current error recovery target."""
return self.run_orchestrator.get_command_recovery_target()

def get_command(self, command_id: str) -> Command:
"""Get a run's command by ID."""
return self.run_orchestrator.get_command(command_id=command_id)

def get_state_summary(self) -> StateSummary:
"""Get protocol run data."""
return self.run_orchestrator.get_state_summary()

async def add_command_and_wait_for_interval(
self,
request: CommandCreate,
wait_until_complete: bool = False,
timeout: Optional[int] = None,
) -> Command:
"""Add a new command to execute and wait for it to complete if needed."""
return await self.run_orchestrator.add_command_and_wait_for_interval(
command=request, wait_until_complete=wait_until_complete, timeout=timeout
)

def add_labware_offset(self, request: LabwareOffsetCreate) -> LabwareOffset:
"""Add a new labware offset to state."""
return self.run_orchestrator.add_labware_offset(request)

def add_labware_definition(self, definition: LabwareDefinition) -> LabwareUri:
"""Add a new labware definition to state."""
return self.run_orchestrator.add_labware_definition(definition)
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,7 @@ def get_commands_slice(
"""
if run_id != self._engine_store.current_run_id:
raise MaintenanceRunNotFoundError(run_id=run_id)
the_slice = self._engine_store.engine.state_view.commands.get_slice(
cursor=cursor, length=length
)
the_slice = self._engine_store.get_command_slice(cursor=cursor, length=length)
return the_slice

def get_current_command(self, run_id: str) -> Optional[CommandPointer]:
Expand All @@ -193,7 +191,7 @@ def get_current_command(self, run_id: str) -> Optional[CommandPointer]:
run_id: ID of the run.
"""
if self._engine_store.current_run_id == run_id:
return self._engine_store.engine.state_view.commands.get_current()
return self._engine_store.get_current_command()
else:
# todo(mm, 2024-05-20):
# For historical runs to behave consistently with the current run,
Expand All @@ -209,7 +207,7 @@ def get_recovery_target_command(self, run_id: str) -> Optional[CommandPointer]:
run_id: ID of the run.
"""
if self._engine_store.current_run_id == run_id:
return self._engine_store.engine.state_view.commands.get_recovery_target()
return self._engine_store.get_command_recovery_target()
else:
# Historical runs can't have any ongoing error recovery.
return None
Expand All @@ -227,7 +225,7 @@ def get_command(self, run_id: str, command_id: str) -> Command:
"""
if run_id != self._engine_store.current_run_id:
raise MaintenanceRunNotFoundError(run_id=run_id)
return self._engine_store.engine.state_view.commands.get(command_id=command_id)
return self._engine_store.get_command(command_id=command_id)

def _get_state_summary(self, run_id: str) -> Optional[StateSummary]:
return self._engine_store.engine.state_view.get_summary()
return self._engine_store.get_state_summary()
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
from typing import Optional, Union
from typing_extensions import Final, Literal

from anyio import move_on_after
from fastapi import APIRouter, Depends, Query, status

from opentrons.protocol_engine import (
CommandPointer,
ProtocolEngine,
commands as pe_commands,
)
from opentrons.protocol_engine.errors import CommandDoesNotExistError
Expand Down Expand Up @@ -60,11 +58,11 @@ class CommandNotAllowed(ErrorDetails):
title: str = "Setup Command Not Allowed"


async def get_current_run_engine_from_url(
async def get_current_run_from_url(
runId: str,
engine_store: MaintenanceEngineStore = Depends(get_maintenance_engine_store),
) -> ProtocolEngine:
"""Get current run protocol engine.
) -> str:
"""Get run from url.
Args:
runId: Run ID to associate the command with.
Expand All @@ -76,7 +74,7 @@ async def get_current_run_engine_from_url(
f"Note that only one maintenance run can exist at a time."
).as_error(status.HTTP_404_NOT_FOUND)

return engine_store.engine
return runId


@PydanticResponse.wrap_route(
Expand Down Expand Up @@ -110,6 +108,7 @@ async def create_run_command(
" or when the timeout is reached. See the `timeout` query parameter."
),
),
engine_store: MaintenanceEngineStore = Depends(get_maintenance_engine_store),
timeout: Optional[int] = Query(
default=None,
gt=0,
Expand All @@ -130,7 +129,7 @@ async def create_run_command(
" the default was 30 seconds, not infinite."
),
),
protocol_engine: ProtocolEngine = Depends(get_current_run_engine_from_url),
run_id: str = Depends(get_current_run_from_url),
check_estop: bool = Depends(require_estop_in_good_state),
) -> PydanticResponse[SimpleBody[pe_commands.Command]]:
"""Enqueue a protocol command.
Expand All @@ -142,9 +141,10 @@ async def create_run_command(
Else, return immediately. Comes from a query parameter in the URL.
timeout: The maximum time, in seconds, to wait before returning.
Comes from a query parameter in the URL.
protocol_engine: The run's `ProtocolEngine` on which the new
engine_store: The run's `EngineStore` on which the new
command will be enqueued.
check_estop: Dependency to verify the estop is in a valid state.
run_id: Run identification to attach command to.
"""
# TODO(mc, 2022-05-26): increment the HTTP API version so that default
# behavior is to pass through `command_intent` without overriding it
Expand All @@ -153,14 +153,11 @@ async def create_run_command(

# TODO (spp): re-add `RunStoppedError` exception catching if/when maintenance runs
# have actions.
command = protocol_engine.add_command(command_create)

if waitUntilComplete:
timeout_sec = None if timeout is None else timeout / 1000.0
with move_on_after(timeout_sec):
await protocol_engine.wait_for_command(command.id)
command = await engine_store.add_command_and_wait_for_interval(
request=command_create, wait_until_complete=waitUntilComplete, timeout=timeout
)

response_data = protocol_engine.state_view.commands.get(command.id)
response_data = engine_store.get_command(command.id)

return await PydanticResponse.create(
content=SimpleBody.construct(data=response_data),
Expand Down
Loading

0 comments on commit c9184fb

Please sign in to comment.