diff --git a/cmd/zfs_object_agent/util/src/lib.rs b/cmd/zfs_object_agent/util/src/lib.rs index 8fce5bf9120e..23e9842168a1 100644 --- a/cmd/zfs_object_agent/util/src/lib.rs +++ b/cmd/zfs_object_agent/util/src/lib.rs @@ -28,7 +28,7 @@ pub use lock_set::LockSet; pub use lock_set::LockedItem; pub use logging::setup_logging; pub use logging::SUPER_EXPENSIVE_TRACE; -pub use mutex_ext::MutexExt; +pub use mutex_ext::lock_non_send; pub use nicenum::{nice_number_count, nice_number_time, nice_p2size}; pub use range_tree::RangeTree; pub use tunable::get_tunable; diff --git a/cmd/zfs_object_agent/util/src/mutex_ext.rs b/cmd/zfs_object_agent/util/src/mutex_ext.rs index c025af420976..78b4b610eb77 100644 --- a/cmd/zfs_object_agent/util/src/mutex_ext.rs +++ b/cmd/zfs_object_agent/util/src/mutex_ext.rs @@ -1,11 +1,3 @@ -//! This module contains a trait which extends tokio::sync::Mutex. It adds a -//! new method, .lock_non_send(), which locks the mutex like .lock(), but -//! returns a new kind of guard which can not be sent between threads. This is -//! useful if you want to ensure that .await is not used while the mutex is -//! locked by some callers, but .await can be used from other callers (that use -//! tokio::sync::Mutex::lock() directly). - -use async_trait::async_trait; use std::marker::PhantomData; use std::ops::Deref; use std::ops::DerefMut; @@ -29,17 +21,16 @@ impl<'a, T> DerefMut for NonSendMutexGuard<'a, T> { } } -#[async_trait] -pub trait MutexExt<'a, T> { - async fn lock_non_send(&'a self) -> NonSendMutexGuard<'a, T>; -} - -#[async_trait] -impl<'a, T: Send> MutexExt<'a, T> for tokio::sync::Mutex { - async fn lock_non_send(&'a self) -> NonSendMutexGuard<'a, T> { - NonSendMutexGuard { - inner: self.lock().await, - _marker: PhantomData, - } +// This locks the mutex like Mutex::lock(), but returns a new kind of guard +// which can not be sent between threads. This is useful if you want to ensure +// that .await is not used while the mutex is locked by some callers, but .await +// can be used from other callers (that use tokio::sync::Mutex::lock() +// directly). +pub async fn lock_non_send(mutex: &tokio::sync::Mutex) -> NonSendMutexGuard<'_, T> { + // It would be nice to do this via an async_trait, but that requires a memory + // allocation each time it's called, which can impact performance. + NonSendMutexGuard { + inner: mutex.lock().await, + _marker: PhantomData, } } diff --git a/cmd/zfs_object_agent/zettacache/src/zettacache.rs b/cmd/zfs_object_agent/zettacache/src/zettacache.rs index a9bd8956fe0b..2bab9d6296d6 100644 --- a/cmd/zfs_object_agent/zettacache/src/zettacache.rs +++ b/cmd/zfs_object_agent/zettacache/src/zettacache.rs @@ -47,6 +47,7 @@ use tokio::sync::OwnedSemaphorePermit; use tokio::sync::Semaphore; use tokio::time::{sleep_until, timeout_at}; use util::get_tunable; +use util::lock_non_send; use util::maybe_die_with; use util::nice_p2size; use util::super_trace; @@ -57,7 +58,6 @@ use util::AlignedBytes; use util::From64; use util::LockSet; use util::LockedItem; -use util::MutexExt; use uuid::Uuid; lazy_static! { @@ -1283,7 +1283,7 @@ impl ZettaCache { let fut_or_f = { // We don't want to hold the state lock while reading from disk so we // use lock_non_send() to ensure that we can't hold it across .await. - let mut state = self.state.lock_non_send().await; + let mut state = lock_non_send(&self.state).await; match state.pending_changes.get(&key).copied() { Some(pc) => { match pc { @@ -1369,13 +1369,13 @@ impl ZettaCache { // might have hit in the index chunk cache. Same below. stat_counter = CacheMissAfterIndexRead; super_trace!("cache miss after reading index for {:?}", key); - let mut state = self.state.lock_non_send().await; + let mut state = lock_non_send(&self.state).await; f(&mut state, None) } Some(entry) => { // Again, we don't want to hold the state lock while reading from disk so // we use lock_non_send() to ensure that we can't hold it across .await. - let mut state = self.state.lock_non_send().await; + let mut state = lock_non_send(&self.state).await; let value = match &state.merge { Some(ms) if entry.value.atime < ms.eviction_cutoff => { // Block is being evicted, abort the read attempt @@ -1483,7 +1483,7 @@ impl ZettaCache { // Now that we are ready to issue the write to disk, insert to the // cache in the current checkpoint (allocate a block, add to // pending_changes and outstanding_writes). - let fut = state.lock_non_send().await.insert(locked_key, bytes); + let fut = lock_non_send(&state).await.insert(locked_key, bytes); fut.await; // We want to hold onto the insert_permit until the write completes // because it represents the memory that's required to buffer this @@ -1553,9 +1553,7 @@ impl ZettaCache { // Now that we are ready to issue the write to disk, insert to the // cache in the current checkpoint (allocate a block, add to // pending_changes and outstanding_writes). - let fut = cache - .state - .lock_non_send() + let fut = lock_non_send(&cache.state) .await .insert(locked_key, aligned_bytes); fut.await;