From 4f98a68a58a998a0fbe5a46e1f2256db466a8d36 Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Sat, 6 Jan 2024 09:01:14 +0300 Subject: [PATCH 01/18] benches: add sync_broadcast benchmark Refs: #5465 --- benches/Cargo.toml | 5 +++ benches/sync_broadcast.rs | 82 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+) create mode 100644 benches/sync_broadcast.rs diff --git a/benches/Cargo.toml b/benches/Cargo.toml index 1eea2e04489..c581055cf65 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -26,6 +26,11 @@ name = "spawn" path = "spawn.rs" harness = false +[[bench]] +name = "sync_broadcast" +path = "sync_broadcast.rs" +harness = false + [[bench]] name = "sync_mpsc" path = "sync_mpsc.rs" diff --git a/benches/sync_broadcast.rs b/benches/sync_broadcast.rs new file mode 100644 index 00000000000..38a2141387b --- /dev/null +++ b/benches/sync_broadcast.rs @@ -0,0 +1,82 @@ +use rand::{Rng, RngCore, SeedableRng}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use tokio::sync::{broadcast, Notify}; + +use criterion::measurement::WallTime; +use criterion::{black_box, criterion_group, criterion_main, BenchmarkGroup, Criterion}; + +fn rt() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(6) + .build() + .unwrap() +} + +fn do_work(rng: &mut impl RngCore) -> u32 { + use std::fmt::Write; + let mut message = String::new(); + for i in 1..=10 { + let _ = write!(&mut message, " {i}={}", rng.gen::()); + } + message + .as_bytes() + .iter() + .map(|&c| c as u32) + .fold(0, u32::wrapping_add) +} + +fn contention_impl(g: &mut BenchmarkGroup) { + let rt = rt(); + + let (tx, _rx) = broadcast::channel::(1000); + let wg = Arc::new((AtomicUsize::new(0), Notify::new())); + + for n in 0..N_TASKS { + let wg = wg.clone(); + let mut rx = tx.subscribe(); + let mut rng = rand::rngs::StdRng::seed_from_u64(n as u64); + rt.spawn(async move { + while let Ok(_) = rx.recv().await { + let r = do_work(&mut rng); + let _ = black_box(r); + if wg.0.fetch_sub(1, Ordering::Relaxed) == 1 { + wg.1.notify_one(); + } + } + }); + } + + const N_ITERS: usize = 100; + + g.bench_function(N_TASKS.to_string(), |b| { + b.iter(|| { + rt.block_on({ + let wg = wg.clone(); + let tx = tx.clone(); + async move { + for i in 0..N_ITERS { + assert_eq!(wg.0.fetch_add(N_TASKS, Ordering::Relaxed), 0); + tx.send(i).unwrap(); + while wg.0.load(Ordering::Relaxed) > 0 { + wg.1.notified().await; + } + } + } + }) + }) + }); +} + +fn bench_contention(c: &mut Criterion) { + let mut group = c.benchmark_group("contention"); + contention_impl::<10>(&mut group); + contention_impl::<100>(&mut group); + contention_impl::<500>(&mut group); + contention_impl::<1000>(&mut group); + group.finish(); +} + +criterion_group!(contention, bench_contention); + +criterion_main!(contention); From cd43fc9227e5554fdb6c8573f79635b7cfb8d448 Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Wed, 10 Jan 2024 08:39:14 +0300 Subject: [PATCH 02/18] sync: reduce contention in broadcast channel Implement atomic linked list that allows pushing waiters concurrently, which reduces contention. Fixes: #5465 --- tokio/src/loom/std/mod.rs | 6 +- tokio/src/sync/broadcast.rs | 88 +++++++---- tokio/src/util/linked_list.rs | 276 ++++++++++++++++++++++++++++++---- 3 files changed, 306 insertions(+), 64 deletions(-) diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs index 0c611af162a..7d01c05a4e4 100644 --- a/tokio/src/loom/std/mod.rs +++ b/tokio/src/loom/std/mod.rs @@ -59,12 +59,14 @@ pub(crate) mod sync { #[cfg(all(feature = "parking_lot", not(miri)))] #[allow(unused_imports)] pub(crate) use crate::loom::std::parking_lot::{ - Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard, WaitTimeoutResult, + Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard, WaitTimeoutResult, }; #[cfg(not(all(feature = "parking_lot", not(miri))))] #[allow(unused_imports)] - pub(crate) use std::sync::{Condvar, MutexGuard, RwLock, RwLockReadGuard, WaitTimeoutResult}; + pub(crate) use std::sync::{ + Condvar, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard, WaitTimeoutResult, + }; #[cfg(not(all(feature = "parking_lot", not(miri))))] pub(crate) use crate::loom::std::mutex::Mutex; diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 568a50bd59b..d7c6d09507d 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -118,8 +118,8 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::AtomicUsize; -use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard}; -use crate::util::linked_list::{self, GuardedLinkedList, LinkedList}; +use crate::loom::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use crate::util::linked_list::{self, AtomicLinkedList, GuardedLinkedList}; use crate::util::WakeList; use std::fmt; @@ -310,7 +310,7 @@ struct Shared { mask: usize, /// Tail of the queue. Includes the rx wait list. - tail: Mutex, + tail: RwLock, /// Number of outstanding Sender handles. num_tx: AtomicUsize, @@ -328,7 +328,7 @@ struct Tail { closed: bool, /// Receivers waiting for a value. - waiters: LinkedList::Target>, + waiters: AtomicLinkedList::Target>, } /// Slot in the buffer. @@ -521,11 +521,11 @@ impl Sender { let shared = Arc::new(Shared { buffer: buffer.into_boxed_slice(), mask: capacity - 1, - tail: Mutex::new(Tail { + tail: RwLock::new(Tail { pos: 0, rx_cnt: receiver_count, closed: false, - waiters: LinkedList::new(), + waiters: AtomicLinkedList::new(), }), num_tx: AtomicUsize::new(1), }); @@ -585,7 +585,7 @@ impl Sender { /// } /// ``` pub fn send(&self, value: T) -> Result> { - let mut tail = self.shared.tail.lock(); + let mut tail = self.shared.tail.write().unwrap(); if tail.rx_cnt == 0 { return Err(SendError(value)); @@ -688,7 +688,7 @@ impl Sender { /// } /// ``` pub fn len(&self) -> usize { - let tail = self.shared.tail.lock(); + let tail = self.shared.tail.read().unwrap(); let base_idx = (tail.pos & self.shared.mask as u64) as usize; let mut low = 0; @@ -735,7 +735,7 @@ impl Sender { /// } /// ``` pub fn is_empty(&self) -> bool { - let tail = self.shared.tail.lock(); + let tail = self.shared.tail.read().unwrap(); let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize; self.shared.buffer[idx].read().unwrap().rem.load(SeqCst) == 0 @@ -778,7 +778,7 @@ impl Sender { /// } /// ``` pub fn receiver_count(&self) -> usize { - let tail = self.shared.tail.lock(); + let tail = self.shared.tail.read().unwrap(); tail.rx_cnt } @@ -806,7 +806,7 @@ impl Sender { } fn close_channel(&self) { - let mut tail = self.shared.tail.lock(); + let mut tail = self.shared.tail.write().unwrap(); tail.closed = true; self.shared.notify_rx(tail); @@ -815,7 +815,7 @@ impl Sender { /// Create a new `Receiver` which reads starting from the tail. fn new_receiver(shared: Arc>) -> Receiver { - let mut tail = shared.tail.lock(); + let mut tail = shared.tail.write().unwrap(); assert!(tail.rx_cnt != MAX_RECEIVERS, "max receivers"); @@ -842,7 +842,7 @@ impl<'a, T> Drop for WaitersList<'a, T> { // If the list is not empty, we unlink all waiters from it. // We do not wake the waiters to avoid double panics. if !self.is_empty { - let _lock_guard = self.shared.tail.lock(); + let _lock_guard = self.shared.tail.write().unwrap(); while self.list.pop_back().is_some() {} } } @@ -850,12 +850,12 @@ impl<'a, T> Drop for WaitersList<'a, T> { impl<'a, T> WaitersList<'a, T> { fn new( - unguarded_list: LinkedList::Target>, + unguarded_list: AtomicLinkedList::Target>, guard: Pin<&'a Waiter>, shared: &'a Shared, ) -> Self { let guard_ptr = NonNull::from(guard.get_ref()); - let list = unguarded_list.into_guarded(guard_ptr); + let list = unguarded_list.into_list().into_guarded(guard_ptr); WaitersList { list, is_empty: false, @@ -877,7 +877,7 @@ impl<'a, T> WaitersList<'a, T> { } impl Shared { - fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) { + fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: RwLockWriteGuard<'a, Tail>) { // It is critical for `GuardedLinkedList` safety that the guard node is // pinned in memory and is not dropped until the guarded list is dropped. let guard = Waiter::new(); @@ -925,7 +925,7 @@ impl Shared { wakers.wake_all(); // Acquire the lock again. - tail = self.tail.lock(); + tail = self.tail.write().unwrap(); } // Release the lock before waking. @@ -987,7 +987,7 @@ impl Receiver { /// } /// ``` pub fn len(&self) -> usize { - let next_send_pos = self.shared.tail.lock().pos; + let next_send_pos = self.shared.tail.read().unwrap().pos; (next_send_pos - self.next) as usize } @@ -1065,7 +1065,7 @@ impl Receiver { let mut old_waker = None; - let mut tail = self.shared.tail.lock(); + let tail = self.shared.tail.read().unwrap(); // Acquire slot lock again slot = self.shared.buffer[idx].read().unwrap(); @@ -1086,7 +1086,16 @@ impl Receiver { // Store the waker if let Some((waiter, waker)) = waiter { - // Safety: called while locked. + // Safety: called while holding a read lock on tail. + // It suffices since we only update two waiter members: + // - `waiter.waker` - all other accesses of this member are + // write-lock protected, + // - `waiter.queued` - all other accesses of this member are + // either write-lock protected or read-lock protected with + // exclusive reference to the `Recv` that contains the waiter. + // Concurrent calls to `recv_ref` with the same waiter + // are impossible because it implies ownership of the `Recv` + // that contains it. unsafe { // Only queue if not already queued waiter.with_mut(|ptr| { @@ -1106,6 +1115,11 @@ impl Receiver { if !(*ptr).queued { (*ptr).queued = true; + // Safety: + // - `waiter` is not already queued, + // - calling `recv_ref` with a waiter implies ownership + // of it's `Recv`. As such, this waiter cannot be pushed + // concurrently by some other thread. tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr)); } }); @@ -1331,7 +1345,7 @@ impl Receiver { impl Drop for Receiver { fn drop(&mut self) { - let mut tail = self.shared.tail.lock(); + let mut tail = self.shared.tail.write().unwrap(); tail.rx_cnt -= 1; let until = tail.pos; @@ -1402,22 +1416,34 @@ where impl<'a, T> Drop for Recv<'a, T> { fn drop(&mut self) { - // Acquire the tail lock. This is required for safety before accessing + // Acquire a read lock on tail. This is required for safety before accessing // the waiter node. - let mut tail = self.receiver.shared.tail.lock(); + let tail = self.receiver.shared.tail.read().unwrap(); - // safety: tail lock is held + // Safety: we hold read lock on tail AND have exclusive reference to `Recv`. let queued = self.waiter.with(|ptr| unsafe { (*ptr).queued }); if queued { - // Remove the node + // Optimistic check failed. To remove the waiter, we need a write lock. + drop(tail); + let mut tail = self.receiver.shared.tail.write().unwrap(); + + // Double check that the waiter is still enqueued, + // in case it was removed before we reacquired the lock. // - // safety: tail lock is held and the wait node is verified to be in - // the list. - unsafe { - self.waiter.with_mut(|ptr| { - tail.waiters.remove((&mut *ptr).into()); - }); + // Safety: tail write lock is held. + let queued = self.waiter.with(|ptr| unsafe { (*ptr).queued }); + + if queued { + // Remove the node. + // + // Safety: tail write lock is held and the wait node is verified to be in + // the list. + unsafe { + self.waiter.with_mut(|ptr| { + tail.waiters.remove((&mut *ptr).into()); + }); + } } } } diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index 0ed2b616456..08ef4a28904 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -11,6 +11,10 @@ use core::fmt; use core::marker::{PhantomData, PhantomPinned}; use core::mem::ManuallyDrop; use core::ptr::{self, NonNull}; +use core::sync::atomic::{ + AtomicPtr, + Ordering::{AcqRel, Relaxed}, +}; /// An intrusive linked list. /// @@ -108,6 +112,52 @@ struct PointersInner { unsafe impl Send for Pointers {} unsafe impl Sync for Pointers {} +// ===== LinkedListBase ===== + +// Common methods between LinkedList and AtomicLinkedList. +trait LinkedListBase { + // NB: exclusive reference is important for AtomicLinkedList safety guarantees. + fn head(&mut self) -> Option>; + fn tail(&mut self) -> Option>; + + fn set_head(&mut self, node: Option>); + fn set_tail(&mut self, node: Option>); + + unsafe fn remove(&mut self, node: NonNull) -> Option { + if let Some(prev) = L::pointers(node).as_ref().get_prev() { + debug_assert_eq!(L::pointers(prev).as_ref().get_next(), Some(node)); + L::pointers(prev) + .as_mut() + .set_next(L::pointers(node).as_ref().get_next()); + } else { + if self.head() != Some(node) { + return None; + } + + self.set_head(L::pointers(node).as_ref().get_next()); + } + + if let Some(next) = L::pointers(node).as_ref().get_next() { + debug_assert_eq!(L::pointers(next).as_ref().get_prev(), Some(node)); + L::pointers(next) + .as_mut() + .set_prev(L::pointers(node).as_ref().get_prev()); + } else { + // This might be the last item in the list + if self.tail() != Some(node) { + return None; + } + + self.set_tail(L::pointers(node).as_ref().get_prev()); + } + + L::pointers(node).as_mut().set_next(None); + L::pointers(node).as_mut().set_prev(None); + + Some(L::from_raw(node)) + } +} + // ===== impl LinkedList ===== impl LinkedList { @@ -121,6 +171,24 @@ impl LinkedList { } } +impl LinkedListBase for LinkedList { + fn head(&mut self) -> Option::Target>> { + self.head + } + + fn tail(&mut self) -> Option::Target>> { + self.tail + } + + fn set_head(&mut self, node: Option::Target>>) { + self.head = node; + } + + fn set_tail(&mut self, node: Option::Target>>) { + self.tail = node; + } +} + impl LinkedList { /// Adds an element first in the list. pub(crate) fn push_front(&mut self, val: L::Handle) { @@ -185,37 +253,7 @@ impl LinkedList { /// the caller has an exclusive access to that list. This condition is /// used by the linked list in `sync::Notify`. pub(crate) unsafe fn remove(&mut self, node: NonNull) -> Option { - if let Some(prev) = L::pointers(node).as_ref().get_prev() { - debug_assert_eq!(L::pointers(prev).as_ref().get_next(), Some(node)); - L::pointers(prev) - .as_mut() - .set_next(L::pointers(node).as_ref().get_next()); - } else { - if self.head != Some(node) { - return None; - } - - self.head = L::pointers(node).as_ref().get_next(); - } - - if let Some(next) = L::pointers(node).as_ref().get_next() { - debug_assert_eq!(L::pointers(next).as_ref().get_prev(), Some(node)); - L::pointers(next) - .as_mut() - .set_prev(L::pointers(node).as_ref().get_prev()); - } else { - // This might be the last item in the list - if self.tail != Some(node) { - return None; - } - - self.tail = L::pointers(node).as_ref().get_prev(); - } - - L::pointers(node).as_mut().set_next(None); - L::pointers(node).as_mut().set_prev(None); - - Some(L::from_raw(node)) + LinkedListBase::remove(self, node) } } @@ -313,6 +351,141 @@ cfg_taskdump! { } } +// ===== impl AtomicLinkedList ===== + +feature! { + #![feature = "sync"] + + /// An atomic intrusive linked list. It allows pushing new nodes concurrently. + /// Removing nodes still requires an exclusive reference. + pub(crate) struct AtomicLinkedList { + /// Linked list head. + head: AtomicPtr, + + /// Linked list tail. + tail: UnsafeCell>>, + + /// Node type marker. + _marker: PhantomData<*const L>, + } + + unsafe impl Send for AtomicLinkedList where L::Target: Send {} + unsafe impl Sync for AtomicLinkedList where L::Target: Sync {} + + impl Default for AtomicLinkedList { + fn default() -> Self { + Self::new() + } + } + + impl AtomicLinkedList { + /// Creates an empty atomic linked list. + pub(crate) const fn new() -> AtomicLinkedList { + AtomicLinkedList { + head: AtomicPtr::new(core::ptr::null_mut()), + tail: UnsafeCell::new(None), + _marker: PhantomData, + } + } + + /// Convert an atomic linked list into a non-atomic version. + pub(crate) fn into_list(mut self) -> LinkedList { + LinkedList { + head: NonNull::new(*self.head.get_mut()), + tail: *self.tail.get_mut(), + _marker: PhantomData, + } + } + } + + impl LinkedListBase for AtomicLinkedList { + fn head(&mut self) -> Option> { + NonNull::new(*self.head.get_mut()) + } + + fn tail(&mut self) -> Option> { + *self.tail.get_mut() + } + + fn set_head(&mut self, node: Option>) { + *self.head.get_mut() = match node { + Some(ptr) => ptr.as_ptr(), + None => core::ptr::null_mut(), + }; + } + + fn set_tail(&mut self, node: Option>) { + *self.tail.get_mut() = node; + } + } + + impl AtomicLinkedList { + /// Atomically adds an element first in the list. + /// This method can be called concurrently from multiple threads. + /// + /// # Safety + /// + /// The caller must ensure that: + /// - `val` is not pushed concurrently by muptiple threads, + /// - `val` is not already part of some list. + pub(crate) unsafe fn push_front(&self, val: L::Handle) { + // Note that removing nodes from the list still requires + // an exclusive reference, so we need not worry about + // concurrent node removals. + + // The value should not be dropped, it is being inserted into the list. + let val = ManuallyDrop::new(val); + let new_head = L::as_raw(&val); + + // Safety: due to the function contract, no concurrent `push_front` + // is called on this particular element, so we are safe to assume + // ownership. + L::pointers(new_head).as_mut().set_prev(None); + + let mut old_head = self.head.load(Relaxed); + loop { + // Safety: due to the function contract, no concurrent `push_front` + // is called on this particular element, and we have not + // inserted it into the list, so we can still assume ownership. + L::pointers(new_head).as_mut().set_next(NonNull::new(old_head)); + + let Err(actual_head) = self.head.compare_exchange_weak( + old_head, + new_head.as_ptr(), + AcqRel, + Relaxed, + ) else { + break; + }; + + old_head = actual_head; + } + + if old_head.is_null() { + // Safety: only the thread that successfully inserted the first + // element is granted the right to update tail. + *self.tail.get() = Some(new_head); + } else { + // Safety: + // 1. Only the thread that successfully inserted the new element + // is granted the right to update the previous head's `prev`, + // 2. Upon successfull insertion, we have synchronized with all the + // previous insertions (due to `AcqRel` memory ordering), so all + // the previous `set_prev` calls on `old_head` happen-before this call, + // 3. Due the `push_front` contract, we can assume that `old_head` + // is not pushed concurrently by another thread, as it is already + // in the list. Thus, no data race on `set_prev` can happen. + L::pointers(NonNull::new_unchecked(old_head)).as_mut().set_prev(Some(new_head)); + } + } + + /// See [LinkedList::remove]. + pub(crate) unsafe fn remove(&mut self, node: NonNull) -> Option { + LinkedListBase::remove(self, node) + } + } +} + // ===== impl GuardedLinkedList ===== feature! { @@ -797,4 +970,45 @@ pub(crate) mod tests { } } } + + #[cfg(feature = "sync")] + #[test] + fn atomic_push_front() { + use std::sync::Arc; + + let atomic_list = Arc::new(AtomicLinkedList::<&Entry, <&Entry as Link>::Target>::new()); + + let _entries = [5, 7] + .into_iter() + .map(|x| { + std::thread::spawn({ + let atomic_list = atomic_list.clone(); + move || { + let list_entry = entry(x); + unsafe { + atomic_list.push_front(list_entry.as_ref()); + } + list_entry + } + }) + }) + .collect::>() + .into_iter() + .map(|handle| handle.join().unwrap()) + .collect::>(); + + let mut list = Arc::into_inner(atomic_list).unwrap().into_list(); + + assert!(!list.is_empty()); + + let first = list.pop_back().unwrap(); + assert!(first.val == 5 || first.val == 7); + + let second = list.pop_back().unwrap(); + assert!(second.val == 5 || second.val == 7); + assert_ne!(first.val, second.val); + + assert!(list.is_empty()); + assert!(list.pop_back().is_none()); + } } From a00a99832a3cb884333097f0ea1cdffa7adeac39 Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Sun, 14 Jan 2024 16:18:53 +0300 Subject: [PATCH 03/18] Replace let-else with if-let for 1.63 compat --- tokio/src/util/linked_list.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index 08ef4a28904..eb174218ac9 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -449,16 +449,16 @@ feature! { // inserted it into the list, so we can still assume ownership. L::pointers(new_head).as_mut().set_next(NonNull::new(old_head)); - let Err(actual_head) = self.head.compare_exchange_weak( + if let Err(actual_head) = self.head.compare_exchange_weak( old_head, new_head.as_ptr(), AcqRel, Relaxed, - ) else { + ) { + old_head = actual_head; + } else { break; }; - - old_head = actual_head; } if old_head.is_null() { From be0d22030fbd62bd6e034c9b3912697047a2c7b0 Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Sun, 14 Jan 2024 16:38:06 +0300 Subject: [PATCH 04/18] Fix build errors --- tokio/src/util/linked_list.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index eb174218ac9..5cf34c850a0 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -11,10 +11,6 @@ use core::fmt; use core::marker::{PhantomData, PhantomPinned}; use core::mem::ManuallyDrop; use core::ptr::{self, NonNull}; -use core::sync::atomic::{ - AtomicPtr, - Ordering::{AcqRel, Relaxed}, -}; /// An intrusive linked list. /// @@ -356,6 +352,11 @@ cfg_taskdump! { feature! { #![feature = "sync"] + use core::sync::atomic::{ + AtomicPtr, + Ordering::{AcqRel, Relaxed}, + }; + /// An atomic intrusive linked list. It allows pushing new nodes concurrently. /// Removing nodes still requires an exclusive reference. pub(crate) struct AtomicLinkedList { @@ -971,7 +972,7 @@ pub(crate) mod tests { } } - #[cfg(feature = "sync")] + #[cfg(all(feature = "sync", not(target_os = "wasi")))] // Wasi doesn't support threads #[test] fn atomic_push_front() { use std::sync::Arc; From 3a2b35b19ac95667ab15062305aa2a83368bb032 Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Sun, 14 Jan 2024 19:02:19 +0300 Subject: [PATCH 05/18] Move AtomicLinkedList into separate file --- tokio/src/util/linked_list.rs | 192 ++------------------------- tokio/src/util/linked_list/atomic.rs | 189 ++++++++++++++++++++++++++ 2 files changed, 197 insertions(+), 184 deletions(-) create mode 100644 tokio/src/util/linked_list/atomic.rs diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index 5cf34c850a0..a742279339c 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -6,6 +6,11 @@ //! structure's APIs are `unsafe` as they require the caller to ensure the //! specified node is actually contained by the list. +#[cfg(feature = "sync")] +mod atomic; +#[cfg(feature = "sync")] +pub(crate) use self::atomic::AtomicLinkedList; + use core::cell::UnsafeCell; use core::fmt; use core::marker::{PhantomData, PhantomPinned}; @@ -347,146 +352,6 @@ cfg_taskdump! { } } -// ===== impl AtomicLinkedList ===== - -feature! { - #![feature = "sync"] - - use core::sync::atomic::{ - AtomicPtr, - Ordering::{AcqRel, Relaxed}, - }; - - /// An atomic intrusive linked list. It allows pushing new nodes concurrently. - /// Removing nodes still requires an exclusive reference. - pub(crate) struct AtomicLinkedList { - /// Linked list head. - head: AtomicPtr, - - /// Linked list tail. - tail: UnsafeCell>>, - - /// Node type marker. - _marker: PhantomData<*const L>, - } - - unsafe impl Send for AtomicLinkedList where L::Target: Send {} - unsafe impl Sync for AtomicLinkedList where L::Target: Sync {} - - impl Default for AtomicLinkedList { - fn default() -> Self { - Self::new() - } - } - - impl AtomicLinkedList { - /// Creates an empty atomic linked list. - pub(crate) const fn new() -> AtomicLinkedList { - AtomicLinkedList { - head: AtomicPtr::new(core::ptr::null_mut()), - tail: UnsafeCell::new(None), - _marker: PhantomData, - } - } - - /// Convert an atomic linked list into a non-atomic version. - pub(crate) fn into_list(mut self) -> LinkedList { - LinkedList { - head: NonNull::new(*self.head.get_mut()), - tail: *self.tail.get_mut(), - _marker: PhantomData, - } - } - } - - impl LinkedListBase for AtomicLinkedList { - fn head(&mut self) -> Option> { - NonNull::new(*self.head.get_mut()) - } - - fn tail(&mut self) -> Option> { - *self.tail.get_mut() - } - - fn set_head(&mut self, node: Option>) { - *self.head.get_mut() = match node { - Some(ptr) => ptr.as_ptr(), - None => core::ptr::null_mut(), - }; - } - - fn set_tail(&mut self, node: Option>) { - *self.tail.get_mut() = node; - } - } - - impl AtomicLinkedList { - /// Atomically adds an element first in the list. - /// This method can be called concurrently from multiple threads. - /// - /// # Safety - /// - /// The caller must ensure that: - /// - `val` is not pushed concurrently by muptiple threads, - /// - `val` is not already part of some list. - pub(crate) unsafe fn push_front(&self, val: L::Handle) { - // Note that removing nodes from the list still requires - // an exclusive reference, so we need not worry about - // concurrent node removals. - - // The value should not be dropped, it is being inserted into the list. - let val = ManuallyDrop::new(val); - let new_head = L::as_raw(&val); - - // Safety: due to the function contract, no concurrent `push_front` - // is called on this particular element, so we are safe to assume - // ownership. - L::pointers(new_head).as_mut().set_prev(None); - - let mut old_head = self.head.load(Relaxed); - loop { - // Safety: due to the function contract, no concurrent `push_front` - // is called on this particular element, and we have not - // inserted it into the list, so we can still assume ownership. - L::pointers(new_head).as_mut().set_next(NonNull::new(old_head)); - - if let Err(actual_head) = self.head.compare_exchange_weak( - old_head, - new_head.as_ptr(), - AcqRel, - Relaxed, - ) { - old_head = actual_head; - } else { - break; - }; - } - - if old_head.is_null() { - // Safety: only the thread that successfully inserted the first - // element is granted the right to update tail. - *self.tail.get() = Some(new_head); - } else { - // Safety: - // 1. Only the thread that successfully inserted the new element - // is granted the right to update the previous head's `prev`, - // 2. Upon successfull insertion, we have synchronized with all the - // previous insertions (due to `AcqRel` memory ordering), so all - // the previous `set_prev` calls on `old_head` happen-before this call, - // 3. Due the `push_front` contract, we can assume that `old_head` - // is not pushed concurrently by another thread, as it is already - // in the list. Thus, no data race on `set_prev` can happen. - L::pointers(NonNull::new_unchecked(old_head)).as_mut().set_prev(Some(new_head)); - } - } - - /// See [LinkedList::remove]. - pub(crate) unsafe fn remove(&mut self, node: NonNull) -> Option { - LinkedListBase::remove(self, node) - } - } -} - // ===== impl GuardedLinkedList ===== feature! { @@ -648,9 +513,9 @@ pub(crate) mod tests { #[derive(Debug)] #[repr(C)] - struct Entry { + pub(crate) struct Entry { pointers: Pointers, - val: i32, + pub(crate) val: i32, } unsafe impl<'a> Link for &'a Entry { @@ -670,7 +535,7 @@ pub(crate) mod tests { } } - fn entry(val: i32) -> Pin> { + pub(crate) fn entry(val: i32) -> Pin> { Box::pin(Entry { pointers: Pointers::new(), val, @@ -971,45 +836,4 @@ pub(crate) mod tests { } } } - - #[cfg(all(feature = "sync", not(target_os = "wasi")))] // Wasi doesn't support threads - #[test] - fn atomic_push_front() { - use std::sync::Arc; - - let atomic_list = Arc::new(AtomicLinkedList::<&Entry, <&Entry as Link>::Target>::new()); - - let _entries = [5, 7] - .into_iter() - .map(|x| { - std::thread::spawn({ - let atomic_list = atomic_list.clone(); - move || { - let list_entry = entry(x); - unsafe { - atomic_list.push_front(list_entry.as_ref()); - } - list_entry - } - }) - }) - .collect::>() - .into_iter() - .map(|handle| handle.join().unwrap()) - .collect::>(); - - let mut list = Arc::into_inner(atomic_list).unwrap().into_list(); - - assert!(!list.is_empty()); - - let first = list.pop_back().unwrap(); - assert!(first.val == 5 || first.val == 7); - - let second = list.pop_back().unwrap(); - assert!(second.val == 5 || second.val == 7); - assert_ne!(first.val, second.val); - - assert!(list.is_empty()); - assert!(list.pop_back().is_none()); - } } diff --git a/tokio/src/util/linked_list/atomic.rs b/tokio/src/util/linked_list/atomic.rs new file mode 100644 index 00000000000..e495092c945 --- /dev/null +++ b/tokio/src/util/linked_list/atomic.rs @@ -0,0 +1,189 @@ +use super::{Link, LinkedList, LinkedListBase}; + +use core::cell::UnsafeCell; +use core::marker::PhantomData; +use core::mem::ManuallyDrop; +use core::ptr::NonNull; +use core::sync::atomic::{ + AtomicPtr, + Ordering::{AcqRel, Relaxed}, +}; + +/// An atomic intrusive linked list. It allows pushing new nodes concurrently. +/// Removing nodes still requires an exclusive reference. +pub(crate) struct AtomicLinkedList { + /// Linked list head. + head: AtomicPtr, + + /// Linked list tail. + tail: UnsafeCell>>, + + /// Node type marker. + _marker: PhantomData<*const L>, +} + +unsafe impl Send for AtomicLinkedList where L::Target: Send {} +unsafe impl Sync for AtomicLinkedList where L::Target: Sync {} + +impl Default for AtomicLinkedList { + fn default() -> Self { + Self::new() + } +} + +impl AtomicLinkedList { + /// Creates an empty atomic linked list. + pub(crate) const fn new() -> AtomicLinkedList { + AtomicLinkedList { + head: AtomicPtr::new(core::ptr::null_mut()), + tail: UnsafeCell::new(None), + _marker: PhantomData, + } + } + + /// Convert an atomic linked list into a non-atomic version. + pub(crate) fn into_list(mut self) -> LinkedList { + LinkedList { + head: NonNull::new(*self.head.get_mut()), + tail: *self.tail.get_mut(), + _marker: PhantomData, + } + } +} + +impl LinkedListBase for AtomicLinkedList { + fn head(&mut self) -> Option> { + NonNull::new(*self.head.get_mut()) + } + + fn tail(&mut self) -> Option> { + *self.tail.get_mut() + } + + fn set_head(&mut self, node: Option>) { + *self.head.get_mut() = match node { + Some(ptr) => ptr.as_ptr(), + None => core::ptr::null_mut(), + }; + } + + fn set_tail(&mut self, node: Option>) { + *self.tail.get_mut() = node; + } +} + +impl AtomicLinkedList { + /// Atomically adds an element first in the list. + /// This method can be called concurrently from multiple threads. + /// + /// # Safety + /// + /// The caller must ensure that: + /// - `val` is not pushed concurrently by muptiple threads, + /// - `val` is not already part of some list. + pub(crate) unsafe fn push_front(&self, val: L::Handle) { + // Note that removing nodes from the list still requires + // an exclusive reference, so we need not worry about + // concurrent node removals. + + // The value should not be dropped, it is being inserted into the list. + let val = ManuallyDrop::new(val); + let new_head = L::as_raw(&val); + + // Safety: due to the function contract, no concurrent `push_front` + // is called on this particular element, so we are safe to assume + // ownership. + L::pointers(new_head).as_mut().set_prev(None); + + let mut old_head = self.head.load(Relaxed); + loop { + // Safety: due to the function contract, no concurrent `push_front` + // is called on this particular element, and we have not + // inserted it into the list, so we can still assume ownership. + L::pointers(new_head) + .as_mut() + .set_next(NonNull::new(old_head)); + + if let Err(actual_head) = + self.head + .compare_exchange_weak(old_head, new_head.as_ptr(), AcqRel, Relaxed) + { + old_head = actual_head; + } else { + break; + }; + } + + if old_head.is_null() { + // Safety: only the thread that successfully inserted the first + // element is granted the right to update tail. + *self.tail.get() = Some(new_head); + } else { + // Safety: + // 1. Only the thread that successfully inserted the new element + // is granted the right to update the previous head's `prev`, + // 2. Upon successfull insertion, we have synchronized with all the + // previous insertions (due to `AcqRel` memory ordering), so all + // the previous `set_prev` calls on `old_head` happen-before this call, + // 3. Due the `push_front` contract, we can assume that `old_head` + // is not pushed concurrently by another thread, as it is already + // in the list. Thus, no data race on `set_prev` can happen. + L::pointers(NonNull::new_unchecked(old_head)) + .as_mut() + .set_prev(Some(new_head)); + } + } + + /// See [LinkedList::remove]. + pub(crate) unsafe fn remove(&mut self, node: NonNull) -> Option { + LinkedListBase::remove(self, node) + } +} + +#[cfg(any(test, fuzzing))] +#[cfg(not(loom))] +pub(crate) mod tests { + use super::super::tests::*; + use super::*; + + #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads + #[test] + fn atomic_push_front() { + use std::sync::Arc; + + let atomic_list = Arc::new(AtomicLinkedList::<&Entry, <&Entry as Link>::Target>::new()); + + let _entries = [5, 7] + .into_iter() + .map(|x| { + std::thread::spawn({ + let atomic_list = atomic_list.clone(); + move || { + let list_entry = entry(x); + unsafe { + atomic_list.push_front(list_entry.as_ref()); + } + list_entry + } + }) + }) + .collect::>() + .into_iter() + .map(|handle| handle.join().unwrap()) + .collect::>(); + + let mut list = Arc::into_inner(atomic_list).unwrap().into_list(); + + assert!(!list.is_empty()); + + let first = list.pop_back().unwrap(); + assert!(first.val == 5 || first.val == 7); + + let second = list.pop_back().unwrap(); + assert!(second.val == 5 || second.val == 7); + assert_ne!(first.val, second.val); + + assert!(list.is_empty()); + assert!(list.pop_back().is_none()); + } +} From d30117cdaf55dc449838a2092a0cdab9216dcaa3 Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Sun, 14 Jan 2024 19:11:54 +0300 Subject: [PATCH 06/18] Fix wasi build errors --- tokio/src/util/linked_list/atomic.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio/src/util/linked_list/atomic.rs b/tokio/src/util/linked_list/atomic.rs index e495092c945..4358c4fd861 100644 --- a/tokio/src/util/linked_list/atomic.rs +++ b/tokio/src/util/linked_list/atomic.rs @@ -142,15 +142,15 @@ impl AtomicLinkedList { #[cfg(any(test, fuzzing))] #[cfg(not(loom))] +#[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads pub(crate) mod tests { use super::super::tests::*; use super::*; - #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads + use std::sync::Arc; + #[test] fn atomic_push_front() { - use std::sync::Arc; - let atomic_list = Arc::new(AtomicLinkedList::<&Entry, <&Entry as Link>::Target>::new()); let _entries = [5, 7] From 1ca92d06cf096144aaff905a83833776cbcadc5d Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Sun, 14 Jan 2024 19:17:12 +0300 Subject: [PATCH 07/18] Fix fuzzing build --- tokio/src/util/linked_list/atomic.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/util/linked_list/atomic.rs b/tokio/src/util/linked_list/atomic.rs index 4358c4fd861..3636ee7e104 100644 --- a/tokio/src/util/linked_list/atomic.rs +++ b/tokio/src/util/linked_list/atomic.rs @@ -140,7 +140,7 @@ impl AtomicLinkedList { } } -#[cfg(any(test, fuzzing))] +#[cfg(test)] #[cfg(not(loom))] #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads pub(crate) mod tests { From 74af0fe4b031eb28b7bfdf56e096266edf2e6ff8 Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Tue, 16 Jan 2024 08:34:36 +0300 Subject: [PATCH 08/18] Make waiter.queued atomic, use read lock in notify_rx --- tokio/src/loom/std/parking_lot.rs | 6 ++ tokio/src/sync/broadcast.rs | 139 +++++++++++++++++++++--------- 2 files changed, 102 insertions(+), 43 deletions(-) diff --git a/tokio/src/loom/std/parking_lot.rs b/tokio/src/loom/std/parking_lot.rs index 9b9a81d35b0..af36c5915f4 100644 --- a/tokio/src/loom/std/parking_lot.rs +++ b/tokio/src/loom/std/parking_lot.rs @@ -105,6 +105,12 @@ impl RwLock { } } +impl<'a, T> RwLockWriteGuard<'a, T> { + pub(crate) fn downgrade(s: Self) -> RwLockReadGuard<'a, T> { + RwLockReadGuard(PhantomData, parking_lot::RwLockWriteGuard::downgrade(s.1)) + } +} + impl<'a, T: ?Sized> Deref for RwLockReadGuard<'a, T> { type Target = T; fn deref(&self) -> &T { diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index d7c6d09507d..ebb7d39a161 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -127,7 +127,8 @@ use std::future::Future; use std::marker::PhantomPinned; use std::pin::Pin; use std::ptr::NonNull; -use std::sync::atomic::Ordering::SeqCst; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering::{Relaxed, SeqCst}; use std::task::{Context, Poll, Waker}; use std::usize; @@ -354,7 +355,7 @@ struct Slot { /// An entry in the wait queue. struct Waiter { /// True if queued. - queued: bool, + queued: AtomicBool, /// Task waiting on the broadcast channel. waker: Option, @@ -369,7 +370,7 @@ struct Waiter { impl Waiter { fn new() -> Self { Self { - queued: false, + queued: AtomicBool::new(false), waker: None, pointers: linked_list::Pointers::new(), _p: PhantomPinned, @@ -865,7 +866,7 @@ impl<'a, T> WaitersList<'a, T> { /// Removes the last element from the guarded list. Modifying this list /// requires an exclusive access to the main list in `Notify`. - fn pop_back_locked(&mut self, _tail: &mut Tail) -> Option> { + fn pop_back_locked(&mut self, _tail: &Tail) -> Option> { let result = self.list.pop_back(); if result.is_none() { // Save information about emptiness to avoid waiting for lock @@ -893,20 +894,42 @@ impl Shared { // guard node after this function returns / panics. let mut list = WaitersList::new(std::mem::take(&mut tail.waiters), guard.as_ref(), self); + // From now on, read lock suffices: we own our own copy of waiters list, + // and we only need to guard against concurrent waiter removals. + // Except us, waiter removals are done by `Recv::drop` and it takes + // a write lock to do it. + let mut tail = if cfg!(feature = "parking_lot") { + RwLockWriteGuard::downgrade(tail) + } else { + // Std does not support the downgrade API. + drop(tail); + self.tail.read().unwrap() + }; + let mut wakers = WakeList::new(); 'outer: loop { while wakers.can_push() { - match list.pop_back_locked(&mut tail) { + match list.pop_back_locked(&tail) { Some(mut waiter) => { - // Safety: `tail` lock is still held. - let waiter = unsafe { waiter.as_mut() }; - - assert!(waiter.queued); - waiter.queued = false; - - if let Some(waker) = waiter.waker.take() { + // Safety: except us, `waiter.waker` is accessed only + // by `Shared::recv_ref`. As this waiter is already + // queued, `Shared::recf_ref` would take a write lock. + let waker = unsafe { (*waiter.as_mut()).waker.take() }; + if let Some(waker) = waker { wakers.push(waker); } + + // Mark the waiter as not queued. + // It is critical to do it **after** the waker was extracted, + // otherwise we might data race with `Shared::recv_ref`. + // + // Safety: + // - Read lock on tail is held, so `waiter` cannot + // be concurrently removed, + // - `waiter.queued` is atomic, so read lock suffices. + let queued = unsafe { &(*waiter.as_ptr()).queued }; + let prev_queued = queued.swap(false, Relaxed); + assert!(prev_queued); } None => { break 'outer; @@ -925,7 +948,7 @@ impl Shared { wakers.wake_all(); // Acquire the lock again. - tail = self.tail.write().unwrap(); + tail = self.tail.read().unwrap(); } // Release the lock before waking. @@ -1084,18 +1107,40 @@ impl Receiver { return Err(TryRecvError::Closed); } + // We will might want to upgrade to a write lock. + let mut tail_read = Some(tail); + let mut tail_write = None; + // Store the waker if let Some((waiter, waker)) = waiter { - // Safety: called while holding a read lock on tail. - // It suffices since we only update two waiter members: - // - `waiter.waker` - all other accesses of this member are - // write-lock protected, - // - `waiter.queued` - all other accesses of this member are - // either write-lock protected or read-lock protected with - // exclusive reference to the `Recv` that contains the waiter. - // Concurrent calls to `recv_ref` with the same waiter - // are impossible because it implies ownership of the `Recv` - // that contains it. + let queued = waiter.with(|ptr| { + // Safety: waiter.queued is atomic. + unsafe { (*ptr).queued.load(Relaxed) } + }); + + // Release the slot lock before reacquiring tail locks + // to avoid a deadlock. + drop(slot); + + // If waiter is already queued, then a write lock on tail is required + // since other threads may try to mutate the waiter concurrently. + // If the waiter is not queued, we are the only owner now and + // read lock suffices. + let tail_ref: &Tail = if queued { + // TODO: this is sketchy, need do make sure that + // it is safe to drop all the locks here... + tail_read = None; + tail_write = Some(self.shared.tail.write().unwrap()); + tail_write.as_deref().unwrap() + } else { + tail_read.as_deref().unwrap() + }; + + // Safety: called while holding a lock on tail. + // If waiter is not queued, then we hold a read lock + // on tail and can safely mutate `waiter` since we + // are the only owner. + // If waiter is queued, then we hold a write lock on tail. unsafe { // Only queue if not already queued waiter.with_mut(|ptr| { @@ -1113,22 +1158,29 @@ impl Receiver { } } - if !(*ptr).queued { - (*ptr).queued = true; + // Technically, `queued` was fetched before we took + // a write. This is OK: if it was `false`, it cannot + // become `true`. If it was `true` and became `false` + // before we acquired the lock, we will just wake + // the waiter unnecessarily at some point in the future. + if !queued { + (*ptr).queued.store(true, Relaxed); // Safety: // - `waiter` is not already queued, // - calling `recv_ref` with a waiter implies ownership // of it's `Recv`. As such, this waiter cannot be pushed // concurrently by some other thread. - tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr)); + tail_ref + .waiters + .push_front(NonNull::new_unchecked(&mut *ptr)); } }); } } // Drop the old waker after releasing the locks. - drop(slot); - drop(tail); + drop(tail_read); + drop(tail_write); drop(old_waker); return Err(TryRecvError::Empty); @@ -1371,7 +1423,7 @@ impl<'a, T> Recv<'a, T> { Recv { receiver, waiter: UnsafeCell::new(Waiter { - queued: false, + queued: AtomicBool::new(false), waker: None, pointers: linked_list::Pointers::new(), _p: PhantomPinned, @@ -1416,23 +1468,24 @@ where impl<'a, T> Drop for Recv<'a, T> { fn drop(&mut self) { - // Acquire a read lock on tail. This is required for safety before accessing - // the waiter node. - let tail = self.receiver.shared.tail.read().unwrap(); - - // Safety: we hold read lock on tail AND have exclusive reference to `Recv`. - let queued = self.waiter.with(|ptr| unsafe { (*ptr).queued }); - + // Safety: `waiter.queued` is atomic. + let queued = self + .waiter + .with(|ptr| unsafe { (*ptr).queued.load(Relaxed) }); + + // If `queued` is false, it cannot become true again + // (concurrent calls to `Shared::recv_ref` with this + // waiter are impossible as they imply an exclusive + // reference to this `Recv`, which we now have). + // If `queued` is true, we need to take a write lock + // and check again. if queued { - // Optimistic check failed. To remove the waiter, we need a write lock. - drop(tail); let mut tail = self.receiver.shared.tail.write().unwrap(); - // Double check that the waiter is still enqueued, - // in case it was removed before we reacquired the lock. - // - // Safety: tail write lock is held. - let queued = self.waiter.with(|ptr| unsafe { (*ptr).queued }); + // Safety: `waiter.queued` is atomic. + let queued = self + .waiter + .with(|ptr| unsafe { (*ptr).queued.load(Relaxed) }); if queued { // Remove the node. From 7a3a75ad44c251dcf9db8e8d3c763b02d1bfbf0b Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Tue, 16 Jan 2024 09:11:38 +0300 Subject: [PATCH 09/18] Fix loom build, clippy warnings --- tokio/src/sync/broadcast.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index ebb7d39a161..d906b1177b7 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -898,7 +898,7 @@ impl Shared { // and we only need to guard against concurrent waiter removals. // Except us, waiter removals are done by `Recv::drop` and it takes // a write lock to do it. - let mut tail = if cfg!(feature = "parking_lot") { + let mut tail = if cfg!(all(feature = "parking_lot", not(loom))) { RwLockWriteGuard::downgrade(tail) } else { // Std does not support the downgrade API. @@ -914,7 +914,7 @@ impl Shared { // Safety: except us, `waiter.waker` is accessed only // by `Shared::recv_ref`. As this waiter is already // queued, `Shared::recf_ref` would take a write lock. - let waker = unsafe { (*waiter.as_mut()).waker.take() }; + let waker = unsafe { waiter.as_mut().waker.take() }; if let Some(waker) = waker { wakers.push(waker); } From 685e5d5d91dfa0ad8b060b22c21989a430ec1f7a Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Tue, 16 Jan 2024 09:24:38 +0300 Subject: [PATCH 10/18] Fix build --- tokio/src/sync/broadcast.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index d906b1177b7..22390200a33 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -898,7 +898,11 @@ impl Shared { // and we only need to guard against concurrent waiter removals. // Except us, waiter removals are done by `Recv::drop` and it takes // a write lock to do it. - let mut tail = if cfg!(all(feature = "parking_lot", not(loom))) { + let mut tail = if cfg!(all( + not(all(test, loom)), + feature = "parking_lot", + not(miri) + )) { RwLockWriteGuard::downgrade(tail) } else { // Std does not support the downgrade API. From 9ced45b4236a5c577b2223f70ce68455da09e125 Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Tue, 16 Jan 2024 09:29:45 +0300 Subject: [PATCH 11/18] Dont use downgrade method --- tokio/src/sync/broadcast.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 22390200a33..8a9b3b2fb02 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -898,6 +898,7 @@ impl Shared { // and we only need to guard against concurrent waiter removals. // Except us, waiter removals are done by `Recv::drop` and it takes // a write lock to do it. + /* let mut tail = if cfg!(all( not(all(test, loom)), feature = "parking_lot", @@ -909,6 +910,11 @@ impl Shared { drop(tail); self.tail.read().unwrap() }; + */ + + // Std does not support the downgrade API. + drop(tail); + let mut tail = self.tail.read().unwrap(); let mut wakers = WakeList::new(); 'outer: loop { From 96f713c3efe767bacb2ed2ca068e7deb2d75fce2 Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Tue, 16 Jan 2024 11:03:20 +0300 Subject: [PATCH 12/18] Fix a bug with memory orders --- tokio/src/sync/broadcast.rs | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 8a9b3b2fb02..5203a4cd21e 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -128,7 +128,7 @@ use std::marker::PhantomPinned; use std::pin::Pin; use std::ptr::NonNull; use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering::{Relaxed, SeqCst}; +use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst}; use std::task::{Context, Poll, Waker}; use std::usize; @@ -865,7 +865,7 @@ impl<'a, T> WaitersList<'a, T> { } /// Removes the last element from the guarded list. Modifying this list - /// requires an exclusive access to the main list in `Notify`. + /// requires a read lock on the main list. fn pop_back_locked(&mut self, _tail: &Tail) -> Option> { let result = self.list.pop_back(); if result.is_none() { @@ -888,7 +888,7 @@ impl Shared { // underneath to allow every waiter to safely remove itself from it. // // * This list will be still guarded by the `waiters` lock. - // `NotifyWaitersList` wrapper makes sure we hold the lock to modify it. + // `NotifyWaitersList` wrapper makes sure we hold a read lock to modify it. // * This wrapper will empty the list on drop. It is critical for safety // that we will not leave any list entry with a pointer to the local // guard node after this function returns / panics. @@ -923,7 +923,7 @@ impl Shared { Some(mut waiter) => { // Safety: except us, `waiter.waker` is accessed only // by `Shared::recv_ref`. As this waiter is already - // queued, `Shared::recf_ref` would take a write lock. + // queued, `Shared::recv_ref` would take a write lock. let waker = unsafe { waiter.as_mut().waker.take() }; if let Some(waker) = waker { wakers.push(waker); @@ -932,13 +932,16 @@ impl Shared { // Mark the waiter as not queued. // It is critical to do it **after** the waker was extracted, // otherwise we might data race with `Shared::recv_ref`. + // Release memory order is required to extablish a happens-before + // relationship between us writing to `waiter.waker` and + // `Receiver::recv_ref` accessing it. // // Safety: // - Read lock on tail is held, so `waiter` cannot // be concurrently removed, // - `waiter.queued` is atomic, so read lock suffices. let queued = unsafe { &(*waiter.as_ptr()).queued }; - let prev_queued = queued.swap(false, Relaxed); + let prev_queued = queued.swap(false, Release); assert!(prev_queued); } None => { @@ -1125,7 +1128,8 @@ impl Receiver { if let Some((waiter, waker)) = waiter { let queued = waiter.with(|ptr| { // Safety: waiter.queued is atomic. - unsafe { (*ptr).queued.load(Relaxed) } + // Acquire is needed to synchronize with `Shared::notify_rx`. + unsafe { (*ptr).queued.load(Acquire) } }); // Release the slot lock before reacquiring tail locks @@ -1168,13 +1172,11 @@ impl Receiver { } } - // Technically, `queued` was fetched before we took - // a write. This is OK: if it was `false`, it cannot - // become `true`. If it was `true` and became `false` - // before we acquired the lock, we will just wake - // the waiter unnecessarily at some point in the future. - if !queued { - (*ptr).queued.store(true, Relaxed); + // If the waiter is already queued, don't do anything. + // If not, enqueue it. + // Relaxed memory order suffices because, if `waiter` + // is shared, then we hold a write lock on tail. + if !(*ptr).queued.swap(true, Relaxed) { // Safety: // - `waiter` is not already queued, // - calling `recv_ref` with a waiter implies ownership @@ -1479,6 +1481,11 @@ where impl<'a, T> Drop for Recv<'a, T> { fn drop(&mut self) { // Safety: `waiter.queued` is atomic. + // Relaxed ordering is enough because, if `queued` is true, + // we will take a write lock on tail that provides the + // necessary synchronization. If `queued` is false, + // there is no way it can become true again and we + // simply don't do anything. let queued = self .waiter .with(|ptr| unsafe { (*ptr).queued.load(Relaxed) }); @@ -1493,6 +1500,7 @@ impl<'a, T> Drop for Recv<'a, T> { let mut tail = self.receiver.shared.tail.write().unwrap(); // Safety: `waiter.queued` is atomic. + // Relaxed order suffices because we hold a write lock on tail. let queued = self .waiter .with(|ptr| unsafe { (*ptr).queued.load(Relaxed) }); From 9d25ae2896fb335e56bcb9d5e23040a8de3a5cd6 Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Tue, 16 Jan 2024 11:04:48 +0300 Subject: [PATCH 13/18] Fix comments --- tokio/src/sync/broadcast.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 5203a4cd21e..184b42ac435 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -888,7 +888,7 @@ impl Shared { // underneath to allow every waiter to safely remove itself from it. // // * This list will be still guarded by the `waiters` lock. - // `NotifyWaitersList` wrapper makes sure we hold a read lock to modify it. + // `WaitersList` wrapper makes sure we hold a read lock to modify it. // * This wrapper will empty the list on drop. It is critical for safety // that we will not leave any list entry with a pointer to the local // guard node after this function returns / panics. @@ -922,8 +922,8 @@ impl Shared { match list.pop_back_locked(&tail) { Some(mut waiter) => { // Safety: except us, `waiter.waker` is accessed only - // by `Shared::recv_ref`. As this waiter is already - // queued, `Shared::recv_ref` would take a write lock. + // by `Receiver::recv_ref`. As this waiter is already + // queued, `Receiver::recv_ref` would take a write lock. let waker = unsafe { waiter.as_mut().waker.take() }; if let Some(waker) = waker { wakers.push(waker); @@ -931,7 +931,7 @@ impl Shared { // Mark the waiter as not queued. // It is critical to do it **after** the waker was extracted, - // otherwise we might data race with `Shared::recv_ref`. + // otherwise we might data race with `Receiver::recv_ref`. // Release memory order is required to extablish a happens-before // relationship between us writing to `waiter.waker` and // `Receiver::recv_ref` accessing it. From 6161c554da69b4040b07a5ae71a4ecf7f92daf81 Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Tue, 16 Jan 2024 17:46:56 +0300 Subject: [PATCH 14/18] Implement RwLock wrapper with downgrade method --- tokio/src/loom/std/mod.rs | 10 +++-- tokio/src/loom/std/parking_lot.rs | 16 +++++--- tokio/src/loom/std/rwlock.rs | 66 +++++++++++++++++++++++++++++++ tokio/src/sync/broadcast.rs | 24 ++--------- 4 files changed, 86 insertions(+), 30 deletions(-) create mode 100644 tokio/src/loom/std/rwlock.rs diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs index 7d01c05a4e4..7c68ea3ef3e 100644 --- a/tokio/src/loom/std/mod.rs +++ b/tokio/src/loom/std/mod.rs @@ -8,6 +8,7 @@ mod barrier; mod mutex; #[cfg(all(feature = "parking_lot", not(miri)))] mod parking_lot; +mod rwlock; mod unsafe_cell; pub(crate) mod cell { @@ -64,12 +65,13 @@ pub(crate) mod sync { #[cfg(not(all(feature = "parking_lot", not(miri))))] #[allow(unused_imports)] - pub(crate) use std::sync::{ - Condvar, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard, WaitTimeoutResult, - }; + pub(crate) use std::sync::{Condvar, MutexGuard, WaitTimeoutResult}; #[cfg(not(all(feature = "parking_lot", not(miri))))] - pub(crate) use crate::loom::std::mutex::Mutex; + pub(crate) use crate::loom::std::{ + mutex::Mutex, + rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard}, + }; pub(crate) mod atomic { pub(crate) use crate::loom::std::atomic_u16::AtomicU16; diff --git a/tokio/src/loom/std/parking_lot.rs b/tokio/src/loom/std/parking_lot.rs index af36c5915f4..41de799709f 100644 --- a/tokio/src/loom/std/parking_lot.rs +++ b/tokio/src/loom/std/parking_lot.rs @@ -105,12 +105,6 @@ impl RwLock { } } -impl<'a, T> RwLockWriteGuard<'a, T> { - pub(crate) fn downgrade(s: Self) -> RwLockReadGuard<'a, T> { - RwLockReadGuard(PhantomData, parking_lot::RwLockWriteGuard::downgrade(s.1)) - } -} - impl<'a, T: ?Sized> Deref for RwLockReadGuard<'a, T> { type Target = T; fn deref(&self) -> &T { @@ -118,6 +112,16 @@ impl<'a, T: ?Sized> Deref for RwLockReadGuard<'a, T> { } } +impl<'a, T> RwLockWriteGuard<'a, T> { + // The corresponding std method requires the rwlock. + pub(crate) fn downgrade(s: Self, _rwlock: &'a RwLock) -> LockResult> { + Ok(RwLockReadGuard( + PhantomData, + parking_lot::RwLockWriteGuard::downgrade(s.1), + )) + } +} + impl<'a, T: ?Sized> Deref for RwLockWriteGuard<'a, T> { type Target = T; fn deref(&self) -> &T { diff --git a/tokio/src/loom/std/rwlock.rs b/tokio/src/loom/std/rwlock.rs new file mode 100644 index 00000000000..900c55c15d1 --- /dev/null +++ b/tokio/src/loom/std/rwlock.rs @@ -0,0 +1,66 @@ +use std::{ + ops::{Deref, DerefMut}, + sync::{self, LockResult, PoisonError}, +}; + +/// Adapter for std::RwLock that adds `downgrade` method. +#[derive(Debug)] +pub(crate) struct RwLock(sync::RwLock); + +#[derive(Debug)] +pub(crate) struct RwLockReadGuard<'a, T>(sync::RwLockReadGuard<'a, T>); + +#[derive(Debug)] +pub(crate) struct RwLockWriteGuard<'a, T>(sync::RwLockWriteGuard<'a, T>); + +#[allow(dead_code)] +impl RwLock { + #[inline] + pub(crate) fn new(t: T) -> RwLock { + RwLock(sync::RwLock::new(t)) + } + + #[inline] + pub(crate) fn read(&self) -> LockResult> { + match self.0.read() { + Ok(inner) => Ok(RwLockReadGuard(inner)), + Err(err) => Err(PoisonError::new(RwLockReadGuard(err.into_inner()))), + } + } + + #[inline] + pub(crate) fn write(&self) -> LockResult> { + match self.0.write() { + Ok(inner) => Ok(RwLockWriteGuard(inner)), + Err(err) => Err(PoisonError::new(RwLockWriteGuard(err.into_inner()))), + } + } +} + +impl<'a, T> Deref for RwLockReadGuard<'a, T> { + type Target = T; + fn deref(&self) -> &T { + self.0.deref() + } +} + +impl<'a, T> RwLockWriteGuard<'a, T> { + pub(crate) fn downgrade(s: Self, rwlock: &'a RwLock) -> LockResult> { + // Std rwlock does not support downgrading. + drop(s); + rwlock.read() + } +} + +impl<'a, T> Deref for RwLockWriteGuard<'a, T> { + type Target = T; + fn deref(&self) -> &T { + self.0.deref() + } +} + +impl<'a, T> DerefMut for RwLockWriteGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + self.0.deref_mut() + } +} diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 184b42ac435..57a8b1f3335 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -898,23 +898,7 @@ impl Shared { // and we only need to guard against concurrent waiter removals. // Except us, waiter removals are done by `Recv::drop` and it takes // a write lock to do it. - /* - let mut tail = if cfg!(all( - not(all(test, loom)), - feature = "parking_lot", - not(miri) - )) { - RwLockWriteGuard::downgrade(tail) - } else { - // Std does not support the downgrade API. - drop(tail); - self.tail.read().unwrap() - }; - */ - - // Std does not support the downgrade API. - drop(tail); - let mut tail = self.tail.read().unwrap(); + let mut tail = RwLockWriteGuard::downgrade(tail, &self.tail).unwrap(); let mut wakers = WakeList::new(); 'outer: loop { @@ -932,15 +916,15 @@ impl Shared { // Mark the waiter as not queued. // It is critical to do it **after** the waker was extracted, // otherwise we might data race with `Receiver::recv_ref`. - // Release memory order is required to extablish a happens-before - // relationship between us writing to `waiter.waker` and - // `Receiver::recv_ref` accessing it. // // Safety: // - Read lock on tail is held, so `waiter` cannot // be concurrently removed, // - `waiter.queued` is atomic, so read lock suffices. let queued = unsafe { &(*waiter.as_ptr()).queued }; + // Release memory order is required to establish a happens-before + // relationship between us writing to `waiter.waker` and + // `Receiver::recv_ref` accessing it. let prev_queued = queued.swap(false, Release); assert!(prev_queued); } From bf9f690f258e0342b202a3d6dbf493aa33cb6b68 Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Tue, 16 Jan 2024 18:01:17 +0300 Subject: [PATCH 15/18] Add RwLock to loom wrappers --- tokio/src/loom/mocked.rs | 69 +++++++++++++++++++++++++++++++++++- tokio/src/loom/std/rwlock.rs | 1 + 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/tokio/src/loom/mocked.rs b/tokio/src/loom/mocked.rs index d40e2c1f8ea..0eba13f002b 100644 --- a/tokio/src/loom/mocked.rs +++ b/tokio/src/loom/mocked.rs @@ -1,12 +1,22 @@ pub(crate) use loom::*; pub(crate) mod sync { + use std::ops::{Deref, DerefMut}; - pub(crate) use loom::sync::MutexGuard; + pub(crate) use loom::sync::{LockResult, MutexGuard, PoisonError}; #[derive(Debug)] pub(crate) struct Mutex(loom::sync::Mutex); + #[derive(Debug)] + pub(crate) struct RwLock(loom::sync::RwLock); + + #[derive(Debug)] + pub(crate) struct RwLockReadGuard<'a, T>(loom::sync::RwLockReadGuard<'a, T>); + + #[derive(Debug)] + pub(crate) struct RwLockWriteGuard<'a, T>(loom::sync::RwLockWriteGuard<'a, T>); + #[allow(dead_code)] impl Mutex { #[inline] @@ -25,6 +35,63 @@ pub(crate) mod sync { self.0.try_lock().ok() } } + + #[allow(dead_code)] + impl RwLock { + #[inline] + pub(crate) fn new(t: T) -> RwLock { + RwLock(loom::sync::RwLock::new(t)) + } + + #[inline] + pub(crate) fn read(&self) -> LockResult> { + match self.0.read() { + Ok(inner) => Ok(RwLockReadGuard(inner)), + Err(err) => Err(PoisonError::new(RwLockReadGuard(err.into_inner()))), + } + } + + #[inline] + pub(crate) fn write(&self) -> LockResult> { + match self.0.write() { + Ok(inner) => Ok(RwLockWriteGuard(inner)), + Err(err) => Err(PoisonError::new(RwLockWriteGuard(err.into_inner()))), + } + } + } + + impl<'a, T> Deref for RwLockReadGuard<'a, T> { + type Target = T; + fn deref(&self) -> &T { + self.0.deref() + } + } + + #[allow(dead_code)] + impl<'a, T> RwLockWriteGuard<'a, T> { + pub(crate) fn downgrade( + s: Self, + rwlock: &'a RwLock, + ) -> LockResult> { + // Std rwlock does not support downgrading. + drop(s); + rwlock.read() + } + } + + impl<'a, T> Deref for RwLockWriteGuard<'a, T> { + type Target = T; + fn deref(&self) -> &T { + self.0.deref() + } + } + + impl<'a, T> DerefMut for RwLockWriteGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + self.0.deref_mut() + } + } + pub(crate) use loom::sync::*; pub(crate) mod atomic { diff --git a/tokio/src/loom/std/rwlock.rs b/tokio/src/loom/std/rwlock.rs index 900c55c15d1..f14f8ecea2b 100644 --- a/tokio/src/loom/std/rwlock.rs +++ b/tokio/src/loom/std/rwlock.rs @@ -44,6 +44,7 @@ impl<'a, T> Deref for RwLockReadGuard<'a, T> { } } +#[allow(dead_code)] impl<'a, T> RwLockWriteGuard<'a, T> { pub(crate) fn downgrade(s: Self, rwlock: &'a RwLock) -> LockResult> { // Std rwlock does not support downgrading. From 18d4d865e3c4baaee7cfdbd3cf24e2b1526a3a90 Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Tue, 16 Jan 2024 18:05:08 +0300 Subject: [PATCH 16/18] Fix loom imports --- tokio/src/loom/mocked.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tokio/src/loom/mocked.rs b/tokio/src/loom/mocked.rs index 0eba13f002b..5d96ab71f7d 100644 --- a/tokio/src/loom/mocked.rs +++ b/tokio/src/loom/mocked.rs @@ -1,9 +1,12 @@ pub(crate) use loom::*; pub(crate) mod sync { - use std::ops::{Deref, DerefMut}; + use std::{ + ops::{Deref, DerefMut}, + sync::{LockResult, PoisonError}, + }; - pub(crate) use loom::sync::{LockResult, MutexGuard, PoisonError}; + pub(crate) use loom::sync::MutexGuard; #[derive(Debug)] pub(crate) struct Mutex(loom::sync::Mutex); From 90b1c0e70bb7ea217d3af5b4924dd8fb97bc28a9 Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Tue, 16 Jan 2024 19:27:41 +0300 Subject: [PATCH 17/18] Fix another bug in memory orders --- tokio/src/sync/broadcast.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 57a8b1f3335..3852e7ebab8 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -924,7 +924,7 @@ impl Shared { let queued = unsafe { &(*waiter.as_ptr()).queued }; // Release memory order is required to establish a happens-before // relationship between us writing to `waiter.waker` and - // `Receiver::recv_ref` accessing it. + // `Receiver::recv_ref`/`Recv::drop` accessing it. let prev_queued = queued.swap(false, Release); assert!(prev_queued); } @@ -1465,14 +1465,11 @@ where impl<'a, T> Drop for Recv<'a, T> { fn drop(&mut self) { // Safety: `waiter.queued` is atomic. - // Relaxed ordering is enough because, if `queued` is true, - // we will take a write lock on tail that provides the - // necessary synchronization. If `queued` is false, - // there is no way it can become true again and we - // simply don't do anything. + // Acquire ordering is required to synchronize with + // `Shared::notify_rx` before we drop the object. let queued = self .waiter - .with(|ptr| unsafe { (*ptr).queued.load(Relaxed) }); + .with(|ptr| unsafe { (*ptr).queued.load(Acquire) }); // If `queued` is false, it cannot become true again // (concurrent calls to `Shared::recv_ref` with this From 4b45a200fbf454d9124bf6480c0e2896e7f52564 Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Thu, 18 Jan 2024 08:19:05 +0300 Subject: [PATCH 18/18] Rename Atomic -> ConcurrentPush, fix tail locking in recv_ref --- tokio/src/sync/broadcast.rs | 82 +++++++++---------- tokio/src/util/linked_list.rs | 6 +- .../{atomic.rs => concurrent_push.rs} | 43 ++++++---- 3 files changed, 68 insertions(+), 63 deletions(-) rename tokio/src/util/linked_list/{atomic.rs => concurrent_push.rs} (77%) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 3852e7ebab8..dab58ad49a5 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -119,7 +119,7 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; -use crate::util::linked_list::{self, AtomicLinkedList, GuardedLinkedList}; +use crate::util::linked_list::{self, ConcurrentPushLinkedList, GuardedLinkedList}; use crate::util::WakeList; use std::fmt; @@ -329,7 +329,7 @@ struct Tail { closed: bool, /// Receivers waiting for a value. - waiters: AtomicLinkedList::Target>, + waiters: ConcurrentPushLinkedList::Target>, } /// Slot in the buffer. @@ -526,7 +526,7 @@ impl Sender { pos: 0, rx_cnt: receiver_count, closed: false, - waiters: AtomicLinkedList::new(), + waiters: ConcurrentPushLinkedList::new(), }), num_tx: AtomicUsize::new(1), }); @@ -851,7 +851,7 @@ impl<'a, T> Drop for WaitersList<'a, T> { impl<'a, T> WaitersList<'a, T> { fn new( - unguarded_list: AtomicLinkedList::Target>, + unguarded_list: ConcurrentPushLinkedList::Target>, guard: Pin<&'a Waiter>, shared: &'a Shared, ) -> Self { @@ -1075,24 +1075,46 @@ impl Receiver { if slot.pos != self.next { // Release the `slot` lock before attempting to acquire the `tail` - // lock. This is required because `send2` acquires the tail lock + // lock. This is required because `send` acquires the tail lock // first followed by the slot lock. Acquiring the locks in reverse // order here would result in a potential deadlock: `recv_ref` // acquires the `slot` lock and attempts to acquire the `tail` lock - // while `send2` acquired the `tail` lock and attempts to acquire + // while `send` acquired the `tail` lock and attempts to acquire // the slot lock. drop(slot); let mut old_waker = None; - let tail = self.shared.tail.read().unwrap(); + let queued = waiter + .map(|(waiter, _)| { + waiter.with(|ptr| { + // Safety: waiter.queued is atomic. + // Acquire is needed to synchronize with `Shared::notify_rx`. + unsafe { (*ptr).queued.load(Acquire) } + }) + }) + .unwrap_or(false); + + // If `queued` is false, then we are the sole owner if the waiter, + // so read lock on tail suffices. + // If `queued` is true, the waiter might be accessed concurrently, + // so we need a write lock. + let mut tail_read = None; + let mut tail_write = None; + let tail = if queued { + tail_write = Some(self.shared.tail.write().unwrap()); + tail_write.as_deref().unwrap() + } else { + tail_read = Some(self.shared.tail.read().unwrap()); + tail_read.as_deref().unwrap() + }; // Acquire slot lock again slot = self.shared.buffer[idx].read().unwrap(); // Make sure the position did not change. This could happen in the // unlikely event that the buffer is wrapped between dropping the - // read lock and acquiring the tail lock. + // slot lock and acquiring the tail lock. if slot.pos != self.next { let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64); @@ -1104,36 +1126,8 @@ impl Receiver { return Err(TryRecvError::Closed); } - // We will might want to upgrade to a write lock. - let mut tail_read = Some(tail); - let mut tail_write = None; - // Store the waker if let Some((waiter, waker)) = waiter { - let queued = waiter.with(|ptr| { - // Safety: waiter.queued is atomic. - // Acquire is needed to synchronize with `Shared::notify_rx`. - unsafe { (*ptr).queued.load(Acquire) } - }); - - // Release the slot lock before reacquiring tail locks - // to avoid a deadlock. - drop(slot); - - // If waiter is already queued, then a write lock on tail is required - // since other threads may try to mutate the waiter concurrently. - // If the waiter is not queued, we are the only owner now and - // read lock suffices. - let tail_ref: &Tail = if queued { - // TODO: this is sketchy, need do make sure that - // it is safe to drop all the locks here... - tail_read = None; - tail_write = Some(self.shared.tail.write().unwrap()); - tail_write.as_deref().unwrap() - } else { - tail_read.as_deref().unwrap() - }; - // Safety: called while holding a lock on tail. // If waiter is not queued, then we hold a read lock // on tail and can safely mutate `waiter` since we @@ -1156,25 +1150,24 @@ impl Receiver { } } - // If the waiter is already queued, don't do anything. - // If not, enqueue it. - // Relaxed memory order suffices because, if `waiter` - // is shared, then we hold a write lock on tail. + // If the waiter is not already queued, enqueue it. + // Relaxed memory order suffices because, if `queued` + // if `false`, then we are the sole owner of the waiter, + // and all future accesses will happen with tail lock held. if !(*ptr).queued.swap(true, Relaxed) { // Safety: // - `waiter` is not already queued, // - calling `recv_ref` with a waiter implies ownership // of it's `Recv`. As such, this waiter cannot be pushed // concurrently by some other thread. - tail_ref - .waiters - .push_front(NonNull::new_unchecked(&mut *ptr)); + tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr)); } }); } } // Drop the old waker after releasing the locks. + drop(slot); drop(tail_read); drop(tail_write); drop(old_waker); @@ -1191,7 +1184,8 @@ impl Receiver { let missed = next.wrapping_sub(self.next); - drop(tail); + drop(tail_read); + drop(tail_write); // The receiver is slow but no values have been missed if missed == 0 { diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index a742279339c..400c42573ca 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -7,9 +7,9 @@ //! specified node is actually contained by the list. #[cfg(feature = "sync")] -mod atomic; +mod concurrent_push; #[cfg(feature = "sync")] -pub(crate) use self::atomic::AtomicLinkedList; +pub(crate) use self::concurrent_push::ConcurrentPushLinkedList; use core::cell::UnsafeCell; use core::fmt; @@ -115,7 +115,7 @@ unsafe impl Sync for Pointers {} // ===== LinkedListBase ===== -// Common methods between LinkedList and AtomicLinkedList. +// Common methods between LinkedList and ConcurrentPushLinkedList. trait LinkedListBase { // NB: exclusive reference is important for AtomicLinkedList safety guarantees. fn head(&mut self) -> Option>; diff --git a/tokio/src/util/linked_list/atomic.rs b/tokio/src/util/linked_list/concurrent_push.rs similarity index 77% rename from tokio/src/util/linked_list/atomic.rs rename to tokio/src/util/linked_list/concurrent_push.rs index 3636ee7e104..74fe56bae5c 100644 --- a/tokio/src/util/linked_list/atomic.rs +++ b/tokio/src/util/linked_list/concurrent_push.rs @@ -9,9 +9,13 @@ use core::sync::atomic::{ Ordering::{AcqRel, Relaxed}, }; -/// An atomic intrusive linked list. It allows pushing new nodes concurrently. -/// Removing nodes still requires an exclusive reference. -pub(crate) struct AtomicLinkedList { +/// A linked list that supports adding new nodes concurrently. +/// Note that all other operations, e.g. node removals, +/// require external synchronization. +/// The simplest way to achieve it is to use RwLock: +/// pushing nodes only requires a read lock, +/// while removing nodes requires a write lock. +pub(crate) struct ConcurrentPushLinkedList { /// Linked list head. head: AtomicPtr, @@ -22,26 +26,26 @@ pub(crate) struct AtomicLinkedList { _marker: PhantomData<*const L>, } -unsafe impl Send for AtomicLinkedList where L::Target: Send {} -unsafe impl Sync for AtomicLinkedList where L::Target: Sync {} +unsafe impl Send for ConcurrentPushLinkedList where L::Target: Send {} +unsafe impl Sync for ConcurrentPushLinkedList where L::Target: Sync {} -impl Default for AtomicLinkedList { +impl Default for ConcurrentPushLinkedList { fn default() -> Self { Self::new() } } -impl AtomicLinkedList { - /// Creates an empty atomic linked list. - pub(crate) const fn new() -> AtomicLinkedList { - AtomicLinkedList { +impl ConcurrentPushLinkedList { + /// Creates an empty concurrent push linked list. + pub(crate) const fn new() -> ConcurrentPushLinkedList { + ConcurrentPushLinkedList { head: AtomicPtr::new(core::ptr::null_mut()), tail: UnsafeCell::new(None), _marker: PhantomData, } } - /// Convert an atomic linked list into a non-atomic version. + /// Convert a concurrent push LL into a regular LL. pub(crate) fn into_list(mut self) -> LinkedList { LinkedList { head: NonNull::new(*self.head.get_mut()), @@ -51,7 +55,7 @@ impl AtomicLinkedList { } } -impl LinkedListBase for AtomicLinkedList { +impl LinkedListBase for ConcurrentPushLinkedList { fn head(&mut self) -> Option> { NonNull::new(*self.head.get_mut()) } @@ -72,9 +76,9 @@ impl LinkedListBase for AtomicLinkedList { } } -impl AtomicLinkedList { +impl ConcurrentPushLinkedList { /// Atomically adds an element first in the list. - /// This method can be called concurrently from multiple threads. + /// This method can be called concurrently by multiple threads. /// /// # Safety /// @@ -135,6 +139,12 @@ impl AtomicLinkedList { } /// See [LinkedList::remove]. + /// + /// Note that `&mut self` implies that this call is somehow + /// synchronized with `push_front` (e.g. with RwLock). + /// In terms of memory model, there has to be an established + /// happens-before relationship between any given `push_front` + /// and any given `remove`. The relation can go either way. pub(crate) unsafe fn remove(&mut self, node: NonNull) -> Option { LinkedListBase::remove(self, node) } @@ -150,8 +160,9 @@ pub(crate) mod tests { use std::sync::Arc; #[test] - fn atomic_push_front() { - let atomic_list = Arc::new(AtomicLinkedList::<&Entry, <&Entry as Link>::Target>::new()); + fn concurrent_push_front() { + let atomic_list = + Arc::new(ConcurrentPushLinkedList::<&Entry, <&Entry as Link>::Target>::new()); let _entries = [5, 7] .into_iter()