diff --git a/app/src/organisms/RunTimeControl/hooks.ts b/app/src/organisms/RunTimeControl/hooks.ts index 4fb777f8ec6..7c63c55212c 100644 --- a/app/src/organisms/RunTimeControl/hooks.ts +++ b/app/src/organisms/RunTimeControl/hooks.ts @@ -77,11 +77,9 @@ export function useRunStatus( refetchInterval: DEFAULT_STATUS_REFETCH_INTERVAL, enabled: lastRunStatus.current == null || - !([ - RUN_STATUS_STOP_REQUESTED, - RUN_STATUS_FAILED, - RUN_STATUS_SUCCEEDED, - ] as RunStatus[]).includes(lastRunStatus.current), + !([RUN_STATUS_FAILED, RUN_STATUS_SUCCEEDED] as RunStatus[]).includes( + lastRunStatus.current + ), onSuccess: data => (lastRunStatus.current = data?.data?.status ?? null), ...options, }) diff --git a/robot-server/robot_server/runs/run_data_manager.py b/robot-server/robot_server/runs/run_data_manager.py index a23487d33bd..05abf3a3d14 100644 --- a/robot-server/robot_server/runs/run_data_manager.py +++ b/robot-server/robot_server/runs/run_data_manager.py @@ -133,7 +133,9 @@ async def create( protocol_id=protocol.protocol_id if protocol is not None else None, ) await self._runs_publisher.begin_polling_engine_store( - get_current_command=self.get_current_command, run_id=run_id + get_current_command=self.get_current_command, + get_state_summary=self._get_state_summary, + run_id=run_id, ) return _build_run( diff --git a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py index 3d6acb10ab6..1010b9a2fc0 100644 --- a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py +++ b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py @@ -2,9 +2,7 @@ import asyncio from typing import Union, Callable, Optional -from opentrons.protocol_engine import ( - CurrentCommand, -) +from opentrons.protocol_engine import CurrentCommand, StateSummary, EngineStatus from server_utils.fastapi_utils.app_state import ( AppState, @@ -23,11 +21,13 @@ def __init__(self, client: NotificationClient) -> None: self._client = client self._run_data_manager_polling = asyncio.Event() self._previous_current_command: Union[CurrentCommand, None] = None + self._previous_state_summary_status: Union[EngineStatus, None] = None # TODO(jh, 2023-02-02): Instead of polling, emit current_commands directly from PE. async def begin_polling_engine_store( self, get_current_command: Callable[[str], Optional[CurrentCommand]], + get_state_summary: Callable[[str], Optional[StateSummary]], run_id: str, ) -> None: """Continuously poll the engine store for the current_command. @@ -38,7 +38,9 @@ async def begin_polling_engine_store( """ asyncio.create_task( self._poll_engine_store( - get_current_command=get_current_command, run_id=run_id + get_current_command=get_current_command, + run_id=run_id, + get_state_summary=get_state_summary, ) ) @@ -59,6 +61,7 @@ def publish_runs(self, run_id: str) -> None: async def _poll_engine_store( self, get_current_command: Callable[[str], Optional[CurrentCommand]], + get_state_summary: Callable[[str], Optional[StateSummary]], run_id: str, ) -> None: """Asynchronously publish new current commands. @@ -69,12 +72,23 @@ async def _poll_engine_store( """ while not self._run_data_manager_polling.is_set(): current_command = get_current_command(run_id) + current_state_summary = get_state_summary(run_id) + current_state_summary_status = ( + current_state_summary.status if current_state_summary else None + ) if ( current_command is not None and self._previous_current_command != current_command ): await self._publish_current_command() self._previous_current_command = current_command + + if ( + current_state_summary_status is not None + and self._previous_state_summary_status != current_state_summary_status + ): + await self._publish_runs_async(run_id=run_id) + self._previous_state_summary_status = current_state_summary_status await asyncio.sleep(1) async def _publish_current_command( @@ -83,6 +97,15 @@ async def _publish_current_command( """Publishes the equivalent of GET /runs/:runId/commands?cursor=null&pageLength=1.""" await self._client.publish_async(topic=Topics.RUNS_CURRENT_COMMAND.value) + async def _publish_runs_async(self, run_id: str) -> None: + """Asynchronously publishes the equivalent of GET /runs and GET /runs/:runId. + + Args: + run_id: ID of the current run. + """ + await self._client.publish_async(topic=Topics.RUNS.value) + await self._client.publish_async(topic=f"{Topics.RUNS.value}/{run_id}") + _runs_publisher_accessor: AppStateAccessor[RunsPublisher] = AppStateAccessor[ RunsPublisher