Skip to content

Commit

Permalink
Backport supress stateless packet to 0.10.x (#1598)
Browse files Browse the repository at this point in the history
  • Loading branch information
lijunwangs authored Jul 13, 2023
1 parent c700d2f commit 53083f4
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 3 deletions.
64 changes: 62 additions & 2 deletions quinn-proto/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,20 @@ pub struct Endpoint {
server_config: Option<Arc<ServerConfig>>,
/// Whether the underlying UDP socket promises not to fragment packets
allow_mtud: bool,
/// The contents length for packets in the transmits queue
transmit_queue_contents_len: usize,
/// The socket buffer aggregated contents length
/// `transmit_queue_contents_len` + `socket_buffer_fill` represents the total contents length
/// of outstanding outgoing packets.
socket_buffer_fill: usize,
}

/// The maximum size of content length of packets in the outgoing transmit queue. Transmit packets
/// generated from the endpoint (retry, initial close, stateless reset and version negotiation)
/// can be dropped when this limit is being execeeded.
/// Chose to represent 100 MB of data.
const MAX_TRANSMIT_QUEUE_CONTENTS_LEN: usize = 100_000_000;

impl Endpoint {
/// Create a new endpoint
///
Expand All @@ -88,13 +100,17 @@ impl Endpoint {
config,
server_config,
allow_mtud,
transmit_queue_contents_len: 0,
socket_buffer_fill: 0,
}
}

/// Get the next packet to transmit
#[must_use]
pub fn poll_transmit(&mut self) -> Option<Transmit> {
self.transmits.pop_front()
let t = self.transmits.pop_front();
self.decrement_transmit_queue_contents_len(t.as_ref().map_or(0, |t| t.contents.len()));
t
}

/// Replace the server configuration, affecting new incoming connections only
Expand Down Expand Up @@ -175,6 +191,9 @@ impl Endpoint {
debug!("dropping packet with unsupported version");
return None;
}
if self.stateless_packets_supressed() {
return None;
}
trace!("sending version negotiation");
// Negotiate versions
let mut buf = BytesMut::new();
Expand All @@ -193,6 +212,7 @@ impl Endpoint {
for &version in &self.config.supported_versions {
buf.write(version);
}
self.increment_transmit_queue_contents_len(buf.len());
self.transmits.push_back(Transmit {
destination: remote,
ecn: None,
Expand Down Expand Up @@ -323,6 +343,9 @@ impl Endpoint {
addresses: FourTuple,
dst_cid: &ConnectionId,
) {
if self.stateless_packets_supressed() {
return;
}
/// Minimum amount of padding for the stateless reset to look like a short-header packet
const MIN_PADDING_LEN: usize = 5;

Expand Down Expand Up @@ -355,7 +378,7 @@ impl Endpoint {
buf.extend_from_slice(&ResetToken::new(&*self.config.reset_key, dst_cid));

debug_assert!(buf.len() < inciting_dgram_len);

self.increment_transmit_queue_contents_len(buf.len());
self.transmits.push_back(Transmit {
destination: addresses.remote,
ecn: None,
Expand Down Expand Up @@ -447,6 +470,35 @@ impl Endpoint {
}
}

/// Limiting the memory usage for items queued in the outgoing queue from endpoint
/// generated packets. Otherwise, we may see a build-up of the queue under test with
/// flood of initial packets against the endpoint. The sender with the sender-limiter
/// may not keep up the pace of these packets queued into the queue.
fn stateless_packets_supressed(&self) -> bool {
self.transmit_queue_contents_len
.saturating_add(self.socket_buffer_fill)
>= MAX_TRANSMIT_QUEUE_CONTENTS_LEN
}

/// Increment the contents length in the transmit queue.
fn increment_transmit_queue_contents_len(&mut self, contents_len: usize) {
self.transmit_queue_contents_len = self
.transmit_queue_contents_len
.saturating_add(contents_len);
}

/// Decrement the contents length in the transmit queue.
fn decrement_transmit_queue_contents_len(&mut self, contents_len: usize) {
self.transmit_queue_contents_len = self
.transmit_queue_contents_len
.saturating_sub(contents_len);
}

/// Set the `socket_buffer_fill` to the input `len`
pub fn set_socket_buffer_fill(&mut self, len: usize) {
self.socket_buffer_fill = len;
}

fn handle_first_packet(
&mut self,
now: Instant,
Expand Down Expand Up @@ -521,6 +573,9 @@ impl Endpoint {

let (retry_src_cid, orig_dst_cid) = if server_config.use_retry {
if token.is_empty() {
if self.stateless_packets_supressed() {
return None;
}
// First Initial
let mut random_bytes = vec![0u8; RetryToken::RANDOM_BYTES_LEN];
self.rng.fill_bytes(&mut random_bytes);
Expand All @@ -544,6 +599,7 @@ impl Endpoint {
buf.extend_from_slice(&server_config.crypto.retry_tag(version, &dst_cid, &buf));
encode.finish(&mut buf, &*crypto.header.local, None);

self.increment_transmit_queue_contents_len(buf.len());
self.transmits.push_back(Transmit {
destination: addresses.remote,
ecn: None,
Expand Down Expand Up @@ -680,6 +736,9 @@ impl Endpoint {
local_id: &ConnectionId,
reason: TransportError,
) {
if self.stateless_packets_supressed() {
return;
}
let number = PacketNumber::U8(0);
let header = Header::Initial {
dst_cid: *remote_id,
Expand All @@ -700,6 +759,7 @@ impl Endpoint {
&*crypto.header.local,
Some((0, &*crypto.packet.local)),
);
self.increment_transmit_queue_contents_len(buf.len());
self.transmits.push_back(Transmit {
destination: addresses.remote,
ecn: None,
Expand Down
25 changes: 24 additions & 1 deletion quinn/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,8 @@ pub(crate) struct State {
recv_buf: Box<[u8]>,
send_limiter: WorkLimiter,
runtime: Arc<dyn Runtime>,
/// The packet contents length in the outgoing queue.
outgoing_queue_contents_len: usize,
}

#[derive(Debug)]
Expand Down Expand Up @@ -486,7 +488,9 @@ impl State {
.poll_send(&self.udp_state, cx, self.outgoing.as_slices().0)
{
Poll::Ready(Ok(n)) => {
self.outgoing.drain(..n);
let contents_len: usize =
self.outgoing.drain(..n).map(|t| t.contents.len()).sum();
self.decrement_outgoing_contents_len(contents_len);
// We count transmits instead of `poll_send` calls since the cost
// of a `sendmmsg` still linearily increases with number of packets.
self.send_limiter.record_work(n);
Expand Down Expand Up @@ -540,6 +544,8 @@ impl State {
}

fn queue_transmit(&mut self, t: proto::Transmit) {
let contents_len = t.contents.len();
self.increment_outgoing_queue_contents_len(contents_len);
self.outgoing.push_back(udp::Transmit {
destination: t.destination,
ecn: t.ecn.map(udp_ecn),
Expand All @@ -548,6 +554,22 @@ impl State {
src_ip: t.src_ip,
});
}

fn increment_outgoing_queue_contents_len(&mut self, contents_len: usize) {
self.outgoing_queue_contents_len = self
.outgoing_queue_contents_len
.saturating_add(contents_len);
self.inner
.set_socket_buffer_fill(self.outgoing_queue_contents_len);
}

fn decrement_outgoing_contents_len(&mut self, contents_len: usize) {
self.outgoing_queue_contents_len = self
.outgoing_queue_contents_len
.saturating_sub(contents_len);
self.inner
.set_socket_buffer_fill(self.outgoing_queue_contents_len);
}
}

#[inline]
Expand Down Expand Up @@ -689,6 +711,7 @@ impl EndpointRef {
recv_limiter: WorkLimiter::new(RECV_TIME_BOUND),
send_limiter: WorkLimiter::new(SEND_TIME_BOUND),
runtime,
outgoing_queue_contents_len: 0,
}),
}))
}
Expand Down

0 comments on commit 53083f4

Please sign in to comment.