Skip to content

Commit

Permalink
Auto merge of #101482 - joboet:netbsd_parker, r=sanxiyn
Browse files Browse the repository at this point in the history
Optimize thread parking on NetBSD

As the futex syscall is not present in the latest stable release, NetBSD cannot use the efficient thread parker and locks Linux uses. Currently, it therefore relies on a pthread-based parker, consisting of a mutex and semaphore which protect a state variable. NetBSD however has more efficient syscalls available: [`_lwp_park`](https://man.netbsd.org/_lwp_park.2) and [`_lwp_unpark`](https://man.netbsd.org/_lwp_unpark.2). These already provide the exact semantics of `thread::park` and `Thread::unpark`, but work with thread ids. In `std`, this ID is here stored in an atomic state variable, which is also used to optimize cases were the parking token is already available at the time `thread::park` is called.

r? `@m-ou-se`
  • Loading branch information
bors committed Sep 11, 2022
2 parents 54c4de6 + f8cae86 commit 585b3df
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 12 deletions.
21 changes: 21 additions & 0 deletions std/src/sys/unix/thread_parker/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
//! Thread parking on systems without futex support.

#![cfg(not(any(
target_os = "linux",
target_os = "android",
all(target_os = "emscripten", target_feature = "atomics"),
target_os = "freebsd",
target_os = "openbsd",
target_os = "dragonfly",
target_os = "fuchsia",
)))]

cfg_if::cfg_if! {
if #[cfg(target_os = "netbsd")] {
mod netbsd;
pub use netbsd::Parker;
} else {
mod pthread;
pub use pthread::Parker;
}
}
113 changes: 113 additions & 0 deletions std/src/sys/unix/thread_parker/netbsd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use crate::ffi::{c_int, c_void};
use crate::pin::Pin;
use crate::ptr::{null, null_mut};
use crate::sync::atomic::{
AtomicU64,
Ordering::{Acquire, Relaxed, Release},
};
use crate::time::Duration;
use libc::{_lwp_self, clockid_t, lwpid_t, time_t, timespec, CLOCK_MONOTONIC};

extern "C" {
fn ___lwp_park60(
clock_id: clockid_t,
flags: c_int,
ts: *mut timespec,
unpark: lwpid_t,
hint: *const c_void,
unparkhint: *const c_void,
) -> c_int;
fn _lwp_unpark(lwp: lwpid_t, hint: *const c_void) -> c_int;
}

/// The thread is not parked and the token is not available.
///
/// Zero cannot be a valid LWP id, since it is used as empty value for the unpark
/// argument in _lwp_park.
const EMPTY: u64 = 0;
/// The token is available. Do not park anymore.
const NOTIFIED: u64 = u64::MAX;

pub struct Parker {
/// The parker state. Contains either one of the two state values above or the LWP
/// id of the parked thread.
state: AtomicU64,
}

impl Parker {
pub unsafe fn new(parker: *mut Parker) {
parker.write(Parker { state: AtomicU64::new(EMPTY) })
}

// Does not actually need `unsafe` or `Pin`, but the pthread implementation does.
pub unsafe fn park(self: Pin<&Self>) {
// If the token has already been made available, we can skip
// a bit of work, so check for it here.
if self.state.load(Acquire) != NOTIFIED {
let parked = _lwp_self() as u64;
let hint = self.state.as_mut_ptr().cast();
if self.state.compare_exchange(EMPTY, parked, Relaxed, Acquire).is_ok() {
// Loop to guard against spurious wakeups.
loop {
___lwp_park60(0, 0, null_mut(), 0, hint, null());
if self.state.load(Acquire) == NOTIFIED {
break;
}
}
}
}

// At this point, the change to NOTIFIED has always been observed with acquire
// ordering, so we can just use a relaxed store here (instead of a swap).
self.state.store(EMPTY, Relaxed);
}

// Does not actually need `unsafe` or `Pin`, but the pthread implementation does.
pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
if self.state.load(Acquire) != NOTIFIED {
let parked = _lwp_self() as u64;
let hint = self.state.as_mut_ptr().cast();
let mut timeout = timespec {
// Saturate so that the operation will definitely time out
// (even if it is after the heat death of the universe).
tv_sec: dur.as_secs().try_into().ok().unwrap_or(time_t::MAX),
tv_nsec: dur.subsec_nanos().into(),
};

if self.state.compare_exchange(EMPTY, parked, Relaxed, Acquire).is_ok() {
// Timeout needs to be mutable since it is modified on NetBSD 9.0 and
// above.
___lwp_park60(CLOCK_MONOTONIC, 0, &mut timeout, 0, hint, null());
// Use a swap to get acquire ordering even if the token was set after
// the timeout occurred.
self.state.swap(EMPTY, Acquire);
return;
}
}

self.state.store(EMPTY, Relaxed);
}

// Does not actually need `Pin`, but the pthread implementation does.
pub fn unpark(self: Pin<&Self>) {
let state = self.state.swap(NOTIFIED, Release);
if !matches!(state, EMPTY | NOTIFIED) {
let lwp = state as lwpid_t;
let hint = self.state.as_mut_ptr().cast();

// If the parking thread terminated and did not actually park, this will
// probably return an error, which is OK. In the worst case, another
// thread has received the same LWP id. It will then receive a spurious
// wakeup, but those are allowable per the API contract. The same reasoning
// applies if a timeout occurred before this call, but the state was not
// yet reset.

// SAFETY:
// The syscall has no invariants to hold. Only unsafe because it is an
// extern function.
unsafe {
_lwp_unpark(lwp, hint);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,5 @@
//! Thread parking without `futex` using the `pthread` synchronization primitives.

#![cfg(not(any(
target_os = "linux",
target_os = "android",
all(target_os = "emscripten", target_feature = "atomics"),
target_os = "freebsd",
target_os = "openbsd",
target_os = "dragonfly",
target_os = "fuchsia",
)))]

use crate::cell::UnsafeCell;
use crate::marker::PhantomPinned;
use crate::pin::Pin;
Expand Down Expand Up @@ -59,8 +49,8 @@ unsafe fn wait_timeout(
target_os = "espidf"
))]
let (now, dur) = {
use super::time::SystemTime;
use crate::cmp::min;
use crate::sys::time::SystemTime;

// OSX implementation of `pthread_cond_timedwait` is buggy
// with super long durations. When duration is greater than
Expand All @@ -85,7 +75,7 @@ unsafe fn wait_timeout(
target_os = "espidf"
)))]
let (now, dur) = {
use super::time::Timespec;
use crate::sys::time::Timespec;

(Timespec::now(libc::CLOCK_MONOTONIC), dur)
};
Expand Down

0 comments on commit 585b3df

Please sign in to comment.