Skip to content

Commit

Permalink
Suppress stateless packets when endpoint transmit queue is large
Browse files Browse the repository at this point in the history
  • Loading branch information
lijunwangs authored Jul 6, 2023
1 parent 6d3958f commit e652b6d
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 4 deletions.
34 changes: 30 additions & 4 deletions quinn/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use udp::{RecvMeta, UdpState, BATCH_SIZE};

use crate::{
connection::Connecting, work_limiter::WorkLimiter, ConnectionEvent, EndpointConfig,
EndpointEvent, VarInt, IO_LOOP_BOUND, RECV_TIME_BOUND, SEND_TIME_BOUND,
EndpointEvent, VarInt, IO_LOOP_BOUND, MAX_TRANSMIT_QUEUE_CONTENTS_LEN, RECV_TIME_BOUND,
SEND_TIME_BOUND,
};

/// A QUIC endpoint.
Expand Down Expand Up @@ -370,6 +371,8 @@ pub(crate) struct State {
recv_buf: Box<[u8]>,
send_limiter: WorkLimiter,
runtime: Arc<dyn Runtime>,
/// The aggregateed contents length of the packets in the transmit queue
transmit_queue_contents_len: usize,
}

#[derive(Debug)]
Expand Down Expand Up @@ -427,7 +430,19 @@ impl State {
.send(ConnectionEvent::Proto(event));
}
Some(DatagramEvent::Response(t)) => {
self.outgoing.push_back(udp_transmit(t));
// Limiting the memory usage for items queued in the outgoing queue from endpoint
// generated packets. Otherwise, we may see a build-up of the queue under test with
// flood of initial packets against the endpoint. The sender with the sender-limiter
// may not keep up the pace of these packets queued into the queue.
if self.transmit_queue_contents_len
< MAX_TRANSMIT_QUEUE_CONTENTS_LEN
{
let contents_len = t.contents.len();
self.outgoing.push_back(udp_transmit(t));
self.transmit_queue_contents_len = self
.transmit_queue_contents_len
.saturating_add(contents_len);
}
}
None => {}
}
Expand Down Expand Up @@ -473,7 +488,11 @@ impl State {
.poll_send(&self.udp_state, cx, self.outgoing.as_slices().0)
{
Poll::Ready(Ok(n)) => {
self.outgoing.drain(..n);
let contents_len: usize =
self.outgoing.drain(..n).map(|t| t.contents.len()).sum();
self.transmit_queue_contents_len = self
.transmit_queue_contents_len
.saturating_sub(contents_len);
// We count transmits instead of `poll_send` calls since the cost
// of a `sendmmsg` still linearly increases with number of packets.
self.send_limiter.record_work(n);
Expand Down Expand Up @@ -514,7 +533,13 @@ impl State {
.send(ConnectionEvent::Proto(event));
}
}
Transmit(t) => self.outgoing.push_back(udp_transmit(t)),
Transmit(t) => {
let contents_len = t.contents.len();
self.outgoing.push_back(udp_transmit(t));
self.transmit_queue_contents_len = self
.transmit_queue_contents_len
.saturating_add(contents_len);
}
},
Poll::Ready(None) => unreachable!("EndpointInner owns one sender"),
Poll::Pending => {
Expand Down Expand Up @@ -677,6 +702,7 @@ impl EndpointRef {
recv_limiter: WorkLimiter::new(RECV_TIME_BOUND),
send_limiter: WorkLimiter::new(SEND_TIME_BOUND),
runtime,
transmit_queue_contents_len: 0,
}),
}))
}
Expand Down
5 changes: 5 additions & 0 deletions quinn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,8 @@ const RECV_TIME_BOUND: Duration = Duration::from_micros(50);

/// The maximum amount of time that should be spent in `sendmsg()` calls per endpoint iteration
const SEND_TIME_BOUND: Duration = Duration::from_micros(50);

/// The maximum size of content length of packets in the outgoing transmit queue. Transmit packets
/// generated from the endpoint (retry or initial close) can be dropped when this limit is being execeeded.
/// Chose to represent 100 MB of data.
const MAX_TRANSMIT_QUEUE_CONTENTS_LEN: usize = 100_000_000;

0 comments on commit e652b6d

Please sign in to comment.