diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index 902b475..e7540f5 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -48,25 +48,17 @@ impl Mutex { drop(state); if !self.semaphore.is_closed() { - // This is a check for permits without any causal dependency or an - // immediate yield point! We need to do this in order to be able to - // detect a deadlock due to re-entrancy without unnecessary yield - // points. The yield(s) and causal dependency updates happen - // immediately after, in the `acquire_blocking` call. - if self.semaphore.available_permits() < 1 { - // If the lock is already held, then we are blocked. This can - // happen either because the lock is held by another thread, - // or because the lock is held by the current thread. For the - // latter case, we check the state to report a more precise - // error message. - state = self.state.borrow_mut(); - if let Some(holder) = state.holder { - if holder == me { - panic!("deadlock! task {:?} tried to acquire a Mutex it already holds", me); - } - } - drop(state); - } + // Detect deadlock due to re-entrancy. + state = self.state.borrow_mut(); + assert!( + match &state.holder { + Some(holder) => *holder != me, + None => true, + }, + "deadlock! task {me:?} tried to acquire a Mutex it already holds" + ); + drop(state); + self.semaphore.acquire_blocking(1).unwrap(); } diff --git a/src/sync/rwlock.rs b/src/sync/rwlock.rs index 2e5c8ba..2315d9f 100644 --- a/src/sync/rwlock.rs +++ b/src/sync/rwlock.rs @@ -1,7 +1,6 @@ +use crate::future::batch_semaphore::{BatchSemaphore, Fairness}; use crate::runtime::execution::ExecutionState; -use crate::runtime::task::clock::VectorClock; use crate::runtime::task::{TaskId, TaskSet}; -use crate::runtime::thread; use std::cell::RefCell; use std::fmt::{Debug, Display}; use std::ops::{Deref, DerefMut}; @@ -9,6 +8,10 @@ use std::panic::{RefUnwindSafe, UnwindSafe}; use std::sync::{LockResult, PoisonError, TryLockError, TryLockResult}; use tracing::trace; +/// (Theoretical) max number of readers holding the same `RwLock`. Based on +/// the `tokio` implementation. +const MAX_READS: usize = (u32::MAX >> 3) as usize; + /// A reader-writer lock, the same as [`std::sync::RwLock`]. /// /// Unlike [`std::sync::RwLock`], the same thread is never allowed to acquire the read side of a @@ -16,15 +19,13 @@ use tracing::trace; /// we choose the most conservative one. pub struct RwLock { state: RefCell, + semaphore: BatchSemaphore, inner: std::sync::RwLock, } #[derive(Debug)] struct RwLockState { holder: RwLockHolder, - waiting_readers: TaskSet, - waiting_writers: TaskSet, - clock: VectorClock, } #[derive(PartialEq, Eq, Debug)] @@ -40,18 +41,26 @@ enum RwLockType { Write, } +impl RwLockType { + /// Number of semaphore permits corresponding to the given lock type. + fn num_permits(&self) -> usize { + match self { + Self::Read => 1, + Self::Write => MAX_READS, + } + } +} + impl RwLock { /// Create a new instance of an `RwLock` which is unlocked. pub const fn new(value: T) -> Self { let state = RwLockState { holder: RwLockHolder::None, - waiting_readers: TaskSet::new(), - waiting_writers: TaskSet::new(), - clock: VectorClock::new(), }; Self { inner: std::sync::RwLock::new(value), + semaphore: BatchSemaphore::const_new(MAX_READS, Fairness::Unfair), state: RefCell::new(state), } } @@ -165,10 +174,10 @@ impl RwLock { { let state = self.state.borrow(); assert_eq!(state.holder, RwLockHolder::None); + // Update the receiver's clock with the RwLock clock - ExecutionState::with(|s| { - s.update_clock(&state.clock); - }); + self.semaphore.try_acquire(MAX_READS).unwrap(); + self.inner.into_inner() } @@ -179,55 +188,30 @@ impl RwLock { let mut state = self.state.borrow_mut(); trace!( holder = ?state.holder, - waiting_readers = ?state.waiting_readers, - waiting_writers = ?state.waiting_writers, + semaphore = ?self.semaphore, "acquiring {:?} lock on rwlock {:p}", typ, self, ); - - // We are waiting for the lock - if typ == RwLockType::Write { - state.waiting_writers.insert(me); - } else { - state.waiting_readers.insert(me); - } - // Block if the lock is in a state where we can't acquire it immediately. Note that we only - // need to context switch here if we can't acquire the lock. If it's available for us to - // acquire, but there is also another thread `t` that wants to acquire it, then `t` must - // have been runnable when this thread was chosen to execute and could have been chosen - // instead. - let should_switch = match &state.holder { - RwLockHolder::Write(writer) => { - if *writer == me { - panic!("deadlock! task {:?} tried to acquire a RwLock it already holds", me); - } - ExecutionState::with(|s| s.current_mut().block(false)); - true - } - RwLockHolder::Read(readers) => { - if readers.contains(me) { - panic!("deadlock! task {:?} tried to acquire a RwLock it already holds", me); - } - if typ == RwLockType::Write { - ExecutionState::with(|s| s.current_mut().block(false)); - true - } else { - false - } - } - RwLockHolder::None => false, - }; drop(state); - if should_switch { - thread::switch(); + if !self.semaphore.is_closed() { + // Detect deadlock due to re-entrancy. + state = self.state.borrow_mut(); + assert!( + match &state.holder { + RwLockHolder::Write(writer) => *writer != me, + RwLockHolder::Read(readers) => !readers.contains(me), + RwLockHolder::None => true, + }, + "deadlock! task {me:?} tried to acquire a RwLock it already holds" + ); + drop(state); + + self.semaphore.acquire_blocking(typ.num_permits()).unwrap(); } - let mut state = self.state.borrow_mut(); - // Once the scheduler has resumed this thread, we are clear to take the lock. We might - // not actually be in the waiters, though (if the lock was uncontended). - // TODO should always be in the waiters? + state = self.state.borrow_mut(); match (typ, &mut state.holder) { (RwLockType::Write, RwLockHolder::None) => { state.holder = RwLockHolder::Write(me); @@ -238,7 +222,7 @@ impl RwLock { state.holder = RwLockHolder::Read(readers); } (RwLockType::Read, RwLockHolder::Read(readers)) => { - readers.insert(me); + debug_assert!(readers.insert(me)); } _ => { panic!( @@ -247,36 +231,14 @@ impl RwLock { ); } } - if typ == RwLockType::Write { - state.waiting_writers.remove(me); - } else { - state.waiting_readers.remove(me); - } trace!( holder = ?state.holder, - waiting_readers = ?state.waiting_readers, - waiting_writers = ?state.waiting_writers, + semaphore = ?self.semaphore, "acquired {:?} lock on rwlock {:p}", typ, self ); - - // Increment the current thread's clock and update this RwLock's clock to match. - // TODO we can likely do better here: there is no causality between multiple readers holding - // the lock at the same time. - ExecutionState::with(|s| { - s.update_clock(&state.clock); - state.clock.update(s.get_clock(me)); - }); - - // Block all other waiters, since we won the race to take this lock - Self::block_waiters(&state, me, typ); drop(state); - - // We need to let other threads in here so they may fail a `try_read` or `try_write`. This - // is the case because the current thread holding the lock might not have any further - // context switches until after releasing the lock. - thread::switch(); } /// Attempt to acquire this lock in the provided mode, but without blocking. Returns `true` if @@ -287,31 +249,35 @@ impl RwLock { let mut state = self.state.borrow_mut(); trace!( holder = ?state.holder, - waiting_readers = ?state.waiting_readers, - waiting_writers = ?state.waiting_writers, + semaphore = ?self.semaphore, "trying to acquire {:?} lock on rwlock {:p}", typ, self, ); + drop(state); - let acquired = match (typ, &mut state.holder) { - (RwLockType::Write, RwLockHolder::None) => { - state.holder = RwLockHolder::Write(me); - true - } - (RwLockType::Read, RwLockHolder::None) => { - let mut readers = TaskSet::new(); - readers.insert(me); - state.holder = RwLockHolder::Read(readers); - true - } - (RwLockType::Read, RwLockHolder::Read(readers)) => { - // If we already hold the read lock, `insert` returns false, which will cause this - // acquisition to fail with `WouldBlock` so we can diagnose potential deadlocks. - readers.insert(me) - } - _ => false, - }; + // Semaphore is never closed, so an error here is always `NoPermits`. + let mut acquired = self.semaphore.try_acquire(typ.num_permits()).is_ok(); + if acquired { + state = self.state.borrow_mut(); + match (typ, &mut state.holder) { + (RwLockType::Write, RwLockHolder::None) => { + state.holder = RwLockHolder::Write(me); + } + (RwLockType::Read, RwLockHolder::None) => { + let mut readers = TaskSet::new(); + readers.insert(me); + state.holder = RwLockHolder::Read(readers); + } + (RwLockType::Read, RwLockHolder::Read(readers)) => { + // If we already hold the read lock, `insert` returns false, which will cause this + // acquisition to fail with `WouldBlock` so we can diagnose potential deadlocks. + acquired = readers.insert(me); + } + _ => (), + }; + drop(state); + } trace!( "{} {:?} lock on rwlock {:p}", @@ -320,70 +286,8 @@ impl RwLock { self, ); - // Update this thread's clock with the clock stored in the RwLock. - // We need to do the vector clock update even in the failing case, because there's a causal - // dependency: if the `try_lock` fails, the current thread `t1` knows that the thread `t2` - // that owns the lock is not in the right state to be read/written, and therefore `t1` has a - // causal dependency on everything that happened before in `t2` (which is recorded in the - // RwLock's clock). - // TODO we can likely do better here: there is no causality between successful `try_read`s - // and other concurrent readers, and there's no need to update the clock on failed - // `try_read`s. - ExecutionState::with(|s| { - s.update_clock(&state.clock); - state.clock.update(s.get_clock(me)); - }); - - // Block all other waiters, since we won the race to take this lock - Self::block_waiters(&state, me, typ); - drop(state); - - // We need to let other threads in here so they - // (a) may fail a `try_lock` (in case we acquired), or - // (b) may release the lock (in case we failed to acquire) so we can succeed in a subsequent - // `try_lock`. - thread::switch(); - acquired } - - fn block_waiters(state: &RwLockState, me: TaskId, typ: RwLockType) { - // Only block waiting readers if the lock is being acquired by a writer - if typ == RwLockType::Write { - for tid in state.waiting_readers.iter() { - assert_ne!(tid, me); - ExecutionState::with(|s| s.get_mut(tid).block(false)); - } - } - // Always block any waiting writers - for tid in state.waiting_writers.iter() { - assert_ne!(tid, me); - ExecutionState::with(|s| s.get_mut(tid).block(false)); - } - } - - fn unblock_waiters(state: &RwLockState, me: TaskId, drop_type: RwLockType) { - for tid in state.waiting_readers.iter() { - debug_assert_ne!(tid, me); - ExecutionState::with(|s| { - let t = s.get_mut(tid); - debug_assert!(drop_type == RwLockType::Read || t.blocked()); - t.unblock(); - }); - } - - // Only unblock waiting writers if there are no exiting readers holding the lock - if state.holder == RwLockHolder::None { - for tid in state.waiting_writers.iter() { - debug_assert_ne!(tid, me); - ExecutionState::with(|s| { - let t = s.get_mut(tid); - debug_assert!(t.blocked()); - t.unblock(); - }); - } - } - } } // Safety: RwLock is never actually passed across true threads, only across continuations. The @@ -441,38 +345,23 @@ impl Drop for RwLockReadGuard<'_, T> { self.inner = None; let mut state = self.rwlock.state.borrow_mut(); - trace!( holder = ?state.holder, - waiting_readers = ?state.waiting_readers, - waiting_writers = ?state.waiting_writers, + semaphore = ?self.rwlock.semaphore, "releasing Read lock on rwlock {:p}", self.rwlock ); - - match &mut state.holder { - RwLockHolder::Read(readers) => { - let was_reader = readers.remove(self.me); - assert!(was_reader); - if readers.is_empty() { - state.holder = RwLockHolder::None; - } - } - _ => panic!("exiting a reader but rwlock is in the wrong state {:?}", state.holder), - } - - if ExecutionState::should_stop() { - return; + let RwLockHolder::Read(readers) = &mut state.holder else { + panic!("exiting a reader but rwlock is in the wrong state {:?}", state.holder); + }; + let was_reader = readers.remove(self.me); + assert!(was_reader); + if readers.is_empty() { + state.holder = RwLockHolder::None; } - - // Unblock every thread waiting on this lock. The scheduler will choose one of them to win - // the race to this lock, and that thread will re-block all the losers. - RwLock::::unblock_waiters(&state, self.me, RwLockType::Read); - drop(state); - // Releasing a lock is a yield point - thread::switch(); + self.rwlock.semaphore.release(RwLockType::Read.num_permits()); } } @@ -516,31 +405,14 @@ impl Drop for RwLockWriteGuard<'_, T> { let mut state = self.rwlock.state.borrow_mut(); trace!( holder = ?state.holder, - waiting_readers = ?state.waiting_readers, - waiting_writers = ?state.waiting_writers, + semaphore = ?self.rwlock.semaphore, "releasing Write lock on rwlock {:p}", self.rwlock ); - assert_eq!(state.holder, RwLockHolder::Write(self.me)); state.holder = RwLockHolder::None; - - // Update the RwLock clock with the owning thread's clock - ExecutionState::with(|s| { - let clock = s.increment_clock(); - state.clock.update(clock); - }); - - if ExecutionState::should_stop() { - return; - } - - // Unblock every thread waiting on this lock. The scheduler will choose one of them to win - // the race to this lock, and that thread will re-block all the losers. - RwLock::::unblock_waiters(&state, self.me, RwLockType::Write); drop(state); - // Releasing a lock is a yield point - thread::switch(); + self.rwlock.semaphore.release(RwLockType::Write.num_permits()); } }