diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f7855d380..e546a4e83a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ This project adheres to [Semantic Versioning](https://semver.org/). ## [Unreleased] - ReleaseDate ### Added + +- Added futex interface. + ([#1907](https://github.com/nix-rust/nix/pull/1907)) - Add `PF_ROUTE` to `SockType` on macOS, iOS, all of the BSDs, Fuchsia, Haiku, Illumos. ([#1867](https://github.com/nix-rust/nix/pull/1867)) - Added `nix::ucontext` module on `aarch64-unknown-linux-gnu`. diff --git a/src/sys/futex.rs b/src/sys/futex.rs new file mode 100644 index 0000000000..cec047a586 --- /dev/null +++ b/src/sys/futex.rs @@ -0,0 +1,314 @@ +use crate::{Errno, Result}; +use libc::{syscall, SYS_futex}; +use std::convert::TryFrom; +use std::os::unix::io::{FromRawFd, OwnedFd}; +use std::time::Duration; + +fn timespec(duration: Duration) -> libc::timespec { + let tv_sec = duration.as_secs().try_into().unwrap(); + let tv_nsec = duration.subsec_nanos().try_into().unwrap(); + libc::timespec { tv_sec, tv_nsec } +} + +fn unwrap_or_null(option: Option<&T>) -> *const T { + match option { + Some(t) => t, + None => std::ptr::null(), + } +} + +/// Fast user-space locking. +/// +/// By default we presume the futex is not process-private, that is, it is used across processes. If +/// you know it is process-private you can set `PRIVATE` to `true` which allows some additional +/// optimizations. +/// ``` +/// # use nix::{sys::futex::Futex,errno::Errno}; +/// # use std::time::{Instant, Duration}; +/// const TIMEOUT: Duration = Duration::from_millis(500); +/// const DELTA: Duration = Duration::from_millis(50); +/// let futex: Futex = Futex::new(0); +/// +/// // If the value of the futex is 0, wait for wake. Since the value is 0 and no wake occurs, +/// // we expect the timeout will pass. +/// { +/// let instant = Instant::now(); +/// assert_eq!(futex.wait(0, Some(TIMEOUT)),Err(Errno::ETIMEDOUT)); +/// assert!(instant.elapsed() > TIMEOUT); +/// } +/// // If the value of the futex is 1, wait for wake. Since the value is 0, not 1, this will +/// // return immediately. +/// { +/// let instant = Instant::now(); +/// assert_eq!(futex.wait(1, Some(TIMEOUT)),Err(Errno::EAGAIN)); +/// assert!(instant.elapsed() < DELTA); +/// } +/// +/// { +/// let futex = std::sync::Arc::new(futex); +/// let futex_clone = futex.clone(); +/// let instant = Instant::now(); +/// std::thread::spawn(move || { +/// std::thread::sleep(TIMEOUT); // Sleep for `timeout` +/// assert_eq!(futex_clone.wake(1),Ok(1)); // Wake 1 waiting thread +/// }); +/// assert_eq!(futex.wait(0, Some(2 * TIMEOUT)),Ok(())); // Wait on wake +/// assert!(instant.elapsed() > TIMEOUT && instant.elapsed() < TIMEOUT + DELTA); // Assert waited ~`timeout` +/// } +/// ``` +#[derive(Debug, Clone, Copy, Default)] +pub struct Futex(u32); +impl Futex { + const MASK: i32 = if PRIVATE { libc::FUTEX_PRIVATE_FLAG } else { 0 }; + /// Create new futex with word value of `val`. + pub fn new(val: u32) -> Self { + Self(val) + } + /// If the value of the futex: + /// - `== val`, the thread sleeps waiting for a [`Futex::wake`] call, in this case this thread + /// is considered a waiter on this futex. + /// - `!= val`, then `Err` with [`Errno::EAGAIN`] is immediately returned. + /// + /// If the timeout is: + /// - `Some(_)` it specifies a timeout for the wait. + /// - `None` it will block indefinitely. + /// + /// Wraps [`libc::FUTEX_WAIT`]. + pub fn wait(&self, val: u32, timeout: Option) -> Result<()> { + let timespec = timeout.map(timespec); + let timespec_ptr = unwrap_or_null(timespec.as_ref()); + + let res = unsafe { + syscall( + SYS_futex, + &self.0, + Self::MASK | libc::FUTEX_WAIT, + val, + timespec_ptr, + ) + }; + Errno::result(res).map(drop) + } + /// Wakes at most `val` waiters. + /// + /// - `val == 1` wakes a single waiter. + /// - `val == u32::MAX` wakes all waiters. + /// + /// No guarantee is provided about which waiters are awoken. A waiter with a higher scheduling + /// priority is not guaranteed to be awoken in preference to a waiter with a lower priority. + /// + /// Wraps [`libc::FUTEX_WAKE`]. + pub fn wake(&self, val: u32) -> Result { + let res = unsafe { + syscall(SYS_futex, &self.0, Self::MASK | libc::FUTEX_WAKE, val) + }; + Errno::result(res).map(|x| u32::try_from(x).unwrap()) + } + /// Creates a file descriptor associated with the futex. + /// + /// When [`Futex::wake`] is performed on the futex this file indicates being readable with + /// `select`, `poll` and `epoll`. + /// + /// The file descriptor can be used to obtain asynchronous notifications: if val is nonzero, + /// then, when another process or thread executes a FUTEX_WAKE, the caller will receive the + /// signal number that was passed in val. + /// + /// **Because it was inherently racy, this is unsupported from Linux 2.6.26 onward.** + /// + /// Wraps [`libc::FUTEX_FD`]. + pub fn fd(&self, val: u32) -> Result { + let res = unsafe { + syscall(SYS_futex, &self.0, Self::MASK | libc::FUTEX_WAKE, val) + }; + + // On a 32 bit arch `x` will be an `i32` and will trigger this lint. + #[allow(clippy::useless_conversion)] + Errno::result(res) + .map(|x| unsafe { OwnedFd::from_raw_fd(i32::try_from(x).unwrap()) }) + } + /// [`Futex::cmp_requeue`] without the check being made using `val3`. + /// + /// Wraps [`libc::FUTEX_REQUEUE`]. + pub fn requeue(&self, val: u32, val2: u32, uaddr2: &Self) -> Result { + let res = unsafe { + syscall( + SYS_futex, + &self.0, + Self::MASK | libc::FUTEX_CMP_REQUEUE, + val, + val2, + &uaddr2.0, + ) + }; + Errno::result(res).map(|x| u32::try_from(x).unwrap()) + } + /// Wakes `val` waiters, moving remaining (up to `val2`) waiters to `uaddr2`. + /// + /// If the value of this futex `== val3` returns `Err` with [`Errno::EAGAIN`]. + /// + /// Typical values to specify for `val` are `0` or `1` (Specifying `u32::MAX` makes the + /// [`Futex::cmp_requeue`] equivalent to [`Futex::wake`]). + /// + /// Typical values to specify for `val2` are `1` or `u32::MAX` (Specifying `0` makes + /// [`Futex::cmp_requeue`] equivalent to [`Futex::wait`]). + /// + /// Wraps [`libc::FUTEX_CMP_REQUEUE`]. + pub fn cmp_requeue( + &self, + val: u32, + val2: u32, + uaddr2: &Self, + val3: u32, + ) -> Result { + let res = unsafe { + syscall( + SYS_futex, + &self.0, + Self::MASK | libc::FUTEX_CMP_REQUEUE, + val, + val2, + &uaddr2.0, + val3, + ) + }; + Errno::result(res).map(|x| u32::try_from(x).unwrap()) + } + /// Wraps [`libc::FUTEX_WAKE_OP`]. + pub fn wake_op( + &self, + val: u32, + val2: u32, + uaddr2: &Self, + val3: u32, + ) -> Result { + let res = unsafe { + syscall( + SYS_futex, + &self.0, + Self::MASK | libc::FUTEX_WAKE_OP, + val, + val2, + &uaddr2.0, + val3, + ) + }; + Errno::result(res).map(|x| u32::try_from(x).unwrap()) + } + /// Wraps [`libc::FUTEX_WAIT_BITSET`]. + pub fn wait_bitset( + &self, + val: u32, + timeout: Option, + val3: u32, + ) -> Result<()> { + let timespec = timeout.map(timespec); + let timespec_ptr = unwrap_or_null(timespec.as_ref()); + + let res = unsafe { + syscall( + SYS_futex, + &self.0, + Self::MASK | libc::FUTEX_WAIT_BITSET, + val, + timespec_ptr, + val3, + ) + }; + Errno::result(res).map(drop) + } + /// Wraps [`libc::FUTEX_WAKE_BITSET`]. + pub fn wake_bitset(&self, val: u32, val3: u32) -> Result { + let res = unsafe { + syscall(SYS_futex, &self.0, libc::FUTEX_WAKE_BITSET, val, val3) + }; + Errno::result(res).map(|x| u32::try_from(x).unwrap()) + } + /// Wraps [`libc::FUTEX_LOCK_PI`]. + pub fn lock_pi(&self, timeout: Option) -> Result<()> { + let timespec = timeout.map(timespec); + let timespec_ptr = unwrap_or_null(timespec.as_ref()); + + let res = unsafe { + syscall( + SYS_futex, + &self.0, + Self::MASK | libc::FUTEX_LOCK_PI, + timespec_ptr, + ) + }; + Errno::result(res).map(drop) + } + /// Wraps [`libc::FUTEX_LOCK_PI2`]. + #[cfg(target_os = "linux")] + pub fn lock_pi2(&self, timeout: Option) -> Result<()> { + let timespec = timeout.map(timespec); + let timespec_ptr = unwrap_or_null(timespec.as_ref()); + + let res = unsafe { + syscall( + SYS_futex, + &self.0, + Self::MASK | libc::FUTEX_LOCK_PI2, + timespec_ptr, + ) + }; + Errno::result(res).map(drop) + } + /// Wraps [`libc::FUTEX_TRYLOCK_PI`]. + pub fn trylock_pi(&self) -> Result<()> { + let res = unsafe { + syscall(SYS_futex, &self.0, Self::MASK | libc::FUTEX_TRYLOCK_PI) + }; + Errno::result(res).map(drop) + } + /// `libc::FUTEX_UNLOCK_PI` + pub fn unlock_pi(&self) -> Result<()> { + let res = unsafe { + syscall(SYS_futex, &self.0, Self::MASK | libc::FUTEX_UNLOCK_PI) + }; + Errno::result(res).map(drop) + } + /// Wraps [`libc::FUTEX_CMP_REQUEUE_PI`]. + pub fn cmp_requeue_pi( + &self, + val: u32, + val2: u32, + uaddr2: &Self, + val3: u32, + ) -> Result { + let res = unsafe { + syscall( + SYS_futex, + &self.0, + Self::MASK | libc::FUTEX_CMP_REQUEUE_PI, + val, + val2, + &uaddr2.0, + val3, + ) + }; + Errno::result(res).map(|x| u32::try_from(x).unwrap()) + } + /// Wraps [`libc::FUTEX_WAIT_REQUEUE_PI`]. + pub fn wait_requeue_pi( + &self, + val: u32, + timeout: Option, + uaddr2: &Self, + ) -> Result<()> { + let timespec = timeout.map(timespec); + let timespec_ptr = unwrap_or_null(timespec.as_ref()); + + let res = unsafe { + syscall( + SYS_futex, + &self.0, + Self::MASK | libc::FUTEX_WAIT_REQUEUE_PI, + val, + timespec_ptr, + &uaddr2.0, + ) + }; + Errno::result(res).map(drop) + } +} diff --git a/src/sys/mod.rs b/src/sys/mod.rs index 2065059de8..0f9eb8d7cc 100644 --- a/src/sys/mod.rs +++ b/src/sys/mod.rs @@ -226,3 +226,7 @@ feature! { #![feature = "time"] pub mod timer; } + +/// Fast user-space locking. +#[cfg(any(target_os = "android", target_os = "linux"))] +pub mod futex;