Skip to content

Commit

Permalink
fix(robot-server): fix infinite cancelling state during protocol run
Browse files Browse the repository at this point in the history
Certain EngineStatus states are internal to PE and not handled by the robot-server, including
stop-requested. This means that when the app requests a STOP, robot-server doesn't internally manage
the transition from stop-requested->stopped. Simply emitting refetch flags whenever the robot-server
updates the state is therefore insufficient. The current solution is to do what we do for
current_commands, which is to poll PE and emit to the app if there's an update. In the future, we
should bubble up this event to robot-server from PE.
  • Loading branch information
mjhuff committed Feb 5, 2024
1 parent 1bb656d commit 292c4be
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 5 deletions.
4 changes: 3 additions & 1 deletion robot-server/robot_server/runs/run_data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ async def create(
protocol_id=protocol.protocol_id if protocol is not None else None,
)
await self._runs_publisher.begin_polling_engine_store(
get_current_command=self.get_current_command, run_id=run_id
get_current_command=self.get_current_command,
get_state_summary=self._get_state_summary,
run_id=run_id,
)

return _build_run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
import asyncio
from typing import Union, Callable, Optional

from opentrons.protocol_engine import (
CurrentCommand,
)
from opentrons.protocol_engine import CurrentCommand, StateSummary, EngineStatus

from server_utils.fastapi_utils.app_state import (
AppState,
Expand All @@ -23,11 +21,13 @@ def __init__(self, client: NotificationClient) -> None:
self._client = client
self._run_data_manager_polling = asyncio.Event()
self._previous_current_command: Union[CurrentCommand, None] = None
self._previous_state_summary_status: Union[EngineStatus, None] = None

# TODO(jh, 2023-02-02): Instead of polling, emit current_commands directly from PE.
async def begin_polling_engine_store(
self,
get_current_command: Callable[[str], Optional[CurrentCommand]],
get_state_summary: Callable[[str], Optional[StateSummary]],
run_id: str,
) -> None:
"""Continuously poll the engine store for the current_command.
Expand All @@ -38,7 +38,9 @@ async def begin_polling_engine_store(
"""
asyncio.create_task(
self._poll_engine_store(
get_current_command=get_current_command, run_id=run_id
get_current_command=get_current_command,
run_id=run_id,
get_state_summary=get_state_summary,
)
)

Expand All @@ -59,6 +61,7 @@ def publish_runs(self, run_id: str) -> None:
async def _poll_engine_store(
self,
get_current_command: Callable[[str], Optional[CurrentCommand]],
get_state_summary: Callable[[str], Optional[StateSummary]],
run_id: str,
) -> None:
"""Asynchronously publish new current commands.
Expand All @@ -69,12 +72,23 @@ async def _poll_engine_store(
"""
while not self._run_data_manager_polling.is_set():
current_command = get_current_command(run_id)
current_state_summary = get_state_summary(run_id)
current_state_summary_status = (
current_state_summary.status if current_state_summary else None
)
if (
current_command is not None
and self._previous_current_command != current_command
):
await self._publish_current_command()
self._previous_current_command = current_command

if (
current_state_summary_status is not None
and self._previous_state_summary_status != current_state_summary_status
):
await self._publish_runs_async(run_id=run_id)
self._previous_state_summary_status = current_state_summary_status
await asyncio.sleep(1)

async def _publish_current_command(
Expand All @@ -83,6 +97,15 @@ async def _publish_current_command(
"""Publishes the equivalent of GET /runs/:runId/commands?cursor=null&pageLength=1."""
await self._client.publish_async(topic=Topics.RUNS_CURRENT_COMMAND.value)

async def _publish_runs_async(self, run_id: str) -> None:
"""Asynchronously publishes the equivalent of GET /runs and GET /runs/:runId.
Args:
run_id: ID of the current run.
"""
await self._client.publish_async(topic=Topics.RUNS.value)
await self._client.publish_async(topic=f"{Topics.RUNS.value}/{run_id}")


_runs_publisher_accessor: AppStateAccessor[RunsPublisher] = AppStateAccessor[
RunsPublisher
Expand Down

0 comments on commit 292c4be

Please sign in to comment.