Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(udp): pre-allocate receive buffer #1707

Merged
merged 1 commit into from
Mar 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions neqo-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ fn to_headers(values: &[impl AsRef<str>]) -> Vec<Header> {

struct ClientRunner<'a> {
local_addr: SocketAddr,
socket: &'a udp::Socket,
socket: &'a mut udp::Socket,
client: Http3Client,
handler: Handler<'a>,
timeout: Option<Pin<Box<Sleep>>>,
Expand All @@ -773,7 +773,7 @@ struct ClientRunner<'a> {
impl<'a> ClientRunner<'a> {
fn new(
args: &'a mut Args,
socket: &'a udp::Socket,
socket: &'a mut udp::Socket,
local_addr: SocketAddr,
remote_addr: SocketAddr,
hostname: &str,
Expand Down Expand Up @@ -998,7 +998,7 @@ async fn main() -> Res<()> {
SocketAddr::V6(..) => SocketAddr::new(IpAddr::V6(Ipv6Addr::from([0; 16])), 0),
};

let socket = udp::Socket::bind(local_addr)?;
let mut socket = udp::Socket::bind(local_addr)?;
let real_local = socket.local_addr().unwrap();
println!(
"{} Client connecting: {:?} -> {:?}",
Expand All @@ -1022,7 +1022,7 @@ async fn main() -> Res<()> {
token = if args.use_old_http {
old::ClientRunner::new(
&args,
&socket,
&mut socket,
real_local,
remote_addr,
&hostname,
Expand All @@ -1034,7 +1034,7 @@ async fn main() -> Res<()> {
} else {
ClientRunner::new(
&mut args,
&socket,
&mut socket,
real_local,
remote_addr,
&hostname,
Expand Down Expand Up @@ -1249,7 +1249,7 @@ mod old {

pub struct ClientRunner<'a> {
local_addr: SocketAddr,
socket: &'a udp::Socket,
socket: &'a mut udp::Socket,
client: Connection,
handler: HandlerOld<'a>,
timeout: Option<Pin<Box<Sleep>>>,
Expand All @@ -1259,7 +1259,7 @@ mod old {
impl<'a> ClientRunner<'a> {
pub fn new(
args: &'a Args,
socket: &'a udp::Socket,
socket: &'a mut udp::Socket,
local_addr: SocketAddr,
remote_addr: SocketAddr,
origin: &str,
Expand Down
19 changes: 11 additions & 8 deletions neqo-common/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
pub struct Socket {
socket: tokio::net::UdpSocket,
state: UdpSocketState,
recv_buf: Vec<u8>,
}

impl Socket {
Expand All @@ -31,6 +32,7 @@
Ok(Self {
state: quinn_udp::UdpSocketState::new((&socket).into())?,
socket: tokio::net::UdpSocket::from_std(socket)?,
recv_buf: vec![0; u16::MAX as usize],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too large. I get that we don't want to overflow, but unless we are receiving multiple datagrams, we only need 2k or so (a constant might be a good idea for that limit).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this too large? This is about ~43 packets, which is smaller than realistic WAN congestion windows. Are you worried about Fx memory usage of this is per-socket?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore the comment. I was reading this patch on its own and missed the follow-up that includes multiple packets.

I do think that a constant is the right thing to use for this, rather than the (rather arbitrary) u16::MAX.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think that a constant is the right thing to use for this, rather than the (rather arbitrary) u16::MAX.

👍 I prepared #1709.

I was reading this patch on its own and missed the follow-up that includes multiple packets.

For others to follow, this is #1708.

})
}

Expand Down Expand Up @@ -70,15 +72,13 @@
}

/// Receive a UDP datagram on the specified socket.
pub fn recv(&self, local_address: &SocketAddr) -> Result<Option<Datagram>, io::Error> {
let mut buf = [0; u16::MAX as usize];

pub fn recv(&mut self, local_address: &SocketAddr) -> Result<Option<Datagram>, io::Error> {
let mut meta = RecvMeta::default();

match self.socket.try_io(Interest::READABLE, || {
self.state.recv(
(&self.socket).into(),
&mut [IoSliceMut::new(&mut buf)],
&mut [IoSliceMut::new(&mut self.recv_buf)],
slice::from_mut(&mut meta),
)
}) {
Expand All @@ -101,16 +101,19 @@
return Ok(None);
}

if meta.len == buf.len() {
eprintln!("Might have received more than {} bytes", buf.len());
if meta.len == self.recv_buf.len() {
eprintln!(
"Might have received more than {} bytes",
self.recv_buf.len()
);

Check warning on line 108 in neqo-common/src/udp.rs

View check run for this annotation

Codecov / codecov/patch

neqo-common/src/udp.rs#L105-L108

Added lines #L105 - L108 were not covered by tests
}

Ok(Some(Datagram::new(
meta.addr,
*local_address,
meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(),
None, // TODO: get the real TTL https://github.com/quinn-rs/quinn/issues/1749
&buf[..meta.len],
&self.recv_buf[..meta.len],
)))
Comment on lines 111 to 117
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that each call to Datagram::new allocates the datagram's data in a new Vec.

impl Datagram {
pub fn new<V: Into<Vec<u8>>>(
src: SocketAddr,
dst: SocketAddr,
tos: IpTos,
ttl: Option<u8>,
d: V,
) -> Self {
Self {
src,
dst,
tos,
ttl,
d: d.into(),
}
}

Changing this is a larger undertaking I would like to tackle with #1693.

}
}
Expand All @@ -124,7 +127,7 @@
async fn datagram_tos() -> Result<(), io::Error> {
let sender = Socket::bind("127.0.0.1:0")?;
let receiver_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let receiver = Socket::bind(receiver_addr)?;
let mut receiver = Socket::bind(receiver_addr)?;

let datagram = Datagram::new(
sender.local_addr()?,
Expand Down