From e65caa3d97caaf4b9671b048d812a9ae633e2c7b Mon Sep 17 00:00:00 2001 From: Sanniti Pimpley Date: Wed, 1 May 2024 16:48:09 -0400 Subject: [PATCH] feat(robot-server, app): add a new endpoint for fast-fetching all run commands (#15031) # Overview **Robot server changes:** Adds a new endpoint- `GET /runs/:run_id/commandsAsPreSerializedList` to the run commands router. This endpoint returns a list of pre-serialized commands (that are valid json objects) of a finished run. This endpoint is a much faster alternative to the `GET /runs/:run_id/commands` endpoint when fetching all commands of a completed run. Also adds notification publishing when pre-serialized commands become available for a run. **App changes** closes RQA-2645 and RQA-2647 # Risk assessment Back-end: New endpoint so impact on existing code is close to none. App: Medium. Fixes issues in existing behavior of handling historical runs. --------- Co-authored-by: ncdiehl11 Co-authored-by: ncdiehl11 --- .eslintrc.js | 1 + .../getCommandsAsPreSerializedList.ts | 22 +++++++ api-client/src/runs/commands/types.ts | 6 ++ api-client/src/runs/index.ts | 1 + .../organisms/CommandText/LoadCommandText.tsx | 4 +- app/src/organisms/RunPreview/index.tsx | 28 +++++--- app/src/redux/shell/types.ts | 1 + app/src/resources/runs/index.ts | 1 + ...useNotifyAllCommandsAsPreSerializedList.ts | 35 ++++++++++ react-api-client/src/runs/index.ts | 1 + .../runs/useAllCommandsAsPreSerializedList.ts | 52 +++++++++++++++ .../runs/router/actions_router.py | 3 + .../runs/router/commands_router.py | 66 ++++++++++++++++++- .../robot_server/runs/run_controller.py | 9 ++- .../robot_server/runs/run_data_manager.py | 26 +++++++- robot-server/robot_server/runs/run_store.py | 14 +++- .../publishers/runs_publisher.py | 20 +++++- .../service/notifications/topics.py | 1 + .../http_api/runs/test_persistence.py | 9 +++ ...t_run_queued_protocol_commands.tavern.yaml | 16 +++++ .../tests/integration/robot_client.py | 8 +++ .../tests/runs/test_run_controller.py | 11 ++++ .../tests/runs/test_run_data_manager.py | 45 ++++++++++++- robot-server/tests/runs/test_run_store.py | 28 ++++++++ .../publishers/test_runs_publisher.py | 42 +++++++++++- 25 files changed, 432 insertions(+), 18 deletions(-) create mode 100644 api-client/src/runs/commands/getCommandsAsPreSerializedList.ts create mode 100644 app/src/resources/runs/useNotifyAllCommandsAsPreSerializedList.ts create mode 100644 react-api-client/src/runs/useAllCommandsAsPreSerializedList.ts diff --git a/.eslintrc.js b/.eslintrc.js index 7c71dc01c22..6e70df2ff27 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -52,6 +52,7 @@ module.exports = { 'useLastRunCommandKey', 'useCurrentMaintenanceRun', 'useDeckConfigurationQuery', + 'useAllCommandsAsPreSerializedList', ], message: 'The HTTP hook is deprecated. Utilize the equivalent notification wrapper (useNotifyX) instead.', diff --git a/api-client/src/runs/commands/getCommandsAsPreSerializedList.ts b/api-client/src/runs/commands/getCommandsAsPreSerializedList.ts new file mode 100644 index 00000000000..420f984b280 --- /dev/null +++ b/api-client/src/runs/commands/getCommandsAsPreSerializedList.ts @@ -0,0 +1,22 @@ +import { GET, request } from '../../request' + +import type { ResponsePromise } from '../../request' +import type { HostConfig } from '../../types' +import type { + CommandsAsPreSerializedListData, + GetCommandsParams, +} from './types' + +export function getCommandsAsPreSerializedList( + config: HostConfig, + runId: string, + params: GetCommandsParams +): ResponsePromise { + return request( + GET, + `/runs/${runId}/commandsAsPreSerializedList`, + null, + config, + params + ) +} diff --git a/api-client/src/runs/commands/types.ts b/api-client/src/runs/commands/types.ts index acea40e1880..d0b443b297a 100644 --- a/api-client/src/runs/commands/types.ts +++ b/api-client/src/runs/commands/types.ts @@ -34,6 +34,12 @@ export interface CommandsData { links: CommandsLinks } +export interface CommandsAsPreSerializedListData { + data: string[] + meta: GetCommandsParams & { totalLength: number } + links: CommandsLinks +} + export interface CreateCommandParams { waitUntilComplete?: boolean timeout?: number diff --git a/api-client/src/runs/index.ts b/api-client/src/runs/index.ts index 1d62755d4c5..01653713c81 100644 --- a/api-client/src/runs/index.ts +++ b/api-client/src/runs/index.ts @@ -7,6 +7,7 @@ export { createCommand } from './commands/createCommand' export { createLiveCommand } from './commands/createLiveCommand' export { getCommand } from './commands/getCommand' export { getCommands } from './commands/getCommands' +export { getCommandsAsPreSerializedList } from './commands/getCommandsAsPreSerializedList' export { createRunAction } from './createRunAction' export * from './createLabwareOffset' export * from './createLabwareDefinition' diff --git a/app/src/organisms/CommandText/LoadCommandText.tsx b/app/src/organisms/CommandText/LoadCommandText.tsx index 62ce7cf1fd5..8dd2f8e64d1 100644 --- a/app/src/organisms/CommandText/LoadCommandText.tsx +++ b/app/src/organisms/CommandText/LoadCommandText.tsx @@ -131,7 +131,9 @@ export const LoadCommandText = ({ return null } } else { - const labware = command.result?.definition.metadata.displayName + const labware = + command.result?.definition.metadata.displayName ?? + command.params.displayName return command.params.location === 'offDeck' ? t('load_labware_info_protocol_setup_off_deck', { labware }) : t('load_labware_info_protocol_setup_no_module', { diff --git a/app/src/organisms/RunPreview/index.tsx b/app/src/organisms/RunPreview/index.tsx index a7e4aa2591b..0bb3b0b2ca2 100644 --- a/app/src/organisms/RunPreview/index.tsx +++ b/app/src/organisms/RunPreview/index.tsx @@ -4,7 +4,6 @@ import { useTranslation } from 'react-i18next' import { ViewportList, ViewportListRef } from 'react-viewport-list' import { RUN_STATUSES_TERMINAL } from '@opentrons/api-client' -import { useAllCommandsQuery } from '@opentrons/react-api-client' import { ALIGN_CENTER, BORDERS, @@ -21,7 +20,10 @@ import { } from '@opentrons/components' import { useMostRecentCompletedAnalysis } from '../LabwarePositionCheck/useMostRecentCompletedAnalysis' -import { useNotifyLastRunCommandKey } from '../../resources/runs' +import { + useNotifyAllCommandsAsPreSerializedList, + useNotifyLastRunCommandKey, +} from '../../resources/runs' import { CommandText } from '../CommandText' import { Divider } from '../../atoms/structure' import { NAV_BAR_WIDTH } from '../../App/constants' @@ -33,6 +35,8 @@ import type { RobotType } from '@opentrons/shared-data' const COLOR_FADE_MS = 500 const LIVE_RUN_COMMANDS_POLL_MS = 3000 +// arbitrary large number of commands +const MAX_COMMANDS = 100000 interface RunPreviewProps { runId: string @@ -52,11 +56,17 @@ export const RunPreviewComponent = ( ? (RUN_STATUSES_TERMINAL as RunStatus[]).includes(runStatus) : false // we only ever want one request done for terminal runs because this is a heavy request - const commandsFromQuery = useAllCommandsQuery(runId, null, { - staleTime: Infinity, - cacheTime: Infinity, - enabled: isRunTerminal, - }).data?.data + const commandsFromQuery = useNotifyAllCommandsAsPreSerializedList( + runId, + { cursor: 0, pageLength: MAX_COMMANDS }, + { + staleTime: Infinity, + cacheTime: Infinity, + enabled: isRunTerminal, + } + ).data?.data + const nullCheckedCommandsFromQuery = + commandsFromQuery == null ? robotSideAnalysis?.commands : commandsFromQuery const viewPortRef = React.useRef(null) const currentRunCommandKey = useNotifyLastRunCommandKey(runId, { refetchInterval: LIVE_RUN_COMMANDS_POLL_MS, @@ -67,7 +77,9 @@ export const RunPreviewComponent = ( ] = React.useState(true) if (robotSideAnalysis == null) return null const commands = - (isRunTerminal ? commandsFromQuery : robotSideAnalysis.commands) ?? [] + (isRunTerminal + ? nullCheckedCommandsFromQuery + : robotSideAnalysis.commands) ?? [] const currentRunCommandIndex = commands.findIndex( c => c.key === currentRunCommandKey ) diff --git a/app/src/redux/shell/types.ts b/app/src/redux/shell/types.ts index 8ea5e8be7db..74e8d17b75b 100644 --- a/app/src/redux/shell/types.ts +++ b/app/src/redux/shell/types.ts @@ -142,6 +142,7 @@ export type NotifyTopic = | 'robot-server/runs' | `robot-server/runs/${string}` | 'robot-server/deck_configuration' + | `robot-server/runs/pre_serialized_commands/${string}` export interface NotifySubscribeAction { type: 'shell:NOTIFY_SUBSCRIBE' diff --git a/app/src/resources/runs/index.ts b/app/src/resources/runs/index.ts index be5fabb4970..7861880e226 100644 --- a/app/src/resources/runs/index.ts +++ b/app/src/resources/runs/index.ts @@ -3,3 +3,4 @@ export * from './utils' export * from './useNotifyAllRunsQuery' export * from './useNotifyRunQuery' export * from './useNotifyLastRunCommandKey' +export * from './useNotifyAllCommandsAsPreSerializedList' diff --git a/app/src/resources/runs/useNotifyAllCommandsAsPreSerializedList.ts b/app/src/resources/runs/useNotifyAllCommandsAsPreSerializedList.ts new file mode 100644 index 00000000000..1410c23cef0 --- /dev/null +++ b/app/src/resources/runs/useNotifyAllCommandsAsPreSerializedList.ts @@ -0,0 +1,35 @@ +import * as React from 'react' + +import { useAllCommandsAsPreSerializedList } from '@opentrons/react-api-client' + +import { useNotifyService } from '../useNotifyService' + +import type { UseQueryResult } from 'react-query' +import type { AxiosError } from 'axios' +import type { CommandsData, GetCommandsParams } from '@opentrons/api-client' +import type { + QueryOptionsWithPolling, + HTTPRefetchFrequency, +} from '../useNotifyService' + +export function useNotifyAllCommandsAsPreSerializedList( + runId: string | null, + params?: GetCommandsParams | null, + options: QueryOptionsWithPolling = {} +): UseQueryResult { + const [refetch, setRefetch] = React.useState(null) + + useNotifyService({ + topic: `robot-server/runs/pre_serialized_commands/${runId}`, + setRefetch, + options, + }) + + const httpResponse = useAllCommandsAsPreSerializedList(runId, params, { + ...options, + enabled: options?.enabled !== false && refetch != null, + onSettled: refetch === 'once' ? () => setRefetch(null) : () => null, + }) + + return httpResponse +} diff --git a/react-api-client/src/runs/index.ts b/react-api-client/src/runs/index.ts index a77e01b2b42..5790abb860b 100644 --- a/react-api-client/src/runs/index.ts +++ b/react-api-client/src/runs/index.ts @@ -10,6 +10,7 @@ export { usePauseRunMutation } from './usePauseRunMutation' export { useStopRunMutation } from './useStopRunMutation' export { useRunActionMutations } from './useRunActionMutations' export { useAllCommandsQuery } from './useAllCommandsQuery' +export { useAllCommandsAsPreSerializedList } from './useAllCommandsAsPreSerializedList' export { useCommandQuery } from './useCommandQuery' export * from './useCreateLabwareOffsetMutation' export * from './useCreateLabwareDefinitionMutation' diff --git a/react-api-client/src/runs/useAllCommandsAsPreSerializedList.ts b/react-api-client/src/runs/useAllCommandsAsPreSerializedList.ts new file mode 100644 index 00000000000..3d30d13c579 --- /dev/null +++ b/react-api-client/src/runs/useAllCommandsAsPreSerializedList.ts @@ -0,0 +1,52 @@ +import { UseQueryResult, useQuery } from 'react-query' +import { getCommandsAsPreSerializedList } from '@opentrons/api-client' +import { useHost } from '../api' +import type { UseQueryOptions } from 'react-query' +import type { + GetCommandsParams, + HostConfig, + CommandsData, + RunCommandSummary, +} from '@opentrons/api-client' + +const DEFAULT_PAGE_LENGTH = 30 +export const DEFAULT_PARAMS: GetCommandsParams = { + cursor: null, + pageLength: DEFAULT_PAGE_LENGTH, +} + +export function useAllCommandsAsPreSerializedList( + runId: string | null, + params?: GetCommandsParams | null, + options: UseQueryOptions = {} +): UseQueryResult { + const host = useHost() + const nullCheckedParams = params ?? DEFAULT_PARAMS + + const allOptions: UseQueryOptions = { + ...options, + enabled: host !== null && runId != null && options.enabled !== false, + } + const { cursor, pageLength } = nullCheckedParams + const query = useQuery( + [host, 'runs', runId, 'getCommandsAsPreSerializedList', cursor, pageLength], + () => { + return getCommandsAsPreSerializedList( + host as HostConfig, + runId as string, + nullCheckedParams + ).then(response => { + const responseData = response.data + return { + ...responseData, + data: responseData.data.map( + command => JSON.parse(command) as RunCommandSummary + ), + } + }) + }, + allOptions + ) + + return query +} diff --git a/robot-server/robot_server/runs/router/actions_router.py b/robot-server/robot_server/runs/router/actions_router.py index b662d59f554..25aae8cfd19 100644 --- a/robot-server/robot_server/runs/router/actions_router.py +++ b/robot-server/robot_server/runs/router/actions_router.py @@ -28,6 +28,7 @@ MaintenanceEngineStore, ) from robot_server.maintenance_runs.dependencies import get_maintenance_engine_store +from robot_server.service.notifications import get_runs_publisher, RunsPublisher log = logging.getLogger(__name__) actions_router = APIRouter() @@ -45,6 +46,7 @@ async def get_run_controller( task_runner: TaskRunner = Depends(get_task_runner), engine_store: EngineStore = Depends(get_engine_store), run_store: RunStore = Depends(get_run_store), + runs_publisher: RunsPublisher = Depends(get_runs_publisher), ) -> RunController: """Get a RunController for the current run. @@ -67,6 +69,7 @@ async def get_run_controller( task_runner=task_runner, engine_store=engine_store, run_store=run_store, + runs_publisher=runs_publisher, ) diff --git a/robot-server/robot_server/runs/router/commands_router.py b/robot-server/robot_server/runs/router/commands_router.py index 47a64c5d800..b220ae33c04 100644 --- a/robot-server/robot_server/runs/router/commands_router.py +++ b/robot-server/robot_server/runs/router/commands_router.py @@ -6,6 +6,7 @@ from anyio import move_on_after from fastapi import APIRouter, Depends, Query, status + from pydantic import BaseModel, Field from opentrons.protocol_engine import ( @@ -21,11 +22,12 @@ MultiBody, MultiBodyMeta, PydanticResponse, + SimpleMultiBody, ) from robot_server.robot.control.dependencies import require_estop_in_good_state from ..run_models import RunCommandSummary -from ..run_data_manager import RunDataManager +from ..run_data_manager import RunDataManager, PreSerializedCommandsNotAvailableError from ..engine_store import EngineStore from ..run_store import RunStore, CommandNotFoundError from ..run_models import RunNotFoundError @@ -70,6 +72,18 @@ class CommandNotAllowed(ErrorDetails): title: str = "Command Not Allowed" +class PreSerializedCommandsNotAvailable(ErrorDetails): + """An error if one tries to fetch pre-serialized commands before they are written to the database.""" + + id: Literal[ + "PreSerializedCommandsNotAvailable" + ] = "PreSerializedCommandsNotAvailable" + title: str = "Pre-Serialized commands not available." + detail: str = ( + "Pre-serialized commands are only available once a run has finished running." + ) + + class CommandLinkMeta(BaseModel): """Metadata about a command resource referenced in `links`.""" @@ -351,6 +365,56 @@ async def get_run_commands( ) +# TODO (spp, 2024-05-01): explore alternatives to returning commands as list of strings. +# Options: 1. JSON Lines +# 2. Simple de-serialized commands list w/o pydantic model conversion +@PydanticResponse.wrap_route( + commands_router.get, + path="/runs/{runId}/commandsAsPreSerializedList", + summary="Get all commands of a completed run as a list of pre-serialized commands", + description=( + "Get all commands of a completed run as a list of pre-serialized commands." + "**Warning:** This endpoint is experimental. We may change or remove it without warning." + "\n\n" + "The commands list will only be available after a run has completed" + " (whether successful, failed or stopped) and its data has been committed to the database." + " If a request is received before the run is completed, it will return a 503 Unavailable error." + " This is a faster alternative to fetching the full commands list using" + " `GET /runs/{runId}/commands`. For large protocols (10k+ commands), the above" + " endpoint can take minutes to respond, whereas this one should only take a few seconds." + ), + responses={ + status.HTTP_404_NOT_FOUND: {"model": ErrorBody[RunNotFound]}, + status.HTTP_503_SERVICE_UNAVAILABLE: { + "model": ErrorBody[PreSerializedCommandsNotAvailable] + }, + }, +) +async def get_run_commands_as_pre_serialized_list( + runId: str, + run_data_manager: RunDataManager = Depends(get_run_data_manager), +) -> PydanticResponse[SimpleMultiBody[str]]: + """Get all commands of a completed run as a list of pre-serialized (string encoded) commands. + + Arguments: + runId: Requested run ID, from the URL + run_data_manager: Run data retrieval interface. + """ + try: + commands = run_data_manager.get_all_commands_as_preserialized_list(runId) + except RunNotFoundError as e: + raise RunNotFound.from_exc(e).as_error(status.HTTP_404_NOT_FOUND) from e + except PreSerializedCommandsNotAvailableError as e: + raise PreSerializedCommandsNotAvailable.from_exc(e).as_error( + status.HTTP_503_SERVICE_UNAVAILABLE + ) from e + return await PydanticResponse.create( + content=SimpleMultiBody.construct( + data=commands, meta=MultiBodyMeta(cursor=0, totalLength=len(commands)) + ) + ) + + @PydanticResponse.wrap_route( commands_router.get, path="/runs/{runId}/commands/{commandId}", diff --git a/robot-server/robot_server/runs/run_controller.py b/robot-server/robot_server/runs/run_controller.py index 923c9cfa64e..e7e55080aed 100644 --- a/robot-server/robot_server/runs/run_controller.py +++ b/robot-server/robot_server/runs/run_controller.py @@ -13,6 +13,8 @@ from opentrons.protocol_engine.types import DeckConfigurationType +from robot_server.service.notifications import RunsPublisher + log = logging.getLogger(__name__) @@ -21,7 +23,7 @@ class RunActionNotAllowedError(RoboticsInteractionError): class RunController: - """An interface to manage the side-effects of requested run actions.""" + """An interface to manage the side effects of requested run actions.""" def __init__( self, @@ -29,11 +31,13 @@ def __init__( task_runner: TaskRunner, engine_store: EngineStore, run_store: RunStore, + runs_publisher: RunsPublisher, ) -> None: self._run_id = run_id self._task_runner = task_runner self._engine_store = engine_store self._run_store = run_store + self._runs_publisher = runs_publisher def create_action( self, @@ -108,3 +112,6 @@ async def _run_protocol_and_insert_result( commands=result.commands, run_time_parameters=result.parameters, ) + await self._runs_publisher.publish_pre_serialized_commands_notification( + self._run_id + ) diff --git a/robot-server/robot_server/runs/run_data_manager.py b/robot-server/robot_server/runs/run_data_manager.py index 8548104911b..311cfb93b40 100644 --- a/robot-server/robot_server/runs/run_data_manager.py +++ b/robot-server/robot_server/runs/run_data_manager.py @@ -112,6 +112,10 @@ class RunNotCurrentError(ValueError): """Error raised when a requested run is not the current run.""" +class PreSerializedCommandsNotAvailableError(LookupError): + """Error raised when a run's commands are not available as pre-serialized list of commands.""" + + class RunDataManager: """Collaborator to manage current and historical run data. @@ -290,10 +294,16 @@ async def delete(self, run_id: str) -> None: self._run_store.remove(run_id=run_id) async def update(self, run_id: str, current: Optional[bool]) -> Union[Run, BadRun]: - """Get and potentially archive a run. + """Get and potentially archive the current run. Args: run_id: The run to get and maybe archive. + current: Whether to mark the run as current or not. + If `current` set to False, then the run is 'un-current'ed by + stopping the run, saving the final run data to the run store, + and clearing the engine and runner. + If 'current' is True or not specified, we simply fetch the run's + data from memory and database. Returns: The updated run. @@ -320,6 +330,9 @@ async def update(self, run_id: str, current: Optional[bool]) -> Union[Run, BadRu commands=commands, run_time_parameters=parameters, ) + await self._runs_publisher.publish_pre_serialized_commands_notification( + run_id + ) else: state_summary = self._engine_store.engine.state_view.get_summary() parameters = self._engine_store.runner.run_time_parameters @@ -387,6 +400,17 @@ def get_command(self, run_id: str, command_id: str) -> Command: return self._run_store.get_command(run_id=run_id, command_id=command_id) + def get_all_commands_as_preserialized_list(self, run_id: str) -> List[str]: + """Get all commands of a run in a serialized json list.""" + if ( + run_id == self._engine_store.current_run_id + and not self._engine_store.engine.state_view.commands.get_is_terminal() + ): + raise PreSerializedCommandsNotAvailableError( + "Pre-serialized commands are only available after a run has ended." + ) + return self._run_store.get_all_commands_as_preserialized_list(run_id) + def _get_state_summary(self, run_id: str) -> Union[StateSummary, BadStateSummary]: if run_id == self._engine_store.current_run_id: return self._engine_store.engine.state_view.get_summary() diff --git a/robot-server/robot_server/runs/run_store.py b/robot-server/robot_server/runs/run_store.py index b86ec8e19ea..6cf86d14af1 100644 --- a/robot-server/robot_server/runs/run_store.py +++ b/robot-server/robot_server/runs/run_store.py @@ -428,7 +428,6 @@ def get_commands_slice( ) .order_by(run_command_table.c.index_in_run) ) - slice_result = transaction.execute(select_slice).all() sliced_commands: List[Command] = [ @@ -442,6 +441,19 @@ def get_commands_slice( commands=sliced_commands, ) + def get_all_commands_as_preserialized_list(self, run_id: str) -> List[str]: + """Get all commands of the run as a list of strings of json command objects.""" + with self._sql_engine.begin() as transaction: + if not self._run_exists(run_id, transaction): + raise RunNotFoundError(run_id=run_id) + select_commands = ( + sqlalchemy.select(run_command_table.c.command) + .where(run_command_table.c.run_id == run_id) + .order_by(run_command_table.c.index_in_run) + ) + commands_result = transaction.scalars(select_commands).all() + return commands_result + @lru_cache(maxsize=_CACHE_ENTRIES) def get_command(self, run_id: str, command_id: str) -> Command: """Get run command by id. diff --git a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py index fef23c8a875..08b14899d0d 100644 --- a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py +++ b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py @@ -95,9 +95,23 @@ async def _publish_runs_advise_refetch_async(self, run_id: str) -> None: async def _publish_runs_advise_unsubscribe_async(self, run_id: str) -> None: """Publish an unsubscribe flag for relevant runs topics.""" - await self._client.publish_advise_unsubscribe_async( - topic=f"{Topics.RUNS}/{run_id}" - ) + if self._run_hooks is not None: + await self._client.publish_advise_unsubscribe_async( + topic=f"{Topics.RUNS}/{run_id}" + ) + await self._client.publish_advise_unsubscribe_async( + topic=Topics.RUNS_CURRENT_COMMAND + ) + await self._client.publish_advise_unsubscribe_async( + topic=f"{Topics.RUNS_PRE_SERIALIZED_COMMANDS}/{run_id}" + ) + + async def publish_pre_serialized_commands_notification(self, run_id: str) -> None: + """Publishes notification for GET /runs/:runId/commandsAsPreSerializedList.""" + if self._run_hooks is not None: + await self._client.publish_advise_refetch_async( + topic=f"{Topics.RUNS_PRE_SERIALIZED_COMMANDS}/{run_id}" + ) async def _handle_current_command_change(self) -> None: """Publish a refetch flag if the current command has changed.""" diff --git a/robot-server/robot_server/service/notifications/topics.py b/robot-server/robot_server/service/notifications/topics.py index 26d53cc3516..f8a6ecaf701 100644 --- a/robot-server/robot_server/service/notifications/topics.py +++ b/robot-server/robot_server/service/notifications/topics.py @@ -15,3 +15,4 @@ class Topics(str, Enum): RUNS_CURRENT_COMMAND = f"{_TOPIC_BASE}/runs/current_command" RUNS = f"{_TOPIC_BASE}/runs" DECK_CONFIGURATION = f"{_TOPIC_BASE}/deck_configuration" + RUNS_PRE_SERIALIZED_COMMANDS = f"{_TOPIC_BASE}/runs/pre_serialized_commands" diff --git a/robot-server/tests/integration/http_api/runs/test_persistence.py b/robot-server/tests/integration/http_api/runs/test_persistence.py index 45b55202fda..943f644e8d3 100644 --- a/robot-server/tests/integration/http_api/runs/test_persistence.py +++ b/robot-server/tests/integration/http_api/runs/test_persistence.py @@ -1,3 +1,4 @@ +import json from copy import deepcopy from datetime import datetime from typing import Any, AsyncGenerator, Dict, NamedTuple, cast @@ -250,6 +251,9 @@ async def test_run_commands_persist(client_and_server: ClientServerFixture) -> N get_persisted_command_response = await client.get_run_command( run_id=run_id, command_id=command_id ) + get_preserialized_commands_response = await client.get_preserialized_commands( + run_id=run_id + ) # ensure the persisted commands still match the original ones assert get_all_persisted_commands_response.json()["data"] == [ @@ -259,6 +263,11 @@ async def test_run_commands_persist(client_and_server: ClientServerFixture) -> N ] assert get_persisted_command_response.json()["data"] == expected_command + json_converted_command = json.loads( + get_preserialized_commands_response.json()["data"][0] + ) + assert json_converted_command == expected_command + async def test_runs_completed_started_at_persist_via_actions_router( client_and_server: ClientServerFixture, diff --git a/robot-server/tests/integration/http_api/runs/test_run_queued_protocol_commands.tavern.yaml b/robot-server/tests/integration/http_api/runs/test_run_queued_protocol_commands.tavern.yaml index 0d4a0010281..9d188402deb 100644 --- a/robot-server/tests/integration/http_api/runs/test_run_queued_protocol_commands.tavern.yaml +++ b/robot-server/tests/integration/http_api/runs/test_run_queued_protocol_commands.tavern.yaml @@ -198,3 +198,19 @@ stages: createdAt: !re_search "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d+\\+\\d{2}:\\d{2}$" startedAt: !re_search "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d+\\+\\d{2}:\\d{2}$" completedAt: !re_search "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d+\\+\\d{2}:\\d{2}$" + + - name: Get all the commands in the run as a pre-serialized list + request: + url: '{ot2_server_base_url}/runs/{run_id}/commandsAsPreSerializedList' + method: GET + response: + status_code: 200 + json: + data: + - !anystr + - !anystr + - !anystr + - !anystr + meta: + cursor: 0 + totalLength: 4 \ No newline at end of file diff --git a/robot-server/tests/integration/robot_client.py b/robot-server/tests/integration/robot_client.py index c4511f8d315..9af11d50cdb 100644 --- a/robot-server/tests/integration/robot_client.py +++ b/robot-server/tests/integration/robot_client.py @@ -220,6 +220,14 @@ async def get_run_command(self, run_id: str, command_id: str) -> Response: response.raise_for_status() return response + async def get_preserialized_commands(self, run_id: str) -> Response: + """GET /runs/:run_id/commandsAsPreSerializedList.""" + response = await self.httpx_client.get( + url=f"{self.base_url}/runs/{run_id}/commandsAsPreSerializedList", + ) + response.raise_for_status() + return response + async def post_labware_offset( self, run_id: str, diff --git a/robot-server/tests/runs/test_run_controller.py b/robot-server/tests/runs/test_run_controller.py index a844cdcc6d5..71fc92f8466 100644 --- a/robot-server/tests/runs/test_run_controller.py +++ b/robot-server/tests/runs/test_run_controller.py @@ -14,6 +14,7 @@ from opentrons.protocol_engine.types import RunTimeParameter, BooleanParameter from opentrons.protocol_runner import RunResult, JsonRunner, PythonAndLegacyRunner +from robot_server.service.notifications import RunsPublisher from robot_server.service.task_runner import TaskRunner from robot_server.runs.action_models import RunAction, RunActionType from robot_server.runs.engine_store import EngineStore @@ -41,6 +42,12 @@ def mock_task_runner(decoy: Decoy) -> TaskRunner: return decoy.mock(cls=TaskRunner) +@pytest.fixture() +def mock_runs_publisher(decoy: Decoy) -> RunsPublisher: + """Get a mock RunsPublisher.""" + return decoy.mock(cls=RunsPublisher) + + @pytest.fixture def run_id() -> str: """A run identifier value.""" @@ -90,6 +97,7 @@ def subject( mock_engine_store: EngineStore, mock_run_store: RunStore, mock_task_runner: TaskRunner, + mock_runs_publisher: RunsPublisher, ) -> RunController: """Get a RunController test subject.""" return RunController( @@ -97,6 +105,7 @@ def subject( engine_store=mock_engine_store, run_store=mock_run_store, task_runner=mock_task_runner, + runs_publisher=mock_runs_publisher, ) @@ -135,6 +144,7 @@ async def test_create_play_action_to_start( mock_engine_store: EngineStore, mock_run_store: RunStore, mock_task_runner: TaskRunner, + mock_runs_publisher: RunsPublisher, engine_state_summary: StateSummary, run_time_parameters: List[RunTimeParameter], protocol_commands: List[pe_commands.Command], @@ -181,6 +191,7 @@ async def test_create_play_action_to_start( commands=protocol_commands, run_time_parameters=run_time_parameters, ), + await mock_runs_publisher.publish_pre_serialized_commands_notification(run_id), times=1, ) diff --git a/robot-server/tests/runs/test_run_data_manager.py b/robot-server/tests/runs/test_run_data_manager.py index 547ec0a7b74..12ced28fdb0 100644 --- a/robot-server/tests/runs/test_run_data_manager.py +++ b/robot-server/tests/runs/test_run_data_manager.py @@ -23,7 +23,11 @@ from robot_server.protocols.protocol_store import ProtocolResource from robot_server.runs.engine_store import EngineStore, EngineConflictError -from robot_server.runs.run_data_manager import RunDataManager, RunNotCurrentError +from robot_server.runs.run_data_manager import ( + RunDataManager, + RunNotCurrentError, + PreSerializedCommandsNotAvailableError, +) from robot_server.runs.run_models import Run, BadRun, RunNotFoundError, RunDataError from robot_server.runs.run_store import ( RunStore, @@ -583,6 +587,7 @@ async def test_update_current( run_command: commands.Command, mock_engine_store: EngineStore, mock_run_store: RunStore, + mock_runs_publisher: RunsPublisher, subject: RunDataManager, ) -> None: """It should persist the current run and clear the engine on current=false.""" @@ -607,6 +612,10 @@ async def test_update_current( result = await subject.update(run_id=run_id, current=False) + decoy.verify( + await mock_runs_publisher.publish_pre_serialized_commands_notification(run_id), + times=1, + ) assert result == Run( current=False, id=run_resource.run_id, @@ -633,6 +642,7 @@ async def test_update_current_noop( run_command: commands.Command, mock_engine_store: EngineStore, mock_run_store: RunStore, + mock_runs_publisher: RunsPublisher, subject: RunDataManager, current: Optional[bool], ) -> None: @@ -657,6 +667,7 @@ async def test_update_current_noop( commands=matchers.Anything(), run_time_parameters=matchers.Anything(), ), + await mock_runs_publisher.publish_pre_serialized_commands_notification(run_id), times=0, ) @@ -932,6 +943,38 @@ def test_get_command_from_db_command_not_found( subject.get_command("run-id", "command-id") +def test_get_all_commands_as_preserialized_list( + decoy: Decoy, + subject: RunDataManager, + mock_run_store: RunStore, + mock_engine_store: EngineStore, +) -> None: + """It should return the pre-serialized commands list.""" + decoy.when(mock_engine_store.current_run_id).then_return(None) + decoy.when( + mock_run_store.get_all_commands_as_preserialized_list("run-id") + ).then_return(['{"id": command-1}', '{"id": command-2}']) + assert subject.get_all_commands_as_preserialized_list("run-id") == [ + '{"id": command-1}', + '{"id": command-2}', + ] + + +def test_get_all_commands_as_preserialized_list_errors_for_active_runs( + decoy: Decoy, + subject: RunDataManager, + mock_run_store: RunStore, + mock_engine_store: EngineStore, +) -> None: + """It should raise an error when fetching pre-serialized commands list while run is active.""" + decoy.when(mock_engine_store.current_run_id).then_return("current-run-id") + decoy.when( + mock_engine_store.engine.state_view.commands.get_is_terminal() + ).then_return(False) + with pytest.raises(PreSerializedCommandsNotAvailableError): + subject.get_all_commands_as_preserialized_list("current-run-id") + + async def test_get_current_run_labware_definition( decoy: Decoy, mock_engine_store: EngineStore, diff --git a/robot-server/tests/runs/test_run_store.py b/robot-server/tests/runs/test_run_store.py index c6108cf5407..ee7697107f6 100644 --- a/robot-server/tests/runs/test_run_store.py +++ b/robot-server/tests/runs/test_run_store.py @@ -734,3 +734,31 @@ def test_get_commands_slice_run_not_found(subject: RunStore) -> None: ) with pytest.raises(RunNotFoundError): subject.get_commands_slice(run_id="not-run-id", cursor=1, length=3) + + +def test_get_all_commands_as_preserialized_list( + subject: RunStore, + protocol_commands: List[pe_commands.Command], + state_summary: StateSummary, +) -> None: + """It should get all commands stored in DB as a pre-serialized list.""" + subject.insert( + run_id="run-id", + protocol_id=None, + created_at=datetime(year=2021, month=1, day=1, tzinfo=timezone.utc), + ) + subject.update_run_state( + run_id="run-id", + summary=state_summary, + commands=protocol_commands, + run_time_parameters=[], + ) + result = subject.get_all_commands_as_preserialized_list(run_id="run-id") + assert result == [ + '{"id": "pause-1", "createdAt": "2021-01-01T00:00:00", "commandType": "waitForResume",' + ' "key": "command-key", "status": "succeeded", "params": {"message": "hello world"}, "result": {}}', + '{"id": "pause-2", "createdAt": "2022-02-02T00:00:00", "commandType": "waitForResume",' + ' "key": "command-key", "status": "succeeded", "params": {"message": "hey world"}, "result": {}}', + '{"id": "pause-3", "createdAt": "2023-03-03T00:00:00", "commandType": "waitForResume",' + ' "key": "command-key", "status": "succeeded", "params": {"message": "sup world"}, "result": {}}', + ] diff --git a/robot-server/tests/service/notifications/publishers/test_runs_publisher.py b/robot-server/tests/service/notifications/publishers/test_runs_publisher.py index a889664cbee..f8fdaf0cf9f 100644 --- a/robot-server/tests/service/notifications/publishers/test_runs_publisher.py +++ b/robot-server/tests/service/notifications/publishers/test_runs_publisher.py @@ -4,7 +4,7 @@ from unittest.mock import MagicMock, AsyncMock from robot_server.service.notifications import RunsPublisher, Topics -from opentrons.protocol_engine import CurrentCommand, EngineStatus +from opentrons.protocol_engine import CurrentCommand, EngineStatus, StateSummary def mock_curent_command(command_id: str) -> CurrentCommand: @@ -17,6 +17,19 @@ def mock_curent_command(command_id: str) -> CurrentCommand: ) +def mock_state_summary(run_id: str) -> StateSummary: + return StateSummary.construct( + status=EngineStatus.FAILED, + errors=[], + labware=[], + pipettes=[], + modules=[], + labwareOffsets=[], + startedAt=None, + completedAt=datetime(year=2021, month=1, day=1), + ) + + @pytest.fixture def notification_client() -> AsyncMock: """Mocked notification client.""" @@ -80,6 +93,12 @@ async def test_clean_up_current_run( notification_client.publish_advise_unsubscribe_async.assert_any_await( topic=f"{Topics.RUNS}/1234" ) + notification_client.publish_advise_unsubscribe_async.assert_any_await( + topic=Topics.RUNS_CURRENT_COMMAND + ) + notification_client.publish_advise_unsubscribe_async.assert_any_await( + topic=f"{Topics.RUNS_PRE_SERIALIZED_COMMANDS}/1234" + ) @pytest.mark.asyncio @@ -143,3 +162,24 @@ async def test_handle_engine_status_change( notification_client.publish_advise_refetch_async.assert_any_await( topic=f"{Topics.RUNS}/1234" ) + + +async def test_publish_pre_serialized_commannds_notif( + runs_publisher: RunsPublisher, notification_client: AsyncMock +) -> None: + """It should send out a notification for pre serialized commands.""" + await runs_publisher.initialize( + "1234", lambda _: mock_curent_command("command1"), AsyncMock() + ) + + assert runs_publisher._run_hooks + assert runs_publisher._engine_state_slice + assert notification_client.publish_advise_refetch_async.call_count == 2 + + await runs_publisher.publish_pre_serialized_commands_notification(run_id="1234") + + assert notification_client.publish_advise_refetch_async.call_count == 3 + + notification_client.publish_advise_refetch_async.assert_any_await( + topic=f"{Topics.RUNS_PRE_SERIALIZED_COMMANDS}/1234" + )