diff --git a/examples/async_example.rs b/examples/async_example.rs index 63213fe..7667032 100644 --- a/examples/async_example.rs +++ b/examples/async_example.rs @@ -16,7 +16,7 @@ async fn main() { // when we get the value, we will get a ValueRef, which contains a RwLockReadGuard // so when we finish use this value, we must release the ValueRef - let v = c.get(&"a").unwrap(); + let v = c.get(&"a").await.unwrap(); assert_eq!(v.value(), &"a"); // release the value v.release(); // or drop(v) @@ -25,17 +25,17 @@ async fn main() { { // when we get the value, we will get a ValueRef, which contains a RwLockWriteGuard // so when we finish use this value, we must release the ValueRefMut - let mut v = c.get_mut(&"a").unwrap(); + let mut v = c.get_mut(&"a").await.unwrap(); v.write("aa"); assert_eq!(v.value(), &"aa"); // release the value } // if you just want to do one operation - let v = c.get_mut(&"a").unwrap(); + let v = c.get_mut(&"a").await.unwrap(); v.write_once("aaa"); - let v = c.get(&"a").unwrap(); + let v = c.get(&"a").await.unwrap(); println!("{}", v); assert_eq!(v.value(), &"aaa"); v.release(); @@ -45,5 +45,5 @@ async fn main() { // wait all the operations are finished c.wait().await.unwrap(); - assert!(c.get(&"a").is_none()); + assert!(c.get(&"a").await.is_none()); } diff --git a/src/cache.rs b/src/cache.rs index b951349..dfd6d0d 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -58,6 +58,19 @@ macro_rules! impl_builder { } } + /// Set the insert buffer items for the Cache. + /// + /// `buffer_items` determines the size of Get buffers. + /// + /// Unless you have a rare use case, using `64` as the BufferItems value + /// results in good performance. + #[inline] + pub fn set_buffer_items(self, sz: usize) -> Self { + Self { + inner: self.inner.set_buffer_items(sz), + } + } + /// Set whether record the metrics or not. /// /// Metrics is true when you want real-time logging of a variety of stats. @@ -213,6 +226,8 @@ macro_rules! impl_cache { let (index, conflict) = self.key_to_hash.build_key(key); + self.get_buf.push(index); + match self.store.get(&index, conflict) { None => { self.metrics.add(MetricType::Miss, index, 1); @@ -237,6 +252,9 @@ macro_rules! impl_cache { } let (index, conflict) = self.key_to_hash.build_key(key); + + self.get_buf.push(index); + match self.store.get_mut(&index, conflict) { None => { self.metrics.add(MetricType::Miss, index, 1); @@ -357,6 +375,7 @@ macro_rules! impl_cache { Self { store: self.store.clone(), policy: self.policy.clone(), + get_buf: self.get_buf.clone(), insert_buf_tx: self.insert_buf_tx.clone(), stop_tx: self.stop_tx.clone(), clear_tx: self.clear_tx.clone(), @@ -494,6 +513,200 @@ macro_rules! impl_cache_processor { }; } +#[cfg(feature = "async")] +macro_rules! impl_async_cache { + ($cache: ident, $builder: ident, $item: ident) => { + use crate::store::UpdateResult; + use crate::{ValueRef, ValueRefMut}; + + impl $cache + where + K: Hash + Eq, + V: Send + Sync + 'static, + KH: KeyBuilder, + C: Coster, + U: UpdateValidator, + CB: CacheCallback, + S: BuildHasher + Clone + 'static + Send, + { + /// `get` returns a `Option>` (if any) representing whether the + /// value was found or not. + pub async fn get(&self, key: &Q) -> Option> + where + K: core::borrow::Borrow, + Q: core::hash::Hash + Eq + ?Sized, + { + if self.is_closed.load(Ordering::SeqCst) { + return None; + } + + let (index, conflict) = self.key_to_hash.build_key(key); + + self.get_buf.push(index).await; + + match self.store.get(&index, conflict) { + None => { + self.metrics.add(MetricType::Miss, index, 1); + None + } + Some(v) => { + self.metrics.add(MetricType::Hit, index, 1); + Some(v) + } + } + } + + /// `get_mut` returns a `Option>` (if any) representing whether the + /// value was found or not. + pub async fn get_mut(&self, key: &Q) -> Option> + where + K: core::borrow::Borrow, + Q: core::hash::Hash + Eq + ?Sized, + { + if self.is_closed.load(Ordering::SeqCst) { + return None; + } + + let (index, conflict) = self.key_to_hash.build_key(key); + + self.get_buf.push(index).await; + + match self.store.get_mut(&index, conflict) { + None => { + self.metrics.add(MetricType::Miss, index, 1); + None + } + Some(v) => { + self.metrics.add(MetricType::Hit, index, 1); + Some(v) + } + } + } + + /// Returns the TTL for the specified key if the + /// item was found and is not expired. + pub fn get_ttl(&self, key: &Q) -> Option + where + K: core::borrow::Borrow, + Q: core::hash::Hash + Eq + ?Sized, + { + let (index, conflict) = self.key_to_hash.build_key(key); + self.store + .get(&index, conflict) + .and_then(|_| self.store.expiration(&index).map(|time| time.get_ttl())) + } + + /// `max_cost` returns the max cost of the cache. + #[inline] + pub fn max_cost(&self) -> i64 { + self.policy.max_cost() + } + + /// `update_max_cost` updates the maxCost of an existing cache. + #[inline] + pub fn update_max_cost(&self, max_cost: i64) { + self.policy.update_max_cost(max_cost) + } + + /// Returns the number of items in the Cache + #[inline] + pub fn len(&self) -> usize { + self.store.len() + } + + /// Returns true if the cache is empty + #[inline] + pub fn is_empty(&self) -> bool { + self.store.len() == 0 + } + + #[inline] + fn try_update( + &self, + key: K, + val: V, + cost: i64, + ttl: Duration, + only_update: bool, + ) -> Result)>, CacheError> { + let expiration = if ttl.is_zero() { + Time::now() + } else { + Time::now_with_expiration(ttl) + }; + + let (index, conflict) = self.key_to_hash.build_key(&key); + + // cost is eventually updated. The expiration must also be immediately updated + // to prevent items from being prematurely removed from the map. + let external_cost = if cost == 0 { self.coster.cost(&val) } else { 0 }; + match self.store.try_update(index, val, conflict, expiration)? { + UpdateResult::NotExist(v) + | UpdateResult::Reject(v) + | UpdateResult::Conflict(v) => { + if only_update { + Ok(None) + } else { + Ok(Some(( + index, + $item::new(index, conflict, cost + external_cost, v, expiration), + ))) + } + } + UpdateResult::Update(v) => { + self.callback.on_exit(Some(v)); + Ok(Some((index, $item::update(index, cost, external_cost)))) + } + } + } + } + + impl AsRef<$cache> + for $cache + where + K: Hash + Eq, + V: Send + Sync + 'static, + KH: KeyBuilder, + C: Coster, + U: UpdateValidator, + CB: CacheCallback, + S: BuildHasher + Clone + 'static, + { + fn as_ref(&self) -> &$cache { + self + } + } + + impl Clone for $cache + where + K: Hash + Eq, + V: Send + Sync + 'static, + KH: KeyBuilder, + C: Coster, + U: UpdateValidator, + CB: CacheCallback, + S: BuildHasher + Clone + 'static, + { + fn clone(&self) -> Self { + Self { + store: self.store.clone(), + policy: self.policy.clone(), + get_buf: self.get_buf.clone(), + insert_buf_tx: self.insert_buf_tx.clone(), + stop_tx: self.stop_tx.clone(), + clear_tx: self.clear_tx.clone(), + callback: self.callback.clone(), + key_to_hash: self.key_to_hash.clone(), + is_closed: self.is_closed.clone(), + coster: self.coster.clone(), + metrics: self.metrics.clone(), + _marker: self._marker, + } + } + } + }; +} + macro_rules! impl_cache_cleaner { ($cleaner: ident, $processor: ident, $item: ident) => { impl<'a, V, U, CB, S> $cleaner<'a, V, U, CB, S> @@ -548,12 +761,12 @@ pub use sync::{Cache, CacheBuilder}; #[cfg(feature = "async")] #[cfg_attr(docsrs, doc(cfg(feature = "async")))] -mod axync; +mod r#async; #[cfg(feature = "async")] #[cfg_attr(docsrs, doc(cfg(feature = "async")))] -pub use axync::{AsyncCache, AsyncCacheBuilder}; +pub use r#async::{AsyncCache, AsyncCacheBuilder}; // TODO: find the optimal value for this const DEFAULT_INSERT_BUF_SIZE: usize = 32 * 1024; -// const DEFAULT_BUFFER_ITEMS: usize = 64; -const DEFAULT_CLEANUP_DURATION: Duration = Duration::from_millis(500); +pub(crate) const DEFAULT_BUFFER_ITEMS: usize = 64; +const DEFAULT_CLEANUP_DURATION: Duration = Duration::from_secs(2); diff --git a/src/cache/axync.rs b/src/cache/async.rs similarity index 98% rename from src/cache/axync.rs rename to src/cache/async.rs index 8c20979..2ed7ee8 100644 --- a/src/cache/axync.rs +++ b/src/cache/async.rs @@ -3,6 +3,7 @@ use crate::axync::{ }; use crate::cache::builder::CacheBuilderCore; use crate::policy::AsyncLFUPolicy; +use crate::ring::AsyncRingStripe; use crate::store::ShardedMap; use crate::ttl::{ExpirationMap, Time}; use crate::{ @@ -209,7 +210,6 @@ where self.inner.update_validator.unwrap(), hasher.clone(), )); - let mut policy = AsyncLFUPolicy::with_hasher(num_counters, max_cost, hasher, spawner)?; let coster = Arc::new(self.inner.coster.unwrap()); @@ -239,9 +239,12 @@ where spawner(fut); })); + let buffer_items = self.inner.buffer_items; + let get_buf = AsyncRingStripe::new(policy.clone(), buffer_items); let this = AsyncCache { store, policy, + get_buf: Arc::new(get_buf), insert_buf_tx: buf_tx, callback, key_to_hash: Arc::new(self.inner.key_to_hash), @@ -373,6 +376,8 @@ pub struct AsyncCache< /// contention. pub(crate) insert_buf_tx: Sender>, + pub(crate) get_buf: Arc>, + pub(crate) stop_tx: Sender<()>, pub(crate) clear_tx: Sender<()>, @@ -776,6 +781,6 @@ where } impl_builder!(AsyncCacheBuilder); -impl_cache!(AsyncCache, AsyncCacheBuilder, Item); +impl_async_cache!(AsyncCache, AsyncCacheBuilder, Item); impl_cache_processor!(CacheProcessor, Item); impl_cache_cleaner!(CacheCleaner, CacheProcessor, Item); diff --git a/src/cache/builder.rs b/src/cache/builder.rs index 1063f7e..f171944 100644 --- a/src/cache/builder.rs +++ b/src/cache/builder.rs @@ -8,6 +8,8 @@ use std::hash::{BuildHasher, Hash}; use std::marker::PhantomData; use std::time::Duration; +use super::DEFAULT_BUFFER_ITEMS; + pub struct CacheBuilderCore< K, V, @@ -33,6 +35,8 @@ pub struct CacheBuilderCore< pub(crate) max_cost: i64, + pub(crate) buffer_items: usize, + // buffer_items determines the size of Get buffers. // // Unless you have a rare use case, using `64` as the BufferItems value @@ -76,7 +80,7 @@ impl CacheBuilderCore { Self { num_counters, max_cost, - // buffer_items: DEFAULT_BUFFER_ITEMS, + buffer_items: DEFAULT_BUFFER_ITEMS, insert_buffer_size: DEFAULT_INSERT_BUF_SIZE, metrics: false, callback: Some(DefaultCacheCallback::default()), @@ -99,7 +103,7 @@ impl> CacheBuild Self { num_counters, max_cost, - // buffer_items: DEFAULT_BUFFER_ITEMS, + buffer_items: DEFAULT_BUFFER_ITEMS, insert_buffer_size: DEFAULT_INSERT_BUF_SIZE, metrics: false, callback: Some(DefaultCacheCallback::default()), @@ -140,6 +144,7 @@ where Self { num_counters, max_cost: self.max_cost, + buffer_items: self.buffer_items, insert_buffer_size: self.insert_buffer_size, metrics: self.metrics, callback: self.callback, @@ -169,6 +174,27 @@ where Self { num_counters: self.num_counters, max_cost, + buffer_items: self.buffer_items, + insert_buffer_size: self.insert_buffer_size, + metrics: self.metrics, + callback: self.callback, + key_to_hash: self.key_to_hash, + update_validator: self.update_validator, + coster: self.coster, + ignore_internal_cost: self.ignore_internal_cost, + cleanup_duration: self.cleanup_duration, + hasher: self.hasher, + marker_k: self.marker_k, + marker_v: self.marker_v, + } + } + + #[inline] + pub fn set_buffer_items(self, sz: usize) -> Self { + Self { + num_counters: self.num_counters, + max_cost: self.max_cost, + buffer_items: sz, insert_buffer_size: self.insert_buffer_size, metrics: self.metrics, callback: self.callback, @@ -195,6 +221,7 @@ where Self { num_counters: self.num_counters, max_cost: self.max_cost, + buffer_items: self.buffer_items, insert_buffer_size: sz, metrics: self.metrics, callback: self.callback, @@ -218,6 +245,7 @@ where Self { num_counters: self.num_counters, max_cost: self.max_cost, + buffer_items: self.buffer_items, insert_buffer_size: self.insert_buffer_size, metrics: val, callback: self.callback, @@ -242,6 +270,7 @@ where Self { num_counters: self.num_counters, max_cost: self.max_cost, + buffer_items: self.buffer_items, insert_buffer_size: self.insert_buffer_size, metrics: self.metrics, callback: self.callback, @@ -262,6 +291,7 @@ where Self { num_counters: self.num_counters, max_cost: self.max_cost, + buffer_items: self.buffer_items, insert_buffer_size: self.insert_buffer_size, metrics: self.metrics, callback: self.callback, @@ -300,6 +330,7 @@ where CacheBuilderCore { num_counters: self.num_counters, max_cost: self.max_cost, + buffer_items: self.buffer_items, insert_buffer_size: self.insert_buffer_size, metrics: self.metrics, callback: self.callback, @@ -333,6 +364,7 @@ where CacheBuilderCore { num_counters: self.num_counters, max_cost: self.max_cost, + buffer_items: self.buffer_items, insert_buffer_size: self.insert_buffer_size, metrics: self.metrics, callback: self.callback, @@ -360,6 +392,7 @@ where CacheBuilderCore { num_counters: self.num_counters, max_cost: self.max_cost, + buffer_items: self.buffer_items, insert_buffer_size: self.insert_buffer_size, metrics: self.metrics, callback: self.callback, @@ -385,6 +418,7 @@ where CacheBuilderCore { num_counters: self.num_counters, max_cost: self.max_cost, + buffer_items: self.buffer_items, insert_buffer_size: self.insert_buffer_size, metrics: self.metrics, callback: Some(cb), @@ -409,6 +443,7 @@ where CacheBuilderCore { num_counters: self.num_counters, max_cost: self.max_cost, + buffer_items: self.buffer_items, insert_buffer_size: self.insert_buffer_size, metrics: self.metrics, callback: self.callback, diff --git a/src/cache/sync.rs b/src/cache/sync.rs index f4465e3..3c7085a 100644 --- a/src/cache/sync.rs +++ b/src/cache/sync.rs @@ -1,5 +1,6 @@ use crate::cache::builder::CacheBuilderCore; use crate::policy::LFUPolicy; +use crate::ring::RingStripe; use crate::store::ShardedMap; use crate::sync::{ bounded, select, spawn, stop_channel, unbounded, Instant, JoinHandle, Receiver, Sender, @@ -189,6 +190,7 @@ where hasher.clone(), )); + let buffer_items = self.inner.buffer_items; let mut policy = LFUPolicy::with_hasher(num_counters, max_cost, hasher)?; let coster = Arc::new(self.inner.coster.unwrap()); @@ -216,9 +218,11 @@ where ) .spawn(); + let get_buf = RingStripe::new(policy.clone(), buffer_items); let this = Cache { store, policy, + get_buf: Arc::new(get_buf), insert_buf_tx: buf_tx, callback, key_to_hash: Arc::new(self.inner.key_to_hash), @@ -341,6 +345,8 @@ pub struct Cache< /// policy determines what gets let in to the cache and what gets kicked out. pub(crate) policy: Arc>, + pub(crate) get_buf: Arc>, + /// insert_buf is a buffer allowing us to batch/drop Sets during times of high /// contention. pub(crate) insert_buf_tx: Sender>, diff --git a/src/cache/test.rs b/src/cache/test.rs index 37ffcc0..0a8cd9e 100644 --- a/src/cache/test.rs +++ b/src/cache/test.rs @@ -737,7 +737,7 @@ mod sync_test { #[cfg(feature = "async")] mod async_test { use super::*; - use crate::cache::axync::Item; + use crate::cache::r#async::Item; use crate::{ AsyncCache, AsyncCacheBuilder, CacheCallback, Coster, DefaultCacheCallback, DefaultCoster, DefaultKeyBuilder, DefaultUpdateValidator, KeyBuilder, TransparentKeyBuilder, @@ -780,7 +780,7 @@ mod async_test { continue; } sleep(Duration::from_millis(100)).await; - assert_eq!(c.get(&key).unwrap().read(), val); + assert_eq!(c.get(&key).await.unwrap().read(), val); return; } } @@ -832,7 +832,7 @@ mod async_test { sleep(Duration::from_millis(10)).await; loop { - match c.get(&1) { + match c.get(&1).await { None => continue, Some(val) => { assert_eq!(val.read(), 1); @@ -857,7 +857,7 @@ mod async_test { sleep(Duration::from_secs(1)).await; // Set is rejected because the cost of the entry is too high // when accounting for the internal cost of storing the entry. - assert!(c.get(&1).is_none()); + assert!(c.get(&1).await.is_none()); // Update the max cost of the cache and retry. c.update_max_cost(1000); @@ -865,7 +865,7 @@ mod async_test { assert!(c.insert(1, 1, 1).await); sleep(Duration::from_millis(200)).await; - assert_eq!(c.get(&1).unwrap().read(), 1); + assert_eq!(c.get(&1).await.unwrap().read(), 1); c.remove(&1).await; } @@ -912,7 +912,7 @@ mod async_test { assert!(c.insert(1, 1, 1).await); let _ = c.close().await; - assert!(c.get(&1).is_none()); + assert!(c.get(&1).await.is_none()); } #[tokio::test] @@ -975,19 +975,19 @@ mod async_test { c.insert(1, 1, 0).await; sleep(Duration::from_secs(1)).await; - match c.get_mut(&1) { + match c.get_mut(&1).await { None => {} Some(mut val) => { val.write(10); } } - assert!(c.get_mut(&2).is_none()); + assert!(c.get_mut(&2).await.is_none()); // 0.5 and not 1.0 because we tried Getting each item twice assert_eq!(c.metrics.ratio().unwrap(), 0.5); - assert_eq!(c.get_mut(&1).unwrap().read(), 10); + assert_eq!(c.get_mut(&1).await.unwrap().read(), 10); } #[tokio::test] @@ -1002,7 +1002,7 @@ mod async_test { retry_set(c.clone(), 1, 1, 1, Duration::ZERO).await; c.insert(1, 2, 2).await; - assert_eq!(c.get(&1).unwrap().read(), 2); + assert_eq!(c.get(&1).await.unwrap().read(), 2); assert!(c.stop_tx.send(()).await.is_ok()); for _ in 0..32768 { @@ -1025,7 +1025,7 @@ mod async_test { // when accounting for the internal cost. c.insert_with_ttl(1, 1, 1, Duration::ZERO).await; sleep(Duration::from_millis(100)).await; - assert!(c.get(&1).is_none()) + assert!(c.get(&1).await.is_none()) } #[tokio::test] @@ -1043,20 +1043,20 @@ mod async_test { sleep(Duration::from_secs(2)).await; // Get value from cache for key = 1 - assert_eq!(c.get(&1).unwrap().read(), 1); + assert_eq!(c.get(&1).await.unwrap().read(), 1); // wait for expiration sleep(Duration::from_secs(5)).await; // The cached value for key = 1 should be gone - assert!(c.get(&1).is_none()); + assert!(c.get(&1).await.is_none()); // set new value for key = 1 assert!(c.insert_with_ttl(1, 2, 1, Duration::from_secs(5)).await); sleep(Duration::from_secs(2)).await; // get value from cache for key = 1; - assert_eq!(c.get(&1).unwrap().read(), 2); + assert_eq!(c.get(&1).await.unwrap().read(), 2); } #[tokio::test] @@ -1073,7 +1073,7 @@ mod async_test { // Sleep to make sure the item has expired after execution resumes. sleep(Duration::from_secs(2)).await; - assert!(c.get(&1).is_none()); + assert!(c.get(&1).await.is_none()); // Sleep to ensure that the bucket where the item was stored has been cleared // from the expiration map. @@ -1084,13 +1084,13 @@ mod async_test { retry_set(c.clone(), 2, 1, 1, Duration::from_secs(1)).await; retry_set(c.clone(), 2, 2, 1, Duration::from_secs(100)).await; sleep(Duration::from_secs(3)).await; - assert_eq!(c.get(&2).unwrap().read(), 2); + assert_eq!(c.get(&2).await.unwrap().read(), 2); // Verify that entries with no expiration are overwritten. retry_set(c.clone(), 3, 1, 1, Duration::ZERO).await; retry_set(c.clone(), 3, 1, 1, Duration::from_secs(1)).await; sleep(Duration::from_secs(3)).await; - assert!(c.get(&3).is_none()); + assert!(c.get(&3).await.is_none()); } #[tokio::test] @@ -1104,7 +1104,7 @@ mod async_test { // that the delete is not processed before the following get is called. So // wait for a millisecond for things to be processed. sleep(Duration::from_millis(1)).await; - assert!(c.get(&1).is_none()); + assert!(c.get(&1).await.is_none()); } #[tokio::test] @@ -1122,7 +1122,7 @@ mod async_test { c.remove(&3).await; // ensure the key is deleted - assert!(c.get(&3).is_none()); + assert!(c.get(&3).await.is_none()); } #[tokio::test] @@ -1139,7 +1139,7 @@ mod async_test { let expiration = Duration::from_secs(5); retry_set(c.clone(), 1, 1, 1, expiration).await; - assert_eq!(c.get(&1).unwrap().read(), 1); + assert_eq!(c.get(&1).await.unwrap().read(), 1); assert!(c.get_ttl(&1).unwrap() < expiration); c.remove(&1).await; @@ -1150,7 +1150,7 @@ mod async_test { // try expiration with no ttl { retry_set(c.clone(), 2, 2, 1, Duration::ZERO).await; - assert_eq!(c.get(&2).unwrap().read(), 2); + assert_eq!(c.get(&2).await.unwrap().read(), 2); assert_eq!(c.get_ttl(&2).unwrap(), Duration::MAX); } @@ -1164,7 +1164,7 @@ mod async_test { let expiration = Duration::from_secs(1); retry_set(c.clone(), 3, 3, 1, expiration).await; - assert_eq!(c.get(&3).unwrap().read(), 3); + assert_eq!(c.get(&3).await.unwrap().read(), 3); sleep(Duration::from_secs(1)).await; assert!(c.get_ttl(&3).is_none()); } @@ -1187,9 +1187,9 @@ mod async_test { c.clear().await.unwrap(); assert_eq!(c.metrics.get_keys_added(), Some(0)); - (0..10).for_each(|i| { - assert!(c.get(&i).is_none()); - }) + for i in 0..10 { + assert!(c.get(&i).await.is_none()); + } } #[tokio::test] @@ -1209,7 +1209,7 @@ mod async_test { tokio::select! { _ = stop_rx.recv() => return, else => { - tc.get(&1); + _ = tc.get(&1); } } } @@ -1278,7 +1278,7 @@ mod async_test { sleep(Duration::from_millis(100)).await; // Get value from cache for key = 1 - match c.get(&1) { + match c.get(&1).await { None => { clean_win += 1; } @@ -1289,7 +1289,7 @@ mod async_test { // assert_eq!(c.get(&1).unwrap().read(), 1); sleep(Duration::from_millis(1200)).await; - assert!(c.get(&1).is_none()); + assert!(c.get(&1).await.is_none()); } eprintln!("process: {} cleanup: {}", process_win, clean_win); } @@ -1335,7 +1335,7 @@ mod async_test { Ok(_) => break, Err(_) => { let k = get_key(); - if c.get(&k).is_none() { + if c.get(&k).await.is_none() { let mut rng = OsRng::default(); let rv = rng.gen::() % 100; let val = if rv < 10 { @@ -1397,12 +1397,12 @@ mod async_test { assert!(c.insert_with_ttl(0, 1, 1, ttl).await); assert!(c.wait().await.is_ok()); - assert_eq!(c.get(&0).unwrap().value(), &1); + assert_eq!(c.get(&0).await.unwrap().value(), &1); assert!(c.clear().await.is_ok()); assert!(c.wait().await.is_ok()); - assert!(c.get(&0).is_none()); + assert!(c.get(&0).await.is_none()); assert!(c.insert_with_ttl(2, 3, 1, ttl).await); assert!(c.wait().await.is_ok()); - assert_eq!(c.get(&2).unwrap().value(), &3); + assert_eq!(c.get(&2).await.unwrap().value(), &3); } } diff --git a/src/lib.rs b/src/lib.rs index 0a590c1..052cdb3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -189,6 +189,7 @@ mod metrics; /// [1]: https://arxiv.org/abs/1512.00727 #[allow(dead_code)] mod policy; +mod ring; mod sketch; mod store; mod ttl; diff --git a/src/policy/sync.rs b/src/policy/sync.rs index e74f3b2..51d8a27 100644 --- a/src/policy/sync.rs +++ b/src/policy/sync.rs @@ -1,10 +1,7 @@ use crate::policy::PolicyInner; -use crate::sync::{ - select, spawn, stop_channel, unbounded, JoinHandle, Receiver, Sender, UnboundedReceiver, - UnboundedSender, -}; +use crate::sync::{select, spawn, stop_channel, JoinHandle, Receiver, Sender}; use crate::{CacheError, MetricType, Metrics}; -use crossbeam_channel::RecvError; +use crossbeam_channel::{bounded, RecvError}; use parking_lot::Mutex; use std::collections::hash_map::RandomState; use std::hash::BuildHasher; @@ -13,7 +10,7 @@ use std::sync::Arc; pub(crate) struct LFUPolicy { pub(crate) inner: Arc>>, - pub(crate) items_tx: UnboundedSender>, + pub(crate) items_tx: Sender>, pub(crate) stop_tx: Sender<()>, pub(crate) is_closed: AtomicBool, pub(crate) metrics: Arc, @@ -31,7 +28,7 @@ impl LFUPolicy { pub fn with_hasher(ctrs: usize, max_cost: i64, hasher: S) -> Result { let inner = PolicyInner::with_hasher(ctrs, max_cost, hasher)?; - let (items_tx, items_rx) = unbounded(); + let (items_tx, items_rx) = bounded(3); let (stop_tx, stop_rx) = stop_channel(); PolicyProcessor::new(inner.clone(), items_rx, stop_rx).spawn(); @@ -91,7 +88,7 @@ impl LFUPolicy { pub(crate) struct PolicyProcessor { inner: Arc>>, - items_rx: UnboundedReceiver>, + items_rx: Receiver>, stop_rx: Receiver<()>, } @@ -99,7 +96,7 @@ impl PolicyProcessor { #[inline] fn new( inner: Arc>>, - items_rx: UnboundedReceiver>, + items_rx: Receiver>, stop_rx: Receiver<()>, ) -> Self { Self { diff --git a/src/ring.rs b/src/ring.rs new file mode 100644 index 0000000..1601925 --- /dev/null +++ b/src/ring.rs @@ -0,0 +1,71 @@ +use std::{hash::BuildHasher, sync::Arc}; + +use parking_lot::Mutex; + +#[cfg(feature = "async")] +use crate::policy::AsyncLFUPolicy; +use crate::policy::LFUPolicy; + +pub struct RingStripe { + cons: Arc>, + data: Mutex>, + capa: usize, +} + +impl RingStripe +where + S: BuildHasher + Clone + 'static, +{ + pub(crate) fn new(cons: Arc>, capa: usize) -> RingStripe { + RingStripe { + cons, + data: Mutex::new(Vec::with_capacity(capa)), + capa, + } + } + + pub fn push(&self, item: u64) { + let mut data = self.data.lock(); + data.push(item); + if data.len() >= self.capa { + match self.cons.push(data.clone()) { + Ok(true) => *data = Vec::with_capacity(self.capa), + _ => data.clear(), + } + } + } +} + +#[cfg(feature = "async")] + +pub struct AsyncRingStripe { + cons: Arc>, + data: Mutex>, + capa: usize, +} + +#[cfg(feature = "async")] + +impl AsyncRingStripe +where + S: BuildHasher + Clone + 'static + Send, +{ + pub(crate) fn new(cons: Arc>, capa: usize) -> AsyncRingStripe { + AsyncRingStripe { + cons, + data: Mutex::new(Vec::with_capacity(capa)), + capa, + } + } + + pub async fn push(&self, item: u64) { + let mut data = self.data.lock(); + data.push(item); + if data.len() >= self.capa { + match self.cons.push(data.clone()).await { + Ok(true) => *data = Vec::with_capacity(self.capa), + _ => data.clear(), + } + } + } +}