-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[mplex] Split the receive buffer per substream. #1784
Conversation
This split allows more efficient reading from the buffer for a particular substream and to reset only the offending substream if it reaches its buffer limit with `MaxBufferBehaviour::ResetStream`. Previously this was implemented as `MaxBufferBehaviour::CloseAll` and resulted in the entire connection closing. The buffer split should be advantageous whenever not all substreams are read at the same pace and some temporarily fall behind in consuming inbound data frames.
Naturally, the semantics of the configuration option |
muxers/mplex/src/io.rs
Outdated
// No task dedicated to the blocked stream woken, so schedule | ||
// this task again to have a chance at progress. | ||
cx.waker().clone().wake(); | ||
} else if blocked_id != &id { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} else if blocked_id != &id { | |
} else { | |
if blocked_id == &id { | |
debug_assert!(false, "..."); | |
} | |
} |
What do you think of ensuring that a read on a blocking stream never makes it this far?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you think it can never happen that the current task is woken at line 387
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do think it can happen that the current task is woken up in 387
and thus control flow reaches into the else
block in 391
, but only when the current task is not trying to read from a stream that is currently blocking the connection.
My suggestion is confusing. I hope I am not missing something. Let me retry:
Say the read buffer of stream S1 is full and thus is blocking the entire connection. Say that the current task is trying to read from S1. In this case the current task would return Poll::Ready(Ok(Some(data)))
in 364
already before the loop
.
With the above in mind, one can not reach the loop when trying to read from the stream (in the case above S1) that is currently blocking the connection. To enforce this one could insert a debug_assert
below 382
like such:
if let Some(blocked_id) = &self.blocking_stream {
+ if blocked_id == id { debug_assert!(false, "..."); }
+
// We have a blocked stream and cannot continue reading
// new frames for any stream, until frames are taken from
// the blocked stream's buffer. Try to wake a pending reader
// of the blocked stream.
I don't think this is particularly important. Thus feel free to ignore this suggestion. What I would like to ensure though is that I am not misunderstanding something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Your comment made me aware though that, contrary to my own expectations, the check for the blocking stream is currently only in poll_read_stream
. That means poll_next_stream
is never blocked and can grow the substream buffers indefinitely, only limited by hitting the max_substreams
limit. I corrected that here together with more commentary. I would appreciate if you could give that another critical look to see if it all makes sense now. I'll see if I can come up with some property tests w.r.t. the MaxBufferBehaviour
.
Co-authored-by: Max Inden <[email protected]>
Since the limit is now per substream and the default `max_substreams` is `128`, this new limit retains the previous overall resource bounds for the buffers.
I went ahead and changed the default |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would appreciate if you could give that another critical look to see if it all makes sense now.
I like how you extracted buffering into fn buffer
and handling blocking on max buffer size reach within fn poll_read_frame
. Adjustments look good to me.
I'll see if I can come up with some property tests w.r.t. the MaxBufferBehaviour.
That would be great. Curious what it might find.
muxers/mplex/src/io.rs
Outdated
/// Fails the entire multiplexed stream if too many pending `Reset` | ||
/// frames accumulate when using [`MaxBufferBehaviour::ResetStream`]. | ||
fn buffer(&mut self, id: LocalStreamId, data: Bytes) -> io::Result<()> { | ||
if let Some(state) = self.substreams.get_mut(&id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Styling suggestion, feel free to ignore: Given that the else
block has a single line, wouldn't an early return make this more readable, reducing the amount of indentation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I incorporated that suggestion here together with some randomised testing for the MaxBufferBehaviour
. It didn't reveal any new issues but increased confidence. There is a lot of potential for randomised (state machine) testing for this kind of code.
muxers/mplex/src/io.rs
Outdated
/// frames accumulate when using [`MaxBufferBehaviour::ResetStream`]. | ||
fn buffer(&mut self, id: LocalStreamId, data: Bytes) -> io::Result<()> { | ||
if let Some(state) = self.substreams.get_mut(&id) { | ||
if let Some(buf) = state.recv_buf_open() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above.
@mxinden Thanks for your reviews. |
This PR picks up a discussion from #1769 (comment) about splitting the shared receive buffer per substream. This split allows more efficient reading from the buffer for a particular substream, to efficiently drop all still buffered frames for a particular stream when it is dropped, and to reset only the offending substream if it reaches its buffer limit with
MaxBufferBehaviour::ResetStream
. Previously this was implemented asMaxBufferBehaviour::CloseAll
and resulted in the entire connection closing. The buffer split should generally be advantageous whenever not all substreams are read at the same pace and some temporarily fall behind in consuming their data.