-
Notifications
You must be signed in to change notification settings - Fork 179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf(api): Do not wait for the event loop to be free when reporting APIv2 commands #9238
Conversation
Codecov Report
@@ Coverage Diff @@
## edge #9238 +/- ##
==========================================
+ Coverage 75.42% 75.47% +0.05%
==========================================
Files 1824 1826 +2
Lines 49459 49592 +133
Branches 4729 4729
==========================================
+ Hits 37303 37431 +128
- Misses 11290 11295 +5
Partials 866 866
Flags with carried forward coverage won't be shown. Click here to find out more.
|
_T = TypeVar("_T") | ||
|
||
|
||
class ThreadAsyncQueue(Generic[_T]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any better names for this?
- The Python standard library has "queues" (
queue.Queue
andasyncio.Queue
), which lack a close method. - Go and Trio add a close method and call them "channels". AnyIO does the same, but calls them "memory object streams".
I'm leaning towards something "channel"-based, like ThreadAsyncChannel
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm partial to "streams" myself, but channel sounds nice, too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving it as Queue
for now because after Googling this, the terminology seems pretty messy and unstandardized, and "queue" wins the tie because it matches something that people might already know from the Python standard library.
class ThreadAsyncQueue(Generic[_T]): | ||
"""A queue to safely pass values of type `_T` between threads and async tasks. | ||
|
||
All methods are safe to call concurrently from any thread or task. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a stronger guarantee than we strictly require for this PR, since we only have one producer thread and one consumer async task.
Weakening the safety guarantees opens up some different implementation options. But after trying a few, they didn't seem that much easier, and they took extra care to explain and understand. So I went with this.
Warning: | ||
A waiting `get_async()` won't be interrupted by an async cancellation. | ||
The proper way to interrupt a waiting `get_async()` is to close the queue, | ||
just like you have to do with `get()`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't looked into this too deeply. There might be a way to get async cancellations to work. But it didn't seem worthwhile, since:
- Good producer/consumer practice is to ensure the send side closes eventually, anyway. (In my opinion.)
- This same concern exists with the standard
queue.Queue
.
83d1cb8
to
4bb753b
Compare
This is getting far-reaching. Is there a better approach?
4bb753b
to
9e1f889
Compare
This complication was based on a misunderstanding of how Python generators work.
# Capture the function that the plugin sets up as its labware load callback. | ||
# Also, ensure that all subscribe calls return an actual unsubscribe callable | ||
# (instead of Decoy's default `None`) so subject.teardown() works. | ||
labware_handler_captor = matchers.Captor() | ||
decoy.when( | ||
legacy_context.broker.subscribe(topic="command", handler=matchers.Anything()) | ||
).then_return(decoy.mock()) | ||
decoy.when( | ||
legacy_context.labware_load_broker.subscribe(callback=labware_handler_captor) | ||
).then_return(decoy.mock()) | ||
decoy.when( | ||
legacy_context.instrument_load_broker.subscribe(callback=matchers.Anything()) | ||
).then_return(decoy.mock()) | ||
decoy.when( | ||
legacy_context.module_load_broker.subscribe(callback=matchers.Anything()) | ||
).then_return(decoy.mock()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm open to ideas for reducing this duplication across tests. My attempts just made the tests harder to follow.
We could also wipe a lot of this away if we merged .labware_load_broker
, .instrument_load_broker
, and .module_load_broker
into a single .equipment_broker
.
def stop(self) -> None: | ||
async def stop(self) -> None: | ||
"""Stop any configured plugins.""" | ||
for p in self._plugins: | ||
p.teardown() | ||
await p.teardown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took the easy way out by changing plugin.teardown()
and plugin_starter.stop()
to be async
. I haven't totally thought this through yet, but in another PR, I'd like to refactor this to make it easier to follow the lifetime of the LegacyContextPlugin
.
Currently, cleaning up the LegacyContextPlugin
works like this:
- The protocol thread exits, either gracefully or by exception.
ProtocolRunner
callsProtocolEngine.finish()
.ProtocolEngine.finish()
callsPluginStarter.stop()
.PluginStarter.stop()
calls.teardown()
on all plugins, which is currently just theLegacyContextPlugin
.
But all LegacyContextPlugin
really cares about is:
- That the protocol thread has exited, so it knows there are no more events coming in.
- That the
ProtocolEngine
is still open, so it knows it can still dispatch actions into theProtocolEngine
as it drains itsThreadAsyncQueue
.
So in principle, we shouldn't need to involve ProtocolEngine.finish()
. And I think involving ProtocolEngine.finish()
makes things a little more fragile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
self._deque.append(value) | ||
self._condition.notify() | ||
|
||
def get(self) -> _T: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this method is used publicly, consider making it private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're correct, it's not used publicly, and I agree with the spirit of making it private. get_until_closed()
is probably what clients should use instead.
I'm keeping it public for now despite this, because get()
is part of the "normal" API for channels (as seen in various implementations), and conformity feels like a good thing when implementing multithreading primitives. I'm happy to delete it later if it feels noisy, and we feel like deleting it wouldn't impede understanding of the class.
# Wait for something to change, then check again. | ||
self._condition.wait() | ||
|
||
def get_until_closed(self) -> Iterable[_T]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also not used publicly, consider private method
## Overview This removes a potential source of deadlocks and race conditions. I originally thought this would help with EXEC-417. I'm not sure it actually will at this point, but maybe we should still do it. ## Background When the robot executes a Python protocol, the `ProtocolEngine` and higher-level HTTP stuff live in the main thread, and the user's Python script runs in a worker thread. For older (`apiLevel`≤2.13) protocols, `LegacyContextPlugin` transports progress reports from the worker thread to the main thread. When we initially built all of this, we had [severe performance problems](#9133) where the main thread would get bogged down serializing a big HTTP response or something, and then the backpressure through `LegacyContextPlugin` would stall the protocol thread, causing visible jankiness in robot motion. We did a bunch of stuff to try to fix this. One fix was to [insert an infinite non-blocking queue](#9238) between the two threads to remove the backpressure. I strongly suspect that that fix is unnecessary today. As evidence: It only ever applied to older, `apiLevel`≤2.13, protocols. Newer, `apiLevel`≥2.14 ones, have always run through a different pipeline, which still has backpressure. And the newer pipeline is apparently not perceived to have janky motion. Removing the fix would remove concurrency, which would be a meaningful simplification. For instance, I've seen hints that [this part](https://github.com/Opentrons/opentrons/blob/45e14ca26359720740db744124b464bdcc84264c/api/src/opentrons/protocol_engine/execution/hardware_stopper.py#L70) of run cleanup is racy, because it depends on whether all the `pickUpTip`/`dropTip` commands got consumed from the queue by the time we reach there. ## Test Plan and Hands on Testing * [x] On an OT-2, run some protocols that are movement-intensive (e.g. lots of `touch_tip()`s) and have `apiLevel` ≤ 2.13. Click around in the app and induce some network requests. There might be some jankiness, but make sure it's not severe. Really, I think we just need to make sure it's not any worse than `apiLevel` ≥ 2.14. * I'm noticing jank lasting ~0-1 seconds when you: navigate to the device details page (understandable); navigate through the device settings (weird); and navigate to the device list page (concerning). All of this applies equally to `apiLevel≥2.14`, so if we're concerned about this, retaining this non-blocking queue in `LegacyContextPlugin` is not the correct solution. ## Changelog * Delete the non-blocking queue between the Python protocol thread . Replace it with simple inter-thread function calls. ## Review requests Is this a good idea? ## Risk assessment Medium. There is a risk that this *is* covering up enduring jankiness problems, and removing it will bring the problems back. This may not be obvious to us in the office because we're more focused on Flexes, which don't run `apiLevel`≤2.13 protocols. But there's also risk to having the concurrency, so... 🤷
Overview
This PR addresses a piece of the robot jankiness problems described in issue #9133.
The
ProtocolEngine
command list and the HTTP API live in the main thread, and operate within theasyncio
event loop. The APIv2 protocol meanwhile runs in its own thread, since APIv2 commands are all synchronous and blocking.The protocol thread needs to report its ongoing progress to the
ProtocolEngine
in the main thread, for HTTP polls to be able to inspect it. With our current implementation, every time the protocol completes a command:ProtocolEngine
update to happen ASAP in the main thread's event loop.The problem is that if the main thread's event loop is, for whatever reason, occupied for a long time, the protocol thread will be stuck waiting. We think this is part of what's causing visibly janky motion when the robot falls under heavy HTTP load.
To fix this, this PR introduces some elasticity between the protocol thread and the main thread. Now, every time the protocol completes a command:
ProtocolEngine
update to happen ASAP in the main thread's event loop.Changelog
ThreadAsyncQueue
, to act as a generic data channel between threads and async tasks.ThreadAsyncQueue
(via the event handlers defined inLegacyContextPlugin
).ThreadAsyncQueue
and update theProtocolEngine
accordingly.Review requests
Run some protocols on a real robot or a dev server. Check to see that the run log updates normally. When the protocol completes successfully, check to see that the run log isn't truncated. When the protocol is cancelled, check to see that the cancellation shows up in the right place in the run log.
Risk assessment
Medium. This only barely touches the thread that's actually executing the protocol, so protocol execution shouldn't break. However, our run and
ProtocolEngine
lifetime is a bit complicated. If I got something about it wrong, we might see messed up run logs, or thread leakage, or task leakage.