Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allocate Incoming response buffers as needed #1811

Merged
merged 1 commit into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 13 additions & 22 deletions quinn/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,12 +359,9 @@ pub(crate) struct EndpointInner {
}

impl EndpointInner {
pub(crate) fn accept(
&self,
incoming: proto::Incoming,
mut response_buffer: BytesMut,
) -> Result<Connecting, ConnectionError> {
pub(crate) fn accept(&self, incoming: proto::Incoming) -> Result<Connecting, ConnectionError> {
let mut state = self.state.lock().unwrap();
let mut response_buffer = BytesMut::new();
match state
.inner
.accept(incoming, Instant::now(), &mut response_buffer)
Expand All @@ -383,25 +380,19 @@ impl EndpointInner {
}
}

pub(crate) fn refuse(&self, incoming: proto::Incoming, mut response_buffer: BytesMut) {
pub(crate) fn refuse(&self, incoming: proto::Incoming) {
let mut state = self.state.lock().unwrap();
let mut response_buffer = BytesMut::new();
let transmit = state.inner.refuse(incoming, &mut response_buffer);
state.transmit_state.respond(transmit, response_buffer);
}

pub(crate) fn retry(
&self,
incoming: proto::Incoming,
mut response_buffer: BytesMut,
) -> Result<(), (proto::RetryError, BytesMut)> {
pub(crate) fn retry(&self, incoming: proto::Incoming) -> Result<(), proto::RetryError> {
let mut state = self.state.lock().unwrap();
match state.inner.retry(incoming, &mut response_buffer) {
Ok(transmit) => {
state.transmit_state.respond(transmit, response_buffer);
Ok(())
}
Err(e) => Err((e, response_buffer)),
}
let mut response_buffer = BytesMut::new();
let transmit = state.inner.retry(incoming, &mut response_buffer)?;
state.transmit_state.respond(transmit, response_buffer);
Ok(())
}
}

Expand All @@ -410,7 +401,7 @@ pub(crate) struct State {
socket: Arc<dyn AsyncUdpSocket>,
inner: proto::Endpoint,
transmit_state: TransmitState,
incoming: VecDeque<(proto::Incoming, BytesMut)>,
incoming: VecDeque<proto::Incoming>,
driver: Option<Waker>,
ipv6: bool,
connections: ConnectionSet,
Expand Down Expand Up @@ -464,7 +455,7 @@ impl State {
) {
Some(DatagramEvent::NewConnection(incoming)) => {
if self.incoming.len() < MAX_INCOMING_CONNECTIONS {
self.incoming.push_back((incoming, response_buffer));
self.incoming.push_back(incoming);
} else {
let transmit =
self.inner.refuse(incoming, &mut response_buffer);
Expand Down Expand Up @@ -707,10 +698,10 @@ impl<'a> Future for Accept<'a> {
if endpoint.driver_lost {
return Poll::Ready(None);
}
if let Some((incoming, response_buffer)) = endpoint.incoming.pop_front() {
if let Some(incoming) = endpoint.incoming.pop_front() {
// Release the mutex lock on endpoint so cloning it doesn't deadlock
drop(endpoint);
let incoming = Incoming::new(incoming, this.endpoint.inner.clone(), response_buffer);
let incoming = Incoming::new(incoming, this.endpoint.inner.clone());
return Poll::Ready(Some(incoming));
}
if endpoint.connections.close.is_some() {
Expand Down
50 changes: 13 additions & 37 deletions quinn/src/incoming.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::{
fmt,
future::{Future, IntoFuture},
net::{IpAddr, SocketAddr},
pin::Pin,
task::{Context, Poll},
};

use bytes::BytesMut;
use proto::ConnectionError;
use thiserror::Error;

Expand All @@ -16,48 +14,37 @@ use crate::{
};

/// An incoming connection for which the server has not yet begun its part of the handshake
#[derive(Debug)]
pub struct Incoming(Option<State>);

impl Incoming {
pub(crate) fn new(
inner: proto::Incoming,
endpoint: EndpointRef,
response_buffer: BytesMut,
) -> Self {
Self(Some(State {
inner,
endpoint,
response_buffer,
}))
pub(crate) fn new(inner: proto::Incoming, endpoint: EndpointRef) -> Self {
Self(Some(State { inner, endpoint }))
}

/// Attempt to accept this incoming connection (an error may still occur)
pub fn accept(mut self) -> Result<Connecting, ConnectionError> {
let state = self.0.take().unwrap();
state.endpoint.accept(state.inner, state.response_buffer)
state.endpoint.accept(state.inner)
}

/// Reject this incoming connection attempt
pub fn refuse(mut self) {
let state = self.0.take().unwrap();
state.endpoint.refuse(state.inner, state.response_buffer);
state.endpoint.refuse(state.inner);
}

/// Respond with a retry packet, requiring the client to retry with address validation
///
/// Errors if `remote_address_validated()` is true.
pub fn retry(mut self) -> Result<(), RetryError> {
let state = self.0.take().unwrap();
state
.endpoint
.retry(state.inner, state.response_buffer)
.map_err(|(e, response_buffer)| {
RetryError(Self(Some(State {
inner: e.into_incoming(),
endpoint: state.endpoint,
response_buffer,
})))
})
state.endpoint.retry(state.inner).map_err(|e| {
RetryError(Self(Some(State {
inner: e.into_incoming(),
endpoint: state.endpoint,
})))
})
}

/// Ignore this incoming connection attempt, not sending any packet in response
Expand Down Expand Up @@ -89,26 +76,15 @@ impl Drop for Incoming {
fn drop(&mut self) {
// Implicit reject, similar to Connection's implicit close
if let Some(state) = self.0.take() {
state.endpoint.refuse(state.inner, state.response_buffer);
state.endpoint.refuse(state.inner);
}
}
}

impl fmt::Debug for Incoming {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let state = self.0.as_ref().unwrap();
f.debug_struct("Incoming")
.field("inner", &state.inner)
.field("endpoint", &state.endpoint)
// response_buffer is too big and not meaningful enough
.finish_non_exhaustive()
}
}

#[derive(Debug)]
struct State {
inner: proto::Incoming,
endpoint: EndpointRef,
response_buffer: BytesMut,
}

/// Error for attempting to retry an [`Incoming`] which already bears an address
Expand Down
Loading