Skip to content

Commit

Permalink
improve performance of lock_non_send (openzfs#125)
Browse files Browse the repository at this point in the history
Non-send locks are implemented with an async_trait, which is ergnomic to
use, but unfortunately requires an additional memory allocation for
every call to the async trait.  This was observed to have a performance
impact (from inspecting CPU flame graphs under a moderate workload).

This change removes the use of async_trait, instead using a plain
function to do the non-send lock.
  • Loading branch information
ahrens authored Jan 25, 2022
1 parent 2b83600 commit 996b8e4
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 29 deletions.
2 changes: 1 addition & 1 deletion cmd/zfs_object_agent/util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub use lock_set::LockSet;
pub use lock_set::LockedItem;
pub use logging::setup_logging;
pub use logging::SUPER_EXPENSIVE_TRACE;
pub use mutex_ext::MutexExt;
pub use mutex_ext::lock_non_send;
pub use nicenum::{nice_number_count, nice_number_time, nice_p2size};
pub use range_tree::RangeTree;
pub use tunable::get_tunable;
Expand Down
31 changes: 11 additions & 20 deletions cmd/zfs_object_agent/util/src/mutex_ext.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
//! This module contains a trait which extends tokio::sync::Mutex. It adds a
//! new method, .lock_non_send(), which locks the mutex like .lock(), but
//! returns a new kind of guard which can not be sent between threads. This is
//! useful if you want to ensure that .await is not used while the mutex is
//! locked by some callers, but .await can be used from other callers (that use
//! tokio::sync::Mutex::lock() directly).
use async_trait::async_trait;
use std::marker::PhantomData;
use std::ops::Deref;
use std::ops::DerefMut;
Expand All @@ -29,17 +21,16 @@ impl<'a, T> DerefMut for NonSendMutexGuard<'a, T> {
}
}

#[async_trait]
pub trait MutexExt<'a, T> {
async fn lock_non_send(&'a self) -> NonSendMutexGuard<'a, T>;
}

#[async_trait]
impl<'a, T: Send> MutexExt<'a, T> for tokio::sync::Mutex<T> {
async fn lock_non_send(&'a self) -> NonSendMutexGuard<'a, T> {
NonSendMutexGuard {
inner: self.lock().await,
_marker: PhantomData,
}
// This locks the mutex like Mutex::lock(), but returns a new kind of guard
// which can not be sent between threads. This is useful if you want to ensure
// that .await is not used while the mutex is locked by some callers, but .await
// can be used from other callers (that use tokio::sync::Mutex::lock()
// directly).
pub async fn lock_non_send<T>(mutex: &tokio::sync::Mutex<T>) -> NonSendMutexGuard<'_, T> {
// It would be nice to do this via an async_trait, but that requires a memory
// allocation each time it's called, which can impact performance.
NonSendMutexGuard {
inner: mutex.lock().await,
_marker: PhantomData,
}
}
14 changes: 6 additions & 8 deletions cmd/zfs_object_agent/zettacache/src/zettacache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use tokio::sync::OwnedSemaphorePermit;
use tokio::sync::Semaphore;
use tokio::time::{sleep_until, timeout_at};
use util::get_tunable;
use util::lock_non_send;
use util::maybe_die_with;
use util::nice_p2size;
use util::super_trace;
Expand All @@ -57,7 +58,6 @@ use util::AlignedBytes;
use util::From64;
use util::LockSet;
use util::LockedItem;
use util::MutexExt;
use uuid::Uuid;

lazy_static! {
Expand Down Expand Up @@ -1283,7 +1283,7 @@ impl ZettaCache {
let fut_or_f = {
// We don't want to hold the state lock while reading from disk so we
// use lock_non_send() to ensure that we can't hold it across .await.
let mut state = self.state.lock_non_send().await;
let mut state = lock_non_send(&self.state).await;
match state.pending_changes.get(&key).copied() {
Some(pc) => {
match pc {
Expand Down Expand Up @@ -1369,13 +1369,13 @@ impl ZettaCache {
// might have hit in the index chunk cache. Same below.
stat_counter = CacheMissAfterIndexRead;
super_trace!("cache miss after reading index for {:?}", key);
let mut state = self.state.lock_non_send().await;
let mut state = lock_non_send(&self.state).await;
f(&mut state, None)
}
Some(entry) => {
// Again, we don't want to hold the state lock while reading from disk so
// we use lock_non_send() to ensure that we can't hold it across .await.
let mut state = self.state.lock_non_send().await;
let mut state = lock_non_send(&self.state).await;
let value = match &state.merge {
Some(ms) if entry.value.atime < ms.eviction_cutoff => {
// Block is being evicted, abort the read attempt
Expand Down Expand Up @@ -1483,7 +1483,7 @@ impl ZettaCache {
// Now that we are ready to issue the write to disk, insert to the
// cache in the current checkpoint (allocate a block, add to
// pending_changes and outstanding_writes).
let fut = state.lock_non_send().await.insert(locked_key, bytes);
let fut = lock_non_send(&state).await.insert(locked_key, bytes);
fut.await;
// We want to hold onto the insert_permit until the write completes
// because it represents the memory that's required to buffer this
Expand Down Expand Up @@ -1553,9 +1553,7 @@ impl ZettaCache {
// Now that we are ready to issue the write to disk, insert to the
// cache in the current checkpoint (allocate a block, add to
// pending_changes and outstanding_writes).
let fut = cache
.state
.lock_non_send()
let fut = lock_non_send(&cache.state)
.await
.insert(locked_key, aligned_bytes);
fut.await;
Expand Down

0 comments on commit 996b8e4

Please sign in to comment.