Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(robot-server, api): add router for creating run error recovery policies #15812

Merged
4 changes: 4 additions & 0 deletions api/src/opentrons/protocol_engine/protocol_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,10 @@ def set_and_start_queue_worker(
)
self._queue_worker.start()

async def set_error_recovery_policy(self, policy: ErrorRecoveryPolicy) -> None:
"""Set error recovery policy for run."""
raise NotImplementedError("set_error_recovery_policy is not implemented yet")

TamarZanzouri marked this conversation as resolved.
Show resolved Hide resolved

# TODO(tz, 7-12-23): move this to shared data when we dont relay on ErrorOccurrence
def code_in_error_tree(
Expand Down
6 changes: 6 additions & 0 deletions api/src/opentrons/protocol_runner/run_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
RunTimeParameter,
RunTimeParamValuesType,
)
from ..protocol_engine.error_recovery_policy import ErrorRecoveryPolicy

from ..protocol_reader import JsonProtocolConfig, PythonProtocolConfig, ProtocolSource
from ..protocols.parse import PythonParseMode

Expand Down Expand Up @@ -357,6 +359,10 @@ def get_deck_type(self) -> DeckType:
"""Get engine deck type."""
return self._protocol_engine.state_view.config.deck_type

async def create_error_recovery_policy(self, policy: ErrorRecoveryPolicy) -> None:
"""Create error recovery policy for the run."""
await self._protocol_engine.set_error_recovery_policy(policy)

TamarZanzouri marked this conversation as resolved.
Show resolved Hide resolved
async def command_generator(self) -> AsyncGenerator[str, None]:
"""Yield next command to execute."""
while True:
Expand Down
12 changes: 12 additions & 0 deletions api/tests/opentrons/protocol_runner/test_run_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from decoy import Decoy
from typing import Union, Generator

from opentrons.protocol_engine.error_recovery_policy import ErrorRecoveryPolicy
from opentrons.protocol_engine.errors import RunStoppedError
from opentrons.protocol_engine.state import StateStore
from opentrons.protocols.api_support.types import APIVersion
Expand Down Expand Up @@ -518,3 +519,14 @@ def get_next_to_execute() -> Generator[str, None, None]:
async for command in live_protocol_subject.command_generator():
assert command == f"command-id-{index}"
index = index + 1


async def test_create_error_recovery_policy(
decoy: Decoy,
mock_protocol_engine: ProtocolEngine,
live_protocol_subject: RunOrchestrator,
) -> None:
"""Should call PE set_error_recovery_policy."""
policy = decoy.mock(cls=ErrorRecoveryPolicy)
await live_protocol_subject.create_error_recovery_policy(policy)
decoy.verify(await mock_protocol_engine.set_error_recovery_policy(policy))
2 changes: 1 addition & 1 deletion robot-server/robot_server/runs/error_recovery_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def _policy(

if command_type_matches and error_type_matches:
if rule.ifMatch == ReactionIfMatch.IGNORE_AND_CONTINUE:
raise NotImplementedError # No protocol engine support for this yet. It's in EXEC-302.
return ErrorRecoveryType.IGNORE_AND_CONTINUE
elif rule.ifMatch == ReactionIfMatch.FAIL_RUN:
return ErrorRecoveryType.FAIL_RUN
elif rule.ifMatch == ReactionIfMatch.WAIT_FOR_RECOVERY:
Expand Down
43 changes: 42 additions & 1 deletion robot-server/robot_server/runs/router/base_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
from datetime import datetime
from textwrap import dedent
from typing import Optional, Union, Callable
from typing import Optional, Union, Callable, List
from typing_extensions import Literal

from fastapi import APIRouter, Depends, status, Query
Expand Down Expand Up @@ -45,6 +45,7 @@
get_run_auto_deleter,
get_quick_transfer_run_auto_deleter,
)
from ..error_recovery_models import ErrorRecoveryRule

