Skip to content

Commit

Permalink
add command and wait
Browse files Browse the repository at this point in the history
  • Loading branch information
TamarZanzouri committed May 29, 2024
1 parent 43d82e8 commit 31e0051
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 143 deletions.
49 changes: 30 additions & 19 deletions api/src/opentrons/protocol_runner/run_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
from __future__ import annotations
from typing import Optional, Union

from anyio import move_on_after

from . import protocol_runner, AnyRunner
from ..hardware_control import HardwareControlAPI
from ..protocol_engine import ProtocolEngine
from ..protocol_engine import ProtocolEngine, CommandCreate, Command
from ..protocol_engine.types import PostRunHardwareState
from ..protocol_reader import JsonProtocolConfig, PythonProtocolConfig

Expand All @@ -24,15 +26,15 @@ class RunOrchestrator:
_protocol_engine: ProtocolEngine

def __init__(
self,
protocol_engine: ProtocolEngine,
hardware_api: HardwareControlAPI,
fixit_runner: protocol_runner.LiveRunner,
setup_runner: protocol_runner.LiveRunner,
json_or_python_protocol_runner: Optional[
Union[protocol_runner.PythonAndLegacyRunner, protocol_runner.JsonRunner]
] = None,
run_id: Optional[str] = None,
self,
protocol_engine: ProtocolEngine,
hardware_api: HardwareControlAPI,
fixit_runner: protocol_runner.LiveRunner,
setup_runner: protocol_runner.LiveRunner,
json_or_python_protocol_runner: Optional[
Union[protocol_runner.PythonAndLegacyRunner, protocol_runner.JsonRunner]
] = None,
run_id: Optional[str] = None,
) -> None:
"""Initialize a run orchestrator interface.
Expand Down Expand Up @@ -63,15 +65,15 @@ def runner(self) -> AnyRunner:

@classmethod
def build_orchestrator(
cls,
protocol_engine: ProtocolEngine,
hardware_api: HardwareControlAPI,
protocol_config: Optional[
Union[JsonProtocolConfig, PythonProtocolConfig]
] = None,
post_run_hardware_state: PostRunHardwareState = PostRunHardwareState.HOME_AND_STAY_ENGAGED,
drop_tips_after_run: bool = True,
run_id: Optional[str] = None,
cls,
protocol_engine: ProtocolEngine,
hardware_api: HardwareControlAPI,
protocol_config: Optional[
Union[JsonProtocolConfig, PythonProtocolConfig]
] = None,
post_run_hardware_state: PostRunHardwareState = PostRunHardwareState.HOME_AND_STAY_ENGAGED,
drop_tips_after_run: bool = True,
run_id: Optional[str] = None,
) -> "RunOrchestrator":
"""Build a RunOrchestrator provider."""
setup_runner = protocol_runner.LiveRunner(
Expand Down Expand Up @@ -99,3 +101,12 @@ def build_orchestrator(
hardware_api=hardware_api,
protocol_engine=protocol_engine,
)

def add_command_and_wait_for_interval(self, command: CommandCreate, wait_until_complete: bool,
timeout: Optional[int] = None, failed_command_id: Optional[str] = None) -> Command:
added_command = self._protocol_engine.add_command(request=command, failed_command_id=failed_command_id)
if waitUntilComplete:
timeout_sec = None if timeout is None else timeout / 1000.0
with move_on_after(timeout_sec):
await self._protocol_engine.wait_for_command(added_command.id)
return added_command
24 changes: 24 additions & 0 deletions robot-server/robot_server/runs/engine_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
CommandSlice,
CommandPointer,
Command,
CommandCreate,
LabwareOffset,
LabwareOffsetCreate,
)

from robot_server.protocols.protocol_store import ProtocolResource
Expand All @@ -45,6 +48,7 @@
RunTimeParamValuesType,
EngineStatus,
)
from opentrons_shared_data.labware.dev_types import LabwareUri


_log = logging.getLogger(__name__)
Expand Down Expand Up @@ -382,3 +386,23 @@ def get_is_run_terminal(self) -> bool:

def run_was_started(self) -> bool:
return self._run_orchestrator.runner.was_started()

def add_labware_offset(self, request: LabwareOffsetCreate) -> LabwareOffset:
return self._run_orchestrator.engine.add_labware_offset(request)

def add_labware_definition(self, definition: LabwareDefinition) -> LabwareUri:
return self._run_orchestrator.engine.add_labware_definition(definition)

def add_command_and_wait_for_interval(
self,
request: CommandCreate,
wait_until_complete: bool = False,
timeout: Optional[int] = None,
failed_command_id: Optional[str] = None,
) -> Command:
return self._run_orchestrator.add_command_and_wait_for_interval(
command=request,
failed_command_id=failed_command_id,
wait_until_complete=wait_until_complete,
timeout=timeout,
)
64 changes: 31 additions & 33 deletions robot-server/robot_server/runs/router/commands_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,29 +77,29 @@ class PreSerializedCommandsNotAvailable(ErrorDetails):
)


async def get_current_run_engine_from_url(
runId: str,
engine_store: EngineStore = Depends(get_engine_store),
run_store: RunStore = Depends(get_run_store),
) -> ProtocolEngine:
"""Get run protocol engine.
Args:
runId: Run ID to associate the command with.
engine_store: Engine store to pull current run ProtocolEngine.
run_store: Run data storage.
"""
if not run_store.has(runId):
raise RunNotFound(detail=f"Run {runId} not found.").as_error(
status.HTTP_404_NOT_FOUND
)

if runId != engine_store.current_run_id:
raise RunStopped(detail=f"Run {runId} is not the current run").as_error(
status.HTTP_409_CONFLICT
)

return engine_store.engine
# async def get_current_run_engine_from_url(
# runId: str,
# engine_store: EngineStore = Depends(get_engine_store),
# run_store: RunStore = Depends(get_run_store),
# ) -> ProtocolEngine:
# """Get run protocol engine.
#
# Args:
# runId: Run ID to associate the command with.
# engine_store: Engine store to pull current run ProtocolEngine.
# run_store: Run data storage.
# """
# if not run_store.has(runId):
# raise RunNotFound(detail=f"Run {runId} not found.").as_error(
# status.HTTP_404_NOT_FOUND
# )
#
# if runId != engine_store.current_run_id:
# raise RunStopped(detail=f"Run {runId} is not the current run").as_error(
# status.HTTP_409_CONFLICT
# )
#
# return engine_store.engine


