From 3236196d017e232824c86f0bec70a5174aeb14d8 Mon Sep 17 00:00:00 2001 From: Joel Date: Sat, 28 May 2022 13:19:37 +0200 Subject: [PATCH] Use a proper lock implementation (#214) * use a proper but reentrant safe lock impl * impl downgrade --- Cargo.toml | 1 + src/lock.rs | 306 ++++++++++++++++++++++++++++++++++++++++++++------- src/table.rs | 81 ++++++++++++++ 3 files changed, 347 insertions(+), 41 deletions(-) create mode 100644 src/table.rs diff --git a/Cargo.toml b/Cargo.toml index 7a89c2a3..c935ac69 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ raw-api = [] [dependencies] lock_api = "0.4.7" +parking_lot_core = "0.9.3" hashbrown = { version = "0.12.0", default-features = false } serde = { version = "1.0.136", optional = true, features = ["derive"] } cfg-if = "1.0.0" diff --git a/src/lock.rs b/src/lock.rs index 68dbba65..f2c98c76 100644 --- a/src/lock.rs +++ b/src/lock.rs @@ -1,76 +1,300 @@ -use lock_api::GuardSend; -use std::hint; -use std::mem; -use std::sync::atomic::{AtomicUsize, Ordering}; - -const USIZE_BITS: usize = mem::size_of::() * 8; -const EXCLUSIVE_BIT: usize = 1 << (USIZE_BITS - 1); +use core::sync::atomic::{AtomicUsize, Ordering}; +use parking_lot_core::{ParkToken, SpinWait, UnparkToken}; pub type RwLock = lock_api::RwLock; pub type RwLockReadGuard<'a, T> = lock_api::RwLockReadGuard<'a, RawRwLock, T>; pub type RwLockWriteGuard<'a, T> = lock_api::RwLockWriteGuard<'a, RawRwLock, T>; +const READERS_PARKED: usize = 0b0001; +const WRITERS_PARKED: usize = 0b0010; +const ONE_READER: usize = 0b0100; +const ONE_WRITER: usize = !(READERS_PARKED | WRITERS_PARKED); + pub struct RawRwLock { - data: AtomicUsize, + state: AtomicUsize, } unsafe impl lock_api::RawRwLock for RawRwLock { - type GuardMarker = GuardSend; - #[allow(clippy::declare_interior_mutable_const)] - const INIT: Self = RawRwLock { - data: AtomicUsize::new(0), + const INIT: Self = Self { + state: AtomicUsize::new(0), }; - fn lock_shared(&self) { - while !self.try_lock_shared() { - hint::spin_loop(); + type GuardMarker = lock_api::GuardNoSend; + + #[inline] + fn try_lock_exclusive(&self) -> bool { + self.state + .compare_exchange(0, ONE_WRITER, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + } + + #[inline] + fn lock_exclusive(&self) { + if self + .state + .compare_exchange_weak(0, ONE_WRITER, Ordering::Acquire, Ordering::Relaxed) + .is_err() + { + self.lock_exclusive_slow(); } } - fn try_lock_shared(&self) -> bool { - let x = self.data.load(Ordering::Acquire); - if x & EXCLUSIVE_BIT != 0 { - return false; + #[inline] + unsafe fn unlock_exclusive(&self) { + if self + .state + .compare_exchange(ONE_WRITER, 0, Ordering::Release, Ordering::Relaxed) + .is_err() + { + self.unlock_exclusive_slow(); } + } - let y = x + 1; - self.data - .compare_exchange(x, y, Ordering::Release, Ordering::Relaxed) - .is_ok() + #[inline] + fn try_lock_shared(&self) -> bool { + self.try_lock_shared_fast() || self.try_lock_shared_slow() } + #[inline] + fn lock_shared(&self) { + if !self.try_lock_shared_fast() { + self.lock_shared_slow(); + } + } + + #[inline] unsafe fn unlock_shared(&self) { - self.data.fetch_sub(1, Ordering::Release); + let state = self.state.fetch_sub(ONE_READER, Ordering::Release); + + if state == (ONE_READER | WRITERS_PARKED) { + self.unlock_shared_slow(); + } } +} - fn lock_exclusive(&self) { - while !self.try_lock_exclusive() { - hint::spin_loop(); +unsafe impl lock_api::RawRwLockDowngrade for RawRwLock { + #[inline] + unsafe fn downgrade(&self) { + let state = self + .state + .fetch_and(ONE_READER | WRITERS_PARKED, Ordering::Release); + if state & READERS_PARKED != 0 { + parking_lot_core::unpark_all((self as *const _ as usize) + 1, UnparkToken(0)); } } +} - fn try_lock_exclusive(&self) -> bool { - self.data - .compare_exchange(0, EXCLUSIVE_BIT, Ordering::Release, Ordering::Relaxed) - .is_ok() +impl RawRwLock { + #[cold] + fn lock_exclusive_slow(&self) { + let mut acquire_with = 0; + loop { + let mut spin = SpinWait::new(); + let mut state = self.state.load(Ordering::Relaxed); + + loop { + while state & ONE_WRITER == 0 { + match self.state.compare_exchange_weak( + state, + state | ONE_WRITER | acquire_with, + Ordering::Acquire, + Ordering::Relaxed, + ) { + Ok(_) => return, + Err(e) => state = e, + } + } + + if state & WRITERS_PARKED == 0 { + if spin.spin() { + state = self.state.load(Ordering::Relaxed); + continue; + } + + if let Err(e) = self.state.compare_exchange_weak( + state, + state | WRITERS_PARKED, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + state = e; + continue; + } + } + + let _ = unsafe { + parking_lot_core::park( + self as *const _ as usize, + || { + let state = self.state.load(Ordering::Relaxed); + (state & ONE_WRITER != 0) && (state & WRITERS_PARKED != 0) + }, + || {}, + |_, _| {}, + ParkToken(0), + None, + ) + }; + + acquire_with = WRITERS_PARKED; + break; + } + } } - unsafe fn unlock_exclusive(&self) { - self.data.store(0, Ordering::Release) + #[cold] + fn unlock_exclusive_slow(&self) { + let state = self.state.load(Ordering::Relaxed); + assert_eq!(state & ONE_WRITER, ONE_WRITER); + + let mut parked = state & (READERS_PARKED | WRITERS_PARKED); + assert_ne!(parked, 0); + + if parked != (READERS_PARKED | WRITERS_PARKED) { + if let Err(new_state) = + self.state + .compare_exchange(state, 0, Ordering::Release, Ordering::Relaxed) + { + assert_eq!(new_state, ONE_WRITER | READERS_PARKED | WRITERS_PARKED); + parked = READERS_PARKED | WRITERS_PARKED; + } + } + + if parked == (READERS_PARKED | WRITERS_PARKED) { + self.state.store(WRITERS_PARKED, Ordering::Release); + parked = READERS_PARKED; + } + + if parked == READERS_PARKED { + return unsafe { + parking_lot_core::unpark_all((self as *const _ as usize) + 1, UnparkToken(0)); + }; + } + + assert_eq!(parked, WRITERS_PARKED); + unsafe { + parking_lot_core::unpark_one(self as *const _ as usize, |_| UnparkToken(0)); + } } - fn is_locked(&self) -> bool { - self.data.load(Ordering::Acquire) != 0 + #[inline(always)] + fn try_lock_shared_fast(&self) -> bool { + let state = self.state.load(Ordering::Relaxed); + + if let Some(new_state) = state.checked_add(ONE_READER) { + if new_state & ONE_WRITER != ONE_WRITER { + return self + .state + .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed) + .is_ok(); + } + } + + false } - fn is_locked_exclusive(&self) -> bool { - self.data.load(Ordering::Acquire) & EXCLUSIVE_BIT != 0 + #[cold] + fn try_lock_shared_slow(&self) -> bool { + let mut state = self.state.load(Ordering::Relaxed); + + while let Some(new_state) = state.checked_add(ONE_READER) { + if new_state & ONE_WRITER == ONE_WRITER { + break; + } + + match self.state.compare_exchange_weak( + state, + new_state, + Ordering::Acquire, + Ordering::Relaxed, + ) { + Ok(_) => return true, + Err(e) => state = e, + } + } + + false } -} -unsafe impl lock_api::RawRwLockDowngrade for RawRwLock { - unsafe fn downgrade(&self) { - self.data.store(1, Ordering::SeqCst); + #[cold] + fn lock_shared_slow(&self) { + loop { + let mut spin = SpinWait::new(); + let mut state = self.state.load(Ordering::Relaxed); + + loop { + let mut backoff = SpinWait::new(); + while let Some(new_state) = state.checked_add(ONE_READER) { + assert_ne!( + new_state & ONE_WRITER, + ONE_WRITER, + "reader count overflowed", + ); + + if self + .state + .compare_exchange_weak( + state, + new_state, + Ordering::Acquire, + Ordering::Relaxed, + ) + .is_ok() + { + return; + } + + backoff.spin_no_yield(); + state = self.state.load(Ordering::Relaxed); + } + + if state & READERS_PARKED == 0 { + if spin.spin() { + state = self.state.load(Ordering::Relaxed); + continue; + } + + if let Err(e) = self.state.compare_exchange_weak( + state, + state | READERS_PARKED, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + state = e; + continue; + } + } + + let _ = unsafe { + parking_lot_core::park( + (self as *const _ as usize) + 1, + || { + let state = self.state.load(Ordering::Relaxed); + (state & ONE_WRITER == ONE_WRITER) && (state & READERS_PARKED != 0) + }, + || {}, + |_, _| {}, + ParkToken(0), + None, + ) + }; + + break; + } + } + } + + #[cold] + fn unlock_shared_slow(&self) { + if self + .state + .compare_exchange(WRITERS_PARKED, 0, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + unsafe { + parking_lot_core::unpark_one(self as *const _ as usize, |_| UnparkToken(0)); + } + } } } diff --git a/src/table.rs b/src/table.rs new file mode 100644 index 00000000..03acd76e --- /dev/null +++ b/src/table.rs @@ -0,0 +1,81 @@ +use super::u128::AtomicU128; +use std::borrow::Borrow; +use std::cell::UnsafeCell; +use std::hash::{BuildHasher, Hash, Hasher}; +use std::mem::MaybeUninit; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; + +const TOMBSTONE_BIT: u64 = 1 << 63; +const ALLOCATED_BIT: u64 = 1 << 62; +const POINTER_MASK: u64 = 0x3FFFFFFFFFFFFFFF; + +fn hash(hasher: &S, key: &K) -> u64 +where + S: BuildHasher, + K: Hash, +{ + let mut hasher = hasher.build_hasher(); + key.hash(&mut hasher); + hasher.finish() +} + +struct Slot { + data: AtomicU64, + pair: UnsafeCell>, +} + +pub struct Table { + hash: Arc, + slots: Box<[Slot]>, + mask: usize, +} + +impl Table +where + K: Eq + Hash, + S: BuildHasher, +{ + pub fn new(hasher: Arc, capacity: usize) -> Self { + debug_assert!(capacity.is_power_of_two()); + let slots = (0..capacity) + .map(|_| Slot { + data: AtomicU64::new(0), + pair: UnsafeCell::new(MaybeUninit::uninit()), + }) + .collect::>(); + + Table { + hash: hasher, + slots: slots.into_boxed_slice(), + mask: capacity - 1, + } + } + + pub fn get(&self, key: &Q) -> Option<*mut (K, V)> + where + K: Borrow, + Q: Eq + Hash, + { + let hash = hash(&*self.hash, key); + let mut idx = hash as usize & self.mask; + let mut i = 0; + + loop { + let slot = &self.slots[idx]; + let data = slot.data.load(Ordering::Relaxed); + let ptr = (data & POINTER_MASK) as *mut (K, V); + if !ptr.is_null() { + let stored = unsafe { (*ptr).0.borrow() }; + if stored == key { + return Some(ptr); + } + } else if data & TOMBSTONE_BIT != TOMBSTONE_BIT || i > self.mask { + return None; + } + + idx = (idx + 1) & self.mask; + i += 1; + } + } +}