Skip to content

Commit

Permalink
Don't buffer endpoint-generated datagrams
Browse files Browse the repository at this point in the history
Dropping these has low cost because they're not associated with any
connection.
  • Loading branch information
Ralith committed Apr 26, 2024
1 parent 9860617 commit d075733
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 109 deletions.
140 changes: 39 additions & 101 deletions quinn/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{
};

use crate::{
runtime::{default_runtime, AsyncUdpSocket, Runtime, UdpPoller},
runtime::{default_runtime, AsyncUdpSocket, Runtime},
udp_transmit,
};
use bytes::{Bytes, BytesMut};
Expand All @@ -29,8 +29,7 @@ use udp::{RecvMeta, BATCH_SIZE};

use crate::{
connection::Connecting, incoming::Incoming, work_limiter::WorkLimiter, ConnectionEvent,
EndpointConfig, VarInt, IO_LOOP_BOUND, MAX_INCOMING_CONNECTIONS,
MAX_TRANSMIT_QUEUE_CONTENTS_LEN, RECV_TIME_BOUND, SEND_TIME_BOUND,
EndpointConfig, VarInt, IO_LOOP_BOUND, MAX_INCOMING_CONNECTIONS, RECV_TIME_BOUND,
};

/// A QUIC endpoint.
Expand Down Expand Up @@ -224,7 +223,6 @@ impl Endpoint {
let addr = socket.local_addr()?;
let mut inner = self.inner.state.lock().unwrap();
inner.prev_socket = Some(mem::replace(&mut inner.socket, socket));
inner.io_poller = inner.socket.clone().create_io_poller();
inner.ipv6 = addr.is_ipv6();

// Update connection socket references
Expand Down Expand Up @@ -330,7 +328,6 @@ impl Future for EndpointDriver {
let mut keep_going = false;
keep_going |= endpoint.drive_recv(cx, now)?;
keep_going |= endpoint.handle_events(cx, &self.0.shared);
keep_going |= endpoint.drive_send(cx)?;

if !endpoint.recv_state.incoming.is_empty() {
self.0.shared.incoming.notify_waiters();
Expand Down Expand Up @@ -392,7 +389,7 @@ impl EndpointInner {
}
Err(error) => {
if let Some(transmit) = error.response {
state.transmit_state.respond(transmit, response_buffer);
respond(transmit, &mut response_buffer, &*state.socket);
}
Err(error.cause)
}
Expand All @@ -403,14 +400,14 @@ impl EndpointInner {
let mut state = self.state.lock().unwrap();
let mut response_buffer = BytesMut::new();
let transmit = state.inner.refuse(incoming, &mut response_buffer);
state.transmit_state.respond(transmit, response_buffer);
respond(transmit, &mut response_buffer, &*state.socket);
}

pub(crate) fn retry(&self, incoming: proto::Incoming) -> Result<(), proto::RetryError> {
let mut state = self.state.lock().unwrap();
let mut response_buffer = BytesMut::new();
let transmit = state.inner.retry(incoming, &mut response_buffer)?;
state.transmit_state.respond(transmit, response_buffer);
respond(transmit, &mut response_buffer, &*state.socket);
Ok(())
}
}
Expand All @@ -421,17 +418,14 @@ pub(crate) struct State {
/// During an active migration, abandoned_socket receives traffic
/// until the first packet arrives on the new socket.
prev_socket: Option<Arc<dyn AsyncUdpSocket>>,
io_poller: Pin<Box<dyn UdpPoller>>,
inner: proto::Endpoint,
transmit_state: TransmitState,
recv_state: RecvState,
driver: Option<Waker>,
ipv6: bool,
events: mpsc::UnboundedReceiver<(ConnectionHandle, EndpointEvent)>,
/// Number of live handles that can be used to initiate or handle I/O; excludes the driver
ref_count: usize,
driver_lost: bool,
send_limiter: WorkLimiter,
runtime: Arc<dyn Runtime>,
}

Expand All @@ -446,24 +440,16 @@ impl State {
self.recv_state.recv_limiter.start_cycle();
if let Some(socket) = &self.prev_socket {
// We don't care about the `PollProgress` from old sockets.
let poll_res = self.recv_state.poll_socket(
cx,
&mut self.inner,
&mut self.transmit_state,
&**socket,
now,
);
let poll_res = self
.recv_state
.poll_socket(cx, &mut self.inner, &**socket, now);
if poll_res.is_err() {
self.prev_socket = None;
}
};
let poll_res = self.recv_state.poll_socket(
cx,
&mut self.inner,
&mut self.transmit_state,
&*self.socket,
now,
);
let poll_res = self
.recv_state
.poll_socket(cx, &mut self.inner, &*self.socket, now);
self.recv_state.recv_limiter.finish_cycle();
let poll_res = poll_res?;
if poll_res.received_connection_packet {
Expand All @@ -474,42 +460,6 @@ impl State {
Ok(poll_res.keep_going)
}

fn drive_send(&mut self, cx: &mut Context) -> Result<bool, io::Error> {
self.send_limiter.start_cycle();

let result = loop {
if self.transmit_state.outgoing.is_empty() {
break Ok(false);
}

if !self.send_limiter.allow_work() {
break Ok(true);
}

if self.io_poller.as_mut().poll_writable(cx)?.is_pending() {
break Ok(false);
}

match self.socket.try_send(self.transmit_state.transmits()) {
Ok(n) => {
self.transmit_state.dequeue(n);
// We count transmits instead of `try_send` calls since the cost
// of a `sendmmsg` still linearly increases with number of packets.
self.send_limiter.record_work(n);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
break Err(e);
}
}
};

self.send_limiter.finish_cycle();
result
}

fn handle_events(&mut self, cx: &mut Context, shared: &Shared) -> bool {
for _ in 0..IO_LOOP_BOUND {
let (ch, event) = match self.events.poll_recv(cx) {
Expand Down Expand Up @@ -543,40 +493,32 @@ impl State {
}
}

#[derive(Debug, Default)]
struct TransmitState {
outgoing: VecDeque<udp::Transmit>,
/// The aggregateed contents length of the packets in the transmit queue
contents_len: usize,
}

impl TransmitState {
fn respond(&mut self, transmit: proto::Transmit, mut response_buffer: BytesMut) {
// Limiting the memory usage for items queued in the outgoing queue from endpoint
// generated packets. Otherwise, we may see a build-up of the queue under test with
// flood of initial packets against the endpoint. The sender with the sender-limiter
// may not keep up the pace of these packets queued into the queue.
if self.contents_len >= MAX_TRANSMIT_QUEUE_CONTENTS_LEN {
return;
}

let contents_len = transmit.size;
self.outgoing.push_back(udp_transmit(
transmit,
response_buffer.split_to(contents_len).freeze(),
));
self.contents_len = self.contents_len.saturating_add(contents_len);
}

fn dequeue(&mut self, sent: usize) {
self.contents_len = self
.contents_len
.saturating_sub(self.outgoing.drain(..sent).map(|t| t.contents.len()).sum());
}

fn transmits(&self) -> &[udp::Transmit] {
self.outgoing.as_slices().0
}
fn respond(transmit: proto::Transmit, response_buffer: &mut BytesMut, socket: &dyn AsyncUdpSocket) {
// Send if there's kernel buffer space; otherwise, drop it
//
// As an endpoint-generated packet, we know this is an
// immediate, stateless response to an unconnected peer,
// one of:
//
// - A version negotiation response due to an unknown version
// - A `CLOSE` due to a malformed or unwanted connection attempt
// - A stateless reset due to an unrecognized connection
// - A `Retry` packet due to a connection attempt when
// `use_retry` is set
//
// In each case, a well-behaved peer can be trusted to retry a
// few times, which is guaranteed to produce the same response
// from us. Repeated failures might at worst cause a peer's new
// connection attempt to time out, which is acceptable if we're
// under such heavy load that there's never room for this code
// to transmit. This is morally equivalent to the packet getting
// lost due to congestion further along the link, which
// similarly relies on peer retries for recovery.
let contents_len = transmit.size;
_ = socket.try_send(&[udp_transmit(
transmit,
response_buffer.split_to(contents_len).freeze(),
)]);
}

#[inline]
Expand Down Expand Up @@ -687,17 +629,14 @@ impl EndpointRef {
idle: Notify::new(),
},
state: Mutex::new(State {
io_poller: socket.clone().create_io_poller(),
socket,
prev_socket: None,
inner,
transmit_state: TransmitState::default(),
ipv6,
events,
driver: None,
ref_count: 0,
driver_lost: false,
send_limiter: WorkLimiter::new(SEND_TIME_BOUND),
recv_state,
runtime,
}),
Expand Down Expand Up @@ -772,7 +711,6 @@ impl RecvState {
&mut self,
cx: &mut Context,
endpoint: &mut proto::Endpoint,
transmit_state: &mut TransmitState,
socket: &dyn AsyncUdpSocket,
now: Instant,
) -> Result<PollProgress, io::Error> {
Expand Down Expand Up @@ -814,7 +752,7 @@ impl RecvState {
} else {
let transmit =
endpoint.refuse(incoming, &mut response_buffer);
transmit_state.respond(transmit, response_buffer);
respond(transmit, &mut response_buffer, socket);
}
}
Some(DatagramEvent::ConnectionEvent(handle, event)) => {
Expand All @@ -828,7 +766,7 @@ impl RecvState {
.send(ConnectionEvent::Proto(event));
}
Some(DatagramEvent::Response(transmit)) => {
transmit_state.respond(transmit, response_buffer);
respond(transmit, &mut response_buffer, socket);
}
None => {}
}
Expand Down
8 changes: 0 additions & 8 deletions quinn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,6 @@ const IO_LOOP_BOUND: usize = 160;
/// batch of size 32 was observed to take 30us on some systems.
const RECV_TIME_BOUND: Duration = Duration::from_micros(50);

/// The maximum amount of time that should be spent in `sendmsg()` calls per endpoint iteration
const SEND_TIME_BOUND: Duration = Duration::from_micros(50);

/// The maximum size of content length of packets in the outgoing transmit queue. Transmit packets
/// generated from the endpoint (retry or initial close) can be dropped when this limit is being execeeded.
/// Chose to represent 100 MB of data.
const MAX_TRANSMIT_QUEUE_CONTENTS_LEN: usize = 100_000_000;

/// The maximum number of `IncomingConnection`s we allow to be enqueued at a time before we start
/// rejecting new `IncomingConnection`s automatically. Assuming each `IncomingConnection` accounts
/// for little over 1200 bytes of memory maximum, this should limit an endpoint's incoming
Expand Down

0 comments on commit d075733

Please sign in to comment.