Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(robot-server, api): add router for creating run error recovery policies #15812

Merged
4 changes: 4 additions & 0 deletions api/src/opentrons/protocol_engine/protocol_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,10 @@ def set_and_start_queue_worker(
)
self._queue_worker.start()

def set_error_recovery_policy(self, policy: ErrorRecoveryPolicy) -> None:
"""Set error recovery policy for run."""
raise NotImplementedError("set_error_recovery_policy is not implemented yet")


# TODO(tz, 7-12-23): move this to shared data when we dont relay on ErrorOccurrence
def code_in_error_tree(
Expand Down
6 changes: 6 additions & 0 deletions api/src/opentrons/protocol_runner/run_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
RunTimeParameter,
RunTimeParamValuesType,
)
from ..protocol_engine.error_recovery_policy import ErrorRecoveryPolicy

from ..protocol_reader import JsonProtocolConfig, PythonProtocolConfig, ProtocolSource
from ..protocols.parse import PythonParseMode

Expand Down Expand Up @@ -357,6 +359,10 @@ def get_deck_type(self) -> DeckType:
"""Get engine deck type."""
return self._protocol_engine.state_view.config.deck_type

def set_error_recovery_policy(self, policy: ErrorRecoveryPolicy) -> None:
"""Create error recovery policy for the run."""
self._protocol_engine.set_error_recovery_policy(policy)

async def command_generator(self) -> AsyncGenerator[str, None]:
"""Yield next command to execute."""
while True:
Expand Down
12 changes: 12 additions & 0 deletions api/tests/opentrons/protocol_runner/test_run_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from decoy import Decoy
from typing import Union, Generator

from opentrons.protocol_engine.error_recovery_policy import ErrorRecoveryPolicy
from opentrons.protocol_engine.errors import RunStoppedError
from opentrons.protocol_engine.state import StateStore
from opentrons.protocols.api_support.types import APIVersion
Expand Down Expand Up @@ -518,3 +519,14 @@ def get_next_to_execute() -> Generator[str, None, None]:
async for command in live_protocol_subject.command_generator():
assert command == f"command-id-{index}"
index = index + 1


