diff --git a/quinn-proto/src/connection/assembler.rs b/quinn-proto/src/connection/assembler.rs index c9a49fceb..255ef53a4 100644 --- a/quinn-proto/src/connection/assembler.rs +++ b/quinn-proto/src/connection/assembler.rs @@ -200,10 +200,6 @@ impl Assembler { } } - pub(super) fn set_bytes_read(&mut self, new: u64) { - self.bytes_read = new; - } - /// Number of bytes consumed by the application pub(super) fn bytes_read(&self) -> u64 { self.bytes_read diff --git a/quinn-proto/src/connection/streams/recv.rs b/quinn-proto/src/connection/streams/recv.rs index a8b9fbb0e..509148039 100644 --- a/quinn-proto/src/connection/streams/recv.rs +++ b/quinn-proto/src/connection/streams/recv.rs @@ -64,10 +64,10 @@ impl Recv { } self.end = self.end.max(end); + // Don't bother storing data or releasing stream-level flow control credit if the stream's + // already stopped if !self.stopped { self.assembler.insert(frame.offset, frame.data, payload_len); - } else { - self.assembler.set_bytes_read(end); } Ok((new_bytes, frame.fin && self.stopped)) @@ -386,3 +386,105 @@ impl Default for RecvState { Self::Recv { size: None } } } + +#[cfg(test)] +mod tests { + use bytes::Bytes; + + use crate::{Dir, Side}; + + use super::*; + + #[test] + fn reordered_frames_while_stopped() { + const INITIAL_BYTES: u64 = 3; + const INITIAL_OFFSET: u64 = 3; + const RECV_WINDOW: u64 = 8; + let mut s = Recv::new(RECV_WINDOW); + let mut data_recvd = 0; + // Receive bytes 3..6 + let (new_bytes, is_closed) = s + .ingest( + frame::Stream { + id: StreamId::new(Side::Client, Dir::Uni, 0), + offset: INITIAL_OFFSET, + fin: false, + data: Bytes::from_static(&[0; INITIAL_BYTES as usize]), + }, + 123, + data_recvd, + data_recvd + 1024, + ) + .unwrap(); + data_recvd += new_bytes; + assert_eq!(new_bytes, INITIAL_OFFSET + INITIAL_BYTES); + assert!(!is_closed); + + let (credits, transmit) = s.stop().unwrap(); + assert!(transmit.should_transmit()); + assert_eq!( + credits, + INITIAL_OFFSET + INITIAL_BYTES, + "full connection flow control credit is issued by stop" + ); + + let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW); + assert!(!transmit.should_transmit()); + assert_eq!( + max_stream_data, RECV_WINDOW, + "stream flow control credit isn't issued by stop" + ); + + // Receive byte 7 + let (new_bytes, is_closed) = s + .ingest( + frame::Stream { + id: StreamId::new(Side::Client, Dir::Uni, 0), + offset: RECV_WINDOW - 1, + fin: false, + data: Bytes::from_static(&[0; 1]), + }, + 123, + data_recvd, + data_recvd + 1024, + ) + .unwrap(); + data_recvd += new_bytes; + assert_eq!(new_bytes, RECV_WINDOW - (INITIAL_OFFSET + INITIAL_BYTES)); + assert!(!is_closed); + + let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW); + assert!(!transmit.should_transmit()); + assert_eq!( + max_stream_data, RECV_WINDOW, + "stream flow control credit isn't issued after stop" + ); + + // Receive bytes 0..3 + let (new_bytes, is_closed) = s + .ingest( + frame::Stream { + id: StreamId::new(Side::Client, Dir::Uni, 0), + offset: 0, + fin: false, + data: Bytes::from_static(&[0; INITIAL_OFFSET as usize]), + }, + 123, + data_recvd, + data_recvd + 1024, + ) + .unwrap(); + assert_eq!( + new_bytes, 0, + "reordered frames don't issue connection-level flow control for stopped streams" + ); + assert!(!is_closed); + + let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW); + assert!(!transmit.should_transmit()); + assert_eq!( + max_stream_data, RECV_WINDOW, + "stream flow control credit isn't issued after stop" + ); + } +} diff --git a/quinn-proto/src/connection/streams/state.rs b/quinn-proto/src/connection/streams/state.rs index 76ce2c178..c71a7e728 100644 --- a/quinn-proto/src/connection/streams/state.rs +++ b/quinn-proto/src/connection/streams/state.rs @@ -298,7 +298,7 @@ impl StreamsState { } self.on_stream_frame(!stopped, id); - // Update flow control + // Update connection-level flow control Ok(if bytes_read != final_offset.into_inner() { // bytes_read is always <= end, so this won't underflow. self.data_recvd = self