Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): Do not enqueue json commands on protocol load #14759

Merged
merged 22 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
07b01f5
added schema definition for fixtures
TamarZanzouri Sep 14, 2023
e44a81a
initial move commands list into task_queue
TamarZanzouri Mar 26, 2024
a073995
moved logic from task queue into runner for json protocols
TamarZanzouri Mar 27, 2024
ce43819
reverted task queue changes
TamarZanzouri Mar 27, 2024
004e498
json runner with Event set and wait. fixed v6 upload test
TamarZanzouri Mar 28, 2024
e2d4658
reverted task queue changes and use runner._run method instead
TamarZanzouri Mar 29, 2024
f87b1db
Delete shared-data/fixture/schemas/1.json
TamarZanzouri Mar 29, 2024
3d7c688
clean up
TamarZanzouri Mar 29, 2024
23c7d40
Merge branch 'edge' into EXEC-352-do-not-enqueue-json-protocol-commands
TamarZanzouri Mar 29, 2024
747974c
fixed failing test from merge
TamarZanzouri Apr 1, 2024
bd2afe5
fixed failing test
TamarZanzouri Apr 1, 2024
335c10d
started adding tests WIP
TamarZanzouri Apr 2, 2024
2e1c4f4
added captors and tested add_and_execute_command
TamarZanzouri Apr 2, 2024
cb83ccf
added test for break on stop
TamarZanzouri Apr 2, 2024
819d957
added test to not execute commands if run stopped
TamarZanzouri Apr 3, 2024
c5249d3
fixed logic with test
TamarZanzouri Apr 3, 2024
6c812d6
added tests for papi and json play and stop in the middle of a run
TamarZanzouri Apr 4, 2024
76aa3e5
fixed unit test to match logic in e2e test
TamarZanzouri Apr 4, 2024
d3b0cd3
changed tests to use delay and wait for a running state
TamarZanzouri Apr 4, 2024
25344bd
Update robot-server/tests/integration/http_api/runs/test_play_stop_pa…
TamarZanzouri Apr 4, 2024
02a6ead
removed retry on get run commands
TamarZanzouri Apr 4, 2024
123f313
removed retry
TamarZanzouri Apr 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions api/src/opentrons/protocol_runner/protocol_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
LegacyExecutor,
LegacyLoadInfo,
)
from ..protocol_engine.errors import ProtocolCommandFailedError
from ..protocol_engine.types import (
PostRunHardwareState,
DeckConfigurationType,
Expand Down Expand Up @@ -283,6 +284,7 @@ def __init__(
)

self._hardware_api.should_taskify_movement_execution(taskify=False)
self._queued_commands: List[pe_commands.CommandCreate] = []

async def load(self, protocol_source: ProtocolSource) -> None:
"""Load a JSONv6+ ProtocolSource into managed ProtocolEngine."""
Expand Down Expand Up @@ -324,17 +326,17 @@ async def load(self, protocol_source: ProtocolSource) -> None:
color=liquid.displayColor,
)
await _yield()

initial_home_command = pe_commands.HomeCreate(
params=pe_commands.HomeParams(axes=None)
)
# this command homes all axes, including pipette plugner and gripper jaw
self._protocol_engine.add_command(request=initial_home_command)

for command in commands:
self._protocol_engine.add_command(request=command)
await _yield()
self._queued_commands.append(command)

self._task_queue.set_run_func(func=self._protocol_engine.wait_until_complete)
self._task_queue.set_run_func(func=self._add_command_and_execute)

