Skip to content

Commit

Permalink
fix(robot-server): notify /runs when a non-current run is deleted
Browse files Browse the repository at this point in the history
When a non-current run is deleted, the server does not publish to the /runs topic a refetch flag.
Instead of conditionally publishing a notify flag when a run is deleted, do so always.
  • Loading branch information
mjhuff committed Apr 22, 2024
1 parent 8776ed9 commit 1d858e9
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 16 deletions.
3 changes: 2 additions & 1 deletion robot-server/robot_server/runs/run_data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ async def delete(self, run_id: str) -> None:
"""
if run_id == self._engine_store.current_run_id:
await self._engine_store.clear()
await self._runs_publisher.clean_up_current_run()

await self._runs_publisher.clean_up_run(run_id=run_id)

self._run_store.remove(run_id=run_id)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,33 +71,33 @@ async def initialize(
)
self._engine_state_slice = EngineStateSlice()

await self._publish_runs_advise_refetch_async()
await self._publish_runs_advise_refetch_async(run_id=run_id)

async def clean_up_current_run(self) -> None:
"""Publish final refetch and unsubscribe flags."""
await self._publish_runs_advise_refetch_async()
await self._publish_runs_advise_unsubscribe_async()
async def clean_up_run(self, run_id: str) -> None:
"""Publish final refetch and unsubscribe flags for the given run."""
await self._publish_runs_advise_refetch_async(run_id=run_id)
await self._publish_runs_advise_unsubscribe_async(run_id=run_id)

async def _publish_current_command(self) -> None:
"""Publishes the equivalent of GET /runs/:runId/commands?cursor=null&pageLength=1."""
await self._client.publish_advise_refetch_async(
topic=Topics.RUNS_CURRENT_COMMAND
)

async def _publish_runs_advise_refetch_async(self) -> None:
async def _publish_runs_advise_refetch_async(self, run_id: str) -> None:
"""Publish a refetch flag for relevant runs topics."""
await self._client.publish_advise_refetch_async(topic=Topics.RUNS)

if self._run_hooks is not None:
await self._client.publish_advise_refetch_async(topic=Topics.RUNS)
await self._client.publish_advise_refetch_async(
topic=f"{Topics.RUNS}/{self._run_hooks.run_id}"
topic=f"{Topics.RUNS}/{run_id}"
)

async def _publish_runs_advise_unsubscribe_async(self) -> None:
async def _publish_runs_advise_unsubscribe_async(self, run_id: str) -> None:
"""Publish an unsubscribe flag for relevant runs topics."""
if self._run_hooks is not None:
await self._client.publish_advise_unsubscribe_async(
topic=f"{Topics.RUNS}/{self._run_hooks.run_id}"
)
await self._client.publish_advise_unsubscribe_async(
topic=f"{Topics.RUNS}/{run_id}"
)

async def _handle_current_command_change(self) -> None:
"""Publish a refetch flag if the current command has changed."""
Expand All @@ -121,7 +121,9 @@ async def _handle_engine_status_change(self) -> None:
and self._engine_state_slice.state_summary_status
!= current_state_summary.status
):
await self._publish_runs_advise_refetch_async()
await self._publish_runs_advise_refetch_async(
run_id=self._run_hooks.run_id
)
self._engine_state_slice.state_summary_status = (
current_state_summary.status
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async def test_clean_up_current_run(
"""It should publish to appropriate topics at the end of a run."""
await runs_publisher.initialize("1234", AsyncMock(), AsyncMock())

await runs_publisher.clean_up_current_run()
await runs_publisher.clean_up_run(run_id="1234")

notification_client.publish_advise_refetch_async.assert_any_await(topic=Topics.RUNS)
notification_client.publish_advise_refetch_async.assert_any_await(
Expand Down

0 comments on commit 1d858e9

Please sign in to comment.