Skip to content

Commit

Permalink
Merge branch 'main' into sync
Browse files Browse the repository at this point in the history
  • Loading branch information
BigWingBeat authored Mar 17, 2024
2 parents b47c50c + ffac4a3 commit 7493941
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 77 deletions.
104 changes: 37 additions & 67 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,6 @@ pub struct Connection {
//
// Congestion Control
//
/// Summary statistics of packets that have been sent, but not yet acked or deemed lost
in_flight: InFlight,
/// Whether the most recently received packet had an ECN codepoint set
receiving_ecn: bool,
/// Number of packets authenticated
Expand Down Expand Up @@ -347,7 +345,6 @@ impl Connection {
pto_count: 0,

app_limited: false,
in_flight: InFlight::new(),
receiving_ecn: false,
total_authed_packets: 0,

Expand Down Expand Up @@ -631,7 +628,7 @@ impl Connection {
debug_assert!(untracked_bytes <= self.path.current_mtu() as u64);

let bytes_to_send = u64::from(self.path.current_mtu()) + untracked_bytes;
if self.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
space_idx += 1;
congestion_blocked = true;
// We continue instead of breaking here in order to avoid
Expand Down Expand Up @@ -1321,7 +1318,7 @@ impl Connection {

let mut ack_eliciting_acked = false;
for packet in newly_acked.elts() {
if let Some(info) = self.spaces[space].sent_packets.remove(&packet) {
if let Some(info) = self.spaces[space].take(packet) {
if let Some(acked) = info.largest_acked {
// Assume ACKs for all packets below the largest acknowledged in `packet` have
// been received. This can cause the peer to spuriously retransmit if some of
Expand All @@ -1343,13 +1340,13 @@ impl Connection {
// Notify ack frequency that a packet was acked, because it might contain an ACK_FREQUENCY frame
self.ack_frequency.on_acked(packet);

self.on_packet_acked(now, space, info);
self.on_packet_acked(now, packet, info);
}
}

self.path.congestion.on_end_acks(
now,
self.in_flight.bytes,
self.path.in_flight.bytes,
self.app_limited,
self.spaces[space].largest_acked_packet,
);
Expand Down Expand Up @@ -1429,8 +1426,8 @@ impl Connection {

// Not timing-aware, so it's safe to call this for inferred acks, such as arise from
// high-latency handshakes
fn on_packet_acked(&mut self, now: Instant, space: SpaceId, info: SentPacket) {
self.remove_in_flight(space, &info);
fn on_packet_acked(&mut self, now: Instant, pn: u64, info: SentPacket) {
self.remove_in_flight(pn, &info);
if info.ack_eliciting && self.path.challenge.is_none() {
// Only pass ACKs to the congestion controller if we are not validating the current
// path, so as to ignore any ACKs from older paths still coming in.
Expand Down Expand Up @@ -1487,13 +1484,13 @@ impl Connection {
}
};
trace!(
in_flight = self.in_flight.bytes,
in_flight = self.path.in_flight.bytes,
count = self.pto_count,
?space,
"PTO fired"
);

let count = match self.in_flight.ack_eliciting {
let count = match self.path.in_flight.ack_eliciting {
// A PTO when we're not expecting any ACKs must be due to handshake anti-amplification
// deadlock preventions
0 => {
Expand Down Expand Up @@ -1582,7 +1579,7 @@ impl Connection {

// OnPacketsLost
if let Some(largest_lost) = lost_packets.last().cloned() {
let old_bytes_in_flight = self.in_flight.bytes;
let old_bytes_in_flight = self.path.in_flight.bytes;
let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
self.lost_packets += lost_packets.len() as u64;
self.stats.path.lost_packets += lost_packets.len() as u64;
Expand All @@ -1593,22 +1590,22 @@ impl Connection {
size_of_lost_packets
);

for packet in &lost_packets {
let info = self.spaces[pn_space].sent_packets.remove(packet).unwrap(); // safe: lost_packets is populated just above
self.remove_in_flight(pn_space, &info);
for &packet in &lost_packets {
let info = self.spaces[pn_space].take(packet).unwrap(); // safe: lost_packets is populated just above
self.remove_in_flight(packet, &info);
for frame in info.stream_frames {
self.streams.retransmit(frame);
}
self.spaces[pn_space].pending |= info.retransmits;
self.path.mtud.on_non_probe_lost(*packet, info.size);
self.path.mtud.on_non_probe_lost(packet, info.size);
}

if self.path.mtud.black_hole_detected(now) {
self.stats.path.black_holes_detected += 1;
}

// Don't apply congestion penalty for lost ack-only packets
let lost_ack_eliciting = old_bytes_in_flight != self.in_flight.bytes;
let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;

if lost_ack_eliciting {
self.stats.path.congestion_events += 1;
Expand All @@ -1623,11 +1620,8 @@ impl Connection {

// Handle a lost MTU probe
if let Some(packet) = lost_mtu_probe {
let info = self.spaces[SpaceId::Data]
.sent_packets
.remove(&packet)
.unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and therefore must not have been removed yet
self.remove_in_flight(SpaceId::Data, &info);
let info = self.spaces[SpaceId::Data].take(packet).unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and therefore must not have been removed yet
self.remove_in_flight(packet, &info);
self.path.mtud.on_probe_lost();
self.stats.path.lost_plpmtud_probes += 1;
}
Expand All @@ -1643,7 +1637,7 @@ impl Connection {
let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
let mut duration = self.path.rtt.pto_base() * backoff;

if self.in_flight.ack_eliciting == 0 {
if self.path.in_flight.ack_eliciting == 0 {
debug_assert!(!self.peer_completed_address_validation());
let space = match self.highest_space {
SpaceId::Handshake => SpaceId::Handshake,
Expand Down Expand Up @@ -1705,7 +1699,7 @@ impl Connection {
return;
}

if self.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
// There is nothing to detect lost, so no timer is set. However, the client needs to arm
// the timer if the server might be blocked by the anti-amplification limit.
self.timers.stop(Timer::LossDetection);
Expand Down Expand Up @@ -2023,9 +2017,10 @@ impl Connection {
space.crypto = None;
space.time_of_last_ack_eliciting_packet = None;
space.loss_time = None;
space.in_flight = 0;
let sent_packets = mem::take(&mut space.sent_packets);
for (_, packet) in sent_packets.into_iter() {
self.remove_in_flight(space_id, &packet);
for (pn, packet) in sent_packets.into_iter() {
self.remove_in_flight(pn, &packet);
}
self.set_loss_detection_timer(now)
}
Expand Down Expand Up @@ -2301,8 +2296,8 @@ impl Connection {
self.rem_handshake_cid = rem_cid;

let space = &mut self.spaces[SpaceId::Initial];
if let Some(info) = space.sent_packets.remove(&0) {
self.on_packet_acked(now, SpaceId::Initial, info);
if let Some(info) = space.take(0) {
self.on_packet_acked(now, 0, info);
};

self.discard_space(now, SpaceId::Initial); // Make sure we clean up after any retransmitted Initials
Expand All @@ -2322,8 +2317,8 @@ impl Connection {

// Retransmit all 0-RTT data
let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
for (_, info) in zero_rtt {
self.remove_in_flight(SpaceId::Data, &info);
for (pn, info) in zero_rtt {
self.remove_in_flight(pn, &info);
self.spaces[SpaceId::Data].pending |= info.retransmits;
}
self.streams.retransmit_all_for_0rtt();
Expand Down Expand Up @@ -2385,8 +2380,8 @@ impl Connection {
// Discard 0-RTT packets
let sent_packets =
mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
for (_, packet) in sent_packets {
self.remove_in_flight(SpaceId::Data, &packet);
for (pn, packet) in sent_packets {
self.remove_in_flight(pn, &packet);
}
} else {
self.accepted_0rtt = true;
Expand Down Expand Up @@ -2946,6 +2941,7 @@ impl Connection {
let mut sent = SentFrames::default();
let space = &mut self.spaces[space_id];
let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
space.pending_acks.maybe_ack_non_eliciting();

// HANDSHAKE_DONE
if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
Expand Down Expand Up @@ -2974,7 +2970,6 @@ impl Connection {

// ACK
if space.pending_acks.can_send() {
debug_assert!(!space.pending_acks.ranges().is_empty());
Self::populate_acks(
now,
self.receiving_ecn,
Expand Down Expand Up @@ -3358,7 +3353,7 @@ impl Connection {
/// acknowledged or declared lost.
#[cfg(test)]
pub(crate) fn bytes_in_flight(&self) -> u64 {
self.in_flight.bytes
self.path.in_flight.bytes
}

/// Number of bytes worth of non-ack-only packets that may be sent
Expand All @@ -3367,7 +3362,7 @@ impl Connection {
self.path
.congestion
.window()
.saturating_sub(self.in_flight.bytes)
.saturating_sub(self.path.in_flight.bytes)
}

/// Whether no timers but keepalive, idle, rtt and pushnewcid are running
Expand Down Expand Up @@ -3440,10 +3435,13 @@ impl Connection {
}

/// Update counters to account for a packet becoming acknowledged, lost, or abandoned
fn remove_in_flight(&mut self, space: SpaceId, packet: &SentPacket) {
self.in_flight.bytes -= u64::from(packet.size);
self.in_flight.ack_eliciting -= u64::from(packet.ack_eliciting);
self.spaces[space].in_flight -= u64::from(packet.size);
fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) {
// Visit known paths from newest to oldest to find the one `pn` was sent on
for path in [&mut self.path].into_iter().chain(self.prev_path.as_mut()) {
if path.remove_in_flight(pn, packet) {
return;
}
}
}

/// Terminate the connection instantly, without sending a close packet
Expand Down Expand Up @@ -3585,34 +3583,6 @@ mod state {
}
}

struct InFlight {
/// Sum of the sizes of all sent packets considered "in flight" by congestion control
///
/// The size does not include IP or UDP overhead. Packets only containing ACK frames do not
/// count towards this to ensure congestion control does not impede congestion feedback.
bytes: u64,
/// Number of packets in flight containing frames other than ACK and PADDING
///
/// This can be 0 even when bytes is not 0 because PADDING frames cause a packet to be
/// considered "in flight" by congestion control. However, if this is nonzero, bytes will always
/// also be nonzero.
ack_eliciting: u64,
}

impl InFlight {
fn new() -> Self {
Self {
bytes: 0,
ack_eliciting: 0,
}
}

fn insert(&mut self, packet: &SentPacket) {
self.bytes += u64::from(packet.size);
self.ack_eliciting += u64::from(packet.ack_eliciting);
}
}

/// Events of interest to the application
#[derive(Debug)]
pub enum Event {
Expand Down
4 changes: 2 additions & 2 deletions quinn-proto/src/connection/packet_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ impl PacketBuilder {
stream_frames: sent.stream_frames,
};

conn.in_flight.insert(&packet);
conn.spaces[space_id].sent(exact_number, packet);
conn.path
.sent(exact_number, packet, &mut conn.spaces[space_id]);
conn.stats.path.sent_packets += 1;
conn.reset_keep_alive(now);
if size != 0 {
Expand Down
Loading

0 comments on commit 7493941

Please sign in to comment.