@PydanticResponse.wrap_route(
Expand Down Expand Up @@ -185,7 +185,7 @@ async def create_run_command(
"FIXIT command use only. Reference of the failed command id we are trying to fix."
),
),
protocol_engine: ProtocolEngine = Depends(get_current_run_engine_from_url),
engine_store: EngineStore = Depends(get_engine_store),
check_estop: bool = Depends(require_estop_in_good_state),
) -> PydanticResponse[SimpleBody[pe_commands.Command]]:
"""Enqueue a protocol command.
Expand All @@ -199,7 +199,7 @@ async def create_run_command(
Comes from a query parameter in the URL.
failedCommandId: FIXIT command use only.
Reference of the failed command id we are trying to fix.
protocol_engine: The run's `ProtocolEngine` on which the new
engine_store: The run's `EngineStore` on which the new
command will be enqueued.
check_estop: Dependency to verify the estop is in a valid state.
"""
Expand All @@ -208,8 +208,11 @@ async def create_run_command(
command_intent = request_body.data.intent or pe_commands.CommandIntent.SETUP
command_create = request_body.data.copy(update={"intent": command_intent})
try:
command = protocol_engine.add_command(
request=command_create, failed_command_id=failedCommandId
command = engine_store.add_command_and_wait_for_interval(
request=command_create,
failed_command_id=failedCommandId,
wait_until_complete=waitUntilComplete,
timeout=timeout,
)

except pe_errors.SetupCommandNotAllowedError as e:
Expand All @@ -219,12 +222,7 @@ async def create_run_command(
except pe_errors.CommandNotAllowedError as e:
raise CommandNotAllowed.from_exc(e).as_error(status.HTTP_400_BAD_REQUEST)

if waitUntilComplete:
timeout_sec = None if timeout is None else timeout / 1000.0
with move_on_after(timeout_sec):
await protocol_engine.wait_for_command(command.id)

response_data = protocol_engine.state_view.commands.get(command.id)
response_data = engine_store.get_command(command.id)

return await PydanticResponse.create(
content=SimpleBody.construct(data=response_data),
Expand Down
Loading

0 comments on commit 31e0051

Please sign in to comment.