From 1289d92f98ccd6b256ede3d8dbd63dbb049075d4 Mon Sep 17 00:00:00 2001 From: Benjamin Saunders Date: Wed, 20 Dec 2023 19:36:50 -0800 Subject: [PATCH] Replace BytesMut transmit buffers with Vec We no longer need to share ownership of this memory, so we should use a simpler type to reflect our simpler requirements. --- quinn-proto/src/connection/datagrams.rs | 4 ++-- quinn-proto/src/connection/mod.rs | 6 +++--- quinn-proto/src/connection/packet_builder.rs | 8 ++++---- quinn-proto/src/connection/streams/state.rs | 12 ++++++------ quinn-proto/src/endpoint.rs | 8 ++++---- quinn-proto/src/frame.rs | 4 ++-- quinn-proto/src/packet.rs | 6 +++--- quinn-proto/src/tests/mod.rs | 8 ++++---- quinn-proto/src/tests/util.rs | 16 +++++++++++----- quinn/src/connection.rs | 6 +++--- quinn/src/endpoint.rs | 2 +- 11 files changed, 43 insertions(+), 37 deletions(-) diff --git a/quinn-proto/src/connection/datagrams.rs b/quinn-proto/src/connection/datagrams.rs index abadbe9166..a9dadaf7d2 100644 --- a/quinn-proto/src/connection/datagrams.rs +++ b/quinn-proto/src/connection/datagrams.rs @@ -1,6 +1,6 @@ use std::collections::VecDeque; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use thiserror::Error; use tracing::{debug, trace}; @@ -127,7 +127,7 @@ impl DatagramState { Ok(was_empty) } - pub(super) fn write(&mut self, buf: &mut BytesMut, max_size: usize) -> bool { + pub(super) fn write(&mut self, buf: &mut Vec, max_size: usize) -> bool { let datagram = match self.outgoing.pop_front() { Some(x) => x, None => return false, diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 4798a12111..7caea178a2 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -459,7 +459,7 @@ impl Connection { &mut self, now: Instant, max_datagrams: usize, - buf: &mut BytesMut, + buf: &mut Vec, ) -> Option { assert!(max_datagrams != 0); let max_datagrams = match self.config.enable_segmentation_offload { @@ -2904,7 +2904,7 @@ impl Connection { &mut self, now: Instant, space_id: SpaceId, - buf: &mut BytesMut, + buf: &mut Vec, max_size: usize, pn: u64, ) -> SentFrames { @@ -3123,7 +3123,7 @@ impl Connection { receiving_ecn: bool, sent: &mut SentFrames, space: &mut PacketSpace, - buf: &mut BytesMut, + buf: &mut Vec, stats: &mut ConnectionStats, ) { debug_assert!(!space.pending_acks.ranges().is_empty()); diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index c53d09a2db..8e362b0157 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -1,6 +1,6 @@ use std::time::Instant; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use rand::Rng; use tracing::{trace, trace_span}; @@ -32,7 +32,7 @@ impl PacketBuilder { pub(super) fn new( now: Instant, space_id: SpaceId, - buffer: &mut BytesMut, + buffer: &mut Vec, buffer_capacity: usize, datagram_start: usize, ack_eliciting: bool, @@ -174,7 +174,7 @@ impl PacketBuilder { now: Instant, conn: &mut Connection, sent: Option, - buffer: &mut BytesMut, + buffer: &mut Vec, ) { let ack_eliciting = self.ack_eliciting; let exact_number = self.exact_number; @@ -217,7 +217,7 @@ impl PacketBuilder { } /// Encrypt packet, returning the length of the packet and whether padding was added - pub(super) fn finish(self, conn: &mut Connection, buffer: &mut BytesMut) -> (usize, bool) { + pub(super) fn finish(self, conn: &mut Connection, buffer: &mut Vec) -> (usize, bool) { let pad = buffer.len() < self.min_size; if pad { trace!("PADDING * {}", self.min_size - buffer.len()); diff --git a/quinn-proto/src/connection/streams/state.rs b/quinn-proto/src/connection/streams/state.rs index a701d31bda..2d16262c71 100644 --- a/quinn-proto/src/connection/streams/state.rs +++ b/quinn-proto/src/connection/streams/state.rs @@ -4,7 +4,7 @@ use std::{ mem, }; -use bytes::{BufMut, BytesMut}; +use bytes::BufMut; use rustc_hash::FxHashMap; use tracing::{debug, trace}; @@ -349,7 +349,7 @@ impl StreamsState { pub(in crate::connection) fn write_control_frames( &mut self, - buf: &mut BytesMut, + buf: &mut Vec, pending: &mut Retransmits, retransmits: &mut ThinRetransmits, stats: &mut FrameStats, @@ -475,7 +475,7 @@ impl StreamsState { pub(crate) fn write_stream_frames( &mut self, - buf: &mut BytesMut, + buf: &mut Vec, max_buf_size: usize, ) -> StreamMetaVec { let mut stream_frames = StreamMetaVec::new(); @@ -874,7 +874,7 @@ mod tests { connection::State as ConnState, connection::Streams, ReadableError, RecvStream, SendStream, TransportErrorCode, WriteError, }; - use bytes::{Bytes, BytesMut}; + use bytes::Bytes; fn make(side: Side) -> StreamsState { StreamsState::new( @@ -1266,7 +1266,7 @@ mod tests { high.set_priority(1).unwrap(); high.write(b"high").unwrap(); - let mut buf = BytesMut::with_capacity(40); + let mut buf = Vec::with_capacity(40); let meta = server.write_stream_frames(&mut buf, 40); assert_eq!(meta[0].id, id_high); assert_eq!(meta[1].id, id_mid); @@ -1325,7 +1325,7 @@ mod tests { }; high.set_priority(-1).unwrap(); - let mut buf = BytesMut::with_capacity(1000); + let mut buf = Vec::with_capacity(1000); let meta = server.write_stream_frames(&mut buf, 40); assert_eq!(meta.len(), 1); assert_eq!(meta[0].id, id_high); diff --git a/quinn-proto/src/endpoint.rs b/quinn-proto/src/endpoint.rs index 3d61881d15..05e4bf0680 100644 --- a/quinn-proto/src/endpoint.rs +++ b/quinn-proto/src/endpoint.rs @@ -127,7 +127,7 @@ impl Endpoint { local_ip: Option, ecn: Option, data: BytesMut, - buf: &mut BytesMut, + buf: &mut Vec, ) -> Option { let datagram_len = data.len(); let (first_decode, remaining) = match PartialDecode::new( @@ -267,7 +267,7 @@ impl Endpoint { inciting_dgram_len: usize, addresses: FourTuple, dst_cid: &ConnectionId, - buf: &mut BytesMut, + buf: &mut Vec, ) -> Option { /// Minimum amount of padding for the stateless reset to look like a short-header packet const MIN_PADDING_LEN: usize = 5; @@ -404,7 +404,7 @@ impl Endpoint { mut packet: Packet, rest: Option, crypto: &Keys, - buf: &mut BytesMut, + buf: &mut Vec, ) -> Option { let (src_cid, dst_cid, token, packet_number, version) = match packet.header { Header::Initial { @@ -633,7 +633,7 @@ impl Endpoint { crypto: &Keys, remote_id: &ConnectionId, reason: TransportError, - buf: &mut BytesMut, + buf: &mut Vec, ) -> Transmit { // We don't need to worry about CID collisions in initial closes because the peer // shouldn't respond, and if it does, and the CID collides, we'll just drop the diff --git a/quinn-proto/src/frame.rs b/quinn-proto/src/frame.rs index fb953b0e77..e75781ae89 100644 --- a/quinn-proto/src/frame.rs +++ b/quinn-proto/src/frame.rs @@ -4,7 +4,7 @@ use std::{ ops::{Range, RangeInclusive}, }; -use bytes::{Buf, BufMut, Bytes, BytesMut}; +use bytes::{Buf, BufMut, Bytes}; use tinyvec::TinyVec; use crate::{ @@ -888,7 +888,7 @@ impl FrameStruct for Datagram { } impl Datagram { - pub(crate) fn encode(&self, length: bool, out: &mut BytesMut) { + pub(crate) fn encode(&self, length: bool, out: &mut Vec) { out.write(Type(*DATAGRAM_TYS.start() | u64::from(length))); // 1 byte if length { // Safe to unwrap because we check length sanity before queueing datagrams diff --git a/quinn-proto/src/packet.rs b/quinn-proto/src/packet.rs index b70eb98f78..13f670a40f 100644 --- a/quinn-proto/src/packet.rs +++ b/quinn-proto/src/packet.rs @@ -271,7 +271,7 @@ pub(crate) enum Header { } impl Header { - pub(crate) fn encode(&self, w: &mut BytesMut) -> PartialEncode { + pub(crate) fn encode(&self, w: &mut Vec) -> PartialEncode { use self::Header::*; let start = w.len(); match *self { @@ -846,7 +846,7 @@ mod tests { let dcid = ConnectionId::new(&hex!("06b858ec6f80452b")); let client = initial_keys(Version::V1, &dcid, Side::Client); - let mut buf = BytesMut::new(); + let mut buf = Vec::new(); let header = Header::Initial { number: PacketNumber::U8(0), src_cid: ConnectionId::new(&[]), @@ -877,7 +877,7 @@ mod tests { let server = initial_keys(Version::V1, &dcid, Side::Server); let supported_versions = DEFAULT_SUPPORTED_VERSIONS.to_vec(); - let decode = PartialDecode::new(buf, 0, &supported_versions, false) + let decode = PartialDecode::new(buf.as_slice().into(), 0, &supported_versions, false) .unwrap() .0; let mut packet = decode.finish(Some(&*server.header.remote)).unwrap(); diff --git a/quinn-proto/src/tests/mod.rs b/quinn-proto/src/tests/mod.rs index c41f3592d0..19e5dfe92f 100644 --- a/quinn-proto/src/tests/mod.rs +++ b/quinn-proto/src/tests/mod.rs @@ -7,7 +7,7 @@ use std::{ }; use assert_matches::assert_matches; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use hex_literal::hex; use rand::RngCore; use ring::hmac; @@ -34,7 +34,7 @@ fn version_negotiate_server() { None, ); let now = Instant::now(); - let mut buf = BytesMut::with_capacity(server.config().get_max_udp_payload_size() as usize); + let mut buf = Vec::with_capacity(server.config().get_max_udp_payload_size() as usize); let event = server.handle( now, client_addr, @@ -76,7 +76,7 @@ fn version_negotiate_client() { .connect(Instant::now(), client_config(), server_addr, "localhost") .unwrap(); let now = Instant::now(); - let mut buf = BytesMut::with_capacity(client.config().get_max_udp_payload_size() as usize); + let mut buf = Vec::with_capacity(client.config().get_max_udp_payload_size() as usize); let opt_event = client.handle( now, server_addr, @@ -1941,7 +1941,7 @@ fn malformed_token_len() { true, None, ); - let mut buf = BytesMut::with_capacity(server.config().get_max_udp_payload_size() as usize); + let mut buf = Vec::with_capacity(server.config().get_max_udp_payload_size() as usize); server.handle( Instant::now(), client_addr, diff --git a/quinn-proto/src/tests/util.rs b/quinn-proto/src/tests/util.rs index cd7bb11239..07ce9337af 100644 --- a/quinn-proto/src/tests/util.rs +++ b/quinn-proto/src/tests/util.rs @@ -331,7 +331,7 @@ impl TestEndpoint { } } let buffer_size = self.endpoint.config().get_max_udp_payload_size() as usize; - let mut buf = BytesMut::with_capacity(buffer_size); + let mut buf = Vec::with_capacity(buffer_size); while self.inbound.front().map_or(false, |x| x.0 <= now) { let (recv_time, ecn, packet) = self.inbound.pop_front().unwrap(); @@ -354,8 +354,11 @@ impl TestEndpoint { } DatagramEvent::Response(transmit) => { let size = transmit.size; - self.outbound - .extend(split_transmit(transmit, buf.split_to(size).freeze())); + self.outbound.extend(split_transmit( + transmit, + Bytes::copy_from_slice(&buf[..size]), + )); + buf.clear(); } } } @@ -380,8 +383,11 @@ impl TestEndpoint { } while let Some(transmit) = conn.poll_transmit(now, MAX_DATAGRAMS, &mut buf) { let size = transmit.size; - self.outbound - .extend(split_transmit(transmit, buf.split_to(size).freeze())); + self.outbound.extend(split_transmit( + transmit, + Bytes::copy_from_slice(&buf[..size]), + )); + buf.clear(); } self.timeout = conn.poll_timeout(); } diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index cb96f68428..9b5ac049ce 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -10,7 +10,7 @@ use std::{ time::{Duration, Instant}, }; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use pin_project_lite::pin_project; use rustc_hash::FxHashMap; use thiserror::Error; @@ -795,7 +795,7 @@ impl ConnectionRef { io_poller: socket.clone().create_io_poller(), socket, runtime, - send_buffer: BytesMut::new(), + send_buffer: Vec::new(), buffered_transmit: None, }), shared: Shared::default(), @@ -876,7 +876,7 @@ pub(crate) struct State { socket: Arc, io_poller: Pin>, runtime: Arc, - send_buffer: BytesMut, + send_buffer: Vec, /// We buffer a transmit when the underlying I/O would block buffered_transmit: Option, } diff --git a/quinn/src/endpoint.rs b/quinn/src/endpoint.rs index 4a1a01574e..d6ffe52cca 100644 --- a/quinn/src/endpoint.rs +++ b/quinn/src/endpoint.rs @@ -407,7 +407,7 @@ impl State { }); let mut iovs = unsafe { iovs.assume_init() }; let buffer_size = self.inner.config().get_max_udp_payload_size() as usize; - let mut buffer = BytesMut::with_capacity(buffer_size); + let mut buffer = Vec::with_capacity(buffer_size); loop { match self.socket.poll_recv(cx, &mut iovs, &mut metas) { Poll::Ready(Ok(msgs)) => {