Skip to content

Commit

Permalink
use watch::Receiver::same_channel() instead of Arc (openzfs#399)
Browse files Browse the repository at this point in the history
Use the new `watch::Receiver::same_channel()` method so that we don't
need an Arc for every in-progress async_cache load (i.e. GET from object
store).
  • Loading branch information
ahrens authored May 3, 2022
1 parent e5661e0 commit bf4b0fe
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 22 deletions.
31 changes: 9 additions & 22 deletions cmd/zfs_object_agent/util/src/async_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::collections::HashMap;
use std::fmt::Debug;
use std::future::Future;
use std::hash::Hash;
use std::sync::Arc;

use anyhow::Result;
use log::*;
Expand All @@ -19,7 +18,7 @@ pub struct AsyncCache<K: Hash + Eq + Copy + Debug, V: Clone> {

struct Inner<K: Hash + Eq + Copy + Debug, V: Clone> {
cache: LruCache<K, V>,
loading: HashMap<K, Arc<watch_once::Sender<V>>>,
loading: HashMap<K, watch_once::Receiver<V>>,
}

pub enum GetMethod {
Expand Down Expand Up @@ -69,19 +68,12 @@ impl<K: Hash + Eq + Copy + Debug, V: Clone> AsyncCache<K, V> {
match inner.loading.get(&key) {
None => {
// No cached value, loading not in progress: drop lock and initiate load

// The Arc is used only so that we can later check if an entry in
// `loading` is *our* entry, and this is the only clone that's ever
// created. Ideally there would be a Receiver::same_channel() method
// which checked the Receiver's internal Arc, in which case we could have
// `loading: HashMap<K, Receiver<V>>`.
// See https://github.com/tokio-rs/tokio/issues/4579
let tx = Arc::new(watch_once::channel().0);
inner.loading.insert(key, tx.clone());
let (tx, rx) = watch_once::channel();
inner.loading.insert(key, rx);
break tx;
}
// loading in progress: drop lock and wait for load
Some(tx) => tx.subscribe(),
Some(rx) => rx.clone(),
}
};

Expand Down Expand Up @@ -111,12 +103,11 @@ impl<K: Hash + Eq + Copy + Debug, V: Clone> AsyncCache<K, V> {

let cacheable = match inner.loading.entry(key) {
Entry::Occupied(oe) => {
if Arc::ptr_eq(oe.get(), &tx) {
let cacheable = oe.get().same_channel(&tx.subscribe());
if cacheable {
oe.remove();
true
} else {
false
}
cacheable
}
Entry::Vacant(_) => false,
};
Expand All @@ -139,11 +130,7 @@ impl<K: Hash + Eq + Copy + Debug, V: Clone> AsyncCache<K, V> {
} else {
measure!("AsyncCache load invalidated").hit();
}
// The only clone of tx is the one in `loading`, which we removed and dropped above.
Arc::try_unwrap(tx)
.unwrap_or_else(|tx| panic!("{} references for {:?}", Arc::strong_count(&tx), key))
.send(value.clone())
.ok();
tx.send(value.clone()).ok();
(value, GetMethod::Loaded)
})
}
Expand All @@ -157,7 +144,7 @@ impl<K: Hash + Eq + Copy + Debug, V: Clone> AsyncCache<K, V> {
let mut inner = self.inner.lock().unwrap();
match inner.cache.get(&key) {
Some(value) => return Some(value.clone()),
None => inner.loading.get(&key).map(|tx| tx.subscribe()),
None => inner.loading.get(&key).cloned(),
}
};
match rx {
Expand Down
4 changes: 4 additions & 0 deletions cmd/zfs_object_agent/util/src/watch_once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ impl<T: Clone> Receiver<T> {
Err(e) => Err(e),
}
}

pub fn same_channel(&self, other: &Receiver<T>) -> bool {
self.0.same_channel(&other.0)
}
}

0 comments on commit bf4b0fe

Please sign in to comment.