from robot_server.deck_configuration.fastapi_dependencies import (
get_deck_configuration_store,
Expand Down Expand Up @@ -362,3 +363,43 @@ async def update_run(
content=SimpleBody.construct(data=run_data),
status_code=status.HTTP_200_OK,
)


@PydanticResponse.wrap_route(
base_router.post,
TamarZanzouri marked this conversation as resolved.
Show resolved Hide resolved
path="/runs/{runId}/policies",
summary="Create run policies",
TamarZanzouri marked this conversation as resolved.
Show resolved Hide resolved
description=dedent(
"""
Create run policies for error recovery.
"""
TamarZanzouri marked this conversation as resolved.
Show resolved Hide resolved
),
status_code=status.HTTP_201_CREATED,
responses={
status.HTTP_201_CREATED: {"model": SimpleBody[Run]},
status.HTTP_409_CONFLICT: {"model": ErrorBody[RunStopped]},
},
TamarZanzouri marked this conversation as resolved.
Show resolved Hide resolved
)
async def create_run_policies(
runId: str,
request_body: Optional[RequestModel[List[ErrorRecoveryRule]]] = None,
TamarZanzouri marked this conversation as resolved.
Show resolved Hide resolved
TamarZanzouri marked this conversation as resolved.
Show resolved Hide resolved
run_data_manager: RunDataManager = Depends(get_run_data_manager),
) -> PydanticResponse[SimpleEmptyBody]:
"""Create run polices.

Arguments:
runId: Run ID pulled from URL.
request_body: Optional request body with run creation data.
run_data_manager: Current and historical run data management.
"""
policies = request_body.data if request_body is not None else None
if policies:
try:
await run_data_manager.create_policies(run_id=runId, policies=policies)
except RunNotCurrentError as e:
raise RunStopped(detail=str(e)).as_error(status.HTTP_409_CONFLICT) from e

return await PydanticResponse.create(
content=SimpleEmptyBody.construct(),
status_code=status.HTTP_200_OK,
)
15 changes: 15 additions & 0 deletions robot-server/robot_server/runs/run_data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from robot_server.protocols.protocol_store import ProtocolResource
from robot_server.service.task_runner import TaskRunner
from robot_server.service.notifications import RunsPublisher
from . import error_recovery_mapping
from .error_recovery_models import ErrorRecoveryRule

from .run_orchestrator_store import RunOrchestratorStore
from .run_store import RunResource, RunStore, BadRunResource, BadStateSummary
Expand Down Expand Up @@ -433,6 +435,19 @@ def get_all_commands_as_preserialized_list(self, run_id: str) -> List[str]:
)
return self._run_store.get_all_commands_as_preserialized_list(run_id)

async def create_policies(
self, run_id: str, policies: List[ErrorRecoveryRule]
) -> None:
"""Create run policy rules for error recovery."""
if run_id != self._run_orchestrator_store.current_run_id:
raise RunNotCurrentError(
f"Cannot update {run_id} because it is not the current run."
)
policy = error_recovery_mapping.create_error_recovery_policy_from_rules(
policies
)
await self._run_orchestrator_store.create_error_recovery_policy(policy=policy)

def _get_state_summary(self, run_id: str) -> Union[StateSummary, BadStateSummary]:
if run_id == self._run_orchestrator_store.current_run_id:
return self._run_orchestrator_store.get_state_summary()
Expand Down
5 changes: 5 additions & 0 deletions robot-server/robot_server/runs/run_orchestrator_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
)
from opentrons_shared_data.labware.types import LabwareUri

from opentrons.protocol_engine.error_recovery_policy import ErrorRecoveryPolicy

_log = logging.getLogger(__name__)

Expand Down Expand Up @@ -359,6 +360,10 @@ def add_labware_definition(self, definition: LabwareDefinition) -> LabwareUri:
"""Add a new labware definition to state."""
return self.run_orchestrator.add_labware_definition(definition)

async def create_error_recovery_policy(self, policy: ErrorRecoveryPolicy) -> None:
"""Create run policy rules for error recovery."""
await self.run_orchestrator.create_error_recovery_policy(policy)