async def run( # noqa: D102
self,
Expand All @@ -355,6 +357,15 @@ async def run( # noqa: D102
commands = self._protocol_engine.state_view.commands.get_all()
return RunResult(commands=commands, state_summary=run_data, parameters=[])

async def _add_command_and_execute(self) -> None:
for command in self._queued_commands:
result = await self._protocol_engine.add_and_execute_command(command)
if result.error:
raise ProtocolCommandFailedError(
original_error=result.error,
message=f"{result.error.errorType}: {result.error.detail}",
)

Copy link
Contributor

@SyntaxColoring SyntaxColoring Apr 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignoring queued commands for a moment, this doesn't look like it matches the existing implementation based on QueueWorker+get_next_to_execute(). This does not have:

Do we need to handle those things, or are they already handled elsewhere in some way that I'm missing?

Copy link
Contributor Author

@TamarZanzouri TamarZanzouri Apr 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setup commands are added to the queue in PE but are being executed after the initial home commands instead of at the end of the run. adding prioritization will happen in a follow up pr.

Prioritization of "intent": "setup" commands
But according to the tests, this is still working, somehow?

Special handling of RunStoppedError to break instead of raising an error
Pausing on recoverable errors (added in #14646)
You can see #14753 for how I'm doing this for Python protocols. If we're making JSON behave more like Python now, maybe this should work the same way.

Will add logic in this pr

Yielding to the event loop on each command

I do not think we need this bc add_and_execute_command is async and add_command is synch but I will test this to make sure we hare not blocking anything.

Copy link
Contributor

@SyntaxColoring SyntaxColoring Apr 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think we need this bc add_and_execute_command is async and add_command is synch but I will test this to make sure we hare not blocking anything.

We do need it, unfortunately, for the reasons described in this comment in the old implementation:

await self._command_executor.execute(command_id=command_id)
# Yield to the event loop in case we're executing a long sequence of commands
# that never yields internally. For example, a long sequence of comment commands.
await asyncio.sleep(0)

Unlike JavaScript, Python's await isn't guaranteed to yield to the event loop. It will only do so when it calls something that goes back to the event loop, like network I/O or an asyncio.sleep().


Other than that, that all makes sense, thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I tested this by adding a background task that prints, then time.sleep(3) and was able to see the printing sequence while executing the commands. so this test is not efficient? @SyntaxColoring @sfoster1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need the await because, crucially, what we're talking about here is in a different async stack than the lines you linked to @SyntaxColoring . This PR does not replace or even touch the execution queue worker.

The way the engine works, just so we agree, is that there's fundamentally at least three async stacks: the execution stack, which is controlled by the execution queue linked above and owned by the engine, and awaits command implementations; the runner task-queue stack, which is what is changed here and owned by the runner, and (now) dispatches commands via add_and_execute_command; and the stack that calls the runner's primary interface, which is I think owned by the server's run data manager.

The execution stack, as you mention, definitely needs to have an explicit yield in case it's running a bunch of commands that have no yield points... so it does, it's in what you linked above, which is still what's used and which this PR does not touch.

The server stack does not need to have an explicit yield as long as the runner uses the task queue, since if the runner uses a task queue it await self._task_queue.join(), which awaits a future, which is a yield point.

The task queue stack (which is what's changed here) does not need to have an explicit yield if it's using something that eventually uses wait_for because wait_for await (asyncio.Event())s, which is a sync point.

Copy link
Contributor

@SyntaxColoring SyntaxColoring Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that makes sense, thanks. I missed that this was still going through the QueueWorker under the hood.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which means I was also wrong about this:

Special handling of RunStoppedError to break instead of raising an error

Per the documentation of ProtocolEngine.add_and_execute(), if the run is stopped, it actually doesn't raise a RunStoppedError—it returns the command still queued.

Copy link
Contributor Author

@TamarZanzouri TamarZanzouri Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SyntaxColoring
I had a feeling but I wanted to prove that through a test I am trying to add :-)

Which means I was also wrong about this:

Copy link
Contributor Author

@TamarZanzouri TamarZanzouri Apr 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SyntaxColoring I added e2e tests to prove that python and json protocols now behave the same when stopping a protocol mid run


class LiveRunner(AbstractRunner):
"""Protocol runner implementation for live http protocols."""
Expand Down
19 changes: 1 addition & 18 deletions api/tests/opentrons/protocol_runner/test_protocol_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,24 +393,7 @@ async def test_load_json_runner(
protocol_engine.add_command(
request=pe_commands.HomeCreate(params=pe_commands.HomeParams(axes=None))
),
protocol_engine.add_command(
request=pe_commands.WaitForResumeCreate(
params=pe_commands.WaitForResumeParams(message="hello")
)
),
protocol_engine.add_command(
request=pe_commands.WaitForResumeCreate(
params=pe_commands.WaitForResumeParams(message="goodbye")
)
),
protocol_engine.add_command(
request=pe_commands.LoadLiquidCreate(
params=pe_commands.LoadLiquidParams(
liquidId="water-id", labwareId="labware-id", volumeByWell={"A1": 30}
)
),
),
task_queue.set_run_func(func=protocol_engine.wait_until_complete),
task_queue.set_run_func(func=json_runner_subject._add_command_and_execute),
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,196 +93,18 @@ stages:
commandId: '{setup_command_id}'
key: '{setup_command_key}'
createdAt: '{setup_command_created_at}'
index: 14
index: 1
meta:
cursor: 0
totalLength: 15
totalLength: 2
data:
# Initial home
- id: !anystr
key: !anystr
commandType: home
createdAt: !anystr
status: queued
params: {}
- id: !anystr
key: !anystr
commandType: loadPipette
createdAt: !anystr
status: queued
params:
pipetteName: p10_single
mount: left
pipetteId: pipetteId
- id: !anystr
key: !anystr
commandType: loadModule
createdAt: !anystr
status: queued
params:
model: magneticModuleV1
location:
slotName: '3'
moduleId: magneticModuleId
- id: !anystr
key: !anystr
commandType: loadModule
createdAt: !anystr
status: queued
params:
model: temperatureModuleV2
location:
slotName: '1'
moduleId: temperatureModuleId
- id: !anystr
key: !anystr
commandType: loadLabware
createdAt: !anystr
status: queued
params:
location:
moduleId: temperatureModuleId
loadName: foo_8_plate_33ul
namespace: example
version: 1
labwareId: sourcePlateId
displayName: Source Plate
- id: !anystr
key: !anystr
commandType: loadLabware
createdAt: !anystr
status: queued
params:
location:
moduleId: magneticModuleId
loadName: foo_8_plate_33ul
namespace: example
version: 1
labwareId: destPlateId
displayName: Sample Collection Plate
- id: !anystr
key: !anystr
commandType: loadLabware
createdAt: !anystr
status: queued
params:
location:
slotName: '8'
loadName: opentrons_96_tiprack_10ul
namespace: opentrons
version: 1
labwareId: tipRackId
displayName: Opentrons 96 Tip Rack 10 µL
- id: !anystr
createdAt: !anystr
commandType: loadLiquid
key: !anystr
status: queued
params:
liquidId: 'waterId'
labwareId: 'sourcePlateId'
volumeByWell:
A1: 100
B1: 100
- id: !anystr
key: !anystr
commandType: pickUpTip
createdAt: !anystr
status: queued
params:
pipetteId: pipetteId
labwareId: tipRackId
wellName: B1
wellLocation:
origin: top
offset:
x: 0
'y': 0
z: 0
- id: !anystr
key: !anystr
commandType: aspirate
createdAt: !anystr
status: queued
params:
pipetteId: pipetteId
labwareId: sourcePlateId
wellName: A1
wellLocation:
origin: bottom
offset:
x: 0
'y': 0
z: 2
volume: 5
flowRate: 3
- id: !anystr
key: !anystr
commandType: dispense
createdAt: !anystr
status: queued
params:
pipetteId: pipetteId
labwareId: destPlateId
wellName: B1
wellLocation:
origin: bottom
offset:
x: 0
'y': 0
z: 1
volume: 4.5
flowRate: 2.5
- id: !anystr
key: !anystr
commandType: moveToWell
createdAt: !anystr
status: queued
params:
pipetteId: pipetteId
labwareId: destPlateId
wellName: B2
wellLocation:
origin: top
offset:
x: 0
'y': 0
z: 0
forceDirect: false
- id: !anystr
key: !anystr
commandType: moveToWell
createdAt: !anystr
status: queued
params:
pipetteId: pipetteId
labwareId: destPlateId
wellName: B2
wellLocation:
origin: bottom
offset:
x: 2
y: 3
z: 10
minimumZHeight: 35
forceDirect: true
speed: 12.3
- id: !anystr
key: !anystr
commandType: dropTip
createdAt: !anystr
status: queued
params:
pipetteId: pipetteId
labwareId: fixedTrash
wellName: A1
wellLocation:
origin: default
offset:
x: 0
y: 0
z: 0
alternateDropLocation: false
params: { }
- id: '{setup_command_id}'
key: '{setup_command_key}'
intent: setup
Expand Down Expand Up @@ -352,6 +174,15 @@ stages:
params: {}
startedAt: !anystr
completedAt: !anystr
- id: '{setup_command_id}'
key: '{setup_command_key}'
intent: setup
commandType: home
createdAt: '{setup_command_created_at}'
startedAt: '{setup_command_started_at}'
completedAt: '{setup_command_completed_at}'
status: succeeded
params: { }
- id: !anystr
key: !anystr
commandType: loadPipette
Expand Down Expand Up @@ -569,16 +400,6 @@ stages:
y: 0
z: 0
alternateDropLocation: false
- id: '{setup_command_id}'
key: '{setup_command_key}'
intent: setup
commandType: home
createdAt: '{setup_command_created_at}'
startedAt: '{setup_command_started_at}'
completedAt: '{setup_command_completed_at}'
status: succeeded
notes: []
params: {}

- name: Verify commands succeeded with pageLength and cursor
request:
Expand Down Expand Up @@ -610,12 +431,12 @@ stages:
notes: []
params:
location:
moduleId: magneticModuleId
moduleId: temperatureModuleId
loadName: foo_8_plate_33ul
namespace: example
version: 1
labwareId: destPlateId
displayName: Sample Collection Plate
labwareId: sourcePlateId
displayName: Source Plate
- id: !anystr
key: !anystr
commandType: loadLabware
Expand All @@ -626,9 +447,9 @@ stages:
notes: []
params:
location:
slotName: '8'
loadName: opentrons_96_tiprack_10ul
namespace: opentrons
moduleId: magneticModuleId
loadName: foo_8_plate_33ul
namespace: example
version: 1
labwareId: tipRackId
displayName: Opentrons 96 Tip Rack 10 µL
labwareId: destPlateId
displayName: Sample Collection Plate
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ stages:
meta:
runId: !anystr
commandId: !anystr
index: 4
index: 3
key: !anystr
createdAt: !anystr
meta:
cursor: 3
totalLength: 5
totalLength: 4
data:
- id: !anystr
key: !anystr
Expand Down Expand Up @@ -120,20 +120,4 @@ stages:
y: 0
z: 1
flowRate: 3.78
volume: 100
- id: !anystr
key: !anystr
commandType: pickUpTip
createdAt: !anystr
completedAt: !anystr
status: failed
params:
pipetteId: pipetteId
labwareId: tipRackId
wellName: A1
wellLocation:
origin: top
offset:
x: 0
y: 0
z: 0
volume: 100
Loading
Loading