Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(robot-server): retrieve state of previous runs #8676

Merged
merged 2 commits into from
Nov 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 37 additions & 20 deletions robot-server/robot_server/runs/engine_store.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
"""In-memory storage of ProtocolEngine instances."""
from typing import Optional, NamedTuple
from typing import Dict, NamedTuple, Optional

from opentrons.hardware_control import API as HardwareAPI
from opentrons.protocol_engine import ProtocolEngine, create_protocol_engine
from opentrons.protocol_engine import ProtocolEngine, StateView, create_protocol_engine
from opentrons.protocol_runner import ProtocolRunner


class EngineConflictError(RuntimeError):
"""An error raised if the runner already has an engine initialized."""

pass


class EngineMissingError(RuntimeError):
"""An error raised if the engine somehow hasn't been initialized.

If this error is raised, it's almost certainly due to a software bug.
"""

pass

class EngineConflictError(RuntimeError):
"""An error raised if an active engine is already initialized.

The store will not create a new engine unless the "current" runner/engine
pair is idle.
"""


class RunnerEnginePair(NamedTuple):
Expand All @@ -41,10 +41,11 @@ def __init__(self, hardware_api: HardwareAPI) -> None:
"""
self._hardware_api = hardware_api
self._runner_engine_pair: Optional[RunnerEnginePair] = None
self._engines_by_run_id: Dict[str, ProtocolEngine] = {}

@property
def engine(self) -> ProtocolEngine:
"""Get the persisted ProtocolEngine.
"""Get the "current" persisted ProtocolEngine.

Raises:
EngineMissingError: Engine has not yet been created and persisted.
Expand All @@ -56,7 +57,7 @@ def engine(self) -> ProtocolEngine:

@property
def runner(self) -> ProtocolRunner:
"""Get the persisted ProtocolRunner.
"""Get the "current" persisted ProtocolRunner.

Raises:
EngineMissingError: Runner has not yet been created and persisted.
Expand All @@ -66,28 +67,44 @@ def runner(self) -> ProtocolRunner:

return self._runner_engine_pair.runner

async def create(self) -> RunnerEnginePair:
"""Create and store a ProtocolRunner and ProtocolEngine.
async def create(self, run_id: str) -> StateView:
"""Create and store a ProtocolRunner and ProtocolEngine for a given Run.

Args:
run_id: The run resource the engine is assigned to.

Returns:
The created and stored ProtocolRunner / ProtocolEngine pair.

Raises:
EngineConflictError: a ProtocolEngine is already present.
EngineConflictError: The current runner/engine pair is not idle, so
a new set may not be created.
"""
# NOTE: this async. creation happens before the `self._engine`
# check intentionally to avoid a race condition where `self._engine` is
# set after the check but before the engine has finished getting created,
# at the expense of having to potentially throw away an engine instance
engine = await create_protocol_engine(hardware_api=self._hardware_api)
runner = ProtocolRunner(protocol_engine=engine, hardware_api=self._hardware_api)

if self._runner_engine_pair is not None:
raise EngineConflictError("Cannot load multiple runs simultaneously.")
if not self.engine.state_view.commands.get_is_stopped():
raise EngineConflictError("Current engine is not stopped.")
Comment on lines +87 to +88
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm reading this right, if a run stops due to an internal error or failed Protocol Engine command, the client will still need to issue a stop action on it before attempting to create a new run. Is that correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The runner always stops the engine at the conclusion of the protocol file run, even if that run bails out early due to error. So this shouldn't be a problem


self._runner_engine_pair = RunnerEnginePair(runner=runner, engine=engine)
self._engines_by_run_id[run_id] = engine

return engine.state_view

def get_state(self, run_id: str) -> StateView:
"""Get a run's ProtocolEngine state.

return self._runner_engine_pair
Args:
run_id: The run resource to retrieve engine state from.

Raises:
EngineMissingError: No engine found for the given run ID.
"""
try:
return self._engines_by_run_id[run_id].state_view
except KeyError:
raise EngineMissingError(f"No engine state found for run {run_id}")

def clear(self) -> None:
"""Remove the persisted ProtocolEngine, if present, no-op otherwise."""
Expand Down
37 changes: 23 additions & 14 deletions robot-server/robot_server/runs/router/actions_router.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
"""Router for /runs actions endpoints."""
from fastapi import APIRouter, Depends, status
from datetime import datetime
from typing import Union
from typing_extensions import Literal

from opentrons.protocol_engine.errors import ProtocolEngineStoppedError

from robot_server.errors import ErrorDetails, ErrorResponse
from robot_server.service.dependencies import get_current_time, get_unique_id
from robot_server.service.json_api import RequestModel, ResponseModel

