From 2eb9ede6b0ff30779abdbb5fc428fcb352cf0722 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Tue, 31 Jul 2018 11:07:59 +0200 Subject: [PATCH 1/4] Use optimistic reads in AtomicCell --- benches/atomic_cell.rs | 159 ++++++++++++++++++++++++++++++++ src/atomic/atomic_cell.rs | 189 ++++++++++++++++++++++++++------------ 2 files changed, 288 insertions(+), 60 deletions(-) create mode 100755 benches/atomic_cell.rs diff --git a/benches/atomic_cell.rs b/benches/atomic_cell.rs new file mode 100755 index 0000000..ec7fef0 --- /dev/null +++ b/benches/atomic_cell.rs @@ -0,0 +1,159 @@ +#![feature(test)] + +extern crate crossbeam_utils; +extern crate test; + +use std::sync::Barrier; + +use crossbeam_utils::atomic::AtomicCell; +use crossbeam_utils::thread; + +#[bench] +fn load_u8(b: &mut test::Bencher) { + let a = AtomicCell::new(0u8); + let mut sum = 0; + b.iter(|| sum += a.load()); + test::black_box(sum); +} + +#[bench] +fn store_u8(b: &mut test::Bencher) { + let a = AtomicCell::new(0u8); + b.iter(|| a.store(1)); +} + +#[bench] +fn fetch_add_u8(b: &mut test::Bencher) { + let a = AtomicCell::new(0u8); + b.iter(|| a.fetch_add(1)); +} + +#[bench] +fn compare_and_swap_u8(b: &mut test::Bencher) { + let a = AtomicCell::new(0u8); + let mut i = 0; + b.iter(|| { + a.compare_and_swap(i, i.wrapping_add(1)); + i = i.wrapping_add(1); + }); +} + +#[bench] +fn concurrent_load_u8(b: &mut test::Bencher) { + const THREADS: usize = 2; + const STEPS: usize = 1_000_000; + + let start = Barrier::new(THREADS + 1); + let end = Barrier::new(THREADS + 1); + let exit = AtomicCell::new(false); + + let a = AtomicCell::new(0u8); + + thread::scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|| { + loop { + start.wait(); + + let mut sum = 0; + for _ in 0..STEPS { + sum += a.load(); + } + test::black_box(sum); + + end.wait(); + if exit.load() { + break; + } + } + }); + } + + start.wait(); + end.wait(); + + b.iter(|| { + start.wait(); + end.wait(); + }); + + start.wait(); + exit.store(true); + end.wait(); + }).unwrap(); +} + +#[bench] +fn load_usize(b: &mut test::Bencher) { + let a = AtomicCell::new(0usize); + let mut sum = 0; + b.iter(|| sum += a.load()); + test::black_box(sum); +} + +#[bench] +fn store_usize(b: &mut test::Bencher) { + let a = AtomicCell::new(0usize); + b.iter(|| a.store(1)); +} + +#[bench] +fn fetch_add_usize(b: &mut test::Bencher) { + let a = AtomicCell::new(0usize); + b.iter(|| a.fetch_add(1)); +} + +#[bench] +fn compare_and_swap_usize(b: &mut test::Bencher) { + let a = AtomicCell::new(0usize); + let mut i = 0; + b.iter(|| { + a.compare_and_swap(i, i.wrapping_add(1)); + i = i.wrapping_add(1); + }); +} + +#[bench] +fn concurrent_load_usize(b: &mut test::Bencher) { + const THREADS: usize = 2; + const STEPS: usize = 1_000_000; + + let start = Barrier::new(THREADS + 1); + let end = Barrier::new(THREADS + 1); + let exit = AtomicCell::new(false); + + let a = AtomicCell::new(0usize); + + thread::scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|| { + loop { + start.wait(); + + let mut sum = 0; + for _ in 0..STEPS { + sum += a.load(); + } + test::black_box(sum); + + end.wait(); + if exit.load() { + break; + } + } + }); + } + + start.wait(); + end.wait(); + + b.iter(|| { + start.wait(); + end.wait(); + }); + + start.wait(); + exit.store(true); + end.wait(); + }).unwrap(); +} diff --git a/src/atomic/atomic_cell.rs b/src/atomic/atomic_cell.rs index dcd2449..1ee45a8 100644 --- a/src/atomic/atomic_cell.rs +++ b/src/atomic/atomic_cell.rs @@ -3,7 +3,7 @@ use core::fmt; use core::mem; use core::ptr; use core::slice; -use core::sync::atomic::{self, AtomicBool, Ordering}; +use core::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering}; /// A thread-safe mutable memory location. /// @@ -252,7 +252,7 @@ macro_rules! impl_arithmetic { let a = unsafe { &*(self.value.get() as *const atomic::AtomicUsize) }; a.fetch_add(val as usize, Ordering::SeqCst) as $t } else { - let _lock = lock(self.value.get() as usize); + let _guard = lock(self.value.get() as usize).write(); let value = unsafe { &mut *(self.value.get()) }; let old = *value; *value = value.wrapping_add(val); @@ -280,7 +280,7 @@ macro_rules! impl_arithmetic { let a = unsafe { &*(self.value.get() as *const atomic::AtomicUsize) }; a.fetch_sub(val as usize, Ordering::SeqCst) as $t } else { - let _lock = lock(self.value.get() as usize); + let _guard = lock(self.value.get() as usize).write(); let value = unsafe { &mut *(self.value.get()) }; let old = *value; *value = value.wrapping_sub(val); @@ -306,7 +306,7 @@ macro_rules! impl_arithmetic { let a = unsafe { &*(self.value.get() as *const atomic::AtomicUsize) }; a.fetch_and(val as usize, Ordering::SeqCst) as $t } else { - let _lock = lock(self.value.get() as usize); + let _guard = lock(self.value.get() as usize).write(); let value = unsafe { &mut *(self.value.get()) }; let old = *value; *value = *value & val; @@ -332,7 +332,7 @@ macro_rules! impl_arithmetic { let a = unsafe { &*(self.value.get() as *const atomic::AtomicUsize) }; a.fetch_or(val as usize, Ordering::SeqCst) as $t } else { - let _lock = lock(self.value.get() as usize); + let _guard = lock(self.value.get() as usize).write(); let value = unsafe { &mut *(self.value.get()) }; let old = *value; *value = *value | val; @@ -358,7 +358,7 @@ macro_rules! impl_arithmetic { let a = unsafe { &*(self.value.get() as *const atomic::AtomicUsize) }; a.fetch_xor(val as usize, Ordering::SeqCst) as $t } else { - let _lock = lock(self.value.get() as usize); + let _guard = lock(self.value.get() as usize).write(); let value = unsafe { &mut *(self.value.get()) }; let old = *value; *value = *value ^ val; @@ -585,19 +585,97 @@ fn can_transmute() -> bool { mem::size_of::() == mem::size_of::() && mem::align_of::() >= mem::align_of::() } -/// Automatically releases a lock when dropped. -struct LockGuard { - lock: &'static AtomicBool, +/// A simple stamped lock. +struct Lock { + /// The current state of the lock. + /// + /// All bits except the least significant one hold the current stamp. When locked, the state + /// equals 1 and doesn't contain a valid stamp. + state: AtomicUsize, } -impl Drop for LockGuard { +impl Lock { + /// If not locked, returns the current stamp. + /// + /// This method should be called before optimistic reads. + #[inline] + fn optimistic_read(&self) -> Option { + let state = self.state.load(Ordering::Acquire); + if state == 1 { + None + } else { + Some(state) + } + } + + /// Returns `true` if the current stamp is equal to `stamp`. + /// + /// This method should be called after optimistic reads to check whether they are valid. The + /// argument `stamp` should correspond to the one returned by method `optimistic_read`. + #[inline] + fn validate_read(&self, stamp: usize) -> bool { + atomic::fence(Ordering::Acquire); + self.state.load(Ordering::SeqCst) == stamp + } + + /// Grabs the lock for writing. + #[inline] + fn write(&'static self) -> WriteGuard { + let mut step = 0usize; + + loop { + let previous = self.state.swap(1, Ordering::Acquire); + + if previous != 1 { + break WriteGuard { + lock: self, + state: previous, + }; + } + + if step < 5 { + // Just try again. + } else if step < 10 { + atomic::spin_loop_hint(); + } else { + #[cfg(not(feature = "use_std"))] + atomic::spin_loop_hint(); + + #[cfg(feature = "use_std")] + ::std::thread::yield_now(); + } + + step = step.wrapping_add(1); + } + } +} + +/// A RAII guard that releases the lock and increments the stamp when dropped. +struct WriteGuard { + /// The parent lock. + lock: &'static Lock, + + /// The stamp before locking. + state: usize, +} + +impl WriteGuard { + /// Releases the lock without incrementing the stamp. + #[inline] + fn abort(self) { + self.lock.state.store(self.state, Ordering::Release); + } +} + +impl Drop for WriteGuard { #[inline] fn drop(&mut self) { - self.lock.store(false, Ordering::Release); + // Release the lock and increment the stamp. + self.lock.state.store(self.state.wrapping_add(2), Ordering::Release); } } -/// Acquires the lock for atomic data stored at the given address. +/// Returns a reference to the global lock associated with the `AtomicCell` at address `addr`. /// /// This function is used to protect atomic data which doesn't fit into any of the primitive atomic /// types in `std::sync::atomic`. Operations on such atomics must therefore use a global lock. @@ -606,53 +684,22 @@ impl Drop for LockGuard { /// picked based on the given address. Having many locks reduces contention and improves /// scalability. #[inline] -fn lock(addr: usize) -> LockGuard { +#[must_use] +fn lock(addr: usize) -> &'static Lock { // The number of locks is prime. - const LEN: usize = 499; - - const A: AtomicBool = AtomicBool::new(false); - static LOCKS: [AtomicBool; LEN] = [ - A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, - A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, - A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, - A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, - A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, - A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, - A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, - A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, - A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, - A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, - A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, - A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, - A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, - A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, - A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, - A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, - A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, A, + const LEN: usize = 97; + + const L: Lock = Lock { state: AtomicUsize::new(0) }; + static LOCKS: [Lock; LEN] = [ + L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, + L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, + L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, + L, L, L, L, L, L, L, ]; // If the modulus is a constant number, the compiler will use crazy math to transform this into // a sequence of cheap arithmetic operations rather than using the slow modulo instruction. - let lock = &LOCKS[addr % LEN]; - - let mut step = 0usize; - - while lock.compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() { - if step < 5 { - // Just try again. - } else if step < 10 { - atomic::spin_loop_hint(); - } else { - #[cfg(not(feature = "use_std"))] - atomic::spin_loop_hint(); - - #[cfg(feature = "use_std")] - ::std::thread::yield_now(); - } - step = step.wrapping_add(1); - } - - LockGuard { lock } + &LOCKS[addr % LEN] } /// An atomic `()`. @@ -737,8 +784,26 @@ where mem::transmute_copy(&a.load(Ordering::SeqCst)) }, { - let _lock = lock(src as usize); - ptr::read(src) + let lock = lock(src as usize); + + // Try doing an optimistic read first. + if let Some(stamp) = lock.optimistic_read() { + // We need a volatile read here because other threads might concurrently modify the + // value. + let val = ptr::read_volatile(src); + + if lock.validate_read(stamp) { + return val; + } + mem::forget(val); + } + + // Grab a regular write lock so that writers don't starve this load. + let guard = lock.write(); + let val = ptr::read(src); + // The value hasn't been changed. Drop the guard without incrementing the stamp. + guard.abort(); + val } } } @@ -757,7 +822,7 @@ unsafe fn atomic_store(dst: *mut T, val: T) { res }, { - let _lock = lock(dst as usize); + let _guard = lock(dst as usize).write(); ptr::write(dst, val) } } @@ -777,7 +842,7 @@ unsafe fn atomic_swap(dst: *mut T, val: T) -> T { res }, { - let _lock = lock(dst as usize); + let _guard = lock(dst as usize).write(); ptr::replace(dst, val) } } @@ -810,11 +875,15 @@ where } }, { - let _lock = lock(dst as usize); + let guard = lock(dst as usize).write(); + if byte_eq(&*dst, ¤t) { Ok(ptr::replace(dst, new)) } else { - Err(ptr::read(dst)) + let val = ptr::read(dst); + // The value hasn't been changed. Drop the guard without incrementing the stamp. + guard.abort(); + Err(val) } } } From 937c38786bc28c8dda7ea29ee8b6975d42cc714c Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Tue, 31 Jul 2018 16:27:24 +0200 Subject: [PATCH 2/4] Always use spin_loop_hint --- src/atomic/atomic_cell.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/atomic/atomic_cell.rs b/src/atomic/atomic_cell.rs index 1ee45a8..3060ceb 100644 --- a/src/atomic/atomic_cell.rs +++ b/src/atomic/atomic_cell.rs @@ -633,9 +633,7 @@ impl Lock { }; } - if step < 5 { - // Just try again. - } else if step < 10 { + if step < 10 { atomic::spin_loop_hint(); } else { #[cfg(not(feature = "use_std"))] From 8b8d406f98913ea80683a4ec617e44a8130fe5cd Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Wed, 8 Aug 2018 15:36:30 +0200 Subject: [PATCH 3/4] Add a Release fence and relax the SeqCst load --- src/atomic/atomic_cell.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/atomic/atomic_cell.rs b/src/atomic/atomic_cell.rs index 3060ceb..111f479 100644 --- a/src/atomic/atomic_cell.rs +++ b/src/atomic/atomic_cell.rs @@ -615,7 +615,7 @@ impl Lock { #[inline] fn validate_read(&self, stamp: usize) -> bool { atomic::fence(Ordering::Acquire); - self.state.load(Ordering::SeqCst) == stamp + self.state.load(Ordering::Relaxed) == stamp } /// Grabs the lock for writing. @@ -627,7 +627,9 @@ impl Lock { let previous = self.state.swap(1, Ordering::Acquire); if previous != 1 { - break WriteGuard { + atomic::fence(Ordering::Release); + + return WriteGuard { lock: self, state: previous, }; From 85d9dce2d26908d641d55dc33e448b3616b55508 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Thu, 9 Aug 2018 19:12:46 +0200 Subject: [PATCH 4/4] Elaborate comments on volatile reads --- src/atomic/atomic_cell.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/atomic/atomic_cell.rs b/src/atomic/atomic_cell.rs index 111f479..95ed349 100644 --- a/src/atomic/atomic_cell.rs +++ b/src/atomic/atomic_cell.rs @@ -789,7 +789,10 @@ where // Try doing an optimistic read first. if let Some(stamp) = lock.optimistic_read() { // We need a volatile read here because other threads might concurrently modify the - // value. + // value. In theory, data races are *always* UB, even if we use volatile reads and + // discard the data when a data race is detected. The proper solution would be to + // do atomic reads and atomic writes, but we can't atomically read and write all + // kinds of data since `AtomicU8` is not available on stable Rust yet. let val = ptr::read_volatile(src); if lock.validate_read(stamp) {