Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(robot-server): redirect maintenance engine/runner calls via run orchestrator #15397

Merged
merged 6 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/src/opentrons/protocol_runner/run_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
StateSummary,
CommandPointer,
CommandSlice,
DeckType,
)
from ..protocol_engine.types import (
PostRunHardwareState,
Expand Down Expand Up @@ -329,3 +330,7 @@ def prepare(self) -> None:
def get_robot_type(self) -> RobotType:
"""Get engine robot type."""
return self._protocol_engine.state_view.config.robot_type

def get_deck_type(self) -> DeckType:
"""Get engine deck type."""
return self._protocol_engine.state_view.config.deck_type
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,37 @@
import asyncio
import logging
from datetime import datetime
from typing import List, NamedTuple, Optional, Callable
from typing import List, Optional, Callable

from opentrons.protocol_engine.errors.exceptions import EStopActivatedError
from opentrons.protocol_engine.types import PostRunHardwareState
from opentrons_shared_data.robot.dev_types import RobotType
from opentrons_shared_data.robot.dev_types import RobotTypeEnum
from opentrons.protocol_engine.types import PostRunHardwareState, DeckConfigurationType
from opentrons.protocol_engine import (
Config as ProtocolEngineConfig,
DeckType,
LabwareOffsetCreate,
StateSummary,
create_protocol_engine,
CommandSlice,
CommandPointer,
Command,
CommandCreate,
LabwareOffset,
)
from opentrons.protocol_runner import RunResult, RunOrchestrator

from opentrons.config import feature_flags

from opentrons.hardware_control import HardwareControlAPI
from opentrons.hardware_control.types import (
EstopState,
HardwareEvent,
EstopStateNotification,
HardwareEventHandler,
)
from opentrons.protocol_runner import LiveRunner, RunResult
from opentrons.protocol_engine import (
Config as ProtocolEngineConfig,
DeckType,
LabwareOffsetCreate,
ProtocolEngine,
StateSummary,
create_protocol_engine,
)

from opentrons.protocol_engine.types import DeckConfigurationType

from opentrons_shared_data.robot.dev_types import RobotType, RobotTypeEnum
from opentrons_shared_data.labware.dev_types import LabwareUri
from opentrons_shared_data.labware.labware_definition import LabwareDefinition

_log = logging.getLogger(__name__)

Expand All @@ -45,13 +49,8 @@ class NoRunnerEnginePairError(RuntimeError):
"""Raised if you try to get the current engine or runner while there is none."""


class RunnerEnginePair(NamedTuple):
"""A stored ProtocolRunner/ProtocolEngine pair."""

run_id: str
created_at: datetime
runner: LiveRunner
engine: ProtocolEngine
class NoRunOrchestrator(RuntimeError):
"""Raised if you try to get the current run orchestrator while there is none."""


async def handle_estop_event(
Expand All @@ -72,8 +71,8 @@ async def handle_estop_event(
return
# todo(mm, 2024-04-17): This estop teardown sequencing belongs in the
# runner layer.
engine_store.engine.estop()
await engine_store.engine.finish(error=EStopActivatedError())
engine_store.run_orchestrator.estop()
await engine_store.run_orchestrator.finish(error=EStopActivatedError())
except Exception:
# This is a background task kicked off by a hardware event,
# so there's no one to propagate this exception to.
Expand All @@ -100,6 +99,8 @@ def run_handler_in_engine_thread_from_hardware_thread(
class MaintenanceEngineStore:
"""Factory and in-memory storage for ProtocolEngine."""

_run_orchestrator: Optional[RunOrchestrator] = None

def __init__(
self,
hardware_api: HardwareControlAPI,
Expand All @@ -117,37 +118,26 @@ def __init__(
self._hardware_api = hardware_api
self._robot_type = robot_type
self._deck_type = deck_type
self._runner_engine_pair: Optional[RunnerEnginePair] = None
hardware_api.register_callback(_get_estop_listener(self))

@property
def engine(self) -> ProtocolEngine:
"""Get the "current" ProtocolEngine."""
if self._runner_engine_pair is None:
raise NoRunnerEnginePairError()
return self._runner_engine_pair.engine

@property
def runner(self) -> LiveRunner:
"""Get the "current" ProtocolRunner."""
if self._runner_engine_pair is None:
raise NoRunnerEnginePairError()
return self._runner_engine_pair.runner
def run_orchestrator(self) -> RunOrchestrator:
"""Get the "current" RunOrchestrator."""
if self._run_orchestrator is None:
raise NoRunOrchestrator()
return self._run_orchestrator

@property
def current_run_id(self) -> Optional[str]:
"""Get the run identifier associated with the current engine/runner pair."""
"""Get the run identifier associated with the current engine."""
return (
self._runner_engine_pair.run_id
if self._runner_engine_pair is not None
else None
self.run_orchestrator.run_id if self._run_orchestrator is not None else None
)

@property
def current_run_created_at(self) -> datetime:
"""Get the run creation datetime."""
assert self._runner_engine_pair is not None, "Run not yet created."
return self._runner_engine_pair.created_at
raise NotImplementedError("run created at not implemented.")

async def create(
self,
Expand All @@ -171,7 +161,7 @@ async def create(
# Because we will be clearing engine store before creating a new one,
# the runner-engine pair should be None at this point.
assert (
self._runner_engine_pair is None
self._run_orchestrator is None
), "There is an active maintenance run that was not cleared correctly."
engine = await create_protocol_engine(
hardware_api=self._hardware_api,
Expand All @@ -186,23 +176,14 @@ async def create(
notify_publishers=notify_publishers,
)

# Using LiveRunner as the runner to allow for future refactor of maintenance runs
# See https://opentrons.atlassian.net/browse/RSS-226
runner = LiveRunner(protocol_engine=engine, hardware_api=self._hardware_api)

# TODO (spp): set live runner start func

for offset in labware_offsets:
engine.add_labware_offset(offset)

self._runner_engine_pair = RunnerEnginePair(
run_id=run_id,
created_at=created_at,
runner=runner,
engine=engine,
self._run_orchestrator = RunOrchestrator.build_orchestrator(
run_id=run_id, protocol_engine=engine, hardware_api=self._hardware_api
)

return engine.state_view.get_summary()
return self._run_orchestrator.get_state_summary()

async def clear(self) -> RunResult:
"""Remove the ProtocolEngine.
Expand All @@ -211,20 +192,65 @@ async def clear(self) -> RunResult:
EngineConflictError: The current runner/engine pair is not idle, so
they cannot be cleared.
"""
engine = self.engine
state_view = engine.state_view

if state_view.commands.get_is_okay_to_clear():
await engine.finish(
if self.run_orchestrator.get_is_okay_to_clear():
await self.run_orchestrator.finish(
drop_tips_after_run=False,
set_run_status=False,
post_run_hardware_state=PostRunHardwareState.STAY_ENGAGED_IN_PLACE,
)
else:
raise EngineConflictError("Current run is not idle or stopped.")

run_data = state_view.get_summary()
commands = state_view.commands.get_all()
self._runner_engine_pair = None
run_data = self.run_orchestrator.get_state_summary()
commands = self.run_orchestrator.get_all_commands()
self._run_orchestrator = None

return RunResult(state_summary=run_data, commands=commands, parameters=[])

def get_command_slice(
self,
cursor: Optional[int],
length: int,
) -> CommandSlice:
"""Get a slice of run commands.

Args:
cursor: Requested index of first command in the returned slice.
length: Length of slice to return.
"""
return self.run_orchestrator.get_command_slice(cursor=cursor, length=length)

def get_current_command(self) -> Optional[CommandPointer]:
"""Get the current running command."""
return self.run_orchestrator.get_current_command()

def get_command_recovery_target(self) -> Optional[CommandPointer]:
"""Get the current error recovery target."""
return self.run_orchestrator.get_command_recovery_target()

def get_command(self, command_id: str) -> Command:
"""Get a run's command by ID."""
return self.run_orchestrator.get_command(command_id=command_id)

def get_state_summary(self) -> StateSummary:
"""Get protocol run data."""
return self.run_orchestrator.get_state_summary()

async def add_command_and_wait_for_interval(
self,
request: CommandCreate,
wait_until_complete: bool = False,
timeout: Optional[int] = None,
) -> Command:
"""Add a new command to execute and wait for it to complete if needed."""
return await self.run_orchestrator.add_command_and_wait_for_interval(
command=request, wait_until_complete=wait_until_complete, timeout=timeout
)

def add_labware_offset(self, request: LabwareOffsetCreate) -> LabwareOffset:
"""Add a new labware offset to state."""
return self.run_orchestrator.add_labware_offset(request)

def add_labware_definition(self, definition: LabwareDefinition) -> LabwareUri:
"""Add a new labware definition to state."""
return self.run_orchestrator.add_labware_definition(definition)
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,7 @@ def get_commands_slice(
"""
if run_id != self._engine_store.current_run_id:
raise MaintenanceRunNotFoundError(run_id=run_id)
the_slice = self._engine_store.engine.state_view.commands.get_slice(
cursor=cursor, length=length
)
the_slice = self._engine_store.get_command_slice(cursor=cursor, length=length)
return the_slice

def get_current_command(self, run_id: str) -> Optional[CommandPointer]:
Expand All @@ -193,7 +191,7 @@ def get_current_command(self, run_id: str) -> Optional[CommandPointer]:
run_id: ID of the run.
"""
if self._engine_store.current_run_id == run_id:
return self._engine_store.engine.state_view.commands.get_current()
return self._engine_store.get_current_command()
else:
# todo(mm, 2024-05-20):
# For historical runs to behave consistently with the current run,
Expand All @@ -209,7 +207,7 @@ def get_recovery_target_command(self, run_id: str) -> Optional[CommandPointer]:
run_id: ID of the run.
"""
if self._engine_store.current_run_id == run_id:
return self._engine_store.engine.state_view.commands.get_recovery_target()
return self._engine_store.get_command_recovery_target()
else:
# Historical runs can't have any ongoing error recovery.
return None
Expand All @@ -227,7 +225,7 @@ def get_command(self, run_id: str, command_id: str) -> Command:
"""
if run_id != self._engine_store.current_run_id:
raise MaintenanceRunNotFoundError(run_id=run_id)
return self._engine_store.engine.state_view.commands.get(command_id=command_id)
return self._engine_store.get_command(command_id=command_id)

def _get_state_summary(self, run_id: str) -> Optional[StateSummary]:
return self._engine_store.engine.state_view.get_summary()
return self._engine_store.get_state_summary()
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
from typing import Optional, Union
from typing_extensions import Final, Literal

from anyio import move_on_after
from fastapi import APIRouter, Depends, Query, status

from opentrons.protocol_engine import (
CommandPointer,
ProtocolEngine,
commands as pe_commands,
)
from opentrons.protocol_engine.errors import CommandDoesNotExistError
Expand Down Expand Up @@ -60,11 +58,11 @@ class CommandNotAllowed(ErrorDetails):
title: str = "Setup Command Not Allowed"


async def get_current_run_engine_from_url(
async def get_current_run_from_url(
runId: str,
engine_store: MaintenanceEngineStore = Depends(get_maintenance_engine_store),
) -> ProtocolEngine:
"""Get current run protocol engine.
) -> str:
"""Get run from url.

Args:
runId: Run ID to associate the command with.
Expand All @@ -76,7 +74,7 @@ async def get_current_run_engine_from_url(
f"Note that only one maintenance run can exist at a time."
).as_error(status.HTTP_404_NOT_FOUND)

return engine_store.engine
return runId


@PydanticResponse.wrap_route(
Expand Down Expand Up @@ -110,6 +108,7 @@ async def create_run_command(
" or when the timeout is reached. See the `timeout` query parameter."
),
),
engine_store: MaintenanceEngineStore = Depends(get_maintenance_engine_store),
timeout: Optional[int] = Query(
default=None,
gt=0,
Expand All @@ -130,7 +129,7 @@ async def create_run_command(
" the default was 30 seconds, not infinite."
),
),
protocol_engine: ProtocolEngine = Depends(get_current_run_engine_from_url),
run_id: str = Depends(get_current_run_from_url),
check_estop: bool = Depends(require_estop_in_good_state),
) -> PydanticResponse[SimpleBody[pe_commands.Command]]:
"""Enqueue a protocol command.
Expand All @@ -142,9 +141,10 @@ async def create_run_command(
Else, return immediately. Comes from a query parameter in the URL.
timeout: The maximum time, in seconds, to wait before returning.
Comes from a query parameter in the URL.
protocol_engine: The run's `ProtocolEngine` on which the new
engine_store: The run's `EngineStore` on which the new
command will be enqueued.
check_estop: Dependency to verify the estop is in a valid state.
run_id: Run identification to attach command to.
"""
# TODO(mc, 2022-05-26): increment the HTTP API version so that default
# behavior is to pass through `command_intent` without overriding it
Expand All @@ -153,14 +153,11 @@ async def create_run_command(

# TODO (spp): re-add `RunStoppedError` exception catching if/when maintenance runs
# have actions.
command = protocol_engine.add_command(command_create)

if waitUntilComplete:
timeout_sec = None if timeout is None else timeout / 1000.0
with move_on_after(timeout_sec):
await protocol_engine.wait_for_command(command.id)
command = await engine_store.add_command_and_wait_for_interval(
request=command_create, wait_until_complete=waitUntilComplete, timeout=timeout
)

response_data = protocol_engine.state_view.commands.get(command.id)
response_data = engine_store.get_command(command.id)

return await PydanticResponse.create(
content=SimpleBody.construct(data=response_data),
Expand Down
Loading
Loading