-
Notifications
You must be signed in to change notification settings - Fork 68
Avoid blocking tokio::select
branches on a potent. full channel
#705
Conversation
We block on sending to a channel that may be full. This rather changes the behavior to poll the channel before blocking indefinitely on it. Percolates backpressure one level up from the output channels of: - header_waiter, - certificate_waiter,
tokio::select
branches on a potent. full channeltokio::select
branches on a potent. full channel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a very elegant solution to the tokio::select loop blocking potential issue.
futures::future::TryFutureExt::unwrap_or_else( | ||
futures::future::try_join( | ||
$fut, | ||
futures::TryFutureExt::map_err($sender.reserve(), |_e| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very elegant idea to use the reserve here. I am not fully fluent with macros: this will keep and return what is provided by the reservation (permit) until the send happens right?
.send(message.clone()) | ||
.await | ||
.expect("Failed to send message ot batch loader"); | ||
permit.send(message.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I confirmed the answer to my own question above is "yes". All is good!
}, | ||
) | ||
}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets stick this macro in the utils crate, and re-use it in places in Sui where we risk blocking select loops.
@@ -23,10 +23,31 @@ macro_rules! ensure { | |||
}; | |||
} | |||
|
|||
#[macro_export] | |||
macro_rules! try_fut_and_permit { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to define twice?
@@ -83,12 +82,9 @@ impl Subscriber { | |||
loop { | |||
tokio::select! { | |||
// Receive the ordered sequence of consensus messages from a consensus node. | |||
Some(message) = self.rx_consensus.recv() => { | |||
(Some(message), permit) = try_fut_and_permit!(self.rx_consensus.recv().map(Ok), self.tx_batch_loader) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So @huitseeker should we expect on every iteration to try and acquire a permit for the requested channel? I am trying to wrap my head around of how the underlying join will work here on the select branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIce!
This was closed by mistake, will re-open. |
Checkpointing some (incomplete) backpressure work on top of #691.
This PR is an incremental improvement.
Context
We often have branches of a
tokio::select
that fire, and then send a message on a channel as part of their immediate processing. This message is sent withsender.send(message).await
, which does not return immediately.The issue
This will yield to the executor, but not let the rest of the
select
resume.The solution
We introduce a macro that helps provide semantics by which a
select
branch fires if it has received an input for its condition (typically awaiting.next()
orreceiver.recv()
) and can acquire a permit to send a message for a downstream channel (typically asender.send(result)
). This gives the other branches in theselect
a chance to fire in case the downstream channel is full, as the macro-using branch will then not fire. The macro meshes better than an async function with the unusual borrowing behavior of theselect
macro.