-
Notifications
You must be signed in to change notification settings - Fork 68
feat: Avoid blocking tokio::select branches on a potent. full channel #724
Conversation
ff03272
to
e3ea5b4
Compare
991f17c
to
e08ff1f
Compare
ab5bdfb
to
90f7a37
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is excellent work @huitseeker 💯 !! Left a couple of nit comments
pub fn try_next(&mut self) -> impl Future<Output = Result<Option<U>, V>> + '_ { | ||
self.queue.try_next().inspect_ok(|val| { | ||
if val.is_some() { | ||
self.push_semaphore.add_permits(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we want to add a permit only in case of successful response?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because this is a TryStreamExt::try_next
, so the semantics of this returning an Option::None
are the same as StreamExt::next
, i.e. the stream is empty. In which case its available permits is already at capacity, and adding more permits would raise it above the capacity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In c535706 I've edited tests to reflect what happens with the capacity when you do try_next()
past the end of the stream. I checked those tests do not pass if you remove the corresponding if val.is_some()
guards.
We block on sending to a channel that may be full. This rather changes the behavior to poll the channel before blocking indefinitely on it. Percolates backpressure one level up from the output channels of: - header_waiter, - certificate_waiter,
93085a7
to
4eb7523
Compare
4eb7523
to
5beef43
Compare
…MystenLabs#724) * feat: don't block on sending on a full channel in primary We block on sending to a channel that may be full. This rather changes the behavior to poll the channel before blocking indefinitely on it. Percolates backpressure one level up from the output channels of: - header_waiter, - certificate_waiter, * feat: don't block on sending on a full channel in executor * feat: add a bounded queue for FuturesUnordered * feat: Bound pending elements in certificate waiter & header waiter * feat: Bound pending elements in subscriber * fix: edit tests to check `try_next()` relationship w/ capacity * feat: add & update metrics for waiting lists in {certificate, header}waiter * feat: add metric for waiting elements to subscriber
…#724) * feat: don't block on sending on a full channel in primary We block on sending to a channel that may be full. This rather changes the behavior to poll the channel before blocking indefinitely on it. Percolates backpressure one level up from the output channels of: - header_waiter, - certificate_waiter, * feat: don't block on sending on a full channel in executor * feat: add a bounded queue for FuturesUnordered * feat: Bound pending elements in certificate waiter & header waiter * feat: Bound pending elements in subscriber * fix: edit tests to check `try_next()` relationship w/ capacity * feat: add & update metrics for waiting lists in {certificate, header}waiter * feat: add metric for waiting elements to subscriber
…MystenLabs/narwhal#724) * feat: don't block on sending on a full channel in primary We block on sending to a channel that may be full. This rather changes the behavior to poll the channel before blocking indefinitely on it. Percolates backpressure one level up from the output channels of: - header_waiter, - certificate_waiter, * feat: don't block on sending on a full channel in executor * feat: add a bounded queue for FuturesUnordered * feat: Bound pending elements in certificate waiter & header waiter * feat: Bound pending elements in subscriber * fix: edit tests to check `try_next()` relationship w/ capacity * feat: add & update metrics for waiting lists in {certificate, header}waiter * feat: add metric for waiting elements to subscriber
This is a re-do of #705.
Checkpointing some (incomplete) backpressure work on top of #691.
This PR is an incremental improvement.
tokio::select
branches on a potent. full channel #705.tokio::select
,waiting : Futures{Un,O}rdered
would fill up indefinitely,BoundedFutures{Un,O}rdered
waiting buffers, gives them (arguably) sensible defaults, and uses preconditions to gate select branches.Todo