From e3bd5c4a359c730f18c7a173b946ce8f38415da3 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 31 Oct 2024 11:53:00 +0000 Subject: [PATCH] Bug 1916412 - fix Android x86 recvmmsg seccomp failure r=glandium,supply-chain-reviewers,sunil https://bugzilla.mozilla.org/show_bug.cgi?id=1910360 replaced NSPR with quinn-udp for HTTP3 QUIC UDP IO on Firefox Nightly. Calls to `recvmmsg` were prohibited by seccomp on Android x86. See https://bugzilla.mozilla.org/show_bug.cgi?id=1910594, upstream quinn-udp tracking issue: https://github.com/quinn-rs/quinn/issues/1947 and upstream quinn-udp fix: https://github.com/quinn-rs/quinn/pull/1966 Now that the upstream fix is merged and released, this commit upgrads neqo_glue to use quinn-udp `v0.5.6`. In addition, given the fix, quinn-udp can now be used on Android x86 Firefox Nightly. Thus this commit also removes the conditional around the `network.http.http3.use_nspr_for_io` pref. Differential Revision: https://phabricator.services.mozilla.com/D220890 --- Cargo.lock | 5 +- modules/libpref/init/StaticPrefList.yaml | 6 - supply-chain/audits.toml | 5 + .../rust/quinn-udp/.cargo-checksum.json | 2 +- third_party/rust/quinn-udp/Cargo.toml | 45 ++- .../rust/quinn-udp/benches/throughput.rs | 143 ++++--- third_party/rust/quinn-udp/build.rs | 26 ++ third_party/rust/quinn-udp/src/cmsg/mod.rs | 4 +- third_party/rust/quinn-udp/src/cmsg/unix.rs | 23 ++ third_party/rust/quinn-udp/src/fallback.rs | 46 ++- third_party/rust/quinn-udp/src/lib.rs | 32 +- third_party/rust/quinn-udp/src/unix.rs | 369 ++++++++++-------- third_party/rust/quinn-udp/src/windows.rs | 224 ++++++----- third_party/rust/quinn-udp/tests/tests.rs | 39 +- 14 files changed, 598 insertions(+), 371 deletions(-) create mode 100644 third_party/rust/quinn-udp/build.rs diff --git a/Cargo.lock b/Cargo.lock index 6958f1f7877db..4362e09bbfbcf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5018,10 +5018,11 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" [[package]] name = "quinn-udp" -version = "0.5.4" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bffec3605b73c6f1754535084a85229fa8a30f86014e6c81aeec4abb68b0285" +checksum = "e346e016eacfff12233c243718197ca12f148c84e1e84268a896699b41c71780" dependencies = [ + "cfg_aliases 0.2.1", "libc", "log", "once_cell", diff --git a/modules/libpref/init/StaticPrefList.yaml b/modules/libpref/init/StaticPrefList.yaml index ee4769c9bf715..74a4ad5640030 100644 --- a/modules/libpref/init/StaticPrefList.yaml +++ b/modules/libpref/init/StaticPrefList.yaml @@ -13994,13 +13994,7 @@ # Use NSPR for HTTP3 UDP IO - name: network.http.http3.use_nspr_for_io type: RelaxedAtomicBool -# Always use NSPR on Android x86 until -# https://bugzilla.mozilla.org/show_bug.cgi?id=1916412 is fixed. -#if defined(ANDROID) && !defined(HAVE_64BIT_BUILD) - value: true -#else value: @IS_NOT_NIGHTLY_BUILD@ -#endif mirror: always # Send and receive IP ECN marks. Noop if network.http.http3.use_nspr_for_io is diff --git a/supply-chain/audits.toml b/supply-chain/audits.toml index 25f7d3b0aa5fa..c5f25c3667c23 100644 --- a/supply-chain/audits.toml +++ b/supply-chain/audits.toml @@ -3801,6 +3801,11 @@ criteria = "safe-to-deploy" version = "0.5.4" notes = "This is a small crate, providing safe wrappers around various low-level networking specific operating system features. Given that the Rust standard library does not provide safe wrappers for these low-level features, safe wrappers need to be build in the crate itself, i.e. `quinn-udp`, thus requiring `unsafe` code." +[[audits.quinn-udp]] +who = "Max Inden " +criteria = "safe-to-deploy" +delta = "0.5.4 -> 0.5.6" + [[audits.quote]] who = "Nika Layzell " criteria = "safe-to-deploy" diff --git a/third_party/rust/quinn-udp/.cargo-checksum.json b/third_party/rust/quinn-udp/.cargo-checksum.json index 2066d5825d955..1588f8ac195c3 100644 --- a/third_party/rust/quinn-udp/.cargo-checksum.json +++ b/third_party/rust/quinn-udp/.cargo-checksum.json @@ -1 +1 @@ -{"files":{"Cargo.toml":"bf505df0c4f9254fa37950bad863cb838a8a7d2be4c8d3f28bdd679f945ef8cf","LICENSE-APACHE":"c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4","LICENSE-MIT":"4b2d0aca6789fa39e03d6738e869ea0988cceba210ca34ebb59c15c463e93a04","benches/throughput.rs":"8f00856e6c6f1dd8c7dd6c8a22b36c6f42dfa4d709edf3348de75d32e22c71fb","src/cmsg/mod.rs":"23d898d72c5aabda93d987526fdd78231bb5907bce2b6b2d292a56bdfd977f86","src/cmsg/unix.rs":"1a4089e5e61536a3c370c6b1bc891036ec2d0e2e78105fbb5b8227705e905d34","src/cmsg/windows.rs":"6fb936ec4a283efc5796872e777441e3039c40589073865644a8ef7936af4f4b","src/fallback.rs":"7fe9666b0bf508d1b5ec0b3690bb7add94c8f213cb51a263c9959e22a5094ad0","src/lib.rs":"72be7f797a3a11e452e7764fadadebc43ae7f9c14ba7fa80aedbbee71aa889c7","src/unix.rs":"fbc9a6ab281cc66500e6afa8b9ebdee73ca281ca14732e8076d9b1f10f431de7","src/windows.rs":"e741a7bdd86d7fcb856db855f9308af01e69387c00e6a726d322f1f4d3046b74","tests/tests.rs":"51bcf6d3f1a3fcf7d481ae966eb679f88341886ff4277f5747df3340ed709d09"},"package":"8bffec3605b73c6f1754535084a85229fa8a30f86014e6c81aeec4abb68b0285"} \ No newline at end of file +{"files":{"Cargo.toml":"865febc6bb7b0a6f4d0758779480f829f96fcd6a614b64db05b5ea53e902fd5c","LICENSE-APACHE":"c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4","LICENSE-MIT":"4b2d0aca6789fa39e03d6738e869ea0988cceba210ca34ebb59c15c463e93a04","benches/throughput.rs":"095137508f85b68174978ff968cade74587751484402ca09269ffc2631d97f34","build.rs":"8e81067cac9fbe675619c3314d5aa06d99cf54c332812a837a227eeab41c92e1","src/cmsg/mod.rs":"63d6ea7126341fededdaef14260a7eed715ad3f507d4da586dbab814f581a54d","src/cmsg/unix.rs":"7917bce2f3c8e844eca2e4cfea82669b2a31cf311321dc42532626db4ee42de8","src/cmsg/windows.rs":"6fb936ec4a283efc5796872e777441e3039c40589073865644a8ef7936af4f4b","src/fallback.rs":"6378c177db7ba0eb88115b63f1ec9e17b05f53b1daae2c1e215520f103145585","src/lib.rs":"9672bd2003d779c95d11a85d05a5dac5d421a9d5dcd9f1475de94aca93f23f73","src/unix.rs":"b8e595499055115d15bfb95259c0c585934adf55f61e365bcc9fc47ab8fa9cdd","src/windows.rs":"ab1928d18bed62162a0f2c96158d808d7a2962045ab47c9efa0ecf60e2a2c060","tests/tests.rs":"3ab6c02756098d3933542baff06fa1f2ad6bba11852466f6843b8a42a9cc97c0"},"package":"e346e016eacfff12233c243718197ca12f148c84e1e84268a896699b41c71780"} \ No newline at end of file diff --git a/third_party/rust/quinn-udp/Cargo.toml b/third_party/rust/quinn-udp/Cargo.toml index 94d1347e2462f..3531c4c528872 100644 --- a/third_party/rust/quinn-udp/Cargo.toml +++ b/third_party/rust/quinn-udp/Cargo.toml @@ -11,10 +11,16 @@ [package] edition = "2021" -rust-version = "1.66" +rust-version = "1.70.0" name = "quinn-udp" -version = "0.5.4" +version = "0.5.6" +build = "build.rs" +autobins = false +autoexamples = false +autotests = false +autobenches = false description = "UDP sockets with ECN information for the QUIC transport protocol" +readme = false keywords = ["quic"] categories = [ "network-programming", @@ -26,8 +32,22 @@ repository = "https://github.com/quinn-rs/quinn" [package.metadata.docs.rs] all-features = true +[lib] +name = "quinn_udp" +path = "src/lib.rs" +bench = false + +[[test]] +name = "tests" +path = "tests/tests.rs" + +[[bench]] +name = "throughput" +path = "benches/throughput.rs" +harness = false + [dependencies.libc] -version = "0.2.113" +version = "0.2.158" [dependencies.log] version = "0.4" @@ -44,6 +64,20 @@ default-features = false [dev-dependencies.criterion] version = "0.5" +features = ["async_tokio"] +default-features = false + +[dev-dependencies.tokio] +version = "1.28.1" +features = [ + "sync", + "rt", + "rt-multi-thread", + "net", +] + +[build-dependencies.cfg_aliases] +version = "0.2" [features] default = [ @@ -51,15 +85,14 @@ default = [ "log", ] direct-log = ["dep:log"] +fast-apple-datapath = [] log = ["tracing/log"] -[target."cfg(any(target_os = \"linux\", target_os = \"windows\"))"] - [target."cfg(windows)".dependencies.once_cell] version = "1.19" [target."cfg(windows)".dependencies.windows-sys] -version = "0.52" +version = ">=0.52, <=0.59" features = [ "Win32_Foundation", "Win32_System_IO", diff --git a/third_party/rust/quinn-udp/benches/throughput.rs b/third_party/rust/quinn-udp/benches/throughput.rs index 39ea6a2917f8d..446ccc12d3dab 100644 --- a/third_party/rust/quinn-udp/benches/throughput.rs +++ b/third_party/rust/quinn-udp/benches/throughput.rs @@ -1,42 +1,63 @@ +use std::{ + cmp::min, + io::{ErrorKind, IoSliceMut}, + net::{Ipv4Addr, Ipv6Addr, UdpSocket}, +}; + use criterion::{criterion_group, criterion_main, Criterion}; -use quinn_udp::{RecvMeta, Transmit, UdpSocketState}; -use std::cmp::min; -use std::{io::IoSliceMut, net::UdpSocket, slice}; +use tokio::{io::Interest, runtime::Runtime}; + +use quinn_udp::{RecvMeta, Transmit, UdpSocketState, BATCH_SIZE}; pub fn criterion_benchmark(c: &mut Criterion) { const TOTAL_BYTES: usize = 10 * 1024 * 1024; - // Maximum GSO buffer size is 64k. - const MAX_BUFFER_SIZE: usize = u16::MAX as usize; const SEGMENT_SIZE: usize = 1280; - let send = UdpSocket::bind("[::1]:0") - .or_else(|_| UdpSocket::bind("127.0.0.1:0")) - .unwrap(); - let recv = UdpSocket::bind("[::1]:0") - .or_else(|_| UdpSocket::bind("127.0.0.1:0")) - .unwrap(); - let max_segments = min( - UdpSocketState::new((&send).into()) - .unwrap() - .max_gso_segments(), - MAX_BUFFER_SIZE / SEGMENT_SIZE, - ); - let dst_addr = recv.local_addr().unwrap(); - let send_state = UdpSocketState::new((&send).into()).unwrap(); - let recv_state = UdpSocketState::new((&recv).into()).unwrap(); - // Reverse non-blocking flag set by `UdpSocketState` to make the test non-racy - recv.set_nonblocking(false).unwrap(); - - let mut receive_buffer = vec![0; MAX_BUFFER_SIZE]; - let mut meta = RecvMeta::default(); - - for gso_enabled in [false, true] { - let mut group = c.benchmark_group(format!("gso_{}", gso_enabled)); - group.throughput(criterion::Throughput::Bytes(TOTAL_BYTES as u64)); + let rt = Runtime::new().unwrap(); + let _guard = rt.enter(); + + let (send_state, send_socket) = new_socket(); + let (recv_state, recv_socket) = new_socket(); + let dst_addr = recv_socket.local_addr().unwrap(); + + let mut permutations = vec![]; + for gso_enabled in [ + false, + #[cfg(any(target_os = "linux", target_os = "windows", apple))] + true, + ] { + for gro_enabled in [false, true] { + #[cfg(target_os = "windows")] + if gso_enabled && !gro_enabled { + // Windows requires receive buffer to fit entire datagram on GRO + // enabled socket. + // + // OS error: "A message sent on a datagram socket was larger + // than the internal message buffer or some other network limit, + // or the buffer used to receive a datagram into was smaller + // than the datagram itself." + continue; + } + + for recvmmsg_enabled in [false, true] { + permutations.push((gso_enabled, gro_enabled, recvmmsg_enabled)); + } + } + } - let segments = if gso_enabled { max_segments } else { 1 }; - let msg = vec![0xAB; SEGMENT_SIZE * segments]; + for (gso_enabled, gro_enabled, recvmmsg_enabled) in permutations { + let mut group = c.benchmark_group(format!( + "gso_{}_gro_{}_recvmmsg_{}", + gso_enabled, gro_enabled, recvmmsg_enabled + )); + group.throughput(criterion::Throughput::Bytes(TOTAL_BYTES as u64)); + let gso_segments = if gso_enabled { + send_state.max_gso_segments() + } else { + 1 + }; + let msg = vec![0xAB; min(MAX_DATAGRAM_SIZE, SEGMENT_SIZE * gso_segments)]; let transmit = Transmit { destination: dst_addr, ecn: None, @@ -44,32 +65,64 @@ pub fn criterion_benchmark(c: &mut Criterion) { segment_size: gso_enabled.then_some(SEGMENT_SIZE), src_ip: None, }; + let gro_segments = if gro_enabled { + recv_state.gro_segments() + } else { + 1 + }; + let batch_size = if recvmmsg_enabled { BATCH_SIZE } else { 1 }; group.bench_function("throughput", |b| { - b.iter(|| { + b.to_async(&rt).iter(|| async { + let mut receive_buffers = vec![vec![0; SEGMENT_SIZE * gro_segments]; batch_size]; + let mut receive_slices = receive_buffers + .iter_mut() + .map(|buf| IoSliceMut::new(buf)) + .collect::>(); + let mut meta = vec![RecvMeta::default(); batch_size]; + let mut sent: usize = 0; + let mut received: usize = 0; while sent < TOTAL_BYTES { - send_state.send((&send).into(), &transmit).unwrap(); + send_socket.writable().await.unwrap(); + send_socket + .try_io(Interest::WRITABLE, || { + send_state.send((&send_socket).into(), &transmit) + }) + .unwrap(); sent += transmit.contents.len(); - let mut received_segments = 0; - while received_segments < segments { - let n = recv_state - .recv( - (&recv).into(), - &mut [IoSliceMut::new(&mut receive_buffer)], - slice::from_mut(&mut meta), - ) - .unwrap(); - assert_eq!(n, 1); - received_segments += meta.len / meta.stride; + while received < sent { + recv_socket.readable().await.unwrap(); + let n = match recv_socket.try_io(Interest::READABLE, || { + recv_state.recv((&recv_socket).into(), &mut receive_slices, &mut meta) + }) { + Ok(n) => n, + // recv.readable() can lead to false positives. Try again. + Err(e) if e.kind() == ErrorKind::WouldBlock => continue, + e => e.unwrap(), + }; + received += meta.iter().map(|m| m.len).take(n).sum::(); } - assert_eq!(received_segments, segments); } }) }); } } +fn new_socket() -> (UdpSocketState, tokio::net::UdpSocket) { + let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)) + .or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))) + .unwrap(); + + ( + UdpSocketState::new((&socket).into()).unwrap(), + tokio::net::UdpSocket::from_std(socket).unwrap(), + ) +} + criterion_group!(benches, criterion_benchmark); criterion_main!(benches); + +const MAX_IP_UDP_HEADER_SIZE: usize = 48; +const MAX_DATAGRAM_SIZE: usize = u16::MAX as usize - MAX_IP_UDP_HEADER_SIZE; diff --git a/third_party/rust/quinn-udp/build.rs b/third_party/rust/quinn-udp/build.rs new file mode 100644 index 0000000000000..ef89ff70fbc30 --- /dev/null +++ b/third_party/rust/quinn-udp/build.rs @@ -0,0 +1,26 @@ +use cfg_aliases::cfg_aliases; + +fn main() { + // Setup cfg aliases + cfg_aliases! { + // Platforms + apple: { + any( + target_os = "macos", + target_os = "ios", + target_os = "tvos", + target_os = "visionos" + ) + }, + bsd: { + any( + target_os = "freebsd", + target_os = "openbsd", + target_os = "netbsd" + ) + }, + // Convenience aliases + apple_fast: { all(apple, feature = "fast-apple-datapath") }, + apple_slow: { all(apple, not(feature = "fast-apple-datapath")) }, + } +} diff --git a/third_party/rust/quinn-udp/src/cmsg/mod.rs b/third_party/rust/quinn-udp/src/cmsg/mod.rs index 531c3d25b11e2..4a1c90e2228d8 100644 --- a/third_party/rust/quinn-udp/src/cmsg/mod.rs +++ b/third_party/rust/quinn-udp/src/cmsg/mod.rs @@ -42,7 +42,7 @@ impl<'a, M: MsgHdr> Encoder<'a, M> { /// # Panics /// - If insufficient buffer space remains. /// - If `T` has stricter alignment requirements than `M::ControlMessage` - pub(crate) fn push(&mut self, level: c_int, ty: c_int, value: T) { + pub(crate) fn push(&mut self, level: c_int, ty: c_int, value: T) { assert!(mem::align_of::() <= mem::align_of::()); let space = M::ControlMessage::cmsg_space(mem::size_of_val(&value)); assert!( @@ -72,7 +72,7 @@ impl<'a, M: MsgHdr> Encoder<'a, M> { // Statically guarantees that the encoding operation is "finished" before the control buffer is read // by `sendmsg` like API. -impl<'a, M: MsgHdr> Drop for Encoder<'a, M> { +impl Drop for Encoder<'_, M> { fn drop(&mut self) { self.hdr.set_control_len(self.len as _); } diff --git a/third_party/rust/quinn-udp/src/cmsg/unix.rs b/third_party/rust/quinn-udp/src/cmsg/unix.rs index 112bd5ebefc14..93ac76ba80165 100644 --- a/third_party/rust/quinn-udp/src/cmsg/unix.rs +++ b/third_party/rust/quinn-udp/src/cmsg/unix.rs @@ -32,6 +32,29 @@ impl MsgHdr for libc::msghdr { } } +#[cfg(apple_fast)] +impl MsgHdr for crate::imp::msghdr_x { + type ControlMessage = libc::cmsghdr; + + fn cmsg_first_hdr(&self) -> *mut Self::ControlMessage { + let selfp = self as *const _ as *mut libc::msghdr; + unsafe { libc::CMSG_FIRSTHDR(selfp) } + } + + fn cmsg_nxt_hdr(&self, cmsg: &Self::ControlMessage) -> *mut Self::ControlMessage { + let selfp = self as *const _ as *mut libc::msghdr; + unsafe { libc::CMSG_NXTHDR(selfp, cmsg) } + } + + fn set_control_len(&mut self, len: usize) { + self.msg_controllen = len as _; + } + + fn control_len(&self) -> usize { + self.msg_controllen as _ + } +} + /// Helpers for [`libc::cmsghdr`] impl CMsgHdr for libc::cmsghdr { fn cmsg_len(length: usize) -> usize { diff --git a/third_party/rust/quinn-udp/src/fallback.rs b/third_party/rust/quinn-udp/src/fallback.rs index 7f305acfeac31..fa81eb1b1253b 100644 --- a/third_party/rust/quinn-udp/src/fallback.rs +++ b/third_party/rust/quinn-udp/src/fallback.rs @@ -24,25 +24,32 @@ impl UdpSocketState { }) } + /// Sends a [`Transmit`] on the given socket. + /// + /// This function will only ever return errors of kind [`io::ErrorKind::WouldBlock`]. + /// All other errors will be logged and converted to `Ok`. + /// + /// UDP transmission errors are considered non-fatal because higher-level protocols must + /// employ retransmits and timeouts anyway in order to deal with UDP's unreliable nature. + /// Thus, logging is most likely the only thing you can do with these errors. + /// + /// If you would like to handle these errors yourself, use [`UdpSocketState::try_send`] + /// instead. pub fn send(&self, socket: UdpSockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> { - let Err(e) = socket.0.send_to( - transmit.contents, - &socket2::SockAddr::from(transmit.destination), - ) else { - return Ok(()); - }; - if e.kind() == io::ErrorKind::WouldBlock { - return Err(e); + match send(socket, transmit) { + Ok(()) => Ok(()), + Err(e) if e.kind() == io::ErrorKind::WouldBlock => Err(e), + Err(e) => { + log_sendmsg_error(&self.last_send_error, e, transmit); + + Ok(()) + } } + } - // Other errors are ignored, since they will usually be handled - // by higher level retransmits and timeouts. - // - PermissionDenied errors have been observed due to iptable rules. - // Those are not fatal errors, since the - // configuration can be dynamically changed. - // - Destination unreachable errors have been observed for other - log_sendmsg_error(&self.last_send_error, e, transmit); - Ok(()) + /// Sends a [`Transmit`] on the given socket without any additional error handling. + pub fn try_send(&self, socket: UdpSockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> { + send(socket, transmit) } pub fn recv( @@ -85,4 +92,11 @@ impl UdpSocketState { } } +fn send(socket: UdpSockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> { + socket.0.send_to( + transmit.contents, + &socket2::SockAddr::from(transmit.destination), + ) +} + pub(crate) const BATCH_SIZE: usize = 1; diff --git a/third_party/rust/quinn-udp/src/lib.rs b/third_party/rust/quinn-udp/src/lib.rs index 0337b9c052c84..606dcc4a79d95 100644 --- a/third_party/rust/quinn-udp/src/lib.rs +++ b/third_party/rust/quinn-udp/src/lib.rs @@ -37,11 +37,6 @@ use std::{ time::{Duration, Instant}, }; -#[cfg(all(feature = "direct-log", not(feature = "tracing")))] -use log::warn; -#[cfg(feature = "tracing")] -use tracing::warn; - #[cfg(any(unix, windows))] mod cmsg; @@ -58,6 +53,29 @@ mod imp; #[path = "fallback.rs"] mod imp; +#[allow(unused_imports, unused_macros)] +mod log { + #[cfg(all(feature = "direct-log", not(feature = "tracing")))] + pub(crate) use log::{debug, error, info, trace, warn}; + + #[cfg(feature = "tracing")] + pub(crate) use tracing::{debug, error, info, trace, warn}; + + #[cfg(not(any(feature = "direct-log", feature = "tracing")))] + mod no_op { + macro_rules! trace ( ($($tt:tt)*) => {{}} ); + macro_rules! debug ( ($($tt:tt)*) => {{}} ); + macro_rules! info ( ($($tt:tt)*) => {{}} ); + macro_rules! log_warn ( ($($tt:tt)*) => {{}} ); + macro_rules! error ( ($($tt:tt)*) => {{}} ); + + pub(crate) use {debug, error, info, log_warn as warn, trace}; + } + + #[cfg(not(any(feature = "direct-log", feature = "tracing")))] + pub(crate) use no_op::*; +} + pub use imp::UdpSocketState; /// Number of UDP packets to send/receive at a time @@ -139,8 +157,8 @@ fn log_sendmsg_error( let last_send_error = &mut *last_send_error.lock().expect("poisend lock"); if now.saturating_duration_since(*last_send_error) > IO_ERROR_LOG_INTERVAL { *last_send_error = now; - warn!( - "sendmsg error: {:?}, Transmit: {{ destination: {:?}, src_ip: {:?}, enc: {:?}, len: {:?}, segment_size: {:?} }}", + log::warn!( + "sendmsg error: {:?}, Transmit: {{ destination: {:?}, src_ip: {:?}, ecn: {:?}, len: {:?}, segment_size: {:?} }}", err, transmit.destination, transmit.src_ip, transmit.ecn, transmit.contents.len(), transmit.segment_size); } } diff --git a/third_party/rust/quinn-udp/src/unix.rs b/third_party/rust/quinn-udp/src/unix.rs index 8ce5f29f2b98f..acfa89effce70 100644 --- a/third_party/rust/quinn-udp/src/unix.rs +++ b/third_party/rust/quinn-udp/src/unix.rs @@ -1,4 +1,4 @@ -#[cfg(not(any(target_os = "macos", target_os = "ios", target_os = "openbsd")))] +#[cfg(not(any(apple, target_os = "openbsd", target_os = "solaris")))] use std::ptr; use std::{ io::{self, IoSliceMut}, @@ -12,16 +12,45 @@ use std::{ time::Instant, }; -#[cfg(all(feature = "direct-log", not(feature = "tracing")))] -use log::{debug, error}; use socket2::SockRef; -#[cfg(feature = "tracing")] -use tracing::{debug, error}; use super::{ - cmsg, log_sendmsg_error, EcnCodepoint, RecvMeta, Transmit, UdpSockRef, IO_ERROR_LOG_INTERVAL, + cmsg, log::debug, log_sendmsg_error, EcnCodepoint, RecvMeta, Transmit, UdpSockRef, + IO_ERROR_LOG_INTERVAL, }; +// Adapted from https://github.com/apple-oss-distributions/xnu/blob/8d741a5de7ff4191bf97d57b9f54c2f6d4a15585/bsd/sys/socket_private.h +#[cfg(apple_fast)] +#[repr(C)] +#[allow(non_camel_case_types)] +pub(crate) struct msghdr_x { + pub msg_name: *mut libc::c_void, + pub msg_namelen: libc::socklen_t, + pub msg_iov: *mut libc::iovec, + pub msg_iovlen: libc::c_int, + pub msg_control: *mut libc::c_void, + pub msg_controllen: libc::socklen_t, + pub msg_flags: libc::c_int, + pub msg_datalen: usize, +} + +#[cfg(apple_fast)] +extern "C" { + fn recvmsg_x( + s: libc::c_int, + msgp: *const msghdr_x, + cnt: libc::c_uint, + flags: libc::c_int, + ) -> isize; + + fn sendmsg_x( + s: libc::c_int, + msgp: *const msghdr_x, + cnt: libc::c_uint, + flags: libc::c_int, + ) -> isize; +} + // Defined in netinet6/in6.h on OpenBSD, this is not yet exported by the libc crate // directly. See https://github.com/rust-lang/libc/issues/3704 for when we might be able to // rely on this from the libc crate. @@ -59,12 +88,10 @@ impl UdpSocketState { let io = sock.0; let mut cmsg_platform_space = 0; if cfg!(target_os = "linux") - || cfg!(target_os = "freebsd") - || cfg!(target_os = "openbsd") - || cfg!(target_os = "netbsd") - || cfg!(target_os = "macos") - || cfg!(target_os = "ios") + || cfg!(bsd) + || cfg!(apple) || cfg!(target_os = "android") + || cfg!(target_os = "solaris") { cmsg_platform_space += unsafe { libc::CMSG_SPACE(mem::size_of::() as _) as usize }; @@ -87,13 +114,12 @@ impl UdpSocketState { // mac and ios do not support IP_RECVTOS on dual-stack sockets :( // older macos versions also don't have the flag and will error out if we don't ignore it - #[cfg(not(any(target_os = "openbsd", target_os = "netbsd")))] + #[cfg(not(any(target_os = "openbsd", target_os = "netbsd", target_os = "solaris")))] if is_ipv4 || !io.only_v6()? { - #[allow(unused_variables)] - if let Err(err) = set_socket_option(&*io, libc::IPPROTO_IP, libc::IP_RECVTOS, OPTION_ON) + if let Err(_err) = + set_socket_option(&*io, libc::IPPROTO_IP, libc::IP_RECVTOS, OPTION_ON) { - #[cfg(any(feature = "tracing", feature = "direct-log"))] - debug!("Ignoring error setting IP_RECVTOS on socket: {err:?}"); + debug!("Ignoring error setting IP_RECVTOS on socket: {_err:?}"); } } @@ -125,7 +151,7 @@ impl UdpSocketState { )?; } } - #[cfg(any(target_os = "freebsd", target_os = "macos", target_os = "ios"))] + #[cfg(any(target_os = "freebsd", apple))] { if is_ipv4 { // Set `may_fragment` to `true` if this option is not supported on the platform. @@ -137,15 +163,9 @@ impl UdpSocketState { )?; } } - #[cfg(any( - target_os = "freebsd", - target_os = "openbsd", - target_os = "netbsd", - target_os = "macos", - target_os = "ios" - ))] + #[cfg(any(bsd, apple, target_os = "solaris"))] // IP_RECVDSTADDR == IP_SENDSRCADDR on FreeBSD - // macOS uses only IP_RECVDSTADDR, no IP_SENDSRCADDR on macOS + // macOS uses only IP_RECVDSTADDR, no IP_SENDSRCADDR on macOS (the same on Solaris) // macOS also supports IP_PKTINFO { if is_ipv4 { @@ -175,7 +195,31 @@ impl UdpSocketState { }) } + /// Sends a [`Transmit`] on the given socket. + /// + /// This function will only ever return errors of kind [`io::ErrorKind::WouldBlock`]. + /// All other errors will be logged and converted to `Ok`. + /// + /// UDP transmission errors are considered non-fatal because higher-level protocols must + /// employ retransmits and timeouts anyway in order to deal with UDP's unreliable nature. + /// Thus, logging is most likely the only thing you can do with these errors. + /// + /// If you would like to handle these errors yourself, use [`UdpSocketState::try_send`] + /// instead. pub fn send(&self, socket: UdpSockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> { + match send(self, socket.0, transmit) { + Ok(()) => Ok(()), + Err(e) if e.kind() == io::ErrorKind::WouldBlock => Err(e), + Err(e) => { + log_sendmsg_error(&self.last_send_error, e, transmit); + + Ok(()) + } + } + } + + /// Sends a [`Transmit`] on the given socket without any additional error handling. + pub fn try_send(&self, socket: UdpSockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> { send(self, socket.0, transmit) } @@ -221,23 +265,13 @@ impl UdpSocketState { } /// Sets the flag indicating we got EINVAL error from `sendmsg` syscall. - #[cfg(not(any( - target_os = "macos", - target_os = "ios", - target_os = "openbsd", - target_os = "netbsd" - )))] + #[cfg(not(any(apple, target_os = "openbsd", target_os = "netbsd")))] fn set_sendmsg_einval(&self) { self.sendmsg_einval.store(true, Ordering::Relaxed) } } -#[cfg(not(any( - target_os = "macos", - target_os = "ios", - target_os = "openbsd", - target_os = "netbsd" -)))] +#[cfg(not(any(apple, target_os = "openbsd", target_os = "netbsd")))] fn send( #[allow(unused_variables)] // only used on Linux state: &UdpSocketState, @@ -289,8 +323,7 @@ fn send( // Prevent new transmits from being scheduled using GSO. Existing GSO transmits // may already be in the pipeline, so we need to tolerate additional failures. if state.max_gso_segments() > 1 { - #[cfg(any(feature = "tracing", feature = "direct-log"))] - error!("got transmit error, halting segmentation offload"); + crate::log::error!("got transmit error, halting segmentation offload"); state .max_gso_segments .store(1, std::sync::atomic::Ordering::Relaxed); @@ -303,16 +336,10 @@ fn send( state.set_sendmsg_einval(); } - // Other errors are ignored, since they will usually be handled - // by higher level retransmits and timeouts. - // - PermissionDenied errors have been observed due to iptable rules. - // Those are not fatal errors, since the - // configuration can be dynamically changed. - // - Destination unreachable errors have been observed for other // - EMSGSIZE is expected for MTU probes. Future work might be able to avoid // these by automatically clamping the MTUD upper bound to the interface MTU. if e.raw_os_error() != Some(libc::EMSGSIZE) { - log_sendmsg_error(&state.last_send_error, e, transmit); + return Err(e); } return Ok(()); @@ -323,12 +350,67 @@ fn send( } } -#[cfg(any( - target_os = "macos", - target_os = "ios", - target_os = "openbsd", - target_os = "netbsd" -))] +#[cfg(apple_fast)] +fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> { + let mut hdrs = unsafe { mem::zeroed::<[msghdr_x; BATCH_SIZE]>() }; + let mut iovs = unsafe { mem::zeroed::<[libc::iovec; BATCH_SIZE]>() }; + let mut ctrls = [cmsg::Aligned([0u8; CMSG_LEN]); BATCH_SIZE]; + let addr = socket2::SockAddr::from(transmit.destination); + let segment_size = transmit.segment_size.unwrap_or(transmit.contents.len()); + let mut cnt = 0; + debug_assert!(transmit.contents.len().div_ceil(segment_size) <= BATCH_SIZE); + for (i, chunk) in transmit + .contents + .chunks(segment_size) + .enumerate() + .take(BATCH_SIZE) + { + prepare_msg( + &Transmit { + destination: transmit.destination, + ecn: transmit.ecn, + contents: chunk, + segment_size: Some(chunk.len()), + src_ip: transmit.src_ip, + }, + &addr, + &mut hdrs[i], + &mut iovs[i], + &mut ctrls[i], + true, + state.sendmsg_einval(), + ); + hdrs[i].msg_datalen = chunk.len(); + cnt += 1; + } + let n = unsafe { sendmsg_x(io.as_raw_fd(), hdrs.as_ptr(), cnt as u32, 0) }; + if n >= 0 { + return Ok(()); + } + let e = io::Error::last_os_error(); + match e.kind() { + io::ErrorKind::Interrupted => { + // Retry the transmission + } + io::ErrorKind::WouldBlock => return Err(e), + _ => { + // Other errors are ignored, since they will usually be handled + // by higher level retransmits and timeouts. + // - PermissionDenied errors have been observed due to iptable rules. + // Those are not fatal errors, since the + // configuration can be dynamically changed. + // - Destination unreachable errors have been observed for other + // - EMSGSIZE is expected for MTU probes. Future work might be able to avoid + // these by automatically clamping the MTUD upper bound to the interface MTU. + if e.raw_os_error() != Some(libc::EMSGSIZE) { + log_sendmsg_error(&state.last_send_error, e, transmit); + } + } + } + Ok(()) +} + +#[cfg(any(target_os = "openbsd", target_os = "netbsd", apple_slow))] fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> { let mut hdr: libc::msghdr = unsafe { mem::zeroed() }; let mut iov: libc::iovec = unsafe { mem::zeroed() }; @@ -340,10 +422,7 @@ fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io: &mut hdr, &mut iov, &mut ctrl, - cfg!(target_os = "macos") - || cfg!(target_os = "ios") - || cfg!(target_os = "openbsd") - || cfg!(target_os = "netbsd"), + cfg!(apple) || cfg!(target_os = "openbsd") || cfg!(target_os = "netbsd"), state.sendmsg_einval(), ); let n = unsafe { libc::sendmsg(io.as_raw_fd(), &hdr, 0) }; @@ -355,16 +434,10 @@ fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io: } io::ErrorKind::WouldBlock => return Err(e), _ => { - // Other errors are ignored, since they will usually be handled - // by higher level retransmits and timeouts. - // - PermissionDenied errors have been observed due to iptable rules. - // Those are not fatal errors, since the - // configuration can be dynamically changed. - // - Destination unreachable errors have been observed for other // - EMSGSIZE is expected for MTU probes. Future work might be able to avoid // these by automatically clamping the MTUD upper bound to the interface MTU. if e.raw_os_error() != Some(libc::EMSGSIZE) { - log_sendmsg_error(&state.last_send_error, e, transmit); + return Err(e); } } } @@ -372,7 +445,7 @@ fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io: Ok(()) } -#[cfg(not(any(target_os = "macos", target_os = "ios", target_os = "openbsd")))] +#[cfg(not(any(apple, target_os = "openbsd", target_os = "solaris")))] fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result { let mut names = [MaybeUninit::::uninit(); BATCH_SIZE]; let mut ctrls = [cmsg::Aligned(MaybeUninit::<[u8; CMSG_LEN]>::uninit()); BATCH_SIZE]; @@ -388,10 +461,12 @@ fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> } let msg_count = loop { let n = unsafe { - recvmmsg_with_fallback( + libc::recvmmsg( io.as_raw_fd(), hdrs.as_mut_ptr(), bufs.len().min(BATCH_SIZE) as _, + 0, + ptr::null_mut::(), ) }; if n == -1 { @@ -409,7 +484,35 @@ fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> Ok(msg_count as usize) } -#[cfg(any(target_os = "macos", target_os = "ios", target_os = "openbsd"))] +#[cfg(apple_fast)] +fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result { + let mut names = [MaybeUninit::::uninit(); BATCH_SIZE]; + let mut ctrls = [cmsg::Aligned(MaybeUninit::<[u8; CMSG_LEN]>::uninit()); BATCH_SIZE]; + let mut hdrs = unsafe { mem::zeroed::<[msghdr_x; BATCH_SIZE]>() }; + let max_msg_count = bufs.len().min(BATCH_SIZE); + for i in 0..max_msg_count { + prepare_recv(&mut bufs[i], &mut names[i], &mut ctrls[i], &mut hdrs[i]); + } + let msg_count = loop { + let n = unsafe { recvmsg_x(io.as_raw_fd(), hdrs.as_mut_ptr(), max_msg_count as _, 0) }; + match n { + -1 => { + let e = io::Error::last_os_error(); + if e.kind() == io::ErrorKind::Interrupted { + continue; + } + return Err(e); + } + n => break n, + } + }; + for i in 0..(msg_count as usize) { + meta[i] = decode_recv(&names[i], &hdrs[i], hdrs[i].msg_datalen as usize); + } + Ok(msg_count as usize) +} + +#[cfg(any(target_os = "openbsd", target_os = "netbsd", apple_slow))] fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result { let mut name = MaybeUninit::::uninit(); let mut ctrl = cmsg::Aligned(MaybeUninit::<[u8; CMSG_LEN]>::uninit()); @@ -433,82 +536,13 @@ fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> Ok(1) } -/// Implementation of `recvmmsg` with a fallback -/// to `recvmsg` if syscall is not available. -/// -/// It uses [`libc::syscall`] instead of [`libc::recvmmsg`] -/// to avoid linking error on systems where libc does not contain `recvmmsg`. -#[cfg(not(any(target_os = "macos", target_os = "ios", target_os = "openbsd")))] -unsafe fn recvmmsg_with_fallback( - sockfd: libc::c_int, - msgvec: *mut libc::mmsghdr, - vlen: libc::c_uint, -) -> libc::c_int { - let flags = 0; - let timeout = ptr::null_mut::(); - - #[cfg(not(any(target_os = "freebsd", target_os = "netbsd")))] - { - let ret = - libc::syscall(libc::SYS_recvmmsg, sockfd, msgvec, vlen, flags, timeout) as libc::c_int; - if ret != -1 { - return ret; - } - } - - // libc on FreeBSD and NetBSD implement `recvmmsg` as a high-level abstraction over - // `recvmsg`, thus `SYS_recvmmsg` constant and direct system call do not exist - #[cfg(any(target_os = "freebsd", target_os = "netbsd"))] - { - #[cfg(target_os = "freebsd")] - let vlen = vlen as usize; - let ret = libc::recvmmsg(sockfd, msgvec, vlen, flags, timeout) as libc::c_int; - if ret != -1 { - return ret; - } - } - - let e = io::Error::last_os_error(); - match e.raw_os_error() { - Some(libc::ENOSYS) => { - // Fallback to `recvmsg`. - recvmmsg_fallback(sockfd, msgvec, vlen) - } - _ => -1, - } -} - -/// Fallback implementation of `recvmmsg` using `recvmsg` -/// for systems which do not support `recvmmsg` -/// such as Linux <2.6.33. -#[cfg(not(any(target_os = "macos", target_os = "ios", target_os = "openbsd")))] -unsafe fn recvmmsg_fallback( - sockfd: libc::c_int, - msgvec: *mut libc::mmsghdr, - vlen: libc::c_uint, -) -> libc::c_int { - let flags = 0; - if vlen == 0 { - return 0; - } - - let n = libc::recvmsg(sockfd, &mut (*msgvec).msg_hdr, flags); - if n == -1 { - -1 - } else { - // type of `msg_len` field differs on Linux and FreeBSD, - // it is up to the compiler to infer and cast `n` to correct type - (*msgvec).msg_len = n as _; - 1 - } -} - const CMSG_LEN: usize = 88; fn prepare_msg( transmit: &Transmit<'_>, dst_addr: &socket2::SockAddr, - hdr: &mut libc::msghdr, + #[cfg(not(apple_fast))] hdr: &mut libc::msghdr, + #[cfg(apple_fast)] hdr: &mut msghdr_x, iov: &mut libc::iovec, ctrl: &mut cmsg::Aligned<[u8; CMSG_LEN]>, #[allow(unused_variables)] // only used on FreeBSD & macOS @@ -566,13 +600,7 @@ fn prepare_msg( }; encoder.push(libc::IPPROTO_IP, libc::IP_PKTINFO, pktinfo); } - #[cfg(any( - target_os = "freebsd", - target_os = "openbsd", - target_os = "netbsd", - target_os = "macos", - target_os = "ios", - ))] + #[cfg(any(bsd, apple, target_os = "solaris"))] { if encode_src_ip { let addr = libc::in_addr { @@ -597,6 +625,7 @@ fn prepare_msg( encoder.finish(); } +#[cfg(not(apple_fast))] fn prepare_recv( buf: &mut IoSliceMut, name: &mut MaybeUninit, @@ -612,9 +641,27 @@ fn prepare_recv( hdr.msg_flags = 0; } +#[cfg(apple_fast)] +fn prepare_recv( + buf: &mut IoSliceMut, + name: &mut MaybeUninit, + ctrl: &mut cmsg::Aligned>, + hdr: &mut msghdr_x, +) { + hdr.msg_name = name.as_mut_ptr() as _; + hdr.msg_namelen = mem::size_of::() as _; + hdr.msg_iov = buf as *mut IoSliceMut as *mut libc::iovec; + hdr.msg_iovlen = 1; + hdr.msg_control = ctrl.0.as_mut_ptr() as _; + hdr.msg_controllen = CMSG_LEN as _; + hdr.msg_flags = 0; + hdr.msg_datalen = buf.len(); +} + fn decode_recv( name: &MaybeUninit, - hdr: &libc::msghdr, + #[cfg(not(apple_fast))] hdr: &libc::msghdr, + #[cfg(apple_fast)] hdr: &msghdr_x, len: usize, ) -> RecvMeta { let name = unsafe { name.assume_init() }; @@ -630,7 +677,7 @@ fn decode_recv( ecn_bits = cmsg::decode::(cmsg); }, // FreeBSD uses IP_RECVTOS here, and we can be liberal because cmsgs are opt-in. - #[cfg(not(any(target_os = "openbsd", target_os = "netbsd")))] + #[cfg(not(any(target_os = "openbsd", target_os = "netbsd", target_os = "solaris")))] (libc::IPPROTO_IP, libc::IP_RECVTOS) => unsafe { ecn_bits = cmsg::decode::(cmsg); }, @@ -638,7 +685,7 @@ fn decode_recv( // Temporary hack around broken macos ABI. Remove once upstream fixes it. // https://bugreport.apple.com/web/?problemID=48761855 #[allow(clippy::unnecessary_cast)] // cmsg.cmsg_len defined as size_t - if (cfg!(target_os = "macos") || cfg!(target_os = "ios")) + if cfg!(apple) && cmsg.cmsg_len as usize == libc::CMSG_LEN(mem::size_of::() as _) as usize { ecn_bits = cmsg::decode::(cmsg); @@ -653,13 +700,7 @@ fn decode_recv( pktinfo.ipi_addr.s_addr.to_ne_bytes(), ))); } - #[cfg(any( - target_os = "freebsd", - target_os = "openbsd", - target_os = "netbsd", - target_os = "macos", - target_os = "ios", - ))] + #[cfg(any(bsd, apple))] (libc::IPPROTO_IP, libc::IP_RECVDSTADDR) => { let in_addr = unsafe { cmsg::decode::(cmsg) }; dst_ip = Some(IpAddr::V4(Ipv4Addr::from(in_addr.s_addr.to_ne_bytes()))); @@ -709,11 +750,11 @@ fn decode_recv( } } -#[cfg(not(any(target_os = "macos", target_os = "ios")))] +#[cfg(not(apple_slow))] // Chosen somewhat arbitrarily; might benefit from additional tuning. pub(crate) const BATCH_SIZE: usize = 32; -#[cfg(any(target_os = "macos", target_os = "ios"))] +#[cfg(apple_slow)] pub(crate) const BATCH_SIZE: usize = 1; #[cfg(target_os = "linux")] @@ -726,7 +767,7 @@ mod gso { const GSO_SIZE: libc::c_int = 1500; let socket = match std::net::UdpSocket::bind("[::]:0") - .or_else(|_| std::net::UdpSocket::bind("127.0.0.1:0")) + .or_else(|_| std::net::UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))) { Ok(socket) => socket, Err(_) => return 1, @@ -745,16 +786,22 @@ mod gso { } } +// On Apple platforms using the `sendmsg_x` call, UDP datagram segmentation is not +// offloaded to the NIC or even the kernel, but instead done here in user space in +// [`send`]) and then passed to the OS as individual `iovec`s (up to `BATCH_SIZE`). #[cfg(not(target_os = "linux"))] mod gso { use super::*; pub(super) fn max_gso_segments() -> usize { - 1 + BATCH_SIZE } - pub(super) fn set_segment_size(_encoder: &mut cmsg::Encoder, _segment_size: u16) { - panic!("Setting a segment size is not supported on current platform"); + pub(super) fn set_segment_size( + #[cfg(not(apple_fast))] _encoder: &mut cmsg::Encoder, + #[cfg(apple_fast)] _encoder: &mut cmsg::Encoder, + _segment_size: u16, + ) { } } @@ -764,7 +811,7 @@ mod gro { pub(crate) fn gro_segments() -> usize { let socket = match std::net::UdpSocket::bind("[::]:0") - .or_else(|_| std::net::UdpSocket::bind("127.0.0.1:0")) + .or_else(|_| std::net::UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))) { Ok(socket) => socket, Err(_) => return 1, diff --git a/third_party/rust/quinn-udp/src/windows.rs b/third_party/rust/quinn-udp/src/windows.rs index 475a716579910..886f455117ceb 100644 --- a/third_party/rust/quinn-udp/src/windows.rs +++ b/third_party/rust/quinn-udp/src/windows.rs @@ -9,15 +9,12 @@ use std::{ }; use libc::{c_int, c_uint}; -#[cfg(all(feature = "direct-log", not(feature = "tracing")))] -use log::{debug, error}; use once_cell::sync::Lazy; -#[cfg(feature = "tracing")] -use tracing::{debug, error}; use windows_sys::Win32::Networking::WinSock; use crate::{ cmsg::{self, CMsgHdr}, + log::{debug, error}, log_sendmsg_error, EcnCodepoint, RecvMeta, Transmit, UdpSockRef, IO_ERROR_LOG_INTERVAL, }; @@ -64,7 +61,6 @@ impl UdpSocketState { // We don't support old versions of Windows that do not enable access to `WSARecvMsg()` if WSARECVMSG_PTR.is_none() { - #[cfg(any(feature = "tracing", feature = "direct-log"))] error!("network stack does not support WSARecvMsg function"); return Err(io::Error::from(io::ErrorKind::Unsupported)); @@ -127,109 +123,32 @@ impl UdpSocketState { }) } + /// Sends a [`Transmit`] on the given socket. + /// + /// This function will only ever return errors of kind [`io::ErrorKind::WouldBlock`]. + /// All other errors will be logged and converted to `Ok`. + /// + /// UDP transmission errors are considered non-fatal because higher-level protocols must + /// employ retransmits and timeouts anyway in order to deal with UDP's unreliable nature. + /// Thus, logging is most likely the only thing you can do with these errors. + /// + /// If you would like to handle these errors yourself, use [`UdpSocketState::try_send`] + /// instead. pub fn send(&self, socket: UdpSockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> { - // we cannot use [`socket2::sendmsg()`] and [`socket2::MsgHdr`] as we do not have access - // to the inner field which holds the WSAMSG - let mut ctrl_buf = cmsg::Aligned([0; CMSG_LEN]); - let daddr = socket2::SockAddr::from(transmit.destination); - - let mut data = WinSock::WSABUF { - buf: transmit.contents.as_ptr() as *mut _, - len: transmit.contents.len() as _, - }; - - let ctrl = WinSock::WSABUF { - buf: ctrl_buf.0.as_mut_ptr(), - len: ctrl_buf.0.len() as _, - }; - - let mut wsa_msg = WinSock::WSAMSG { - name: daddr.as_ptr() as *mut _, - namelen: daddr.len(), - lpBuffers: &mut data, - Control: ctrl, - dwBufferCount: 1, - dwFlags: 0, - }; + match send(socket, transmit) { + Ok(()) => Ok(()), + Err(e) if e.kind() == io::ErrorKind::WouldBlock => Err(e), + Err(e) => { + log_sendmsg_error(&self.last_send_error, e, transmit); - // Add control messages (ECN and PKTINFO) - let mut encoder = unsafe { cmsg::Encoder::new(&mut wsa_msg) }; - - if let Some(ip) = transmit.src_ip { - let ip = std::net::SocketAddr::new(ip, 0); - let ip = socket2::SockAddr::from(ip); - match ip.family() { - WinSock::AF_INET => { - let src_ip = unsafe { ptr::read(ip.as_ptr() as *const WinSock::SOCKADDR_IN) }; - let pktinfo = WinSock::IN_PKTINFO { - ipi_addr: src_ip.sin_addr, - ipi_ifindex: 0, - }; - encoder.push(WinSock::IPPROTO_IP, WinSock::IP_PKTINFO, pktinfo); - } - WinSock::AF_INET6 => { - let src_ip = unsafe { ptr::read(ip.as_ptr() as *const WinSock::SOCKADDR_IN6) }; - let pktinfo = WinSock::IN6_PKTINFO { - ipi6_addr: src_ip.sin6_addr, - ipi6_ifindex: unsafe { src_ip.Anonymous.sin6_scope_id }, - }; - encoder.push(WinSock::IPPROTO_IPV6, WinSock::IPV6_PKTINFO, pktinfo); - } - _ => { - return Err(io::Error::from(io::ErrorKind::InvalidInput)); - } + Ok(()) } } + } - // ECN is a C integer https://learn.microsoft.com/en-us/windows/win32/winsock/winsock-ecn - let ecn = transmit.ecn.map_or(0, |x| x as c_int); - // True for IPv4 or IPv4-Mapped IPv6 - let is_ipv4 = transmit.destination.is_ipv4() - || matches!(transmit.destination.ip(), IpAddr::V6(addr) if addr.to_ipv4_mapped().is_some()); - if is_ipv4 { - encoder.push(WinSock::IPPROTO_IP, WinSock::IP_ECN, ecn); - } else { - encoder.push(WinSock::IPPROTO_IPV6, WinSock::IPV6_ECN, ecn); - } - - // Segment size is a u32 https://learn.microsoft.com/en-us/windows/win32/api/ws2tcpip/nf-ws2tcpip-wsasetudpsendmessagesize - if let Some(segment_size) = transmit.segment_size { - encoder.push( - WinSock::IPPROTO_UDP, - WinSock::UDP_SEND_MSG_SIZE, - segment_size as u32, - ); - } - - encoder.finish(); - - let mut len = 0; - let rc = unsafe { - WinSock::WSASendMsg( - socket.0.as_raw_socket() as usize, - &wsa_msg, - 0, - &mut len, - ptr::null_mut(), - None, - ) - }; - - if rc != 0 { - let e = io::Error::last_os_error(); - if e.kind() == io::ErrorKind::WouldBlock { - return Err(e); - } - - // Other errors are ignored, since they will usually be handled - // by higher level retransmits and timeouts. - // - PermissionDenied errors have been observed due to iptable rules. - // Those are not fatal errors, since the - // configuration can be dynamically changed. - // - Destination unreachable errors have been observed for other - log_sendmsg_error(&self.last_send_error, e, transmit); - } - Ok(()) + /// Sends a [`Transmit`] on the given socket without any additional error handling. + pub fn try_send(&self, socket: UdpSockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> { + send(socket, transmit) } pub fn recv( @@ -361,6 +280,100 @@ impl UdpSocketState { } } +fn send(socket: UdpSockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> { + // we cannot use [`socket2::sendmsg()`] and [`socket2::MsgHdr`] as we do not have access + // to the inner field which holds the WSAMSG + let mut ctrl_buf = cmsg::Aligned([0; CMSG_LEN]); + let daddr = socket2::SockAddr::from(transmit.destination); + + let mut data = WinSock::WSABUF { + buf: transmit.contents.as_ptr() as *mut _, + len: transmit.contents.len() as _, + }; + + let ctrl = WinSock::WSABUF { + buf: ctrl_buf.0.as_mut_ptr(), + len: ctrl_buf.0.len() as _, + }; + + let mut wsa_msg = WinSock::WSAMSG { + name: daddr.as_ptr() as *mut _, + namelen: daddr.len(), + lpBuffers: &mut data, + Control: ctrl, + dwBufferCount: 1, + dwFlags: 0, + }; + + // Add control messages (ECN and PKTINFO) + let mut encoder = unsafe { cmsg::Encoder::new(&mut wsa_msg) }; + + if let Some(ip) = transmit.src_ip { + let ip = std::net::SocketAddr::new(ip, 0); + let ip = socket2::SockAddr::from(ip); + match ip.family() { + WinSock::AF_INET => { + let src_ip = unsafe { ptr::read(ip.as_ptr() as *const WinSock::SOCKADDR_IN) }; + let pktinfo = WinSock::IN_PKTINFO { + ipi_addr: src_ip.sin_addr, + ipi_ifindex: 0, + }; + encoder.push(WinSock::IPPROTO_IP, WinSock::IP_PKTINFO, pktinfo); + } + WinSock::AF_INET6 => { + let src_ip = unsafe { ptr::read(ip.as_ptr() as *const WinSock::SOCKADDR_IN6) }; + let pktinfo = WinSock::IN6_PKTINFO { + ipi6_addr: src_ip.sin6_addr, + ipi6_ifindex: unsafe { src_ip.Anonymous.sin6_scope_id }, + }; + encoder.push(WinSock::IPPROTO_IPV6, WinSock::IPV6_PKTINFO, pktinfo); + } + _ => { + return Err(io::Error::from(io::ErrorKind::InvalidInput)); + } + } + } + + // ECN is a C integer https://learn.microsoft.com/en-us/windows/win32/winsock/winsock-ecn + let ecn = transmit.ecn.map_or(0, |x| x as c_int); + // True for IPv4 or IPv4-Mapped IPv6 + let is_ipv4 = transmit.destination.is_ipv4() + || matches!(transmit.destination.ip(), IpAddr::V6(addr) if addr.to_ipv4_mapped().is_some()); + if is_ipv4 { + encoder.push(WinSock::IPPROTO_IP, WinSock::IP_ECN, ecn); + } else { + encoder.push(WinSock::IPPROTO_IPV6, WinSock::IPV6_ECN, ecn); + } + + // Segment size is a u32 https://learn.microsoft.com/en-us/windows/win32/api/ws2tcpip/nf-ws2tcpip-wsasetudpsendmessagesize + if let Some(segment_size) = transmit.segment_size { + encoder.push( + WinSock::IPPROTO_UDP, + WinSock::UDP_SEND_MSG_SIZE, + segment_size as u32, + ); + } + + encoder.finish(); + + let mut len = 0; + let rc = unsafe { + WinSock::WSASendMsg( + socket.0.as_raw_socket() as usize, + &wsa_msg, + 0, + &mut len, + ptr::null_mut(), + None, + ) + }; + + match rc { + 0 => Ok(()), + _ => Err(io::Error::last_os_error()), + } +} + fn set_socket_option( socket: &impl AsRawSocket, level: i32, @@ -392,7 +405,6 @@ const OPTION_ON: u32 = 1; static WSARECVMSG_PTR: Lazy = Lazy::new(|| { let s = unsafe { WinSock::socket(WinSock::AF_INET as _, WinSock::SOCK_DGRAM as _, 0) }; if s == WinSock::INVALID_SOCKET { - #[cfg(any(feature = "tracing", feature = "direct-log"))] debug!( "ignoring WSARecvMsg function pointer due to socket creation error: {}", io::Error::last_os_error() @@ -422,13 +434,11 @@ static WSARECVMSG_PTR: Lazy = Lazy::new(|| { }; if rc == -1 { - #[cfg(any(feature = "tracing", feature = "direct-log"))] debug!( "ignoring WSARecvMsg function pointer due to ioctl error: {}", io::Error::last_os_error() ); } else if len as usize != mem::size_of::() { - #[cfg(any(feature = "tracing", feature = "direct-log"))] debug!("ignoring WSARecvMsg function pointer due to pointer size mismatch"); wsa_recvmsg_ptr = None; } @@ -442,7 +452,7 @@ static WSARECVMSG_PTR: Lazy = Lazy::new(|| { static MAX_GSO_SEGMENTS: Lazy = Lazy::new(|| { let socket = match std::net::UdpSocket::bind("[::]:0") - .or_else(|_| std::net::UdpSocket::bind("127.0.0.1:0")) + .or_else(|_| std::net::UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))) { Ok(socket) => socket, Err(_) => return 1, diff --git a/third_party/rust/quinn-udp/tests/tests.rs b/third_party/rust/quinn-udp/tests/tests.rs index 4c06c488feec3..a0dd32fda2b04 100644 --- a/third_party/rust/quinn-udp/tests/tests.rs +++ b/third_party/rust/quinn-udp/tests/tests.rs @@ -1,4 +1,4 @@ -#[cfg(not(any(target_os = "openbsd", target_os = "netbsd")))] +#[cfg(not(any(target_os = "openbsd", target_os = "netbsd", target_os = "solaris")))] use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6}; use std::{ io::IoSliceMut, @@ -11,11 +11,11 @@ use socket2::Socket; #[test] fn basic() { - let send = UdpSocket::bind("[::1]:0") - .or_else(|_| UdpSocket::bind("127.0.0.1:0")) + let send = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)) + .or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))) .unwrap(); - let recv = UdpSocket::bind("[::1]:0") - .or_else(|_| UdpSocket::bind("127.0.0.1:0")) + let recv = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)) + .or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))) .unwrap(); let dst_addr = recv.local_addr().unwrap(); test_send_recv( @@ -33,8 +33,8 @@ fn basic() { #[test] fn ecn_v6() { - let send = Socket::from(UdpSocket::bind("[::1]:0").unwrap()); - let recv = Socket::from(UdpSocket::bind("[::1]:0").unwrap()); + let send = Socket::from(UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).unwrap()); + let recv = Socket::from(UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).unwrap()); for codepoint in [EcnCodepoint::Ect0, EcnCodepoint::Ect1] { test_send_recv( &send, @@ -51,10 +51,10 @@ fn ecn_v6() { } #[test] -#[cfg(not(any(target_os = "openbsd", target_os = "netbsd")))] +#[cfg(not(any(target_os = "openbsd", target_os = "netbsd", target_os = "solaris")))] fn ecn_v4() { - let send = Socket::from(UdpSocket::bind("127.0.0.1:0").unwrap()); - let recv = Socket::from(UdpSocket::bind("127.0.0.1:0").unwrap()); + let send = Socket::from(UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)).unwrap()); + let recv = Socket::from(UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)).unwrap()); for codepoint in [EcnCodepoint::Ect0, EcnCodepoint::Ect1] { test_send_recv( &send, @@ -71,7 +71,7 @@ fn ecn_v4() { } #[test] -#[cfg(not(any(target_os = "openbsd", target_os = "netbsd")))] +#[cfg(not(any(target_os = "openbsd", target_os = "netbsd", target_os = "solaris")))] fn ecn_v6_dualstack() { let recv = socket2::Socket::new( socket2::Domain::IPV6, @@ -93,7 +93,10 @@ fn ecn_v6_dualstack() { 0, )); let recv_v4 = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, recv_v6.port())); - for (src, dst) in [("[::1]:0", recv_v6), ("127.0.0.1:0", recv_v4)] { + for (src, dst) in [ + (SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 0), recv_v6), + (SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0), recv_v4), + ] { dbg!(src, dst); let send = UdpSocket::bind(src).unwrap(); let send = Socket::from(send); @@ -114,7 +117,7 @@ fn ecn_v6_dualstack() { } #[test] -#[cfg(not(any(target_os = "openbsd", target_os = "netbsd")))] +#[cfg(not(any(target_os = "openbsd", target_os = "netbsd", target_os = "solaris")))] fn ecn_v4_mapped_v6() { let send = socket2::Socket::new( socket2::Domain::IPV6, @@ -128,7 +131,7 @@ fn ecn_v4_mapped_v6() { )) .unwrap(); - let recv = UdpSocket::bind("127.0.0.1:0").unwrap(); + let recv = UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)).unwrap(); let recv = Socket::from(recv); let recv_v4_mapped_v6 = SocketAddr::V6(SocketAddrV6::new( Ipv4Addr::LOCALHOST.to_ipv6_mapped(), @@ -155,11 +158,11 @@ fn ecn_v4_mapped_v6() { #[test] #[cfg_attr(not(any(target_os = "linux", target_os = "windows")), ignore)] fn gso() { - let send = UdpSocket::bind("[::1]:0") - .or_else(|_| UdpSocket::bind("127.0.0.1:0")) + let send = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)) + .or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))) .unwrap(); - let recv = UdpSocket::bind("[::1]:0") - .or_else(|_| UdpSocket::bind("127.0.0.1:0")) + let recv = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)) + .or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))) .unwrap(); let max_segments = UdpSocketState::new((&send).into()) .unwrap()