Skip to content

Commit

Permalink
Replace BytesMut transmit buffers with Vec
Browse files Browse the repository at this point in the history
We no longer need to share ownership of this memory, so we should use
a simpler type to reflect our simpler requirements.
  • Loading branch information
Ralith committed Dec 21, 2023
1 parent c0d1af7 commit ac8a1ad
Show file tree
Hide file tree
Showing 11 changed files with 43 additions and 37 deletions.
4 changes: 2 additions & 2 deletions quinn-proto/src/connection/datagrams.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::VecDeque;

use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use thiserror::Error;
use tracing::{debug, trace};

Expand Down Expand Up @@ -127,7 +127,7 @@ impl DatagramState {
Ok(was_empty)
}

pub(super) fn write(&mut self, buf: &mut BytesMut, max_size: usize) -> bool {
pub(super) fn write(&mut self, buf: &mut Vec<u8>, max_size: usize) -> bool {
let datagram = match self.outgoing.pop_front() {
Some(x) => x,
None => return false,
Expand Down
6 changes: 3 additions & 3 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ impl Connection {
&mut self,
now: Instant,
max_datagrams: usize,
buf: &mut BytesMut,
buf: &mut Vec<u8>,
) -> Option<Transmit> {
assert!(max_datagrams != 0);
let max_datagrams = match self.config.enable_segmentation_offload {
Expand Down Expand Up @@ -2920,7 +2920,7 @@ impl Connection {
&mut self,
now: Instant,
space_id: SpaceId,
buf: &mut BytesMut,
buf: &mut Vec<u8>,
max_size: usize,
pn: u64,
) -> SentFrames {
Expand Down Expand Up @@ -3139,7 +3139,7 @@ impl Connection {
receiving_ecn: bool,
sent: &mut SentFrames,
space: &mut PacketSpace,
buf: &mut BytesMut,
buf: &mut Vec<u8>,
stats: &mut ConnectionStats,
) {
debug_assert!(!space.pending_acks.ranges().is_empty());
Expand Down
8 changes: 4 additions & 4 deletions quinn-proto/src/connection/packet_builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::Instant;

use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use rand::Rng;
use tracing::{trace, trace_span};

Expand Down Expand Up @@ -32,7 +32,7 @@ impl PacketBuilder {
pub(super) fn new(
now: Instant,
space_id: SpaceId,
buffer: &mut BytesMut,
buffer: &mut Vec<u8>,
buffer_capacity: usize,
datagram_start: usize,
ack_eliciting: bool,
Expand Down Expand Up @@ -174,7 +174,7 @@ impl PacketBuilder {
now: Instant,
conn: &mut Connection,
sent: Option<SentFrames>,
buffer: &mut BytesMut,
buffer: &mut Vec<u8>,
) {
let ack_eliciting = self.ack_eliciting;
let exact_number = self.exact_number;
Expand Down Expand Up @@ -217,7 +217,7 @@ impl PacketBuilder {
}

/// Encrypt packet, returning the length of the packet and whether padding was added
pub(super) fn finish(self, conn: &mut Connection, buffer: &mut BytesMut) -> (usize, bool) {
pub(super) fn finish(self, conn: &mut Connection, buffer: &mut Vec<u8>) -> (usize, bool) {
let pad = buffer.len() < self.min_size;
if pad {
trace!("PADDING * {}", self.min_size - buffer.len());
Expand Down
12 changes: 6 additions & 6 deletions quinn-proto/src/connection/streams/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
mem,
};

use bytes::{BufMut, BytesMut};
use bytes::BufMut;
use rustc_hash::FxHashMap;
use tracing::{debug, trace};

Expand Down Expand Up @@ -365,7 +365,7 @@ impl StreamsState {

pub(in crate::connection) fn write_control_frames(
&mut self,
buf: &mut BytesMut,
buf: &mut Vec<u8>,
pending: &mut Retransmits,
retransmits: &mut ThinRetransmits,
stats: &mut FrameStats,
Expand Down Expand Up @@ -491,7 +491,7 @@ impl StreamsState {

pub(crate) fn write_stream_frames(
&mut self,
buf: &mut BytesMut,
buf: &mut Vec<u8>,
max_buf_size: usize,
) -> StreamMetaVec {
let mut stream_frames = StreamMetaVec::new();
Expand Down Expand Up @@ -925,7 +925,7 @@ mod tests {
connection::State as ConnState, connection::Streams, ReadableError, RecvStream, SendStream,
TransportErrorCode, WriteError,
};
use bytes::{Bytes, BytesMut};
use bytes::Bytes;

fn make(side: Side) -> StreamsState {
StreamsState::new(
Expand Down Expand Up @@ -1317,7 +1317,7 @@ mod tests {
high.set_priority(1).unwrap();
high.write(b"high").unwrap();

let mut buf = BytesMut::with_capacity(40);
let mut buf = Vec::with_capacity(40);
let meta = server.write_stream_frames(&mut buf, 40);
assert_eq!(meta[0].id, id_high);
assert_eq!(meta[1].id, id_mid);
Expand Down Expand Up @@ -1376,7 +1376,7 @@ mod tests {
};
high.set_priority(-1).unwrap();

let mut buf = BytesMut::with_capacity(1000);
let mut buf = Vec::with_capacity(1000);
let meta = server.write_stream_frames(&mut buf, 40);
assert_eq!(meta.len(), 1);
assert_eq!(meta[0].id, id_high);
Expand Down
8 changes: 4 additions & 4 deletions quinn-proto/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl Endpoint {
local_ip: Option<IpAddr>,
ecn: Option<EcnCodepoint>,
data: BytesMut,
buf: &mut BytesMut,
buf: &mut Vec<u8>,
) -> Option<DatagramEvent> {
let datagram_len = data.len();
let (first_decode, remaining) = match PartialDecode::new(
Expand Down Expand Up @@ -267,7 +267,7 @@ impl Endpoint {
inciting_dgram_len: usize,
addresses: FourTuple,
dst_cid: &ConnectionId,
buf: &mut BytesMut,
buf: &mut Vec<u8>,
) -> Option<Transmit> {
/// Minimum amount of padding for the stateless reset to look like a short-header packet
const MIN_PADDING_LEN: usize = 5;
Expand Down Expand Up @@ -404,7 +404,7 @@ impl Endpoint {
mut packet: Packet,
rest: Option<BytesMut>,
crypto: &Keys,
buf: &mut BytesMut,
buf: &mut Vec<u8>,
) -> Option<DatagramEvent> {
let (src_cid, dst_cid, token, packet_number, version) = match packet.header {
Header::Initial {
Expand Down Expand Up @@ -633,7 +633,7 @@ impl Endpoint {
crypto: &Keys,
remote_id: &ConnectionId,
reason: TransportError,
buf: &mut BytesMut,
buf: &mut Vec<u8>,
) -> Transmit {
// We don't need to worry about CID collisions in initial closes because the peer
// shouldn't respond, and if it does, and the CID collides, we'll just drop the
Expand Down
4 changes: 2 additions & 2 deletions quinn-proto/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
ops::{Range, RangeInclusive},
};

use bytes::{Buf, BufMut, Bytes, BytesMut};
use bytes::{Buf, BufMut, Bytes};
use tinyvec::TinyVec;

use crate::{
Expand Down Expand Up @@ -888,7 +888,7 @@ impl FrameStruct for Datagram {
}

impl Datagram {
pub(crate) fn encode(&self, length: bool, out: &mut BytesMut) {
pub(crate) fn encode(&self, length: bool, out: &mut Vec<u8>) {
out.write(Type(*DATAGRAM_TYS.start() | u64::from(length))); // 1 byte
if length {
// Safe to unwrap because we check length sanity before queueing datagrams
Expand Down
6 changes: 3 additions & 3 deletions quinn-proto/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ pub(crate) enum Header {
}

impl Header {
pub(crate) fn encode(&self, w: &mut BytesMut) -> PartialEncode {
pub(crate) fn encode(&self, w: &mut Vec<u8>) -> PartialEncode {
use self::Header::*;
let start = w.len();
match *self {
Expand Down Expand Up @@ -846,7 +846,7 @@ mod tests {

let dcid = ConnectionId::new(&hex!("06b858ec6f80452b"));
let client = initial_keys(Version::V1, &dcid, Side::Client);
let mut buf = BytesMut::new();
let mut buf = Vec::new();
let header = Header::Initial {
number: PacketNumber::U8(0),
src_cid: ConnectionId::new(&[]),
Expand Down Expand Up @@ -877,7 +877,7 @@ mod tests {

let server = initial_keys(Version::V1, &dcid, Side::Server);
let supported_versions = DEFAULT_SUPPORTED_VERSIONS.to_vec();
let decode = PartialDecode::new(buf, 0, &supported_versions, false)
let decode = PartialDecode::new(buf.as_slice().into(), 0, &supported_versions, false)
.unwrap()
.0;
let mut packet = decode.finish(Some(&*server.header.remote)).unwrap();
Expand Down
8 changes: 4 additions & 4 deletions quinn-proto/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
};

use assert_matches::assert_matches;
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use hex_literal::hex;
use rand::RngCore;
use ring::hmac;
Expand All @@ -34,7 +34,7 @@ fn version_negotiate_server() {
None,
);
let now = Instant::now();
let mut buf = BytesMut::with_capacity(server.config().get_max_udp_payload_size() as usize);
let mut buf = Vec::with_capacity(server.config().get_max_udp_payload_size() as usize);
let event = server.handle(
now,
client_addr,
Expand Down Expand Up @@ -76,7 +76,7 @@ fn version_negotiate_client() {
.connect(Instant::now(), client_config(), server_addr, "localhost")
.unwrap();
let now = Instant::now();
let mut buf = BytesMut::with_capacity(client.config().get_max_udp_payload_size() as usize);
let mut buf = Vec::with_capacity(client.config().get_max_udp_payload_size() as usize);
let opt_event = client.handle(
now,
server_addr,
Expand Down Expand Up @@ -1941,7 +1941,7 @@ fn malformed_token_len() {
true,
None,
);
let mut buf = BytesMut::with_capacity(server.config().get_max_udp_payload_size() as usize);
let mut buf = Vec::with_capacity(server.config().get_max_udp_payload_size() as usize);
server.handle(
Instant::now(),
client_addr,
Expand Down
16 changes: 11 additions & 5 deletions quinn-proto/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ impl TestEndpoint {
}
}
let buffer_size = self.endpoint.config().get_max_udp_payload_size() as usize;
let mut buf = BytesMut::with_capacity(buffer_size);
let mut buf = Vec::with_capacity(buffer_size);

while self.inbound.front().map_or(false, |x| x.0 <= now) {
let (recv_time, ecn, packet) = self.inbound.pop_front().unwrap();
Expand All @@ -354,8 +354,11 @@ impl TestEndpoint {
}
DatagramEvent::Response(transmit) => {
let size = transmit.size;
self.outbound
.extend(split_transmit(transmit, buf.split_to(size).freeze()));
self.outbound.extend(split_transmit(
transmit,
Bytes::copy_from_slice(&buf[..size]),
));
buf.clear();
}
}
}
Expand All @@ -380,8 +383,11 @@ impl TestEndpoint {
}
while let Some(transmit) = conn.poll_transmit(now, MAX_DATAGRAMS, &mut buf) {
let size = transmit.size;
self.outbound
.extend(split_transmit(transmit, buf.split_to(size).freeze()));
self.outbound.extend(split_transmit(
transmit,
Bytes::copy_from_slice(&buf[..size]),
));
buf.clear();
}
self.timeout = conn.poll_timeout();
}
Expand Down
6 changes: 3 additions & 3 deletions quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
time::{Duration, Instant},
};

use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use pin_project_lite::pin_project;
use rustc_hash::FxHashMap;
use thiserror::Error;
Expand Down Expand Up @@ -795,7 +795,7 @@ impl ConnectionRef {
io_poller: socket.clone().create_io_poller(),
socket,
runtime,
send_buffer: BytesMut::new(),
send_buffer: Vec::new(),
buffered_transmit: None,
}),
shared: Shared::default(),
Expand Down Expand Up @@ -876,7 +876,7 @@ pub(crate) struct State {
socket: Arc<dyn AsyncUdpSocket>,
io_poller: Pin<Box<dyn UdpPoller>>,
runtime: Arc<dyn Runtime>,
send_buffer: BytesMut,
send_buffer: Vec<u8>,
/// We buffer a transmit when the underlying I/O would block
buffered_transmit: Option<proto::Transmit>,
}
Expand Down
2 changes: 1 addition & 1 deletion quinn/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ impl State {
let mut data: BytesMut = buf[0..meta.len].into();
while !data.is_empty() {
let buf = data.split_to(meta.stride.min(data.len()));
let mut response_buffer = BytesMut::new();
let mut response_buffer = Vec::new();
match self.inner.handle(
now,
meta.addr,
Expand Down

0 comments on commit ac8a1ad

Please sign in to comment.