Skip to content

Commit

Permalink
Fix stopped recv stream flow control underflow under reordering
Browse files Browse the repository at this point in the history
If frames are received out of order (e.g. due to packet loss) on a
stopped stream, the local `end` might be less than the high water mark
`Recv::end`. The previous code would set `Assembler::bytes_read` to
that incorrect lower value, potentially after it had previously taken
a higher value.

If `MAX_STREAM_DATA` frames are then queued for that stream (e.g. due
to retransmits prompted by packet loss), we might attempt to transmit
a smaller flow control credit than we had previously. This would cause
an underflow in the subtraction used to judge whether an increase in
flow control credit is worth sending, and violates the spec besides.

Because additional data on a stopped stream isn't useful, there's no
benefit to updating stream-level flow control at all, so we might as
well remove that path entirely. Connection-level flow control is still
maintained at the StreamsState level based on change in the stream's
high-water mark or determination of the final offset.
  • Loading branch information
Ralith committed May 19, 2024
1 parent e37a8f2 commit b529d6d
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 7 deletions.
4 changes: 0 additions & 4 deletions quinn-proto/src/connection/assembler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,6 @@ impl Assembler {
}
}

pub(super) fn set_bytes_read(&mut self, new: u64) {
self.bytes_read = new;
}

/// Number of bytes consumed by the application
pub(super) fn bytes_read(&self) -> u64 {
self.bytes_read
Expand Down
106 changes: 104 additions & 2 deletions quinn-proto/src/connection/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ impl Recv {
}

self.end = self.end.max(end);
// Don't bother storing data or releasing stream-level flow control credit if the stream's
// already stopped
if !self.stopped {
self.assembler.insert(frame.offset, frame.data, payload_len);
} else {
self.assembler.set_bytes_read(end);
}

Ok((new_bytes, frame.fin && self.stopped))
Expand Down Expand Up @@ -386,3 +386,105 @@ impl Default for RecvState {
Self::Recv { size: None }
}
}

#[cfg(test)]
mod tests {
use bytes::Bytes;

use crate::{Dir, Side};

use super::*;

#[test]
fn reordered_frames_while_stopped() {
const INITIAL_BYTES: u64 = 3;
const INITIAL_OFFSET: u64 = 3;
const RECV_WINDOW: u64 = 8;
let mut s = Recv::new(RECV_WINDOW);
let mut data_recvd = 0;
// Receive bytes 3..6
let (new_bytes, is_closed) = s
.ingest(
frame::Stream {
id: StreamId::new(Side::Client, Dir::Uni, 0),
offset: INITIAL_OFFSET,
fin: false,
data: Bytes::from_static(&[0; INITIAL_BYTES as usize]),
},
123,
data_recvd,
data_recvd + 1024,
)
.unwrap();
data_recvd += new_bytes;
assert_eq!(new_bytes, INITIAL_OFFSET + INITIAL_BYTES);
assert!(!is_closed);

let (credits, transmit) = s.stop().unwrap();
assert!(transmit.should_transmit());
assert_eq!(
credits,
INITIAL_OFFSET + INITIAL_BYTES,
"full connection flow control credit is issued by stop"
);

let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW);
assert!(!transmit.should_transmit());
assert_eq!(
max_stream_data, RECV_WINDOW,
"stream flow control credit isn't issued by stop"
);

// Receive byte 7
let (new_bytes, is_closed) = s
.ingest(
frame::Stream {
id: StreamId::new(Side::Client, Dir::Uni, 0),
offset: RECV_WINDOW - 1,
fin: false,
data: Bytes::from_static(&[0; 1]),
},
123,
data_recvd,
data_recvd + 1024,
)
.unwrap();
data_recvd += new_bytes;
assert_eq!(new_bytes, RECV_WINDOW - (INITIAL_OFFSET + INITIAL_BYTES));
assert!(!is_closed);

let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW);
assert!(!transmit.should_transmit());
assert_eq!(
max_stream_data, RECV_WINDOW,
"stream flow control credit isn't issued after stop"
);

// Receive bytes 0..3
let (new_bytes, is_closed) = s
.ingest(
frame::Stream {
id: StreamId::new(Side::Client, Dir::Uni, 0),
offset: 0,
fin: false,
data: Bytes::from_static(&[0; INITIAL_OFFSET as usize]),
},
123,
data_recvd,
data_recvd + 1024,
)
.unwrap();
assert_eq!(
new_bytes, 0,
"reordered frames don't issue connection-level flow control for stopped streams"
);
assert!(!is_closed);

let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW);
assert!(!transmit.should_transmit());
assert_eq!(
max_stream_data, RECV_WINDOW,
"stream flow control credit isn't issued after stop"
);
}
}
2 changes: 1 addition & 1 deletion quinn-proto/src/connection/streams/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ impl StreamsState {
}
self.on_stream_frame(!stopped, id);

// Update flow control
// Update connection-level flow control
Ok(if bytes_read != final_offset.into_inner() {
// bytes_read is always <= end, so this won't underflow.
self.data_recvd = self
Expand Down

0 comments on commit b529d6d

Please sign in to comment.