Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

quinn_udp: use async_io instead of tokio #1183

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[workspace]
members = ["quinn", "quinn-proto", "interop", "bench", "perf", "fuzz"]
default-members = ["quinn", "quinn-proto", "interop", "bench", "perf"]
members = ["quinn", "quinn-proto", "quinn-udp", "interop", "bench", "perf", "fuzz"]
default-members = ["quinn", "quinn-proto", "quinn-udp", "interop", "bench", "perf"]

[profile.bench]
debug = true
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ This library is at [draft 32][current-draft].

- **quinn:** High-level async API based on tokio, see for usage. This will be used by most developers. (Basic benchmarks are included.)
- **quinn-proto:** Deterministic state machine of the protocol which performs [**no** I/O][sans-io] internally and is suitable for use with custom event loops (and potentially a C or C++ API).
- **quinn-udp:** UDP sockets with ECN information tuned for the protocol.
- **quinn-h3:** Contains an implementation of HTTP-3 and QPACK. It is split internally in a deterministic state machine and a tokio-based high-level async API.
- **bench:** Benchmarks without any framework.
- **interop:** Tooling that helps to run interoperability tests.
Expand Down
2 changes: 1 addition & 1 deletion quinn-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "quinn-proto"
version = "0.7.0"
authors = ["Benjamin Saunders <[email protected]>", "Dirkjan Ochtman <[email protected]>"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/djc/quinn"
repository = "https://github.com/quinn-rs/quinn"
description = "State machine for the QUIC transport protocol"
keywords = ["quic"]
categories = [ "network-programming", "asynchronous" ]
Expand Down
34 changes: 34 additions & 0 deletions quinn-udp/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[package]
name = "quinn-udp"
version = "0.7.0"
authors = ["Benjamin Saunders <[email protected]>", "Dirkjan Ochtman <[email protected]>", "David Craven <[email protected]>"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/quinn-rs/quinn"
description = "UDP sockets with ECN information for the QUIC transport protocol"
keywords = ["quic"]
categories = [ "network-programming", "asynchronous" ]
workspace = ".."
edition = "2018"

[package.metadata.docs.rs]
all-features = true

[badges]
maintenance = { status = "experimental" }

[dependencies]
async-io = "1.3.1"
futures-lite = "1.11.3"
libc = "0.2.69"
proto = { package = "quinn-proto", path = "../quinn-proto", version = "0.7" }
socket2 = "0.4"
tracing = "0.1.10"

[target.'cfg(unix)'.dependencies]
lazy_static = "1"

[dev-dependencies]
anyhow = "1.0.40"
async-global-executor = "2.0.2"
env_logger = "0.8.3"
log = "0.4.14"
66 changes: 66 additions & 0 deletions quinn-udp/examples/simple.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use anyhow::Result;
use std::io::IoSliceMut;
use std::net::Ipv4Addr;
use std::time::Instant;
use proto::{EcnCodepoint, Transmit};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not very happy about this. I guess it would be better to duplicate the definitions and convert it in quinn.

use quinn_udp::{RecvMeta, UdpSocket, BATCH_SIZE};

fn main() -> Result<()> {
env_logger::init();
let socket1 = UdpSocket::bind((Ipv4Addr::LOCALHOST, 0).into())?;
let socket2 = UdpSocket::bind((Ipv4Addr::LOCALHOST, 0).into())?;
let addr2 = socket2.local_addr()?;

let mut transmits = Vec::with_capacity(BATCH_SIZE);
for i in 0..BATCH_SIZE {
let contents = (i as u64).to_be_bytes().to_vec();
transmits.push(Transmit {
destination: addr2,
ecn: Some(EcnCodepoint::Ce),
segment_size: Some(1200),
contents,
src_ip: Some(Ipv4Addr::LOCALHOST.into()),
});
}

let task1 = async_global_executor::spawn(async move {
log::debug!("before send");
socket1.send(&transmits).await.unwrap();
log::debug!("after send");
});

let task2 = async_global_executor::spawn(async move {
let mut storage = [[0u8; 1200]; BATCH_SIZE];
let mut buffers = Vec::with_capacity(BATCH_SIZE);
let mut rest = &mut storage[..];
for _ in 0..BATCH_SIZE {
let (b, r) = rest.split_at_mut(1);
rest = r;
buffers.push(IoSliceMut::new(&mut b[0]));
}

let mut meta = [RecvMeta::default(); BATCH_SIZE];
let n = socket2.recv(&mut buffers, &mut meta).await.unwrap();
for i in 0..n {
log::debug!(
"received {} {:?} {:?}",
i,
&buffers[i][..meta[i].len],
&meta[i]
);
}
});

async_global_executor::block_on(async move {
let start = Instant::now();
task1.await;
task2.await;
println!(
"sent {} packets in {}ms",
BATCH_SIZE,
start.elapsed().as_millis()
);
});

Ok(())
}
4 changes: 2 additions & 2 deletions quinn/src/platform/cmsg.rs → quinn-udp/src/cmsg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ impl<'a> Drop for Encoder<'a> {
/// `cmsg` must refer to a cmsg containing a payload of type `T`
pub unsafe fn decode<T: Copy>(cmsg: &libc::cmsghdr) -> T {
assert!(mem::align_of::<T>() <= mem::align_of::<libc::cmsghdr>());
debug_assert_eq!(
cmsg.cmsg_len as usize,
debug_assert!(
cmsg.cmsg_len as usize <=
libc::CMSG_LEN(mem::size_of::<T>() as _) as usize
);
ptr::read(libc::CMSG_DATA(cmsg) as *const T)
Expand Down
52 changes: 52 additions & 0 deletions quinn-udp/src/fallback.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use crate::RecvMeta;
use proto::Transmit;

use std::io::{IoSliceMut, Result};

pub fn init(_socket: &std::net::UdpSocket) -> Result<()> {
// We do nothing with the given socket.
Ok(())
}

pub fn send(socket: &std::net::UdpSocket, transmits: &[Transmit]) -> Result<usize> {
let mut sent = 0;
for transmit in transmits {
match socket.send_to(&transmit.contents, &transmit.destination) {
Ok(_) => {
sent += 1;
}
Err(_) if sent != 0 => {
// We need to report that some packets were sent in this case, so we rely on
// errors being either harmlessly transient (in the case of WouldBlock) or
// recurring on the next call.
return Ok(sent);
}
Err(e) => {
return Err(e);
}
}
}
Ok(sent)
}

pub fn recv(
socket: &std::net::UdpSocket,
buffers: &mut [IoSliceMut<'_>],
meta: &mut [RecvMeta],
) -> Result<usize> {
let (len, addr) = socket.recv_from(&mut buffers[0])?;
meta[0] = RecvMeta {
addr,
len,
ecn: None,
dst_ip: None,
};
Ok(1)
}

/// Returns the platforms UDP socket capabilities
pub fn max_gso_segments() -> Result<usize> {
Ok(1)
}

pub const BATCH_SIZE: usize = 1;
18 changes: 8 additions & 10 deletions quinn/src/platform/mod.rs → quinn-udp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,22 @@ use tracing::warn;

#[cfg(unix)]
mod cmsg;

mod socket;

#[cfg(unix)]
#[path = "unix.rs"]
mod imp;
mod platform;

// No ECN support
#[cfg(not(unix))]
#[path = "fallback.rs"]
mod imp;
mod platform;

pub use imp::UdpSocket;

/// Returns the platforms UDP socket capabilities
pub fn caps() -> UdpCapabilities {
imp::caps()
}
pub use socket::UdpSocket;

/// Number of UDP packets to send/receive at a time
pub const BATCH_SIZE: usize = imp::BATCH_SIZE;
/// Number of UDP packets to send/receive at a time when using sendmmsg/recvmmsg.
pub const BATCH_SIZE: usize = platform::BATCH_SIZE;

/// The capabilities a UDP socket suppports on a certain platform
#[derive(Debug, Clone, Copy)]
Expand Down
96 changes: 96 additions & 0 deletions quinn-udp/src/socket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use proto::{Transmit};
use crate::{RecvMeta, UdpCapabilities};
use async_io::Async;
use futures_lite::future::poll_fn;
use std::io::{IoSliceMut, Result};
use std::net::SocketAddr;
use std::task::{Context, Poll};

use crate::platform as platform;

/// Async-io-compatible UDP socket with some useful specializations.
///
/// Unlike a standard UDP socket, this allows ECN bits to be read
/// and written on some platforms.
#[derive(Debug)]
pub struct UdpSocket {
inner: Async<std::net::UdpSocket>,
}

impl UdpSocket {
/// Returns the platforms UDP socket capabilities
pub fn capabilities() -> Result<UdpCapabilities> {
Ok(UdpCapabilities {
max_gso_segments: platform::max_gso_segments()?,
})
}

pub fn from_std(socket: std::net::UdpSocket) -> Result<Self> {
platform::init(&socket)?;
Ok(Self {
inner: Async::new(socket)?,
})
}

pub fn bind(addr: SocketAddr) -> Result<Self> {
let socket = std::net::UdpSocket::bind(addr)?;
Self::from_std(socket)
}

pub fn local_addr(&self) -> Result<SocketAddr> {
self.inner.get_ref().local_addr()
}

pub fn ttl(&self) -> Result<u8> {
let ttl = self.inner.get_ref().ttl()?;
Ok(ttl as u8)
}

pub fn set_ttl(&self, ttl: u8) -> Result<()> {
self.inner.get_ref().set_ttl(ttl as u32)
}

pub fn poll_send(&self, cx: &mut Context, transmits: &[Transmit]) -> Poll<Result<usize>> {
match self.inner.poll_writable(cx) {
Poll::Ready(Ok(())) => {}
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
}
let socket = self.inner.get_ref();
match platform::send(socket, transmits) {
Ok(len) => Poll::Ready(Ok(len)),
Err(err) => Poll::Ready(Err(err)),
}
}

pub fn poll_recv(
&self,
cx: &mut Context,
buffers: &mut [IoSliceMut<'_>],
meta: &mut [RecvMeta],
) -> Poll<Result<usize>> {
match self.inner.poll_readable(cx) {
Poll::Ready(Ok(())) => {}
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
}
let socket = self.inner.get_ref();
Poll::Ready(platform::recv(socket, buffers, meta))
}

pub async fn send(&self, transmits: &[Transmit]) -> Result<usize> {
let mut i = 0;
while i < transmits.len() {
i += poll_fn(|cx| self.poll_send(cx, &transmits[i..])).await?;
}
Ok(i)
}

pub async fn recv(
&self,
buffers: &mut [IoSliceMut<'_>],
meta: &mut [RecvMeta],
) -> Result<usize> {
poll_fn(|cx| self.poll_recv(cx, buffers, meta)).await
}
}
Loading