diff --git a/CHANGELOG.md b/CHANGELOG.md index ac0e3f8c69..04862464ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,9 @@ This project adheres to [Semantic Versioning](https://semver.org/). ### Changed +- Reimplemented sendmmsg/recvmmsg to avoid allocations and with better API + (#[1744](https://github.com/nix-rust/nix/pull/1744)) + - Rewrote the aio module. The new module: * Does more type checking at compile time rather than runtime. * Gives the caller control over whether and when to `Box` an aio operation. diff --git a/src/sys/socket/mod.rs b/src/sys/socket/mod.rs index ecbf30ad44..f654e05a95 100644 --- a/src/sys/socket/mod.rs +++ b/src/sys/socket/mod.rs @@ -581,15 +581,20 @@ macro_rules! cmsg_space { } #[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub struct RecvMsg<'a, S> { +/// Contains outcome of sending or receiving a message +/// +/// Use [`cmsgs`][RecvMsg::cmsgs] to access all the control messages present, and +/// [`iovs`][RecvMsg::iovs`] to access underlying io slices. +pub struct RecvMsg<'a, 's, S> { pub bytes: usize, cmsghdr: Option<&'a cmsghdr>, pub address: Option, pub flags: MsgFlags, + iobufs: std::marker::PhantomData<& 's()>, mhdr: msghdr, } -impl<'a, S> RecvMsg<'a, S> { +impl<'a, S> RecvMsg<'a, '_, S> { /// Iterate over the valid control messages pointed to by this /// msghdr. pub fn cmsgs(&self) -> CmsgIterator { @@ -1466,24 +1471,6 @@ pub fn sendmsg(fd: RawFd, iov: &[IoSlice<'_>], cmsgs: &[ControlMessage], Errno::result(ret).map(|r| r as usize) } -#[cfg(any( - target_os = "linux", - target_os = "android", - target_os = "freebsd", - target_os = "netbsd", -))] -#[derive(Debug)] -pub struct SendMmsgData<'a, I, C, S> - where - I: AsRef<[IoSlice<'a>]>, - C: AsRef<[ControlMessage<'a>]>, - S: SockaddrLike + 'a -{ - pub iov: I, - pub cmsgs: C, - pub addr: Option, - pub _lt: std::marker::PhantomData<&'a I>, -} /// An extension of `sendmsg` that allows the caller to transmit multiple /// messages on a socket using a single system call. This has performance @@ -1508,51 +1495,66 @@ pub struct SendMmsgData<'a, I, C, S> target_os = "freebsd", target_os = "netbsd", ))] -pub fn sendmmsg<'a, I, C, S>( +pub fn sendmmsg<'a, XS, AS, C, I, S>( fd: RawFd, - data: impl std::iter::IntoIterator>, + data: &'a mut MultiHeaders, + slices: XS, + // one address per group of slices + addrs: AS, + // shared across all the messages + cmsgs: C, flags: MsgFlags -) -> Result> +) -> crate::Result> where + XS: IntoIterator, + AS: AsRef<[Option]>, I: AsRef<[IoSlice<'a>]> + 'a, C: AsRef<[ControlMessage<'a>]> + 'a, S: SockaddrLike + 'a { - let iter = data.into_iter(); - let size_hint = iter.size_hint(); - let reserve_items = size_hint.1.unwrap_or(size_hint.0); + let mut count = 0; - let mut output = Vec::::with_capacity(reserve_items); - let mut cmsgs_buffers = Vec::>::with_capacity(reserve_items); + for (i, ((slice, addr), mmsghdr)) in slices.into_iter().zip(addrs.as_ref()).zip(data.items.iter_mut() ).enumerate() { + let mut p = &mut mmsghdr.msg_hdr; + p.msg_iov = slice.as_ref().as_ptr() as *mut libc::iovec; + p.msg_iovlen = slice.as_ref().len() as _; - for d in iter { - let capacity: usize = d.cmsgs.as_ref().iter().map(|c| c.space()).sum(); - let mut cmsgs_buffer = vec![0u8; capacity]; + p.msg_namelen = addr.as_ref().map_or(0, S::len); + p.msg_name = addr.as_ref().map_or(ptr::null(), S::as_ptr) as _; - output.push(libc::mmsghdr { - msg_hdr: pack_mhdr_to_send( - &mut cmsgs_buffer, - &d.iov, - &d.cmsgs, - d.addr.as_ref() - ), - msg_len: 0, - }); - cmsgs_buffers.push(cmsgs_buffer); - }; + // Encode each cmsg. This must happen after initializing the header because + // CMSG_NEXT_HDR and friends read the msg_control and msg_controllen fields. + // CMSG_FIRSTHDR is always safe + let mut pmhdr: *mut cmsghdr = unsafe { CMSG_FIRSTHDR(p) }; + for cmsg in cmsgs.as_ref() { + assert_ne!(pmhdr, ptr::null_mut()); + // Safe because we know that pmhdr is valid, and we initialized it with + // sufficient space + unsafe { cmsg.encode_into(pmhdr) }; + // Safe because mhdr is valid + pmhdr = unsafe { CMSG_NXTHDR(p, pmhdr) }; + } - let ret = unsafe { libc::sendmmsg(fd, output.as_mut_ptr(), output.len() as _, flags.bits() as _) }; + count = i+1; + } - let sent_messages = Errno::result(ret)? as usize; - let mut sent_bytes = Vec::with_capacity(sent_messages); + let sent = Errno::result(unsafe { + libc::sendmmsg( + fd, + data.items.as_mut_ptr(), + count as _, + flags.bits() as _ + ) + })? as usize; - for item in &output { - sent_bytes.push(item.msg_len as usize); - } + Ok(MultiResults { + rmm: data, + current_index: 0, + received: sent + }) - Ok(sent_bytes) } @@ -1563,134 +1565,338 @@ pub fn sendmmsg<'a, I, C, S>( target_os = "netbsd", ))] #[derive(Debug)] -pub struct RecvMmsgData<'a, I> +/// Preallocated structures needed for [`recvmmsg`] and [`sendmmsg`] functions +pub struct MultiHeaders { + // preallocated boxed slice of mmsghdr + items: Box<[libc::mmsghdr]>, + addresses: Box<[mem::MaybeUninit]>, + // while we are not using it directly - this is used to store control messages + // and we retain pointers to them inside items array + #[allow(dead_code)] + cmsg_buffers: Option>, + msg_controllen: usize, +} + +#[cfg(any( + target_os = "linux", + target_os = "android", + target_os = "freebsd", + target_os = "netbsd", +))] +impl MultiHeaders { + /// Preallocate structure used by [`recvmmsg`] and [`sendmmsg`] takes number of headers to preallocate + /// + /// `cmsg_buffer` should be created with [`cmsg_space!`] if needed + pub fn preallocate(num_slices: usize, cmsg_buffer: Option>) -> Self where - I: AsRef<[IoSliceMut<'a>]> + 'a, -{ - pub iov: I, - pub cmsg_buffer: Option<&'a mut Vec>, + S: Copy + SockaddrLike, + { + // we will be storing pointers to addresses inside mhdr - convert it into boxed + // slice so it can'be changed later by pushing anything into self.addresses + let mut addresses = vec![std::mem::MaybeUninit::uninit(); num_slices].into_boxed_slice(); + + let msg_controllen = cmsg_buffer.as_ref().map_or(0, |v| v.capacity()); + + // we'll need a cmsg_buffer for each slice, we preallocate a vector and split + // it into "slices" parts + let cmsg_buffers = + cmsg_buffer.map(|v| vec![0u8; v.capacity() * num_slices].into_boxed_slice()); + + let items = addresses + .iter_mut() + .enumerate() + .map(|(ix, address)| { + let (ptr, cap) = match &cmsg_buffers { + Some(v) => ((&v[ix * msg_controllen] as *const u8), msg_controllen), + None => (std::ptr::null(), 0), + }; + let msg_hdr = unsafe { pack_mhdr_to_receive(std::ptr::null(), 0, ptr, cap, address.as_mut_ptr()) }; + libc::mmsghdr { + msg_hdr, + msg_len: 0, + } + }) + .collect::>(); + + Self { + items: items.into_boxed_slice(), + addresses, + cmsg_buffers, + msg_controllen, + } + } } -/// An extension of `recvmsg` that allows the caller to receive multiple -/// messages from a socket using a single system call. This has -/// performance benefits for some applications. +/// An extension of recvmsg that allows the caller to receive multiple messages from a socket using a single system call. /// -/// `iov` and `cmsg_buffer` should be constructed similarly to `recvmsg` +/// This has performance benefits for some applications. /// -/// Multiple allocations are performed +/// This method performs no allocations. /// -/// # Arguments +/// Returns an iterator producing [`RecvMsg`], one per received messages. Each `RecvMsg` can produce +/// iterators over [`IoSlice`] with [`iovs`][RecvMsg::iovs`] and +/// `ControlMessageOwned` with [`cmsgs`][RecvMsg::cmsgs]. /// -/// * `fd`: Socket file descriptor -/// * `data`: Struct that implements `IntoIterator` with `RecvMmsgData` items -/// * `flags`: Optional flags passed directly to the operating system. +/// # Bugs (in underlying implementation, at least in Linux) +/// The timeout argument does not work as intended. The timeout is checked only after the receipt +/// of each datagram, so that if up to `vlen`-1 datagrams are received before the timeout expires, +/// but then no further datagrams are received, the call will block forever. /// -/// # RecvMmsgData -/// -/// * `iov`: Scatter-gather list of buffers to receive the message -/// * `cmsg_buffer`: Space to receive ancillary data. Should be created by -/// [`cmsg_space!`](../../macro.cmsg_space.html) -/// -/// # Returns -/// A `Vec` with multiple `RecvMsg`, one per received message -/// -/// # References -/// - [`recvmsg`](fn.recvmsg.html) -/// - [`RecvMsg`](struct.RecvMsg.html) +/// If an error occurs after at least one message has been received, the call succeeds, and returns +/// the number of messages received. The error code is expected to be returned on a subsequent +/// call to recvmmsg(). In the current implementation, however, the error code can be +/// overwritten in the meantime by an unrelated network event on a socket, for example an +/// incoming ICMP packet. #[cfg(any( target_os = "linux", target_os = "android", target_os = "freebsd", target_os = "netbsd", ))] -#[allow(clippy::needless_collect)] // Complicated false positive -pub fn recvmmsg<'a, I, S>( +pub fn recvmmsg<'a, XS, S, I>( fd: RawFd, - data: impl std::iter::IntoIterator, - IntoIter=impl ExactSizeIterator + Iterator>>, + data: &'a mut MultiHeaders, + slices: XS, flags: MsgFlags, - timeout: Option -) -> Result>> - where - I: AsRef<[IoSliceMut<'a>]> + 'a, - S: Copy + SockaddrLike + 'a + mut timeout: Option, +) -> crate::Result> +where + XS: IntoIterator, + I: AsRef<[IoSliceMut<'a>]> + 'a, { - let iter = data.into_iter(); + let mut count = 0; + for (i, (slice, mmsghdr)) in slices.into_iter().zip(data.items.iter_mut()).enumerate() { + let mut p = &mut mmsghdr.msg_hdr; + p.msg_iov = slice.as_ref().as_ptr() as *mut libc::iovec; + p.msg_iovlen = slice.as_ref().len() as _; + count = i + 1; + } - let num_messages = iter.len(); + let timeout_ptr = timeout + .as_mut() + .map_or_else(std::ptr::null_mut, |t| t as *mut _ as *mut libc::timespec); + + let received = Errno::result(unsafe { + libc::recvmmsg( + fd, + data.items.as_mut_ptr(), + count as _, + flags.bits() as _, + timeout_ptr, + ) + })? as usize; - let mut output: Vec = Vec::with_capacity(num_messages); + Ok(MultiResults { + rmm: data, + current_index: 0, + received, + }) +} - // Addresses should be pre-allocated. pack_mhdr_to_receive will store them - // as raw pointers, so we may not move them. Turn the vec into a boxed - // slice so we won't inadvertently reallocate the vec. - let mut addresses = vec![mem::MaybeUninit::uninit(); num_messages] - .into_boxed_slice(); +#[cfg(any( + target_os = "linux", + target_os = "android", + target_os = "freebsd", + target_os = "netbsd", +))] +#[derive(Debug)] +/// Iterator over results of [`recvmmsg`]/[`sendmmsg`] +/// +/// +pub struct MultiResults<'a, S> { + // preallocated structures + rmm: &'a MultiHeaders, + current_index: usize, + received: usize, +} - let results: Vec<_> = iter.enumerate().map(|(i, d)| { - let (msg_controllen, mhdr) = unsafe { - pack_mhdr_to_receive( - d.iov.as_ref(), - &mut d.cmsg_buffer, - addresses[i].as_mut_ptr(), +#[cfg(any( + target_os = "linux", + target_os = "android", + target_os = "freebsd", + target_os = "netbsd", +))] +impl<'a, S> Iterator for MultiResults<'a, S> +where + S: Copy + SockaddrLike, +{ + type Item = RecvMsg<'a, 'a, S>; + + fn next(&mut self) -> Option { + if self.current_index >= self.received { + return None; + } + let mmsghdr = self.rmm.items[self.current_index]; + + // as long as we are not reading past the index writen by recvmmsg - address + // will be initialized + let address = unsafe { self.rmm.addresses[self.current_index].assume_init() }; + + self.current_index += 1; + Some(unsafe { + read_mhdr( + mmsghdr.msg_hdr, + mmsghdr.msg_len as isize, + self.rmm.msg_controllen, + address, ) + }) + } +} + +impl<'a, S> RecvMsg<'_, 'a, S> { + /// Iterate over the filled io slices pointed by this msghdr + pub fn iovs(&self) -> IoSliceIterator<'a> { + IoSliceIterator { + index: 0, + remaining: self.bytes, + slices: unsafe { + // safe for as long as mgdr is properly initialized and references are valid. + // for multi messages API we initialize it with an empty + // slice and replace with a concrete buffer + // for single message API we hold a lifetime reference to ioslices + std::slice::from_raw_parts(self.mhdr.msg_iov as *const _, self.mhdr.msg_iovlen as _) + }, + } + } +} + +#[derive(Debug)] +pub struct IoSliceIterator<'a> { + index: usize, + remaining: usize, + slices: &'a [IoSlice<'a>], +} + +impl<'a> Iterator for IoSliceIterator<'a> { + type Item = &'a [u8]; + + fn next(&mut self) -> Option { + if self.index >= self.slices.len() { + return None; + } + let slice = &self.slices[self.index][..self.remaining.min(self.slices[self.index].len())]; + self.remaining -= slice.len(); + self.index += 1; + if slice.is_empty() { + return None; + } + + Some(slice) + } +} + +// test contains both recvmmsg and timestaping which is linux only +// there are existing tests for recvmmsg only in tests/ +#[cfg(target_os = "linux")] +#[cfg(test)] +mod test { + use crate::sys::socket::{AddressFamily, ControlMessageOwned}; + use crate::*; + use std::str::FromStr; + + #[cfg_attr(qemu, ignore)] + #[test] + fn test_recvmm2() -> crate::Result<()> { + use crate::sys::socket::{ + sendmsg, setsockopt, socket, sockopt::Timestamping, MsgFlags, SockFlag, SockType, + SockaddrIn, TimestampingFlag, }; + use std::io::{IoSlice, IoSliceMut}; - output.push( - libc::mmsghdr { - msg_hdr: mhdr, - msg_len: 0, - } - ); + let sock_addr = SockaddrIn::from_str("127.0.0.1:6790").unwrap(); - (msg_controllen as usize, &mut d.cmsg_buffer) - }).collect(); + let ssock = socket( + AddressFamily::Inet, + SockType::Datagram, + SockFlag::empty(), + None, + )?; - let timeout = if let Some(mut t) = timeout { - t.as_mut() as *mut libc::timespec - } else { - ptr::null_mut() - }; + let rsock = socket( + AddressFamily::Inet, + SockType::Datagram, + SockFlag::SOCK_NONBLOCK, + None, + )?; - let ret = unsafe { libc::recvmmsg(fd, output.as_mut_ptr(), output.len() as _, flags.bits() as _, timeout) }; + crate::sys::socket::bind(rsock, &sock_addr)?; - let _ = Errno::result(ret)?; + setsockopt(rsock, Timestamping, &TimestampingFlag::all())?; - Ok(output - .into_iter() - .take(ret as usize) - .zip(addresses.iter().map(|addr| unsafe{addr.assume_init()})) - .zip(results.into_iter()) - .map(|((mmsghdr, address), (msg_controllen, cmsg_buffer))| { - unsafe { - read_mhdr( - mmsghdr.msg_hdr, - mmsghdr.msg_len as isize, - msg_controllen, - address, - cmsg_buffer - ) + let sbuf = (0..400).map(|i| i as u8).collect::>(); + + let mut recv_buf = vec![0; 1024]; + + let mut recv_iovs = Vec::new(); + let mut pkt_iovs = Vec::new(); + + for (ix, chunk) in recv_buf.chunks_mut(256).enumerate() { + pkt_iovs.push(IoSliceMut::new(chunk)); + if ix % 2 == 1 { + recv_iovs.push(pkt_iovs); + pkt_iovs = Vec::new(); + } + } + drop(pkt_iovs); + + let flags = MsgFlags::empty(); + let iov1 = [IoSlice::new(&sbuf)]; + + let cmsg = cmsg_space!(crate::sys::socket::Timestamps); + sendmsg(ssock, &iov1, &[], flags, Some(&sock_addr)).unwrap(); + + let mut data = super::MultiHeaders::<()>::preallocate(recv_iovs.len(), Some(cmsg)); + + let t = sys::time::TimeSpec::from_duration(std::time::Duration::from_secs(10)); + + let recv = super::recvmmsg(rsock, &mut data, recv_iovs.iter(), flags, Some(t))?; + + for rmsg in recv { + #[cfg(not(qemu))] + let mut saw_time = false; + let mut recvd = 0; + for cmsg in rmsg.cmsgs() { + if let ControlMessageOwned::ScmTimestampsns(timestamps) = cmsg { + let ts = timestamps.system; + + let sys_time = + crate::time::clock_gettime(crate::time::ClockId::CLOCK_REALTIME)?; + let diff = if ts > sys_time { + ts - sys_time + } else { + sys_time - ts + }; + assert!(std::time::Duration::from(diff).as_secs() < 60); + #[cfg(not(qemu))] + { + saw_time = true; + } + } } - }) - .collect()) -} -unsafe fn read_mhdr<'a, 'b, S>( + #[cfg(not(qemu))] + assert!(saw_time); + + for iov in rmsg.iovs() { + recvd += iov.len(); + } + assert_eq!(recvd, 400); + } + + Ok(()) + } +} +unsafe fn read_mhdr<'a, 'i, S>( mhdr: msghdr, r: isize, msg_controllen: usize, address: S, - cmsg_buffer: &'a mut Option<&'b mut Vec> -) -> RecvMsg<'b, S> +) -> RecvMsg<'a, 'i, S> where S: SockaddrLike { let cmsghdr = { if mhdr.msg_controllen > 0 { - // got control message(s) - cmsg_buffer - .as_mut() - .unwrap() - .set_len(mhdr.msg_controllen as usize); debug_assert!(!mhdr.msg_control.is_null()); debug_assert!(msg_controllen >= mhdr.msg_controllen as usize); CMSG_FIRSTHDR(&mhdr as *const msghdr) @@ -1705,38 +1911,43 @@ unsafe fn read_mhdr<'a, 'b, S>( address: Some(address), flags: MsgFlags::from_bits_truncate(mhdr.msg_flags), mhdr, + iobufs: std::marker::PhantomData, } } -unsafe fn pack_mhdr_to_receive<'outer, 'inner, I, S>( - iov: I, - cmsg_buffer: &mut Option<&mut Vec>, +/// Pack pointers to various structures into into msghdr +/// +/// # Safety +/// `iov_buffer` and `iov_buffer_len` must point to a slice +/// of `IoSliceMut` and number of available elements or be a null pointer and 0 +/// +/// `cmsg_buffer` and `cmsg_capacity` must point to a byte buffer used +/// to store control headers later or be a null pointer and 0 if control +/// headers are not used +/// +/// Buffers must remain valid for the whole lifetime of msghdr +unsafe fn pack_mhdr_to_receive( + iov_buffer: *const IoSliceMut, + iov_buffer_len: usize, + cmsg_buffer: *const u8, + cmsg_capacity: usize, address: *mut S, -) -> (usize, msghdr) +) -> msghdr where - I: AsRef<[IoSliceMut<'inner>]> + 'outer, - S: SockaddrLike + 'outer + S: SockaddrLike { - let (msg_control, msg_controllen) = cmsg_buffer.as_mut() - .map(|v| (v.as_mut_ptr(), v.capacity())) - .unwrap_or((ptr::null_mut(), 0)); - - let mhdr = { - // Musl's msghdr has private fields, so this is the only way to - // initialize it. - let mut mhdr = mem::MaybeUninit::::zeroed(); - let p = mhdr.as_mut_ptr(); - (*p).msg_name = (*address).as_mut_ptr() as *mut c_void; - (*p).msg_namelen = S::size(); - (*p).msg_iov = iov.as_ref().as_ptr() as *mut iovec; - (*p).msg_iovlen = iov.as_ref().len() as _; - (*p).msg_control = msg_control as *mut c_void; - (*p).msg_controllen = msg_controllen as _; - (*p).msg_flags = 0; - mhdr.assume_init() - }; - - (msg_controllen, mhdr) + // Musl's msghdr has private fields, so this is the only way to + // initialize it. + let mut mhdr = mem::MaybeUninit::::zeroed(); + let p = mhdr.as_mut_ptr(); + (*p).msg_name = (*address).as_mut_ptr() as *mut c_void; + (*p).msg_namelen = S::size(); + (*p).msg_iov = iov_buffer as *mut iovec; + (*p).msg_iovlen = iov_buffer_len as _; + (*p).msg_control = cmsg_buffer as *mut c_void; + (*p).msg_controllen = cmsg_capacity as _; + (*p).msg_flags = 0; + mhdr.assume_init() } fn pack_mhdr_to_send<'a, I, C, S>( @@ -1808,20 +2019,24 @@ fn pack_mhdr_to_send<'a, I, C, S>( /// [recvmsg(2)](https://pubs.opengroup.org/onlinepubs/9699919799/functions/recvmsg.html) pub fn recvmsg<'a, 'outer, 'inner, S>(fd: RawFd, iov: &'outer mut [IoSliceMut<'inner>], mut cmsg_buffer: Option<&'a mut Vec>, - flags: MsgFlags) -> Result> - where S: SockaddrLike + 'a + flags: MsgFlags) -> Result> + where S: SockaddrLike + 'a, + 'inner: 'outer { let mut address = mem::MaybeUninit::uninit(); - let (msg_controllen, mut mhdr) = unsafe { - pack_mhdr_to_receive::<_, S>(iov, &mut cmsg_buffer, address.as_mut_ptr()) + let (msg_control, msg_controllen) = cmsg_buffer.as_mut() + .map(|v| (v.as_mut_ptr(), v.capacity())) + .unwrap_or((ptr::null_mut(), 0)); + let mut mhdr = unsafe { + pack_mhdr_to_receive(iov.as_ref().as_ptr(), iov.len(), msg_control, msg_controllen, address.as_mut_ptr()) }; let ret = unsafe { libc::recvmsg(fd, &mut mhdr, flags.bits()) }; let r = Errno::result(ret)?; - Ok(unsafe { read_mhdr(mhdr, r, msg_controllen, address.assume_init(), &mut cmsg_buffer) }) + Ok(unsafe { read_mhdr(mhdr, r, msg_controllen, address.assume_init()) }) } } diff --git a/test/sys/test_socket.rs b/test/sys/test_socket.rs index b4ca279d67..cec2170bbd 100644 --- a/test/sys/test_socket.rs +++ b/test/sys/test_socket.rs @@ -501,31 +501,31 @@ mod recvfrom { rsock, ssock, move |s, m, flags| { - let iov = [IoSlice::new(m)]; - let mut msgs = vec![SendMmsgData { - iov: &iov, - cmsgs: &[], - addr: Some(sock_addr), - _lt: Default::default(), - }]; - let batch_size = 15; + let mut iovs = Vec::with_capacity(1 + batch_size); + let mut addrs = Vec::with_capacity(1 + batch_size); + let mut data = MultiHeaders::preallocate(1 + batch_size, None); + let iov = IoSlice::new(m); + // first chunk: + iovs.push([iov]); + addrs.push(Some(sock_addr)); for _ in 0..batch_size { - msgs.push(SendMmsgData { - iov: &iov, - cmsgs: &[], - addr: Some(sock_addr2), - _lt: Default::default(), - }); + iovs.push([iov]); + addrs.push(Some(sock_addr2)); } - sendmmsg(s, msgs.iter(), flags).map(move |sent_bytes| { - assert!(!sent_bytes.is_empty()); - for sent in &sent_bytes { - assert_eq!(*sent, m.len()); - } - sent_bytes.len() - }) + + let res = sendmmsg(s, &mut data, &iovs, addrs, &[], flags)?; + let mut sent_messages = 0; + let mut sent_bytes = 0; + for item in res { + sent_messages += 1; + sent_bytes += item.bytes; + } + // + assert_eq!(sent_messages, iovs.len()); + assert_eq!(sent_bytes, sent_messages * m.len()); + Ok(sent_messages) }, |_, _| {}, ); @@ -577,21 +577,18 @@ mod recvfrom { // Buffers to receive exactly `NUM_MESSAGES_SENT` messages let mut receive_buffers = [[0u8; 32]; NUM_MESSAGES_SENT]; - let iovs: Vec<_> = receive_buffers - .iter_mut() - .map(|buf| [IoSliceMut::new(&mut buf[..])]) - .collect(); - - for iov in &iovs { - msgs.push_back(RecvMmsgData { - iov, - cmsg_buffer: None, - }) - } + msgs.extend( + receive_buffers + .iter_mut() + .map(|buf| [IoSliceMut::new(&mut buf[..])]), + ); + let mut data = + MultiHeaders::::preallocate(msgs.len(), None); let res: Vec> = - recvmmsg(rsock, &mut msgs, MsgFlags::empty(), None) - .expect("recvmmsg"); + recvmmsg(rsock, &mut data, msgs.iter(), MsgFlags::empty(), None) + .expect("recvmmsg") + .collect(); assert_eq!(res.len(), DATA.len()); for RecvMsg { address, bytes, .. } in res.into_iter() { @@ -655,21 +652,26 @@ mod recvfrom { // will return when there are fewer than requested messages in the // kernel buffers when using `MSG_DONTWAIT`. let mut receive_buffers = [[0u8; 32]; NUM_MESSAGES_SENT + 2]; - let iovs: Vec<_> = receive_buffers - .iter_mut() - .map(|buf| [IoSliceMut::new(&mut buf[..])]) - .collect(); + msgs.extend( + receive_buffers + .iter_mut() + .map(|buf| [IoSliceMut::new(&mut buf[..])]), + ); - for iov in &iovs { - msgs.push_back(RecvMmsgData { - iov, - cmsg_buffer: None, - }) - } + let mut data = MultiHeaders::::preallocate( + NUM_MESSAGES_SENT + 2, + None, + ); - let res: Vec> = - recvmmsg(rsock, &mut msgs, MsgFlags::MSG_DONTWAIT, None) - .expect("recvmmsg"); + let res: Vec> = recvmmsg( + rsock, + &mut data, + msgs.iter(), + MsgFlags::MSG_DONTWAIT, + None, + ) + .expect("recvmmsg") + .collect(); assert_eq!(res.len(), NUM_MESSAGES_SENT); for RecvMsg { address, bytes, .. } in res.into_iter() { @@ -2205,14 +2207,13 @@ fn test_recvmmsg_timestampns() { assert_eq!(message.len(), l); // Receive the message let mut buffer = vec![0u8; message.len()]; - let mut cmsgspace = nix::cmsg_space!(TimeSpec); - let iov = [IoSliceMut::new(&mut buffer)]; - let mut data = vec![RecvMmsgData { - iov, - cmsg_buffer: Some(&mut cmsgspace), - }]; + let cmsgspace = nix::cmsg_space!(TimeSpec); + let iov = vec![[IoSliceMut::new(&mut buffer)]]; + let mut data = MultiHeaders::preallocate(1, Some(cmsgspace)); let r: Vec> = - recvmmsg(in_socket, &mut data, flags, None).unwrap(); + recvmmsg(in_socket, &mut data, iov.iter(), flags, None) + .unwrap() + .collect(); let rtime = match r[0].cmsgs().next() { Some(ControlMessageOwned::ScmTimestampns(rtime)) => rtime, Some(_) => panic!("Unexpected control message"),