async def add_command_and_wait_for_interval(
self,
request: CommandCreate,
Expand Down
49 changes: 49 additions & 0 deletions robot-server/tests/runs/router/test_base_router.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""Tests for base /runs routes."""
from typing import List

import pytest
from datetime import datetime
from decoy import Decoy
Expand All @@ -9,6 +11,7 @@
from opentrons.protocol_reader import ProtocolSource, JsonProtocolConfig

from robot_server.errors.error_responses import ApiError
from robot_server.runs.error_recovery_models import ErrorRecoveryRule
from robot_server.service.json_api import (
RequestModel,
SimpleBody,
Expand Down Expand Up @@ -37,6 +40,7 @@
get_runs,
remove_run,
update_run,
create_run_policies,
)

from robot_server.deck_configuration.store import DeckConfigurationStore
Expand Down Expand Up @@ -569,3 +573,48 @@ async def test_update_to_current_missing(

assert exc_info.value.status_code == 404
assert exc_info.value.content["errors"][0]["id"] == "RunNotFound"


async def test_create_policies(
decoy: Decoy, mock_run_data_manager: RunDataManager
) -> None:
"""It should call RunDataManager create run policies."""
policies = decoy.mock(cls=List[ErrorRecoveryRule])
await create_run_policies(
runId="rud-id",
run_data_manager=mock_run_data_manager,
request_body=RequestModel(data=policies),
)
decoy.verify(
await mock_run_data_manager.create_policies(run_id="rud-id", policies=policies)
)


# async def test_create_policies_raises_no_policies(
# decoy: Decoy, mock_run_data_manager: RunDataManager
# ) -> None:
# """It should raise that no policies were accepted."""
# policies = decoy.mock(cls=List[ErrorRecoveryRule])
# await create_run_policies(runId="rud-id", request_body=RequestModel(data=policies))
# decoy.verify(
# mock_run_data_manager.create_policies(run_id="rud-id", policies=policies)
# )
TamarZanzouri marked this conversation as resolved.
Show resolved Hide resolved


async def test_create_policies_raises_not_active_run(
decoy: Decoy, mock_run_data_manager: RunDataManager
) -> None:
"""It should raise that the run is not current."""
policies = decoy.mock(cls=List[ErrorRecoveryRule])
decoy.when(
await mock_run_data_manager.create_policies(run_id="rud-id", policies=policies)
).then_raise(RunNotCurrentError())
with pytest.raises(ApiError) as exc_info:
await create_run_policies(
runId="rud-id",
run_data_manager=mock_run_data_manager,
request_body=RequestModel(data=policies),
)

assert exc_info.value.status_code == 409
assert exc_info.value.content["errors"][0]["id"] == "RunStopped"
71 changes: 57 additions & 14 deletions robot-server/tests/runs/test_run_data_manager.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
"""Tests for RunDataManager."""
from datetime import datetime
from typing import Optional, List

import pytest
from datetime import datetime
from decoy import Decoy, matchers

from opentrons.types import DeckSlotName
from opentrons.protocol_runner import RunResult
from opentrons.protocol_engine import (
EngineStatus,
StateSummary,
Expand All @@ -20,31 +17,34 @@
LoadedModule,
LabwareOffset,
)
from opentrons.protocol_engine import Liquid
from opentrons.protocol_engine.error_recovery_policy import ErrorRecoveryPolicy
from opentrons.protocol_runner import RunResult
from opentrons.types import DeckSlotName

from opentrons_shared_data.errors.exceptions import InvalidStoredData
from opentrons_shared_data.labware.labware_definition import LabwareDefinition
from robot_server.protocols.protocol_store import ProtocolResource
from robot_server.runs.run_orchestrator_store import (
RunOrchestratorStore,
RunConflictError,
)
from robot_server.runs import error_recovery_mapping
from robot_server.runs.error_recovery_models import ErrorRecoveryRule
from robot_server.runs.run_data_manager import (
RunDataManager,
RunNotCurrentError,
PreSerializedCommandsNotAvailableError,
)
from robot_server.runs.run_models import Run, BadRun, RunNotFoundError, RunDataError
from robot_server.runs.run_orchestrator_store import (
RunOrchestratorStore,
RunConflictError,
)
from robot_server.runs.run_store import (
RunStore,
RunResource,
CommandNotFoundError,
BadStateSummary,
)
from robot_server.service.task_runner import TaskRunner
from robot_server.service.notifications import RunsPublisher

from opentrons.protocol_engine import Liquid

from opentrons_shared_data.labware.labware_definition import LabwareDefinition
from opentrons_shared_data.errors.exceptions import InvalidStoredData
from robot_server.service.task_runner import TaskRunner


def mock_notify_publishers() -> None:
Expand Down Expand Up @@ -1001,3 +1001,46 @@ async def test_get_current_run_labware_definition(
LabwareDefinition.construct(namespace="test_1"), # type: ignore[call-arg]
LabwareDefinition.construct(namespace="test_2"), # type: ignore[call-arg]
]


async def test_create_policies_raises_run_not_current(
decoy: Decoy,
mock_run_orchestrator_store: RunOrchestratorStore,
subject: RunDataManager,
) -> None:
"""Should raise run not current."""
decoy.when(mock_run_orchestrator_store.current_run_id).then_return(
"not-current-run-id"
)
with pytest.raises(RunNotCurrentError):
await subject.create_policies(
run_id="run-id", policies=decoy.mock(cls=List[ErrorRecoveryRule])
)


async def test_create_policies_translates_and_calls_orchestrator(
decoy: Decoy,
monkeypatch: pytest.MonkeyPatch,
mock_run_orchestrator_store: RunOrchestratorStore,
subject: RunDataManager,
) -> None:
"""Should translate rules into policy and call orchestrator."""
monkeypatch.setattr(
error_recovery_mapping,
"create_error_recovery_policy_from_rules",
decoy.mock(
func=decoy.mock(
func=error_recovery_mapping.create_error_recovery_policy_from_rules
)
),
)
input_rules = decoy.mock(cls=List[ErrorRecoveryRule])
expected_output = decoy.mock(cls=ErrorRecoveryPolicy)
decoy.when(
error_recovery_mapping.create_error_recovery_policy_from_rules(input_rules)
).then_return(expected_output)
decoy.when(mock_run_orchestrator_store.current_run_id).then_return("run-id")
await subject.create_policies(run_id="run-id", policies=input_rules)
decoy.verify(
await mock_run_orchestrator_store.create_error_recovery_policy(expected_output)
)
Loading