Skip to content

Commit

Permalink
refactor(robot-server): retrieve state of previous runs (#8676)
Browse files Browse the repository at this point in the history
Closes #8470
  • Loading branch information
mcous authored Nov 9, 2021
1 parent 23ff933 commit 80093b3
Show file tree
Hide file tree
Showing 13 changed files with 459 additions and 149 deletions.
57 changes: 37 additions & 20 deletions robot-server/robot_server/runs/engine_store.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
"""In-memory storage of ProtocolEngine instances."""
from typing import Optional, NamedTuple
from typing import Dict, NamedTuple, Optional

from opentrons.hardware_control import API as HardwareAPI
from opentrons.protocol_engine import ProtocolEngine, create_protocol_engine
from opentrons.protocol_engine import ProtocolEngine, StateView, create_protocol_engine
from opentrons.protocol_runner import ProtocolRunner


class EngineConflictError(RuntimeError):
"""An error raised if the runner already has an engine initialized."""

pass


class EngineMissingError(RuntimeError):
"""An error raised if the engine somehow hasn't been initialized.
If this error is raised, it's almost certainly due to a software bug.
"""

pass

class EngineConflictError(RuntimeError):
"""An error raised if an active engine is already initialized.
The store will not create a new engine unless the "current" runner/engine
pair is idle.
"""


class RunnerEnginePair(NamedTuple):
Expand All @@ -41,10 +41,11 @@ def __init__(self, hardware_api: HardwareAPI) -> None:
"""
self._hardware_api = hardware_api
self._runner_engine_pair: Optional[RunnerEnginePair] = None
self._engines_by_run_id: Dict[str, ProtocolEngine] = {}

@property
def engine(self) -> ProtocolEngine:
"""Get the persisted ProtocolEngine.
"""Get the "current" persisted ProtocolEngine.
Raises:
EngineMissingError: Engine has not yet been created and persisted.
Expand All @@ -56,7 +57,7 @@ def engine(self) -> ProtocolEngine:

@property
def runner(self) -> ProtocolRunner:
"""Get the persisted ProtocolRunner.
"""Get the "current" persisted ProtocolRunner.
Raises:
EngineMissingError: Runner has not yet been created and persisted.
Expand All @@ -66,28 +67,44 @@ def runner(self) -> ProtocolRunner:

return self._runner_engine_pair.runner

async def create(self) -> RunnerEnginePair:
"""Create and store a ProtocolRunner and ProtocolEngine.
async def create(self, run_id: str) -> StateView:
"""Create and store a ProtocolRunner and ProtocolEngine for a given Run.
Args:
run_id: The run resource the engine is assigned to.
Returns:
The created and stored ProtocolRunner / ProtocolEngine pair.
Raises:
EngineConflictError: a ProtocolEngine is already present.
EngineConflictError: The current runner/engine pair is not idle, so
a new set may not be created.
"""
# NOTE: this async. creation happens before the `self._engine`
# check intentionally to avoid a race condition where `self._engine` is
# set after the check but before the engine has finished getting created,
# at the expense of having to potentially throw away an engine instance
engine = await create_protocol_engine(hardware_api=self._hardware_api)
runner = ProtocolRunner(protocol_engine=engine, hardware_api=self._hardware_api)

if self._runner_engine_pair is not None:
raise EngineConflictError("Cannot load multiple runs simultaneously.")
if not self.engine.state_view.commands.get_is_stopped():
raise EngineConflictError("Current engine is not stopped.")

self._runner_engine_pair = RunnerEnginePair(runner=runner, engine=engine)
self._engines_by_run_id[run_id] = engine

return engine.state_view

def get_state(self, run_id: str) -> StateView:
"""Get a run's ProtocolEngine state.
return self._runner_engine_pair
Args:
run_id: The run resource to retrieve engine state from.
Raises:
EngineMissingError: No engine found for the given run ID.
"""
try:
return self._engines_by_run_id[run_id].state_view
except KeyError:
raise EngineMissingError(f"No engine state found for run {run_id}")

def clear(self) -> None:
"""Remove the persisted ProtocolEngine, if present, no-op otherwise."""
Expand Down
37 changes: 23 additions & 14 deletions robot-server/robot_server/runs/router/actions_router.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
"""Router for /runs actions endpoints."""
from fastapi import APIRouter, Depends, status
from datetime import datetime
from typing import Union
from typing_extensions import Literal

from opentrons.protocol_engine.errors import ProtocolEngineStoppedError

from robot_server.errors import ErrorDetails, ErrorResponse
from robot_server.service.dependencies import get_current_time, get_unique_id
from robot_server.service.json_api import RequestModel, ResponseModel

from ..run_store import RunStore, RunNotFoundError
from ..run_view import RunView
from ..action_models import RunAction, RunActionType, RunActionCreateData
from ..engine_store import EngineStore, EngineMissingError
from ..engine_store import EngineStore
from ..dependencies import get_run_store, get_engine_store
from .base_router import RunNotFound
from .base_router import RunNotFound, RunStopped

actions_router = APIRouter()

Expand All @@ -31,7 +34,9 @@ class RunActionNotAllowed(ErrorDetails):
status_code=status.HTTP_201_CREATED,
response_model=ResponseModel[RunAction],
responses={
status.HTTP_400_BAD_REQUEST: {"model": ErrorResponse[RunActionNotAllowed]},
status.HTTP_409_CONFLICT: {
"model": ErrorResponse[Union[RunActionNotAllowed, RunStopped]],
},
status.HTTP_404_NOT_FOUND: {"model": ErrorResponse[RunNotFound]},
},
)
Expand All @@ -57,27 +62,31 @@ async def create_run_action(
"""
try:
prev_run = run_store.get(run_id=runId)
except RunNotFoundError as e:
raise RunNotFound(detail=str(e)).as_error(status.HTTP_404_NOT_FOUND)

action, next_run = run_view.with_action(
run=prev_run,
action_id=action_id,
action_data=request_body.data,
created_at=created_at,
if not prev_run.is_current:
raise RunStopped(detail=f"Run {runId} is not the current run").as_error(
status.HTTP_409_CONFLICT
)

# TODO(mc, 2021-07-06): add a dependency to verify that a given
# action is allowed
action, next_run = run_view.with_action(
run=prev_run,
action_id=action_id,
action_data=request_body.data,
created_at=created_at,
)

try:
if action.actionType == RunActionType.PLAY:
engine_store.runner.play()
elif action.actionType == RunActionType.PAUSE:
engine_store.runner.pause()
if action.actionType == RunActionType.STOP:
await engine_store.runner.stop()

except RunNotFoundError as e:
raise RunNotFound(detail=str(e)).as_error(status.HTTP_404_NOT_FOUND)
except EngineMissingError as e:
raise RunActionNotAllowed(detail=str(e)).as_error(status.HTTP_400_BAD_REQUEST)
except ProtocolEngineStoppedError as e:
raise RunActionNotAllowed(detail=str(e)).as_error(status.HTTP_409_CONFLICT)

run_store.upsert(run=next_run)

Expand Down
49 changes: 31 additions & 18 deletions robot-server/robot_server/runs/router/base_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ class RunNotFound(ErrorDetails):
title: str = "Run Not Found"


# TODO(mc, 2021-05-28): evaluate multi-run logic
class RunAlreadyActive(ErrorDetails):
"""An error if one tries to create a new run while one is already active."""

Expand All @@ -59,6 +58,13 @@ class RunNotIdle(ErrorDetails):
)


class RunStopped(ErrorDetails):
"""An error if one tries to modify a stopped run."""

id: Literal["RunStopped"] = "RunStopped"
title: str = "Run Stopped"


@base_router.post(
path="/runs",
summary="Create a run",
Expand Down Expand Up @@ -96,15 +102,17 @@ async def create_run(
"""
create_data = request_body.data if request_body is not None else None
run = run_view.as_resource(
run_id=run_id, created_at=created_at, create_data=create_data
run_id=run_id,
created_at=created_at,
create_data=create_data,
)
protocol_id = None

if isinstance(create_data, ProtocolRunCreateData):
protocol_id = create_data.createParams.protocolId

try:
await engine_store.create()
engine_state = await engine_store.create(run_id=run_id)

if protocol_id is not None:
protocol_resource = protocol_store.get(protocol_id=protocol_id)
Expand All @@ -124,10 +132,10 @@ async def create_run(

data = run_view.as_response(
run=run,
commands=engine_store.engine.state_view.commands.get_all(),
pipettes=engine_store.engine.state_view.pipettes.get_all(),
labware=engine_store.engine.state_view.labware.get_all(),
engine_status=engine_store.engine.state_view.commands.get_status(),
commands=engine_state.commands.get_all(),
pipettes=engine_state.pipettes.get_all(),
labware=engine_state.labware.get_all(),
engine_status=engine_state.commands.get_status(),
)

return ResponseModel(data=data)
Expand Down Expand Up @@ -155,13 +163,14 @@ async def get_runs(
data = []

for run in run_store.get_all():
# TODO(mc, 2021-06-23): add multi-engine support
run_id = run.run_id
engine_state = engine_store.get_state(run_id)
run_data = run_view.as_response(
run=run,
commands=engine_store.engine.state_view.commands.get_all(),
pipettes=engine_store.engine.state_view.pipettes.get_all(),
labware=engine_store.engine.state_view.labware.get_all(),
engine_status=engine_store.engine.state_view.commands.get_status(),
commands=engine_state.commands.get_all(),
pipettes=engine_state.pipettes.get_all(),
labware=engine_state.labware.get_all(),
engine_status=engine_state.commands.get_status(),
)

data.append(run_data)
Expand Down Expand Up @@ -198,12 +207,14 @@ async def get_run(
except RunNotFoundError as e:
raise RunNotFound(detail=str(e)).as_error(status.HTTP_404_NOT_FOUND)

engine_state = engine_store.get_state(run.run_id)

data = run_view.as_response(
run=run,
commands=engine_store.engine.state_view.commands.get_all(),
pipettes=engine_store.engine.state_view.pipettes.get_all(),
labware=engine_store.engine.state_view.labware.get_all(),
engine_status=engine_store.engine.state_view.commands.get_status(),
commands=engine_state.commands.get_all(),
pipettes=engine_state.pipettes.get_all(),
labware=engine_state.labware.get_all(),
engine_status=engine_state.commands.get_status(),
)

return ResponseModel(data=data)
Expand All @@ -230,10 +241,12 @@ async def remove_run_by_id(
engine_store: ProtocolEngine storage and control.
"""
try:
if not engine_store.engine.state_view.commands.get_is_stopped():
raise RunNotIdle().as_error(status.HTTP_409_CONFLICT)
engine_state = engine_store.get_state(runId)
except EngineMissingError:
pass
else:
if not engine_state.commands.get_is_stopped():
raise RunNotIdle().as_error(status.HTTP_409_CONFLICT)

try:
engine_store.clear()
Expand Down
11 changes: 9 additions & 2 deletions robot-server/robot_server/runs/router/commands_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from ..run_models import Run, RunCommandSummary
from ..engine_store import EngineStore
from ..dependencies import get_engine_store
from .base_router import RunNotFound, get_run
from .base_router import RunNotFound, RunStopped, get_run

commands_router = APIRouter()

Expand All @@ -38,6 +38,7 @@ class CommandNotFound(ErrorDetails):
status_code=status.HTTP_200_OK,
response_model=ResponseModel[pe_commands.Command],
responses={
status.HTTP_400_BAD_REQUEST: {"model": ErrorResponse[RunStopped]},
status.HTTP_404_NOT_FOUND: {"model": ErrorResponse[RunNotFound]},
},
)
Expand All @@ -57,6 +58,11 @@ async def post_run_command(
`GET /runs/{runId}`. Present to ensure 404 if run
not found.
"""
if not run.data.current:
raise RunStopped(detail=f"Run {run.data.id} is not the current run").as_error(
status.HTTP_400_BAD_REQUEST
)

command = engine_store.engine.add_command(request_body.data)
return ResponseModel[pe_commands.Command](data=command)

Expand Down Expand Up @@ -120,8 +126,9 @@ async def get_run_command(
`GET /run/{runId}`. Present to ensure 404 if run
not found.
"""
engine_state = engine_store.get_state(run.data.id)
try:
command = engine_store.engine.state_view.commands.get(commandId)
command = engine_state.commands.get(commandId)
except pe_errors.CommandDoesNotExistError as e:
raise CommandNotFound(detail=str(e)).as_error(status.HTTP_404_NOT_FOUND)

Expand Down
7 changes: 7 additions & 0 deletions robot-server/robot_server/runs/run_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ class _AbstractRun(ResourceModel):
runType: RunType = Field(..., description="Specific run type.")
createdAt: datetime = Field(..., description="When the run was created")
status: RunStatus = Field(..., description="Execution status of the run")
current: bool = Field(
...,
description=(
"Whether this run is currently controlling the robot."
" There can be, at most, one current run."
),
)
actions: List[RunAction] = Field(
...,
description="Client-initiated run control actions.",
Expand Down
8 changes: 7 additions & 1 deletion robot-server/robot_server/runs/run_store.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Runs' in-memory store."""
from dataclasses import dataclass
from dataclasses import dataclass, replace
from datetime import datetime
from typing import Dict, List

Expand All @@ -19,6 +19,7 @@ class RunResource:
create_data: RunCreateData
created_at: datetime
actions: List[RunAction]
is_current: bool


class RunNotFoundError(ValueError):
Expand Down Expand Up @@ -46,6 +47,11 @@ def upsert(self, run: RunResource) -> RunResource:
Returns:
The resource that was added to the store.
"""
if run.is_current is True:
for target_id, target in self._runs_by_id.items():
if target.is_current and target_id != run.run_id:
self._runs_by_id[target_id] = replace(target, is_current=False)

self._runs_by_id[run.run_id] = run

return run
Expand Down
Loading

0 comments on commit 80093b3

Please sign in to comment.