async def test_create_error_recovery_policy(
decoy: Decoy,
mock_protocol_engine: ProtocolEngine,
live_protocol_subject: RunOrchestrator,
) -> None:
"""Should call PE set_error_recovery_policy."""
policy = decoy.mock(cls=ErrorRecoveryPolicy)
live_protocol_subject.set_error_recovery_policy(policy)
decoy.verify(mock_protocol_engine.set_error_recovery_policy(policy))
2 changes: 1 addition & 1 deletion robot-server/robot_server/runs/error_recovery_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def _policy(

if command_type_matches and error_type_matches:
if rule.ifMatch == ReactionIfMatch.IGNORE_AND_CONTINUE:
raise NotImplementedError # No protocol engine support for this yet. It's in EXEC-302.
return ErrorRecoveryType.IGNORE_AND_CONTINUE
elif rule.ifMatch == ReactionIfMatch.FAIL_RUN:
return ErrorRecoveryType.FAIL_RUN
elif rule.ifMatch == ReactionIfMatch.WAIT_FOR_RECOVERY:
Expand Down
14 changes: 13 additions & 1 deletion robot-server/robot_server/runs/error_recovery_models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Request and response models for dealing with error recovery policies."""
from enum import Enum
from typing import List

from pydantic import BaseModel, Field


Expand Down Expand Up @@ -57,7 +59,7 @@ class MatchCriteria(BaseModel):


class ErrorRecoveryRule(BaseModel):
"""Request/Response model for new error recovery rule creation."""
"""Model for new error recovery rule."""

matchCriteria: MatchCriteria = Field(
...,
Expand All @@ -67,3 +69,13 @@ class ErrorRecoveryRule(BaseModel):
...,
description="The specific recovery setting that will be in use if the type parameters match.",
)


class ErrorRecoveryPolicies(BaseModel):
"""Request/Response model for new error recovery policy rules creation."""

policyRules: List[ErrorRecoveryRule] = Field(
...,
description="A list or error recovery rules to apply for a run's recovery management."
"The rules are evaluated first-to-last. The first exact match will dectate recovery management.",
)
42 changes: 42 additions & 0 deletions robot-server/robot_server/runs/router/base_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
get_run_auto_deleter,
get_quick_transfer_run_auto_deleter,
)
from ..error_recovery_models import ErrorRecoveryPolicies

from robot_server.deck_configuration.fastapi_dependencies import (
get_deck_configuration_store,
Expand Down Expand Up @@ -362,3 +363,44 @@ async def update_run(
content=SimpleBody.construct(data=run_data),
status_code=status.HTTP_200_OK,
)


@PydanticResponse.wrap_route(
base_router.put,
path="/runs/{runId}/errorRecoveryPolicies",
summary="Set run policies",
description=dedent(
"""
Update how to handle different kinds of command failures.
The following rules will persist during the run.
"""
),
status_code=status.HTTP_201_CREATED,
responses={
status.HTTP_200_OK: {"model": SimpleEmptyBody},
status.HTTP_409_CONFLICT: {"model": ErrorBody[RunStopped]},
},
)
async def set_run_policies(
runId: str,
request_body: RequestModel[ErrorRecoveryPolicies],
run_data_manager: RunDataManager = Depends(get_run_data_manager),
) -> PydanticResponse[SimpleEmptyBody]:
"""Create run polices.

Arguments:
runId: Run ID pulled from URL.
request_body: Request body with run policies data.
run_data_manager: Current and historical run data management.
"""
policies = request_body.data.policyRules
if policies:
try:
run_data_manager.set_policies(run_id=runId, policies=policies)
except RunNotCurrentError as e:
raise RunStopped(detail=str(e)).as_error(status.HTTP_409_CONFLICT) from e

return await PydanticResponse.create(
content=SimpleEmptyBody.construct(),
status_code=status.HTTP_200_OK,
)
13 changes: 13 additions & 0 deletions robot-server/robot_server/runs/run_data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from robot_server.protocols.protocol_store import ProtocolResource
from robot_server.service.task_runner import TaskRunner
from robot_server.service.notifications import RunsPublisher
from . import error_recovery_mapping
from .error_recovery_models import ErrorRecoveryRule

from .run_orchestrator_store import RunOrchestratorStore
from .run_store import RunResource, RunStore, BadRunResource, BadStateSummary
Expand Down Expand Up @@ -433,6 +435,17 @@ def get_all_commands_as_preserialized_list(self, run_id: str) -> List[str]:
)
return self._run_store.get_all_commands_as_preserialized_list(run_id)

def set_policies(self, run_id: str, policies: List[ErrorRecoveryRule]) -> None:
"""Create run policy rules for error recovery."""
if run_id != self._run_orchestrator_store.current_run_id:
raise RunNotCurrentError(
f"Cannot update {run_id} because it is not the current run."
)
policy = error_recovery_mapping.create_error_recovery_policy_from_rules(
policies
)
self._run_orchestrator_store.set_error_recovery_policy(policy=policy)

def _get_state_summary(self, run_id: str) -> Union[StateSummary, BadStateSummary]:
if run_id == self._run_orchestrator_store.current_run_id:
return self._run_orchestrator_store.get_state_summary()
Expand Down
5 changes: 5 additions & 0 deletions robot-server/robot_server/runs/run_orchestrator_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
)
from opentrons_shared_data.labware.types import LabwareUri

from opentrons.protocol_engine.error_recovery_policy import ErrorRecoveryPolicy

_log = logging.getLogger(__name__)

Expand Down Expand Up @@ -359,6 +360,10 @@ def add_labware_definition(self, definition: LabwareDefinition) -> LabwareUri:
"""Add a new labware definition to state."""
return self.run_orchestrator.add_labware_definition(definition)

def set_error_recovery_policy(self, policy: ErrorRecoveryPolicy) -> None:
"""Create run policy rules for error recovery."""
self.run_orchestrator.set_error_recovery_policy(policy)

async def add_command_and_wait_for_interval(
self,
request: CommandCreate,
Expand Down
40 changes: 40 additions & 0 deletions robot-server/tests/runs/router/test_base_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from opentrons.protocol_reader import ProtocolSource, JsonProtocolConfig

from robot_server.errors.error_responses import ApiError
from robot_server.runs.error_recovery_models import ErrorRecoveryPolicies
from robot_server.service.json_api import (
RequestModel,
SimpleBody,
Expand Down Expand Up @@ -37,6 +38,7 @@
get_runs,
remove_run,
update_run,
set_run_policies,
)

from robot_server.deck_configuration.store import DeckConfigurationStore
Expand Down Expand Up @@ -569,3 +571,41 @@ async def test_update_to_current_missing(

assert exc_info.value.status_code == 404
assert exc_info.value.content["errors"][0]["id"] == "RunNotFound"


async def test_create_policies(
decoy: Decoy, mock_run_data_manager: RunDataManager
) -> None:
"""It should call RunDataManager create run policies."""
policies = decoy.mock(cls=ErrorRecoveryPolicies)
await set_run_policies(
runId="rud-id",
request_body=RequestModel(data=policies),
run_data_manager=mock_run_data_manager,
)
decoy.verify(
mock_run_data_manager.set_policies(
run_id="rud-id", policies=policies.policyRules
)
)


async def test_create_policies_raises_not_active_run(
decoy: Decoy, mock_run_data_manager: RunDataManager
) -> None:
"""It should raise that the run is not current."""
policies = decoy.mock(cls=ErrorRecoveryPolicies)
decoy.when(
mock_run_data_manager.set_policies(
run_id="rud-id", policies=policies.policyRules
)
).then_raise(RunNotCurrentError())
with pytest.raises(ApiError) as exc_info:
await set_run_policies(
runId="rud-id",
request_body=RequestModel(data=policies),
run_data_manager=mock_run_data_manager,
)

assert exc_info.value.status_code == 409
assert exc_info.value.content["errors"][0]["id"] == "RunStopped"
4 changes: 3 additions & 1 deletion robot-server/tests/runs/test_error_recovery_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ def test_create_error_recovery_policy_with_rules(
robot_type="OT-3 Standard",
deck_type=DeckType.OT3_STANDARD,
)
with pytest.raises(NotImplementedError):
assert (
policy(exampleConfig, mock_command, mock_error_data)
== ErrorRecoveryType.IGNORE_AND_CONTINUE
)


def test_create_error_recovery_policy_undefined_error(
Expand Down
69 changes: 55 additions & 14 deletions robot-server/tests/runs/test_run_data_manager.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
"""Tests for RunDataManager."""
from datetime import datetime
from typing import Optional, List

import pytest
from datetime import datetime
from decoy import Decoy, matchers

from opentrons.types import DeckSlotName
from opentrons.protocol_runner import RunResult
from opentrons.protocol_engine import (
EngineStatus,
StateSummary,
Expand All @@ -20,31 +17,34 @@
LoadedModule,
LabwareOffset,
)
from opentrons.protocol_engine import Liquid
from opentrons.protocol_engine.error_recovery_policy import ErrorRecoveryPolicy
from opentrons.protocol_runner import RunResult
from opentrons.types import DeckSlotName

from opentrons_shared_data.errors.exceptions import InvalidStoredData
from opentrons_shared_data.labware.labware_definition import LabwareDefinition
from robot_server.protocols.protocol_store import ProtocolResource
from robot_server.runs.run_orchestrator_store import (
RunOrchestratorStore,
RunConflictError,
)
from robot_server.runs import error_recovery_mapping
from robot_server.runs.error_recovery_models import ErrorRecoveryRule
from robot_server.runs.run_data_manager import (
RunDataManager,
RunNotCurrentError,
PreSerializedCommandsNotAvailableError,
)
from robot_server.runs.run_models import Run, BadRun, RunNotFoundError, RunDataError
from robot_server.runs.run_orchestrator_store import (
RunOrchestratorStore,
RunConflictError,
)
from robot_server.runs.run_store import (
RunStore,
RunResource,
CommandNotFoundError,
BadStateSummary,
)
from robot_server.service.task_runner import TaskRunner
from robot_server.service.notifications import RunsPublisher

from opentrons.protocol_engine import Liquid

from opentrons_shared_data.labware.labware_definition import LabwareDefinition
from opentrons_shared_data.errors.exceptions import InvalidStoredData
from robot_server.service.task_runner import TaskRunner


def mock_notify_publishers() -> None:
Expand Down Expand Up @@ -1001,3 +1001,44 @@ async def test_get_current_run_labware_definition(
LabwareDefinition.construct(namespace="test_1"), # type: ignore[call-arg]
LabwareDefinition.construct(namespace="test_2"), # type: ignore[call-arg]
]


async def test_create_policies_raises_run_not_current(
decoy: Decoy,
mock_run_orchestrator_store: RunOrchestratorStore,
subject: RunDataManager,
) -> None:
"""Should raise run not current."""
decoy.when(mock_run_orchestrator_store.current_run_id).then_return(
"not-current-run-id"
)
with pytest.raises(RunNotCurrentError):
subject.set_policies(
run_id="run-id", policies=decoy.mock(cls=List[ErrorRecoveryRule])
)


async def test_create_policies_translates_and_calls_orchestrator(
decoy: Decoy,
monkeypatch: pytest.MonkeyPatch,
mock_run_orchestrator_store: RunOrchestratorStore,
subject: RunDataManager,
) -> None:
"""Should translate rules into policy and call orchestrator."""
monkeypatch.setattr(
error_recovery_mapping,
"create_error_recovery_policy_from_rules",
decoy.mock(
func=decoy.mock(
func=error_recovery_mapping.create_error_recovery_policy_from_rules
)
),
)
input_rules = decoy.mock(cls=List[ErrorRecoveryRule])
expected_output = decoy.mock(cls=ErrorRecoveryPolicy)
decoy.when(
error_recovery_mapping.create_error_recovery_policy_from_rules(input_rules)
).then_return(expected_output)
decoy.when(mock_run_orchestrator_store.current_run_id).then_return("run-id")
subject.set_policies(run_id="run-id", policies=input_rules)
decoy.verify(mock_run_orchestrator_store.set_error_recovery_policy(expected_output))
Loading