diff --git a/api/src/opentrons/protocol_runner/run_orchestrator.py b/api/src/opentrons/protocol_runner/run_orchestrator.py index c693172a924..345e583afc9 100644 --- a/api/src/opentrons/protocol_runner/run_orchestrator.py +++ b/api/src/opentrons/protocol_runner/run_orchestrator.py @@ -158,9 +158,9 @@ def get_current_command(self) -> Optional[CommandPointer]: return self._protocol_engine.state_view.commands.get_current() def get_command_slice( - self, - cursor: Optional[int], - length: int, + self, + cursor: Optional[int], + length: int, ) -> CommandSlice: """Get a slice of run commands. @@ -186,7 +186,7 @@ def get_command(self, command_id: str) -> Command: command_id=command_id ) - def get_status(self) -> EngineStatus: + def get_run_status(self) -> EngineStatus: """Get the current execution status of the engine.""" return self._protocol_engine.state_view.commands.get_status() @@ -206,7 +206,8 @@ def add_labware_definition(self, definition: LabwareDefinition) -> LabwareUri: return self.run_orchestrator.engine.add_labware_definition(definition) async def add_command_and_wait_for_interval(self, command: CommandCreate, wait_until_complete: bool = False, - timeout: Optional[int] = None, failed_command_id: Optional[str] = None) -> Command: + timeout: Optional[int] = None, + failed_command_id: Optional[str] = None) -> Command: added_command = self._protocol_engine.add_command(request=command, failed_command_id=failed_command_id) if wait_until_complete: timeout_sec = None if timeout is None else timeout / 1000.0 @@ -217,5 +218,8 @@ async def add_command_and_wait_for_interval(self, command: CommandCreate, wait_u def estop(self) -> None: return self._protocol_engine.estop() - def use_attached_modules(self, modules_by_id: Dict[str, HardwareModuleAPI]) -> None: - self._protocol_engine.use_attached_modules(modules_by_id=modules_by_id) + async def use_attached_modules(self, modules_by_id: Dict[str, HardwareModuleAPI]) -> None: + await self._protocol_engine.use_attached_modules(modules_by_id=modules_by_id) + + def get_protocol_runner(self) -> None: + raise NotImplementedError() \ No newline at end of file diff --git a/robot-server/robot_server/commands/router.py b/robot-server/robot_server/commands/router.py index eb9155acd06..250e9c755d5 100644 --- a/robot-server/robot_server/commands/router.py +++ b/robot-server/robot_server/commands/router.py @@ -105,7 +105,7 @@ async def create_command( Else, return immediately. Comes from a query parameter in the URL. timeout: The maximum time, in seconds, to wait before returning. Comes from a query parameter in the URL. - engine: The `ProtocolEngine` on which the command will be enqueued. + orchestrator: The `RunOrchestrator` handling engine for command to be enqueued. """ command_create = request_body.data.copy(update={"intent": CommandIntent.SETUP}) command = await orchestrator.add_command_and_wait_for_interval( diff --git a/robot-server/robot_server/runs/engine_store.py b/robot-server/robot_server/runs/engine_store.py index ba61aea5f94..0e23ddc2384 100644 --- a/robot-server/robot_server/runs/engine_store.py +++ b/robot-server/robot_server/runs/engine_store.py @@ -238,17 +238,16 @@ async def create( drop_tips_after_run=drop_tips_after_run, ) + runner = self.run_orchestrator.get_protocol_runner() # FIXME(mm, 2022-12-21): These `await runner.load()`s introduce a # concurrency hazard. If two requests simultaneously call this method, # they will both "succeed" (with undefined results) instead of one # raising EngineConflictError. - if isinstance( - self.run_orchestrator.get_protocol_runner(), PythonAndLegacyRunner - ): + if isinstance(runner, PythonAndLegacyRunner): assert ( protocol is not None ), "A Python protocol should have a protocol source file." - await self.run_orchestrator.runner.load( + await self.run_orchestrator.load( protocol.source, # Conservatively assume that we're re-running a protocol that # was uploaded before we added stricter validation, and that @@ -256,18 +255,18 @@ async def create( python_parse_mode=PythonParseMode.ALLOW_LEGACY_METADATA_AND_REQUIREMENTS, run_time_param_values=run_time_param_values, ) - elif isinstance(self.run_orchestrator.runner, JsonRunner): + elif isinstance(runner, JsonRunner): assert ( protocol is not None ), "A JSON protocol shouZld have a protocol source file." - await self.run_orchestrator.runner.load(protocol.source) + await self.run_orchestrator.load(protocol.source) else: - self.run_orchestrator.runner.prepare() + self.run_orchestrator.prepare() for offset in labware_offsets: - self.run_orchestrator.engine.add_labware_offset(offset) + self.run_orchestrator.add_labware_offset(offset) - return self.run_orchestrator.engine.state_view.get_summary() + return self.run_orchestrator.get_state_summary() async def clear(self) -> RunResult: """Remove the persisted ProtocolEngine. @@ -276,11 +275,8 @@ async def clear(self) -> RunResult: EngineConflictError: The current runner/engine pair is not idle, so they cannot be cleared. """ - assert self.run_orchestrator is not None - engine = self.run_orchestrator.engine - runner = self.run_orchestrator.runner - if engine.state_view.commands.get_is_okay_to_clear(): - await engine.finish( + if self.run_orchestrator.get_is_okay_to_clear(): + await self.run_orchestrator.finish( drop_tips_after_run=False, set_run_status=False, post_run_hardware_state=PostRunHardwareState.STAY_ENGAGED_IN_PLACE, @@ -288,9 +284,9 @@ async def clear(self) -> RunResult: else: raise EngineConflictError("Current run is not idle or stopped.") - run_data = engine.state_view.get_summary() - commands = engine.state_view.commands.get_all() - run_time_parameters = runner.run_time_parameters if runner else [] + run_data = self.run_orchestrator.get_state_summary() + commands = self.run_orchestrator.get_all_commands() + run_time_parameters = self.run_orchestrator.get_run_time_parameters() self._run_orchestrator = None @@ -323,7 +319,7 @@ async def finish(self, error: Optional[Exception]) -> None: await self.run_orchestrator.finish(error=error) def get_state_summary(self) -> StateSummary: - return self.run_orchestrator.get_summary() + return self.run_orchestrator.get_state_summary() def get_loaded_labware_definitions(self) -> List[LabwareDefinition]: return self.run_orchestrator.get_loaded_labware_definitions() @@ -369,7 +365,7 @@ def get_is_run_terminal(self) -> bool: return self.run_orchestrator.get_is_run_terminal() def run_was_started(self) -> bool: - return self.run_orchestrator.was_run_started() + return self.run_orchestrator.run_has_started() def add_labware_offset(self, request: LabwareOffsetCreate) -> LabwareOffset: return self.run_orchestrator.add_labware_offset(request) diff --git a/robot-server/tests/commands/test_get_default_engine.py b/robot-server/tests/commands/test_get_default_engine.py index 542266d9dc2..c99b82e2c67 100644 --- a/robot-server/tests/commands/test_get_default_engine.py +++ b/robot-server/tests/commands/test_get_default_engine.py @@ -1,21 +1,21 @@ -"""Tests for robot_server.commands.get_default_engine.""" +"""Tests for robot_server.commands.get_default_orchestrator.""" import pytest from decoy import Decoy from opentrons.hardware_control import HardwareControlAPI from opentrons.hardware_control.modules import MagDeck, TempDeck -from opentrons.protocol_engine import ProtocolEngine +from opentrons.protocol_runner import RunOrchestrator from robot_server.errors.error_responses import ApiError from robot_server.runs.engine_store import EngineStore, EngineConflictError from robot_server.modules.module_identifier import ModuleIdentifier, ModuleIdentity -from robot_server.commands.get_default_orchestrator import get_default_engine +from robot_server.commands.get_default_orchestrator import get_default_orchestrator @pytest.fixture() -def protocol_engine(decoy: Decoy) -> ProtocolEngine: +def run_orchestrator(decoy: Decoy) -> RunOrchestrator: """Get a mocked out ProtocolEngine.""" - return decoy.mock(cls=ProtocolEngine) + return decoy.mock(cls=RunOrchestrator) @pytest.fixture() @@ -30,11 +30,11 @@ def module_identifier(decoy: Decoy) -> ModuleIdentifier: return decoy.mock(cls=ModuleIdentifier) -async def test_get_default_engine( +async def test_get_default_orchestrator( decoy: Decoy, engine_store: EngineStore, hardware_api: HardwareControlAPI, - protocol_engine: ProtocolEngine, + run_orchestrator: RunOrchestrator, module_identifier: ModuleIdentifier, ) -> None: """It should get a default engine with modules pre-loaded.""" @@ -63,30 +63,32 @@ async def test_get_default_engine( decoy.when(hardware_api.attached_modules).then_return([mod_1, mod_2]) - decoy.when(await engine_store.get_default_engine()).then_return(protocol_engine) + decoy.when(await engine_store.get_default_orchestrator()).then_return( + run_orchestrator + ) - result = await get_default_engine( + result = await get_default_orchestrator( engine_store=engine_store, hardware_api=hardware_api, module_identifier=module_identifier, ) - assert result is protocol_engine + assert result is run_orchestrator decoy.verify( - await protocol_engine.use_attached_modules({"mod-1": mod_1, "mod-2": mod_2}), + await run_orchestrator.use_attached_modules({"mod-1": mod_1, "mod-2": mod_2}), times=1, ) async def test_raises_conflict(decoy: Decoy, engine_store: EngineStore) -> None: """It should raise a 409 conflict if the default engine is not availble.""" - decoy.when(await engine_store.get_default_engine()).then_raise( + decoy.when(await engine_store.get_default_orchestrator()).then_raise( EngineConflictError("oh no") ) with pytest.raises(ApiError) as exc_info: - await get_default_engine(engine_store=engine_store) + await get_default_orchestrator(engine_store=engine_store) assert exc_info.value.status_code == 409 assert exc_info.value.content["errors"][0]["id"] == "RunActive" diff --git a/robot-server/tests/commands/test_router.py b/robot-server/tests/commands/test_router.py index 2d8dc6ac435..259af673fe9 100644 --- a/robot-server/tests/commands/test_router.py +++ b/robot-server/tests/commands/test_router.py @@ -4,12 +4,12 @@ from decoy import Decoy from opentrons.protocol_engine import ( - ProtocolEngine, CommandSlice, CommandPointer, commands as pe_commands, ) from opentrons.protocol_engine.errors import CommandDoesNotExistError +from opentrons.protocol_runner import RunOrchestrator from robot_server.service.json_api import MultiBodyMeta from robot_server.errors.error_responses import ApiError @@ -22,14 +22,14 @@ @pytest.fixture() -def protocol_engine(decoy: Decoy) -> ProtocolEngine: - """Get a mocked out ProtocolEngine.""" - return decoy.mock(cls=ProtocolEngine) +def run_orchestrator(decoy: Decoy) -> RunOrchestrator: + """Get a mocked out RunOrchestrator.""" + return decoy.mock(cls=RunOrchestrator) async def test_create_command( decoy: Decoy, - protocol_engine: ProtocolEngine, + run_orchestrator: RunOrchestrator, ) -> None: """It should be able to create a command.""" command_create = pe_commands.HomeCreate(params=pe_commands.HomeParams()) @@ -43,17 +43,17 @@ async def test_create_command( ) def _stub_queued_command_state(*_a: object, **_k: object) -> pe_commands.Command: - decoy.when(protocol_engine.state_view.commands.get("abc123")).then_return( - queued_command - ) + decoy.when(run_orchestrator.get_command("abc123")).then_return(queued_command) return queued_command decoy.when( - protocol_engine.add_command( - pe_commands.HomeCreate( + await run_orchestrator.add_command_and_wait_for_interval( + command=pe_commands.HomeCreate( params=pe_commands.HomeParams(), intent=pe_commands.CommandIntent.SETUP, - ) + ), + wait_until_complete=False, + timeout=42, ) ).then_do(_stub_queued_command_state) @@ -61,31 +61,23 @@ def _stub_queued_command_state(*_a: object, **_k: object) -> pe_commands.Command RequestModelWithStatelessCommandCreate(data=command_create), waitUntilComplete=False, timeout=42, - engine=protocol_engine, + orchestrator=run_orchestrator, ) assert result.content.data == queued_command assert result.status_code == 201 - decoy.verify(await protocol_engine.wait_for_command("abc123"), times=0) async def test_create_command_wait_for_complete( decoy: Decoy, - protocol_engine: ProtocolEngine, + run_orchestrator: RunOrchestrator, ) -> None: """It should be able to create a command.""" command_create = pe_commands.HomeCreate( params=pe_commands.HomeParams(), intent=pe_commands.CommandIntent.SETUP, ) - queued_command = pe_commands.Home( - id="abc123", - key="command-key", - createdAt=datetime(year=2021, month=1, day=1), - status=pe_commands.CommandStatus.QUEUED, - params=pe_commands.HomeParams(), - result=None, - ) + completed_command = pe_commands.Home( id="abc123", key="command-key", @@ -96,30 +88,21 @@ async def test_create_command_wait_for_complete( result=None, ) - def _stub_queued_command_state(*_a: object, **_k: object) -> pe_commands.Command: - decoy.when(protocol_engine.state_view.commands.get("abc123")).then_return( - queued_command + decoy.when( + await run_orchestrator.add_command_and_wait_for_interval( + command=command_create, + wait_until_complete=True, + timeout=42, ) - return queued_command + ).then_return(completed_command) - def _stub_completed_command_state(*_a: object, **_k: object) -> None: - decoy.when(protocol_engine.state_view.commands.get("abc123")).then_return( - completed_command - ) - - decoy.when(protocol_engine.add_command(command_create)).then_do( - _stub_queued_command_state - ) - - decoy.when(await protocol_engine.wait_for_command("abc123")).then_do( - _stub_completed_command_state - ) + decoy.when(run_orchestrator.get_command("abc123")).then_return(completed_command) result = await create_command( RequestModelWithStatelessCommandCreate(data=command_create), waitUntilComplete=True, timeout=42, - engine=protocol_engine, + orchestrator=run_orchestrator, ) assert result.content.data == completed_command @@ -128,7 +111,7 @@ def _stub_completed_command_state(*_a: object, **_k: object) -> None: async def test_get_commands_list( decoy: Decoy, - protocol_engine: ProtocolEngine, + run_orchestrator: RunOrchestrator, ) -> None: """It should get a list of commands.""" command_1 = pe_commands.Home( @@ -146,7 +129,7 @@ async def test_get_commands_list( params=pe_commands.HomeParams(), ) - decoy.when(protocol_engine.state_view.commands.get_current()).then_return( + decoy.when(run_orchestrator.get_current_command()).then_return( CommandPointer( command_id="abc123", command_key="command-key-1", @@ -154,14 +137,12 @@ async def test_get_commands_list( index=0, ) ) - decoy.when( - protocol_engine.state_view.commands.get_slice(cursor=1337, length=42) - ).then_return( + decoy.when(run_orchestrator.get_command_slice(cursor=1337, length=42)).then_return( CommandSlice(commands=[command_1, command_2], cursor=0, total_length=2) ) result = await get_commands_list( - engine=protocol_engine, + orchestrator=run_orchestrator, cursor=1337, pageLength=42, ) @@ -173,7 +154,7 @@ async def test_get_commands_list( async def test_get_command( decoy: Decoy, - protocol_engine: ProtocolEngine, + run_orchestrator: RunOrchestrator, ) -> None: """It should get a single command by ID.""" command_1 = pe_commands.Home( @@ -184,9 +165,9 @@ async def test_get_command( params=pe_commands.HomeParams(), ) - decoy.when(protocol_engine.state_view.commands.get("abc123")).then_return(command_1) + decoy.when(run_orchestrator.get_command("abc123")).then_return(command_1) - result = await get_command(commandId="abc123", engine=protocol_engine) + result = await get_command(commandId="abc123", orchestrator=run_orchestrator) assert result.content.data == command_1 assert result.status_code == 200 @@ -194,15 +175,15 @@ async def test_get_command( async def test_get_command_not_found( decoy: Decoy, - protocol_engine: ProtocolEngine, + run_orchestrator: RunOrchestrator, ) -> None: """It should raise a 404 if command is not found.""" - decoy.when(protocol_engine.state_view.commands.get("abc123")).then_raise( + decoy.when(run_orchestrator.get_command("abc123")).then_raise( CommandDoesNotExistError("oh no") ) with pytest.raises(ApiError) as exc_info: - await get_command(commandId="abc123", engine=protocol_engine) + await get_command(commandId="abc123", orchestrator=run_orchestrator) assert exc_info.value.status_code == 404 assert exc_info.value.content["errors"][0]["id"] == "StatelessCommandNotFound" diff --git a/robot-server/tests/runs/test_engine_store.py b/robot-server/tests/runs/test_engine_store.py index e75ba49c9ba..846bef77034 100644 --- a/robot-server/tests/runs/test_engine_store.py +++ b/robot-server/tests/runs/test_engine_store.py @@ -233,10 +233,10 @@ async def test_clear_idle_engine(subject: EngineStore) -> None: subject._run_orchestrator.runner -async def test_get_default_engine_idempotent(subject: EngineStore) -> None: +async def test_get_default_orchestrator_idempotent(subject: EngineStore) -> None: """It should create and retrieve the same default ProtocolEngine.""" - result = await subject.get_default_engine() - repeated_result = await subject.get_default_engine() + result = await subject.get_default_orchestrator() + repeated_result = await subject.get_default_orchestrator() assert isinstance(result, ProtocolEngine) assert repeated_result is result @@ -244,7 +244,7 @@ async def test_get_default_engine_idempotent(subject: EngineStore) -> None: @pytest.mark.parametrize("robot_type", ["OT-2 Standard", "OT-3 Standard"]) @pytest.mark.parametrize("deck_type", pe_types.DeckType) -async def test_get_default_engine_robot_type( +async def test_get_default_orchestrator_robot_type( decoy: Decoy, robot_type: RobotType, deck_type: pe_types.DeckType ) -> None: """It should create default ProtocolEngines with the given robot and deck type.""" @@ -257,12 +257,12 @@ async def test_get_default_engine_robot_type( deck_type=deck_type, ) - result = await subject.get_default_engine() + result = await subject.get_default_orchestrator() assert result.state_view.config.robot_type == robot_type -async def test_get_default_engine_current_unstarted(subject: EngineStore) -> None: +async def test_get_default_orchestrator_current_unstarted(subject: EngineStore) -> None: """It should allow a default engine if another engine current but unstarted.""" await subject.create( run_id="run-id", @@ -272,11 +272,11 @@ async def test_get_default_engine_current_unstarted(subject: EngineStore) -> Non notify_publishers=mock_notify_publishers, ) - result = await subject.get_default_engine() + result = await subject.get_default_orchestrator() assert isinstance(result, ProtocolEngine) -async def test_get_default_engine_conflict(subject: EngineStore) -> None: +async def test_get_default_orchestrator_conflict(subject: EngineStore) -> None: """It should not allow a default engine if another engine is executing commands.""" await subject.create( run_id="run-id", @@ -288,10 +288,10 @@ async def test_get_default_engine_conflict(subject: EngineStore) -> None: subject.play() with pytest.raises(EngineConflictError): - await subject.get_default_engine() + await subject.get_default_orchestrator() -async def test_get_default_engine_run_stopped(subject: EngineStore) -> None: +async def test_get_default_orchestrator_run_stopped(subject: EngineStore) -> None: """It allow a default engine if another engine is terminal.""" await subject.create( run_id="run-id", @@ -302,7 +302,7 @@ async def test_get_default_engine_run_stopped(subject: EngineStore) -> None: ) await subject.finish(error=None) - result = await subject.get_default_engine() + result = await subject.get_default_orchestrator() assert isinstance(result, ProtocolEngine)