-
Notifications
You must be signed in to change notification settings - Fork 179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(robot-server): Fix indefinite protocol cancel state #14428
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,9 +2,7 @@ | |
import asyncio | ||
from typing import Union, Callable, Optional | ||
|
||
from opentrons.protocol_engine import ( | ||
CurrentCommand, | ||
) | ||
from opentrons.protocol_engine import CurrentCommand, StateSummary, EngineStatus | ||
|
||
from server_utils.fastapi_utils.app_state import ( | ||
AppState, | ||
|
@@ -23,11 +21,13 @@ def __init__(self, client: NotificationClient) -> None: | |
self._client = client | ||
self._run_data_manager_polling = asyncio.Event() | ||
self._previous_current_command: Union[CurrentCommand, None] = None | ||
self._previous_state_summary_status: Union[EngineStatus, None] = None | ||
|
||
# TODO(jh, 2023-02-02): Instead of polling, emit current_commands directly from PE. | ||
async def begin_polling_engine_store( | ||
self, | ||
get_current_command: Callable[[str], Optional[CurrentCommand]], | ||
get_state_summary: Callable[[str], Optional[StateSummary]], | ||
run_id: str, | ||
) -> None: | ||
"""Continuously poll the engine store for the current_command. | ||
|
@@ -38,7 +38,9 @@ async def begin_polling_engine_store( | |
""" | ||
asyncio.create_task( | ||
self._poll_engine_store( | ||
get_current_command=get_current_command, run_id=run_id | ||
get_current_command=get_current_command, | ||
run_id=run_id, | ||
get_state_summary=get_state_summary, | ||
) | ||
) | ||
|
||
|
@@ -59,6 +61,7 @@ def publish_runs(self, run_id: str) -> None: | |
async def _poll_engine_store( | ||
self, | ||
get_current_command: Callable[[str], Optional[CurrentCommand]], | ||
get_state_summary: Callable[[str], Optional[StateSummary]], | ||
run_id: str, | ||
) -> None: | ||
"""Asynchronously publish new current commands. | ||
|
@@ -69,12 +72,23 @@ async def _poll_engine_store( | |
""" | ||
while not self._run_data_manager_polling.is_set(): | ||
current_command = get_current_command(run_id) | ||
current_state_summary = get_state_summary(run_id) | ||
current_state_summary_status = ( | ||
current_state_summary.status if current_state_summary else None | ||
) | ||
if ( | ||
current_command is not None | ||
and self._previous_current_command != current_command | ||
): | ||
await self._publish_current_command() | ||
self._previous_current_command = current_command | ||
|
||
if ( | ||
current_state_summary_status is not None | ||
and self._previous_state_summary_status != current_state_summary_status | ||
): | ||
await self._publish_runs_async(run_id=run_id) | ||
self._previous_state_summary_status = current_state_summary_status | ||
Comment on lines
+86
to
+91
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There will be certain status transitions (like IDLE -> RUNNING -> PAUSED) that will be handled by the robot server and hence (I'm guessing) will have a notification sent out. How is the publisher differentiating between such a transition vs one that requires polling. Because would it not send out duplicate notifications otherwise? Or does that not matter since it's just a tiny redundancy? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question (and sorry to have missed this before the merge) - it's a redundancy here. As you note, it's relatively tiny compared to what the app does now. The idea is that this polling is pretty temporary - we're planning on implementing the callback strategy we discussed yesterday as a more permanent solution to this problem, and we should revisit any redundancy and clean it up then. I think it's sufficient for now, though! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. Thanks for the clarification! |
||
await asyncio.sleep(1) | ||
|
||
async def _publish_current_command( | ||
|
@@ -83,6 +97,15 @@ async def _publish_current_command( | |
"""Publishes the equivalent of GET /runs/:runId/commands?cursor=null&pageLength=1.""" | ||
await self._client.publish_async(topic=Topics.RUNS_CURRENT_COMMAND.value) | ||
|
||
async def _publish_runs_async(self, run_id: str) -> None: | ||
"""Asynchronously publishes the equivalent of GET /runs and GET /runs/:runId. | ||
|
||
Args: | ||
run_id: ID of the current run. | ||
""" | ||
await self._client.publish_async(topic=Topics.RUNS.value) | ||
await self._client.publish_async(topic=f"{Topics.RUNS.value}/{run_id}") | ||
|
||
|
||
_runs_publisher_accessor: AppStateAccessor[RunsPublisher] = AppStateAccessor[ | ||
RunsPublisher | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still confused about how this worked lol