Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send path validation responses to the correct remote #1746

Merged
merged 6 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 56 additions & 41 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ mod packet_crypto;
use packet_crypto::{PrevCrypto, ZeroRttCrypto};

mod paths;
use paths::PathData;
pub use paths::RttEstimator;
use paths::{PathData, PathResponses};

mod send_buffer;

Expand Down Expand Up @@ -197,7 +197,8 @@ pub struct Connection {
//
// Queued non-retransmittable 1-RTT data
//
path_response: Option<PathResponse>,
/// Responses to PATH_CHALLENGE frames
path_responses: PathResponses,
Ralith marked this conversation as resolved.
Show resolved Hide resolved
close: bool,

//
Expand Down Expand Up @@ -336,7 +337,7 @@ impl Connection {
#[cfg(not(test))]
packet_number_filter: PacketNumberFilter::new(&mut rng),

path_response: None,
path_responses: PathResponses::default(),
close: false,

ack_frequency: AckFrequencyState::new(get_max_ack_delay(
Expand Down Expand Up @@ -508,9 +509,7 @@ impl Connection {
builder.pad_to(MIN_INITIAL_SIZE);

builder.finish(self, buf);
self.stats.udp_tx.datagrams += 1;
self.stats.udp_tx.ios += 1;
self.stats.udp_tx.bytes += buf.len() as u64;
self.stats.udp_tx.on_sent(1, buf.len());
return Some(Transmit {
destination,
size: buf.len(),
Expand Down Expand Up @@ -561,7 +560,7 @@ impl Connection {
let mut buf_capacity = 0;

let mut coalesce = true;
let mut builder: Option<PacketBuilder> = None;
let mut builder_storage: Option<PacketBuilder> = None;
Ralith marked this conversation as resolved.
Show resolved Hide resolved
let mut sent_frames = None;
let mut pad_datagram = false;
let mut congestion_blocked = false;
Expand Down Expand Up @@ -598,7 +597,7 @@ impl Connection {
// Can we append more data into the current buffer?
// It is not safe to assume that `buf.len()` is the end of the data,
// since the last packet might not have been finished.
let buf_end = if let Some(builder) = &builder {
let buf_end = if let Some(builder) = &builder_storage {
buf.len().max(builder.min_size) + builder.tag_len
} else {
buf.len()
Expand All @@ -619,9 +618,10 @@ impl Connection {
// for starting another datagram. If there is any anti-amplification
// budget left, we always allow a full MTU to be sent
// (see https://github.com/quinn-rs/quinn/issues/1082)
if self.path.anti_amplification_blocked(
self.path.current_mtu() as u64 * num_datagrams as u64 + 1,
) {
if self
.path
.anti_amplification_blocked(self.path.current_mtu() as u64 * num_datagrams + 1)
{
trace!("blocked by anti-amplification");
break;
}
Expand All @@ -630,7 +630,7 @@ impl Connection {
// Tail loss probes must not be blocked by congestion, or a deadlock could arise
if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
// Assume the current packet will get padded to fill the full MTU
let untracked_bytes = if let Some(builder) = &builder {
let untracked_bytes = if let Some(builder) = &builder_storage {
buf_capacity - builder.partial_encode.start
} else {
0
Expand Down Expand Up @@ -664,7 +664,7 @@ impl Connection {
}

// Finish current packet
if let Some(mut builder) = builder.take() {
if let Some(mut builder) = builder_storage.take() {
// Pad the packet to make it suitable for sending with GSO
// which will always send the maximum PDU.
builder.pad_to(self.path.current_mtu());
Expand Down Expand Up @@ -694,7 +694,7 @@ impl Connection {
// We can append/coalesce the next packet into the current
// datagram.
// Finish current packet without adding extra padding
if let Some(builder) = builder.take() {
if let Some(builder) = builder_storage.take() {
builder.finish_and_track(now, self, sent_frames.take(), buf);
}
}
Expand All @@ -718,19 +718,19 @@ impl Connection {
}

debug_assert!(
builder.is_none() && sent_frames.is_none(),
builder_storage.is_none() && sent_frames.is_none(),
"Previous packet must have been finished"
);

// This should really be `builder.insert()`, but `Option::insert`
// is not stable yet. Since we `debug_assert!(builder.is_none())` it
// doesn't make any functional difference.
let builder = builder.get_or_insert(PacketBuilder::new(
let builder = builder_storage.get_or_insert(PacketBuilder::new(
now,
space_id,
buf,
buf_capacity,
(num_datagrams - 1) * (self.path.current_mtu() as usize),
(num_datagrams as usize - 1) * (self.path.current_mtu() as usize),
ack_eliciting,
self,
self.version,
Expand Down Expand Up @@ -796,6 +796,38 @@ impl Connection {
break;
}

// Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that path
// validation can occur while the link is saturated.
if space_id == SpaceId::Data && num_datagrams == 1 {
djc marked this conversation as resolved.
Show resolved Hide resolved
if let Some((token, remote)) = self.path_responses.pop_off_path(&self.path.remote) {
// `unwrap` guaranteed to succeed because `builder_storage` was populated just
// above.
let mut builder = builder_storage.take().unwrap();
trace!("PATH_RESPONSE {:08x} (off-path)", token);
buf.write(frame::Type::PATH_RESPONSE);
buf.write(token);
self.stats.frame_tx.path_response += 1;
builder.pad_to(MIN_INITIAL_SIZE);
builder.finish_and_track(
now,
self,
Some(SentFrames {
non_retransmits: true,
..SentFrames::default()
}),
buf,
);
self.stats.udp_tx.on_sent(1, buf.len());
return Some(Transmit {
destination: remote,
size: buf.len(),
ecn: None,
segment_size: None,
src_ip: self.local_ip,
});
}
}

let sent = self.populate_packet(
now,
space_id,
Expand Down Expand Up @@ -831,7 +863,7 @@ impl Connection {
}

// Finish the last packet
if let Some(mut builder) = builder {
if let Some(mut builder) = builder_storage {
if pad_datagram {
builder.pad_to(MIN_INITIAL_SIZE);
}
Expand Down Expand Up @@ -900,9 +932,7 @@ impl Connection {
trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);

self.stats.udp_tx.datagrams += num_datagrams as u64;
self.stats.udp_tx.bytes += buf.len() as u64;
self.stats.udp_tx.ios += 1;
self.stats.udp_tx.on_sent(num_datagrams, buf.len());

Some(Transmit {
destination: self.path.remote,
Expand Down Expand Up @@ -2585,16 +2615,7 @@ impl Connection {
close = Some(reason);
}
Frame::PathChallenge(token) => {
if self
.path_response
.as_ref()
.map_or(true, |x| x.packet <= number)
{
self.path_response = Some(PathResponse {
packet: number,
token,
});
}
self.path_responses.push(number, token, remote);
if remote == self.path.remote {
// PATH_CHALLENGE on active path, possible off-path packet forwarding
// attack. Send a non-probing packet to recover the active path.
Expand Down Expand Up @@ -3014,12 +3035,12 @@ impl Connection {

// PATH_RESPONSE
if buf.len() + 9 < max_size && space_id == SpaceId::Data {
if let Some(response) = self.path_response.take() {
if let Some(token) = self.path_responses.pop_on_path(&self.path.remote) {
sent.non_retransmits = true;
sent.requires_padding = true;
trace!("PATH_RESPONSE {:08x}", response.token);
trace!("PATH_RESPONSE {:08x}", token);
buf.write(frame::Type::PATH_RESPONSE);
buf.write(response.token);
buf.write(token);
self.stats.frame_tx.path_response += 1;
}
}
Expand Down Expand Up @@ -3417,7 +3438,7 @@ impl Connection {
.prev_path
.as_ref()
.map_or(false, |x| x.challenge_pending)
|| self.path_response.is_some()
|| !self.path_responses.is_empty()
|| !self.datagrams.outgoing.is_empty()
}

Expand Down Expand Up @@ -3615,12 +3636,6 @@ pub enum Event {
DatagramReceived,
}

struct PathResponse {
/// The packet number the corresponding PATH_CHALLENGE was received in
packet: u64,
token: u64,
}

fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
if x > y {
x - y
Expand Down
69 changes: 69 additions & 0 deletions quinn-proto/src/connection/paths.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::{cmp, net::SocketAddr, time::Duration, time::Instant};

use tracing::trace;

use super::{mtud::MtuDiscovery, pacing::Pacer};
use crate::{config::MtuDiscoveryConfig, congestion, packet::SpaceId, TIMER_GRANULARITY};

Expand Down Expand Up @@ -164,3 +166,70 @@ impl RttEstimator {
}
}
}

#[derive(Default)]
pub(crate) struct PathResponses {
pending: Vec<PathResponse>,
}

impl PathResponses {
pub(crate) fn push(&mut self, packet: u64, token: u64, remote: SocketAddr) {
/// Arbitrary permissive limit to prevent abuse
const MAX_PATH_RESPONSES: usize = 16;
let response = PathResponse {
packet,
token,
remote,
};
let existing = self.pending.iter_mut().find(|x| x.remote == remote);
if let Some(existing) = existing {
// Update a queued response
if existing.packet <= packet {
*existing = response;
}
return;
}
if self.pending.len() < MAX_PATH_RESPONSES {
self.pending.push(response);
} else {
// We don't expect to ever hit this with well-behaved peers, so we don't bother dropping
// older challenges.
trace!("ignoring excessive PATH_CHALLENGE");
}
}

pub(crate) fn pop_off_path(&mut self, remote: &SocketAddr) -> Option<(u64, SocketAddr)> {
let response = *self.pending.last()?;
djc marked this conversation as resolved.
Show resolved Hide resolved
if response.remote == *remote {
// We don't bother searching further because we expect that the on-path response will
// get drained in the immediate future by a call to `pop_on_path`
return None;
}
self.pending.pop();
Some((response.token, response.remote))
}

pub(crate) fn pop_on_path(&mut self, remote: &SocketAddr) -> Option<u64> {
let response = *self.pending.last()?;
if response.remote != *remote {
// We don't bother searching further because we expect that the off-path response will
// get drained in the immediate future by a call to `pop_off_path`
return None;
}
self.pending.pop();
Some(response.token)
}

pub(crate) fn is_empty(&self) -> bool {
self.pending.is_empty()
}
}

#[derive(Copy, Clone)]
struct PathResponse {
/// The packet number the corresponding PATH_CHALLENGE was received in
packet: u64,
token: u64,
/// The address the corresponding PATH_CHALLENGE was received from
remote: SocketAddr,
}
8 changes: 8 additions & 0 deletions quinn-proto/src/connection/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ pub struct UdpStats {
pub ios: u64,
}

impl UdpStats {
pub(crate) fn on_sent(&mut self, datagrams: u64, bytes: usize) {
self.datagrams += datagrams;
self.bytes += bytes as u64;
self.ios += 1;
}
}

/// Number of frames transmitted of each frame type
#[derive(Default, Copy, Clone)]
#[non_exhaustive]
Expand Down
Loading