from ..run_store import RunStore, RunNotFoundError
from ..run_view import RunView
from ..action_models import RunAction, RunActionType, RunActionCreateData
from ..engine_store import EngineStore, EngineMissingError
from ..engine_store import EngineStore
from ..dependencies import get_run_store, get_engine_store
from .base_router import RunNotFound
from .base_router import RunNotFound, RunStopped

actions_router = APIRouter()

Expand All @@ -31,7 +34,9 @@ class RunActionNotAllowed(ErrorDetails):
status_code=status.HTTP_201_CREATED,
response_model=ResponseModel[RunAction],
responses={
status.HTTP_400_BAD_REQUEST: {"model": ErrorResponse[RunActionNotAllowed]},
status.HTTP_409_CONFLICT: {
"model": ErrorResponse[Union[RunActionNotAllowed, RunStopped]],
},
status.HTTP_404_NOT_FOUND: {"model": ErrorResponse[RunNotFound]},
},
)
Expand All @@ -57,27 +62,31 @@ async def create_run_action(
"""
try:
prev_run = run_store.get(run_id=runId)
except RunNotFoundError as e:
raise RunNotFound(detail=str(e)).as_error(status.HTTP_404_NOT_FOUND)

action, next_run = run_view.with_action(
run=prev_run,
action_id=action_id,
action_data=request_body.data,
created_at=created_at,
if not prev_run.is_current:
raise RunStopped(detail=f"Run {runId} is not the current run").as_error(
status.HTTP_409_CONFLICT
)

# TODO(mc, 2021-07-06): add a dependency to verify that a given
# action is allowed
action, next_run = run_view.with_action(
run=prev_run,
action_id=action_id,
action_data=request_body.data,
created_at=created_at,
)

try:
if action.actionType == RunActionType.PLAY:
engine_store.runner.play()
elif action.actionType == RunActionType.PAUSE:
engine_store.runner.pause()
if action.actionType == RunActionType.STOP:
await engine_store.runner.stop()

except RunNotFoundError as e:
raise RunNotFound(detail=str(e)).as_error(status.HTTP_404_NOT_FOUND)
except EngineMissingError as e:
raise RunActionNotAllowed(detail=str(e)).as_error(status.HTTP_400_BAD_REQUEST)
except ProtocolEngineStoppedError as e:
raise RunActionNotAllowed(detail=str(e)).as_error(status.HTTP_409_CONFLICT)

run_store.upsert(run=next_run)

Expand Down
49 changes: 31 additions & 18 deletions robot-server/robot_server/runs/router/base_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ class RunNotFound(ErrorDetails):
title: str = "Run Not Found"


# TODO(mc, 2021-05-28): evaluate multi-run logic
class RunAlreadyActive(ErrorDetails):
"""An error if one tries to create a new run while one is already active."""

Expand All @@ -59,6 +58,13 @@ class RunNotIdle(ErrorDetails):
)


class RunStopped(ErrorDetails):
"""An error if one tries to modify a stopped run."""

id: Literal["RunStopped"] = "RunStopped"
title: str = "Run Stopped"


@base_router.post(
path="/runs",
summary="Create a run",
Expand Down Expand Up @@ -96,15 +102,17 @@ async def create_run(
"""
create_data = request_body.data if request_body is not None else None
run = run_view.as_resource(
run_id=run_id, created_at=created_at, create_data=create_data
run_id=run_id,
created_at=created_at,
create_data=create_data,
)
protocol_id = None

if isinstance(create_data, ProtocolRunCreateData):
protocol_id = create_data.createParams.protocolId

try:
await engine_store.create()
engine_state = await engine_store.create(run_id=run_id)

if protocol_id is not None:
protocol_resource = protocol_store.get(protocol_id=protocol_id)
Expand All @@ -124,10 +132,10 @@ async def create_run(

data = run_view.as_response(
run=run,
commands=engine_store.engine.state_view.commands.get_all(),
pipettes=engine_store.engine.state_view.pipettes.get_all(),
labware=engine_store.engine.state_view.labware.get_all(),
engine_status=engine_store.engine.state_view.commands.get_status(),
commands=engine_state.commands.get_all(),
pipettes=engine_state.pipettes.get_all(),
labware=engine_state.labware.get_all(),
engine_status=engine_state.commands.get_status(),
)

return ResponseModel(data=data)
Expand Down Expand Up @@ -155,13 +163,14 @@ async def get_runs(
data = []

for run in run_store.get_all():
# TODO(mc, 2021-06-23): add multi-engine support
run_id = run.run_id
engine_state = engine_store.get_state(run_id)
run_data = run_view.as_response(
run=run,
commands=engine_store.engine.state_view.commands.get_all(),
pipettes=engine_store.engine.state_view.pipettes.get_all(),
labware=engine_store.engine.state_view.labware.get_all(),
engine_status=engine_store.engine.state_view.commands.get_status(),
commands=engine_state.commands.get_all(),
pipettes=engine_state.pipettes.get_all(),
labware=engine_state.labware.get_all(),
engine_status=engine_state.commands.get_status(),
)

data.append(run_data)
Expand Down Expand Up @@ -198,12 +207,14 @@ async def get_run(
except RunNotFoundError as e:
raise RunNotFound(detail=str(e)).as_error(status.HTTP_404_NOT_FOUND)

engine_state = engine_store.get_state(run.run_id)

data = run_view.as_response(
run=run,
commands=engine_store.engine.state_view.commands.get_all(),
pipettes=engine_store.engine.state_view.pipettes.get_all(),
labware=engine_store.engine.state_view.labware.get_all(),
engine_status=engine_store.engine.state_view.commands.get_status(),
commands=engine_state.commands.get_all(),
pipettes=engine_state.pipettes.get_all(),
labware=engine_state.labware.get_all(),
engine_status=engine_state.commands.get_status(),
)

return ResponseModel(data=data)
Expand All @@ -230,10 +241,12 @@ async def remove_run_by_id(
engine_store: ProtocolEngine storage and control.
"""
try:
if not engine_store.engine.state_view.commands.get_is_stopped():
raise RunNotIdle().as_error(status.HTTP_409_CONFLICT)
engine_state = engine_store.get_state(runId)
except EngineMissingError:
pass
else:
if not engine_state.commands.get_is_stopped():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See prior comment about whether .get_is_stopped() is the right thing to check here.

raise RunNotIdle().as_error(status.HTTP_409_CONFLICT)

try:
engine_store.clear()
Expand Down
11 changes: 9 additions & 2 deletions robot-server/robot_server/runs/router/commands_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from ..run_models import Run, RunCommandSummary
from ..engine_store import EngineStore
from ..dependencies import get_engine_store
from .base_router import RunNotFound, get_run
from .base_router import RunNotFound, RunStopped, get_run

commands_router = APIRouter()

Expand All @@ -38,6 +38,7 @@ class CommandNotFound(ErrorDetails):
status_code=status.HTTP_200_OK,
response_model=ResponseModel[pe_commands.Command],
responses={
status.HTTP_400_BAD_REQUEST: {"model": ErrorResponse[RunStopped]},
status.HTTP_404_NOT_FOUND: {"model": ErrorResponse[RunNotFound]},
},
)
Expand All @@ -57,6 +58,11 @@ async def post_run_command(
`GET /runs/{runId}`. Present to ensure 404 if run
not found.
"""
if not run.data.current:
raise RunStopped(detail=f"Run {run.data.id} is not the current run").as_error(
status.HTTP_400_BAD_REQUEST
)

command = engine_store.engine.add_command(request_body.data)
return ResponseModel[pe_commands.Command](data=command)

Expand Down Expand Up @@ -120,8 +126,9 @@ async def get_run_command(
`GET /run/{runId}`. Present to ensure 404 if run
not found.
"""
engine_state = engine_store.get_state(run.data.id)
try:
command = engine_store.engine.state_view.commands.get(commandId)
command = engine_state.commands.get(commandId)
except pe_errors.CommandDoesNotExistError as e:
raise CommandNotFound(detail=str(e)).as_error(status.HTTP_404_NOT_FOUND)

Expand Down
7 changes: 7 additions & 0 deletions robot-server/robot_server/runs/run_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ class _AbstractRun(ResourceModel):
runType: RunType = Field(..., description="Specific run type.")
createdAt: datetime = Field(..., description="When the run was created")
status: RunStatus = Field(..., description="Execution status of the run")
current: bool = Field(
...,
description=(
"Whether this run is currently controlling the robot."
" There can be, at most, one current run."
),
)
actions: List[RunAction] = Field(
...,
description="Client-initiated run control actions.",
Expand Down
8 changes: 7 additions & 1 deletion robot-server/robot_server/runs/run_store.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Runs' in-memory store."""
from dataclasses import dataclass
from dataclasses import dataclass, replace
from datetime import datetime
from typing import Dict, List

Expand All @@ -19,6 +19,7 @@ class RunResource:
create_data: RunCreateData
created_at: datetime
actions: List[RunAction]
is_current: bool


class RunNotFoundError(ValueError):
Expand Down Expand Up @@ -46,6 +47,11 @@ def upsert(self, run: RunResource) -> RunResource:
Returns:
The resource that was added to the store.
"""
if run.is_current is True:
for target_id, target in self._runs_by_id.items():
if target.is_current and target_id != run.run_id:
self._runs_by_id[target_id] = replace(target, is_current=False)

self._runs_by_id[run.run_id] = run

return run
Expand Down
Loading