diff --git a/src/cache.rs b/src/cache.rs index aa8a0b0..caca24f 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -583,15 +583,19 @@ mod test; use crate::Item as CrateItem; use std::time::Duration; -cfg_sync!( - mod sync; - pub use sync::{Cache, CacheBuilder}; -); - -cfg_async!( - mod axync; - pub use axync::{AsyncCacheBuilder, AsyncCache}; -); +#[cfg(feature = "sync")] +#[cfg_attr(docsrs, doc(cfg(feature = "sync")))] +mod sync; +#[cfg(feature = "sync")] +#[cfg_attr(docsrs, doc(cfg(feature = "sync")))] +pub use sync::{Cache, CacheBuilder}; + +#[cfg(feature = "async")] +#[cfg_attr(docsrs, doc(cfg(feature = "async")))] +mod axync; +#[cfg(feature = "async")] +#[cfg_attr(docsrs, doc(cfg(feature = "async")))] +pub use axync::{AsyncCache, AsyncCacheBuilder}; // TODO: find the optimal value for this const DEFAULT_INSERT_BUF_SIZE: usize = 32 * 1024; diff --git a/src/cache/axync.rs b/src/cache/axync.rs index 4223a1f..31b8f2a 100644 --- a/src/cache/axync.rs +++ b/src/cache/axync.rs @@ -1,16 +1,22 @@ +use crate::axync::{ + bounded, select, sleep, spawn, stop_channel, unbounded, Instant, JoinHandle, Receiver, Sender, + UnboundedReceiver, UnboundedSender, WaitGroup, +}; +use crate::cache::builder::CacheBuilderCore; +use crate::policy::AsyncLFUPolicy; +use crate::store::ShardedMap; +use crate::ttl::{ExpirationMap, Time}; +use crate::{ + metrics::MetricType, CacheCallback, CacheError, Coster, DefaultCacheCallback, DefaultCoster, + DefaultKeyBuilder, DefaultUpdateValidator, KeyBuilder, Metrics, UpdateValidator, +}; use std::collections::hash_map::RandomState; use std::collections::HashMap; use std::hash::{BuildHasher, Hash}; use std::marker::PhantomData; -use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::time::Duration; -use crate::cache::builder::CacheBuilderCore; -use crate::{metrics::MetricType, CacheCallback, CacheError, Coster, DefaultCacheCallback, DefaultCoster, DefaultUpdateValidator, KeyBuilder, Metrics, UpdateValidator, DefaultKeyBuilder}; -use crate::axync::{bounded, Receiver, Sender, stop_channel, unbounded, UnboundedReceiver, UnboundedSender, WaitGroup, select, JoinHandle, sleep, Instant, spawn}; -use crate::policy::AsyncLFUPolicy; -use crate::store::ShardedMap; -use crate::ttl::{ExpirationMap, Time}; /// The `AsyncCacheBuilder` struct is used when creating [`AsyncCache`] instances if you want to customize the [`AsyncCache`] settings. /// @@ -121,9 +127,7 @@ pub struct AsyncCacheBuilder< inner: CacheBuilderCore, } -impl -AsyncCacheBuilder -{ +impl AsyncCacheBuilder { /// Create a new AsyncCacheBuilder #[inline] pub fn new(num_counters: usize, max_cost: i64) -> Self { @@ -133,9 +137,7 @@ AsyncCacheBuilder } } -impl> -AsyncCacheBuilder -{ +impl> AsyncCacheBuilder { /// Create a new AsyncCacheBuilder #[inline] pub fn new_with_key_builder(num_counters: usize, max_cost: i64, kh: KH) -> Self { @@ -146,15 +148,15 @@ AsyncCacheBuilder } impl AsyncCacheBuilder - where - K: Hash + Eq, - V: Send + Sync + 'static, - KH: KeyBuilder, - C: Coster, - U: UpdateValidator, - CB: CacheCallback, - S: BuildHasher + Clone + 'static + Send, { - +where + K: Hash + Eq, + V: Send + Sync + 'static, + KH: KeyBuilder, + C: Coster, + U: UpdateValidator, + CB: CacheCallback, + S: BuildHasher + Clone + 'static + Send, +{ /// Build Cache and start all threads needed by the Cache. #[inline] pub fn finalize(self) -> Result, CacheError> { @@ -212,7 +214,7 @@ impl AsyncCacheBuilder metrics.clone(), callback.clone(), ) - .spawn(); + .spawn(); let this = AsyncCache { store, @@ -307,7 +309,6 @@ impl Item { } } - /// AsyncCache is a thread-safe async implementation of a hashmap with a TinyLFU admission /// policy and a Sampled LFU eviction policy. You can use the same AsyncCache instance /// from as many threads as you want. @@ -372,14 +373,14 @@ pub struct AsyncCache< } impl AsyncCache - where - K: Hash + Eq, - V: Send + Sync + 'static, - KH: KeyBuilder, - C: Coster, - U: UpdateValidator, - CB: CacheCallback, - S: BuildHasher + Clone + 'static, +where + K: Hash + Eq, + V: Send + Sync + 'static, + KH: KeyBuilder, + C: Coster, + U: UpdateValidator, + CB: CacheCallback, + S: BuildHasher + Clone + 'static, { /// `insert` attempts to add the key-value item to the cache. If it returns false, /// then the `insert` was dropped and the key-value item isn't added to the cache. If @@ -396,30 +397,47 @@ impl AsyncCache /// `try_insert` is the non-panicking version of [`insert`](#method.insert) pub async fn try_insert(&self, key: K, val: V, cost: i64) -> Result { - self.try_insert_with_ttl(key, val, cost, Duration::ZERO).await + self.try_insert_with_ttl(key, val, cost, Duration::ZERO) + .await } /// `insert_with_ttl` works like Set but adds a key-value pair to the cache that will expire /// after the specified TTL (time to live) has passed. A zero value means the value never /// expires, which is identical to calling `insert`. pub async fn insert_with_ttl(&self, key: K, val: V, cost: i64, ttl: Duration) -> bool { - self.try_insert_in(key, val, cost, ttl, false).await.unwrap() + self.try_insert_in(key, val, cost, ttl, false) + .await + .unwrap() } /// `try_insert_with_ttl` is the non-panicking version of [`insert_with_ttl`](#method.insert_with_ttl) - pub async fn try_insert_with_ttl(&self, key: K, val: V, cost: i64, ttl: Duration) -> Result { + pub async fn try_insert_with_ttl( + &self, + key: K, + val: V, + cost: i64, + ttl: Duration, + ) -> Result { self.try_insert_in(key, val, cost, ttl, false).await } /// `insert_if_present` is like `insert`, but only updates the value of an existing key. It /// does NOT add the key to cache if it's absent. pub async fn insert_if_present(&self, key: K, val: V, cost: i64) -> bool { - self.try_insert_in(key, val, cost, Duration::ZERO, true).await.unwrap() + self.try_insert_in(key, val, cost, Duration::ZERO, true) + .await + .unwrap() } /// `try_insert_if_present` is the non-panicking version of [`insert_if_present`](#method.insert_if_present) - pub async fn try_insert_if_present(&self, key: K, val: V, cost: i64) -> Result { - self.try_insert_in(key, val, cost, Duration::ZERO, true).await + pub async fn try_insert_if_present( + &self, + key: K, + val: V, + cost: i64, + ) -> Result { + self.try_insert_in(key, val, cost, Duration::ZERO, true) + .await } /// wait until the previous operations finished. @@ -430,10 +448,12 @@ impl AsyncCache let wg = WaitGroup::new(); let wait_item = Item::Wait(wg.add(1)); - match self.insert_buf_tx - .try_send(wait_item) { + match self.insert_buf_tx.try_send(wait_item) { Ok(_) => Ok(wg.wait().await), - Err(e) => Err(CacheError::SendError(format!("cache set buf sender: {}", e.to_string()))), + Err(e) => Err(CacheError::SendError(format!( + "cache set buf sender: {}", + e.to_string() + ))), } } @@ -473,19 +493,23 @@ impl AsyncCache self.clear()?; // Block until processItems thread is returned - self.stop_tx - .send(()) - .await - .map_err( - |e| CacheError::SendError(format!("fail to send stop signal to working thread, {}", e)) - )?; + self.stop_tx.send(()).await.map_err(|e| { + CacheError::SendError(format!("fail to send stop signal to working thread, {}", e)) + })?; self.policy.close().await?; self.is_closed.store(true, Ordering::SeqCst); Ok(()) } #[inline] - async fn try_insert_in(&self, key: K, val: V, cost: i64, ttl: Duration, only_update: bool) -> Result { + async fn try_insert_in( + &self, + key: K, + val: V, + cost: i64, + ttl: Duration, + only_update: bool, + ) -> Result { if self.is_closed.load(Ordering::SeqCst) { return Ok(false); } @@ -494,29 +518,29 @@ impl AsyncCache let is_update = item.is_update(); // Attempt to send item to policy. select! { - res = self.insert_buf_tx.send(item) => res.map_or_else(|_| { - if is_update { - // Return true if this was an update operation since we've already - // updated the store. For all the other operations (set/delete), we - // return false which means the item was not inserted. - Ok(true) - } else { - self.metrics.add(MetricType::DropSets, index, 1); - Ok(false) - } - }, |_| Ok(true)), - else => { - if is_update { - // Return true if this was an update operation since we've already - // updated the store. For all the other operations (set/delete), we - // return false which means the item was not inserted. - Ok(true) - } else { - self.metrics.add(MetricType::DropSets, index, 1); - Ok(false) - } + res = self.insert_buf_tx.send(item) => res.map_or_else(|_| { + if is_update { + // Return true if this was an update operation since we've already + // updated the store. For all the other operations (set/delete), we + // return false which means the item was not inserted. + Ok(true) + } else { + self.metrics.add(MetricType::DropSets, index, 1); + Ok(false) + } + }, |_| Ok(true)), + else => { + if is_update { + // Return true if this was an update operation since we've already + // updated the store. For all the other operations (set/delete), we + // return false which means the item was not inserted. + Ok(true) + } else { + self.metrics.add(MetricType::DropSets, index, 1); + Ok(false) } } + } } else { Ok(false) } @@ -524,11 +548,11 @@ impl AsyncCache } impl CacheProcessor - where - V: Send + Sync + 'static, - U: UpdateValidator, - CB: CacheCallback, - S: BuildHasher + Clone + 'static + Send, +where + V: Send + Sync + 'static, + U: UpdateValidator, + CB: CacheCallback, + S: BuildHasher + Clone + 'static + Send, { pub(crate) fn new( num_to_keep: usize, @@ -568,18 +592,18 @@ impl CacheProcessor loop { select! { - item = self.insert_buf_rx.recv() => { - let _ = self.handle_insert_event(item)?; - } - _ = &mut cleanup_timer => { - cleanup_timer.as_mut().reset(Instant::now() + self.cleanup_duration); - let _ = self.handle_cleanup_event()?; - }, - Some(_) = self.clear_rx.recv() => { - let _ = CacheCleaner::new(&mut self).clean().await?; - }, - _ = self.stop_rx.recv() => return self.handle_close_event(), + item = self.insert_buf_rx.recv() => { + let _ = self.handle_insert_event(item)?; } + _ = &mut cleanup_timer => { + cleanup_timer.as_mut().reset(Instant::now() + self.cleanup_duration); + let _ = self.handle_cleanup_event()?; + }, + Some(_) = self.clear_rx.recv() => { + let _ = CacheCleaner::new(&mut self).clean().await?; + }, + _ = self.stop_rx.recv() => return self.handle_close_event(), + } } }) } @@ -594,9 +618,8 @@ impl CacheProcessor #[inline] pub(crate) fn handle_insert_event(&mut self, res: Option>) -> Result<(), CacheError> { - res - .ok_or_else(|| CacheError::RecvError(format!("fail to receive msg from insert buffer"))) - .and_then(|item| self.handle_item(item)) + res.ok_or_else(|| CacheError::RecvError(format!("fail to receive msg from insert buffer"))) + .and_then(|item| self.handle_item(item)) } #[inline] @@ -613,23 +636,23 @@ impl CacheProcessor } impl<'a, V, U, CB, S> CacheCleaner<'a, V, U, CB, S> - where - V: Send + Sync + 'static, - U: UpdateValidator, - CB: CacheCallback, - S: BuildHasher + Clone + 'static + Send, +where + V: Send + Sync + 'static, + U: UpdateValidator, + CB: CacheCallback, + S: BuildHasher + Clone + 'static + Send, { #[inline] pub(crate) async fn clean(mut self) -> Result<(), CacheError> { loop { select! { - // clear out the insert buffer channel. - Some(item) = self.processor.insert_buf_rx.recv() => { - self.handle_item(item); - }, - _ = async {} => return Ok(()), - else => return Ok(()), - } + // clear out the insert buffer channel. + Some(item) = self.processor.insert_buf_rx.recv() => { + self.handle_item(item); + }, + _ = async {} => return Ok(()), + else => return Ok(()), + } } } } diff --git a/src/lib.rs b/src/lib.rs index e102fbb..9feb2e1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -174,11 +174,9 @@ #![allow(clippy::too_many_arguments, clippy::type_complexity)] #![cfg_attr(docsrs, feature(doc_cfg))] #![cfg_attr(docsrs, allow(unused_attributes))] -mod error; -#[macro_use] -mod macros; mod bbloom; mod cache; +mod error; mod histogram; mod metrics; /// This package includes multiple probabalistic data structures needed for @@ -204,46 +202,49 @@ extern crate log; #[cfg(feature = "serde")] extern crate serde; -cfg_async!( - pub(crate) mod axync { - pub(crate) use tokio::select; - pub(crate) use tokio::sync::mpsc::{ - channel as bounded, Receiver, Sender, UnboundedReceiver, UnboundedSender, - }; - pub(crate) use tokio::task::{spawn, JoinHandle}; - pub(crate) use tokio::time::{sleep, Instant}; - pub(crate) type WaitGroup = wg::AsyncWaitGroup; - use tokio::sync::mpsc::unbounded_channel; - - pub(crate) fn stop_channel() -> (Sender<()>, Receiver<()>) { - bounded(1) - } - - pub(crate) fn unbounded() -> (UnboundedSender, UnboundedReceiver) { - unbounded_channel::() - } +#[cfg(feature = "async")] +#[cfg_attr(docsrs, doc(cfg(feature = "async")))] +pub(crate) mod axync { + pub(crate) use tokio::select; + pub(crate) use tokio::sync::mpsc::{ + channel as bounded, Receiver, Sender, UnboundedReceiver, UnboundedSender, + }; + pub(crate) use tokio::task::{spawn, JoinHandle}; + pub(crate) use tokio::time::{sleep, Instant}; + pub(crate) type WaitGroup = wg::AsyncWaitGroup; + use tokio::sync::mpsc::unbounded_channel; + + pub(crate) fn stop_channel() -> (Sender<()>, Receiver<()>) { + bounded(1) } - pub use cache::{AsyncCache, AsyncCacheBuilder}; -); - -cfg_sync!( - pub(crate) mod sync { - pub(crate) use crossbeam_channel::{bounded, select, unbounded, Receiver, Sender}; - pub(crate) use std::thread::{spawn, JoinHandle}; - pub(crate) use std::time::Instant; - - pub(crate) type UnboundedSender = Sender; - pub(crate) type UnboundedReceiver = Receiver; - pub(crate) type WaitGroup = wg::WaitGroup; - - pub(crate) fn stop_channel() -> (Sender<()>, Receiver<()>) { - bounded(0) - } + pub(crate) fn unbounded() -> (UnboundedSender, UnboundedReceiver) { + unbounded_channel::() } +} +#[cfg(feature = "async")] +#[cfg_attr(docsrs, doc(cfg(feature = "async")))] +pub use cache::{AsyncCache, AsyncCacheBuilder}; + +#[cfg(feature = "sync")] +#[cfg_attr(docsrs, doc(cfg(feature = "sync")))] +pub(crate) mod sync { + pub(crate) use crossbeam_channel::{bounded, select, unbounded, Receiver, Sender}; + pub(crate) use std::thread::{spawn, JoinHandle}; + pub(crate) use std::time::Instant; + + pub(crate) type UnboundedSender = Sender; + pub(crate) type UnboundedReceiver = Receiver; + pub(crate) type WaitGroup = wg::WaitGroup; + + pub(crate) fn stop_channel() -> (Sender<()>, Receiver<()>) { + bounded(0) + } +} - pub use cache::{Cache, CacheBuilder}; -); +#[cfg(feature = "sync")] +#[cfg_attr(docsrs, doc(cfg(feature = "sync")))] +pub use cache::{Cache, CacheBuilder}; pub use error::CacheError; pub use histogram::Histogram; diff --git a/src/macros.rs b/src/macros.rs deleted file mode 100644 index 9558935..0000000 --- a/src/macros.rs +++ /dev/null @@ -1,30 +0,0 @@ -macro_rules! cfg_async { - ($($item:item)*) => { - $( - #[cfg(feature = "async")] - #[cfg_attr(docsrs, doc(cfg(feature = "async")))] - $item - )* - } -} - -macro_rules! cfg_sync { - ($($item:item)*) => { - $( - #[cfg(feature = "sync")] - #[cfg_attr(docsrs, doc(cfg(feature = "sync")))] - $item - )* - } -} - -#[doc(hidden)] -macro_rules! cfg_serde { - ($($item:item)*) => { - $( - #[cfg(feature = "serde")] - #[cfg_attr(docsrs, doc(cfg(feature = "serde")))] - $item - )* - } -} diff --git a/src/metrics.rs b/src/metrics.rs index 97fcdfe..2c87c74 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -419,23 +419,45 @@ impl MetricsInner { } } -cfg_serde! { - use serde::{Serialize, Serializer}; - use serde::ser::{SerializeStruct, Error}; - - impl Serialize for MetricsInner { - fn serialize(&self, serializer: S) -> Result where S: Serializer { - let mut s = serializer.serialize_struct("MetricsInner", 13)?; - - let types: [&'static str; 11] = ["hit", "miss", "keys-added", "keys-updated", "keys-evicted", "cost-added", "cost-evicted", "sets-dropped", "sets-rejected", "gets-dropped", "gets-kept"]; - - for (idx, typ) in METRIC_TYPES_ARRAY.iter().enumerate() { - s.serialize_field(types[idx], &self.get(typ))?; - } - s.serialize_field("gets-total", &(self.get(&MetricType::Hit) + self.get(&MetricType::Miss)))?; - s.serialize_field("hit-ratio", &self.ratio())?; - s.end() +#[cfg(feature = "serde")] +#[cfg_attr(docsrs, doc(cfg(feature = "serde")))] +use serde::ser::{Error, SerializeStruct}; +#[cfg(feature = "serde")] +#[cfg_attr(docsrs, doc(cfg(feature = "serde")))] +use serde::{Serialize, Serializer}; + +#[cfg(feature = "serde")] +#[cfg_attr(docsrs, doc(cfg(feature = "serde")))] +impl Serialize for MetricsInner { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut s = serializer.serialize_struct("MetricsInner", 13)?; + + let types: [&'static str; 11] = [ + "hit", + "miss", + "keys-added", + "keys-updated", + "keys-evicted", + "cost-added", + "cost-evicted", + "sets-dropped", + "sets-rejected", + "gets-dropped", + "gets-kept", + ]; + + for (idx, typ) in METRIC_TYPES_ARRAY.iter().enumerate() { + s.serialize_field(types[idx], &self.get(typ))?; } + s.serialize_field( + "gets-total", + &(self.get(&MetricType::Hit) + self.get(&MetricType::Miss)), + )?; + s.serialize_field("hit-ratio", &self.ratio())?; + s.end() } } diff --git a/src/policy.rs b/src/policy.rs index 1bb8d52..f8386fb 100644 --- a/src/policy.rs +++ b/src/policy.rs @@ -183,15 +183,19 @@ macro_rules! impl_policy { }; } -cfg_sync!( - mod sync; - pub(crate) use sync::LFUPolicy; -); - -cfg_async!( - mod axync; - pub(crate) use axync::AsyncLFUPolicy; -); +#[cfg(feature = "sync")] +#[cfg_attr(docsrs, doc(cfg(feature = "sync")))] +mod sync; +#[cfg(feature = "sync")] +#[cfg_attr(docsrs, doc(cfg(feature = "sync")))] +pub(crate) use sync::LFUPolicy; + +#[cfg(feature = "async")] +#[cfg_attr(docsrs, doc(cfg(feature = "async")))] +mod axync; +#[cfg(feature = "async")] +#[cfg_attr(docsrs, doc(cfg(feature = "async")))] +pub(crate) use axync::AsyncLFUPolicy; pub(crate) struct PolicyInner { admit: TinyLFU, diff --git a/src/policy/axync.rs b/src/policy/axync.rs index 91b3519..21df282 100644 --- a/src/policy/axync.rs +++ b/src/policy/axync.rs @@ -1,11 +1,14 @@ +use crate::axync::{ + select, spawn, stop_channel, unbounded, JoinHandle, Receiver, Sender, UnboundedReceiver, + UnboundedSender, +}; +use crate::policy::PolicyInner; +use crate::{CacheError, MetricType, Metrics}; +use parking_lot::Mutex; use std::collections::hash_map::RandomState; use std::hash::BuildHasher; -use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; -use parking_lot::Mutex; -use crate::axync::{Sender, stop_channel, unbounded, UnboundedSender, select, UnboundedReceiver, Receiver, JoinHandle, spawn}; -use crate::{CacheError, Metrics, MetricType}; -use crate::policy::PolicyInner; +use std::sync::Arc; pub(crate) struct AsyncLFUPolicy { pub(crate) inner: Arc>>, @@ -47,19 +50,19 @@ impl AsyncLFUPolicy { let first = keys[0]; select! { - rst = async { self.items_tx.send(keys) } => rst.map(|_| { - self.metrics.add(MetricType::KeepGets, first, num_of_keys); - true - }) - .map_err(|e| { - self.metrics.add(MetricType::DropGets, first, num_of_keys); - CacheError::SendError(format!("sending on a disconnected channel, msg: {:?}", e)) - }), - else => { - self.metrics.add(MetricType::DropGets, first, num_of_keys); - return Ok(false); - } + rst = async { self.items_tx.send(keys) } => rst.map(|_| { + self.metrics.add(MetricType::KeepGets, first, num_of_keys); + true + }) + .map_err(|e| { + self.metrics.add(MetricType::DropGets, first, num_of_keys); + CacheError::SendError(format!("sending on a disconnected channel, msg: {:?}", e)) + }), + else => { + self.metrics.add(MetricType::DropGets, first, num_of_keys); + return Ok(false); } + } } #[inline] @@ -98,40 +101,37 @@ impl PolicyProcessor { } } - - #[inline] - fn spawn(mut self) -> JoinHandle<()> { - spawn(async { - loop { - select! { - items = self.items_rx.recv() => self.handle_items(items), - _ = self.stop_rx.recv() => { - drop(self); - return; - }, - } + #[inline] + fn spawn(mut self) -> JoinHandle<()> { + spawn(async { + loop { + select! { + items = self.items_rx.recv() => self.handle_items(items), + _ = self.stop_rx.recv() => { + drop(self); + return; + }, } - }) - } + } + }) + } - // TODO: None handle - #[inline] - fn handle_items(&self, items: Option>) { - match items { - Some(items) => { - let mut inner = self.inner.lock(); - inner.admit.increments(items); - } - None => { - // error!("policy processor error") - } + // TODO: None handle + #[inline] + fn handle_items(&self, items: Option>) { + match items { + Some(items) => { + let mut inner = self.inner.lock(); + inner.admit.increments(items); + } + None => { + // error!("policy processor error") } } - + } } unsafe impl Send for PolicyProcessor {} unsafe impl Sync for PolicyProcessor {} - -impl_policy!(AsyncLFUPolicy); \ No newline at end of file +impl_policy!(AsyncLFUPolicy); diff --git a/src/policy/test.rs b/src/policy/test.rs index 1e9572d..e1bfbd6 100644 --- a/src/policy/test.rs +++ b/src/policy/test.rs @@ -163,169 +163,168 @@ fn test_policy_add_after_close() { p.add(1, 1); } -cfg_async! { - mod async_test { - use super::*; - use std::sync::Arc; - use tokio::time::sleep; - use crate::policy::AsyncLFUPolicy; - - #[tokio::test] - async fn test_policy() { - let _ = AsyncLFUPolicy::new(100, 10); - } +#[cfg(feature = "async")] +mod async_test { + use super::*; + use crate::policy::AsyncLFUPolicy; + use std::sync::Arc; + use tokio::time::sleep; + + #[tokio::test] + async fn test_policy() { + let _ = AsyncLFUPolicy::new(100, 10); + } - #[tokio::test] - async fn test_policy_metrics() { - let mut p = AsyncLFUPolicy::new(100, 10).unwrap(); - p.collect_metrics(Arc::new(Metrics::new_op())); - assert!(p.metrics.is_op()); - assert!(p.inner.lock().costs.metrics.is_op()); - } + #[tokio::test] + async fn test_policy_metrics() { + let mut p = AsyncLFUPolicy::new(100, 10).unwrap(); + p.collect_metrics(Arc::new(Metrics::new_op())); + assert!(p.metrics.is_op()); + assert!(p.inner.lock().costs.metrics.is_op()); + } - #[tokio::test] - async fn test_policy_process_items() { - let p = AsyncLFUPolicy::new(100, 10).unwrap(); + #[tokio::test] + async fn test_policy_process_items() { + let p = AsyncLFUPolicy::new(100, 10).unwrap(); - p.push(vec![1, 2, 2]).await.unwrap(); - sleep(WAIT).await; + p.push(vec![1, 2, 2]).await.unwrap(); + sleep(WAIT).await; - let inner = p.inner.lock(); - assert_eq!(inner.admit.estimate(2), 2); - assert_eq!(inner.admit.estimate(1), 1); - drop(inner); + let inner = p.inner.lock(); + assert_eq!(inner.admit.estimate(2), 2); + assert_eq!(inner.admit.estimate(1), 1); + drop(inner); - p.stop_tx.send(()).await.unwrap(); - sleep(WAIT).await; - assert!(p.push(vec![3, 3, 3]).await.is_err()); - let inner = p.inner.lock(); - assert_eq!(inner.admit.estimate(3), 0); - } + p.stop_tx.send(()).await.unwrap(); + sleep(WAIT).await; + assert!(p.push(vec![3, 3, 3]).await.is_err()); + let inner = p.inner.lock(); + assert_eq!(inner.admit.estimate(3), 0); + } - #[tokio::test] - async fn test_policy_push() { - let p = AsyncLFUPolicy::new(100, 10).unwrap(); - assert!(p.push(vec![]).await.unwrap()); + #[tokio::test] + async fn test_policy_push() { + let p = AsyncLFUPolicy::new(100, 10).unwrap(); + assert!(p.push(vec![]).await.unwrap()); - let mut keep_count = 0; - for _ in 0..10 { - if p.push(vec![1, 2, 3, 4, 5]).await.unwrap() { - keep_count += 1; - } + let mut keep_count = 0; + for _ in 0..10 { + if p.push(vec![1, 2, 3, 4, 5]).await.unwrap() { + keep_count += 1; } - - assert_ne!(0, keep_count); } - #[tokio::test] - async fn test_policy_add() { - let p = AsyncLFUPolicy::new(1000, 100).unwrap(); - let (victims, added) = p.add(1, 101); - assert!(victims.is_none()); - assert!(!added); - - let mut inner = p.inner.lock(); - inner.costs.increment(1, 1); - inner.admit.increment(1); - inner.admit.increment(2); - inner.admit.increment(3); - drop(inner); - - let (victims, added) = p.add(1, 1); - assert!(victims.is_none()); - assert!(!added); - - let (victims, added) = p.add(2, 20); - assert!(victims.is_none()); - assert!(added); - - let (victims, added) = p.add(3, 90); - assert!(victims.is_some()); - assert!(added); - - let (victims, added) = p.add(4, 20); - assert!(victims.is_some()); - assert!(!added); - } + assert_ne!(0, keep_count); + } - #[tokio::test] - async fn test_policy_has() { - let p = AsyncLFUPolicy::new(100, 10).unwrap(); - p.add(1, 1); - assert!(p.contains(&1)); - assert!(!p.contains(&2)); - } + #[tokio::test] + async fn test_policy_add() { + let p = AsyncLFUPolicy::new(1000, 100).unwrap(); + let (victims, added) = p.add(1, 101); + assert!(victims.is_none()); + assert!(!added); + + let mut inner = p.inner.lock(); + inner.costs.increment(1, 1); + inner.admit.increment(1); + inner.admit.increment(2); + inner.admit.increment(3); + drop(inner); + + let (victims, added) = p.add(1, 1); + assert!(victims.is_none()); + assert!(!added); + + let (victims, added) = p.add(2, 20); + assert!(victims.is_none()); + assert!(added); + + let (victims, added) = p.add(3, 90); + assert!(victims.is_some()); + assert!(added); + + let (victims, added) = p.add(4, 20); + assert!(victims.is_some()); + assert!(!added); + } - #[tokio::test] - async fn test_policy_del() { - let p = AsyncLFUPolicy::new(100, 10).unwrap(); - p.add(1, 1); - p.remove(&1); - p.remove(&2); - assert!(!p.contains(&1)); - assert!(!p.contains(&2)); - } + #[tokio::test] + async fn test_policy_has() { + let p = AsyncLFUPolicy::new(100, 10).unwrap(); + p.add(1, 1); + assert!(p.contains(&1)); + assert!(!p.contains(&2)); + } - #[tokio::test] - async fn test_policy_cap() { - let p = AsyncLFUPolicy::new(100, 10).unwrap(); - p.add(1, 1); - assert_eq!(p.cap(), 9); - } + #[tokio::test] + async fn test_policy_del() { + let p = AsyncLFUPolicy::new(100, 10).unwrap(); + p.add(1, 1); + p.remove(&1); + p.remove(&2); + assert!(!p.contains(&1)); + assert!(!p.contains(&2)); + } - #[tokio::test] - async fn test_policy_update() { - let p = AsyncLFUPolicy::new(100, 10).unwrap(); - p.add(1, 1); - p.update(&1, 2); - let inner = p.inner.lock(); - assert_eq!(inner.costs.key_costs.get(&1).unwrap(), &2); - } + #[tokio::test] + async fn test_policy_cap() { + let p = AsyncLFUPolicy::new(100, 10).unwrap(); + p.add(1, 1); + assert_eq!(p.cap(), 9); + } - #[tokio::test] - async fn test_policy_cost() { - let p = AsyncLFUPolicy::new(100, 10).unwrap(); - p.add(1, 2); - assert_eq!(p.cost(&1), 2); - assert_eq!(p.cost(&2), -1); - } + #[tokio::test] + async fn test_policy_update() { + let p = AsyncLFUPolicy::new(100, 10).unwrap(); + p.add(1, 1); + p.update(&1, 2); + let inner = p.inner.lock(); + assert_eq!(inner.costs.key_costs.get(&1).unwrap(), &2); + } - #[tokio::test] - async fn test_policy_clear() { - let p = AsyncLFUPolicy::new(100, 10).unwrap(); - p.add(1, 1); - p.add(2, 2); - p.add(3, 3); - p.clear(); - - assert_eq!(p.cap(), 10); - assert!(!p.contains(&2)); - assert!(!p.contains(&2)); - assert!(!p.contains(&3)); - } + #[tokio::test] + async fn test_policy_cost() { + let p = AsyncLFUPolicy::new(100, 10).unwrap(); + p.add(1, 2); + assert_eq!(p.cost(&1), 2); + assert_eq!(p.cost(&2), -1); + } - #[tokio::test] - async fn test_policy_close() { - let p = AsyncLFUPolicy::new(100, 10).unwrap(); - p.add(1, 1); - p.close().await.unwrap(); - sleep(WAIT).await; - assert!(p.items_tx.send(vec![1]).is_err()) - } + #[tokio::test] + async fn test_policy_clear() { + let p = AsyncLFUPolicy::new(100, 10).unwrap(); + p.add(1, 1); + p.add(2, 2); + p.add(3, 3); + p.clear(); + + assert_eq!(p.cap(), 10); + assert!(!p.contains(&2)); + assert!(!p.contains(&2)); + assert!(!p.contains(&3)); + } - #[tokio::test] - async fn test_policy_push_after_close() { - let p = AsyncLFUPolicy::new(100, 10).unwrap(); - p.close().await.unwrap(); - assert!(!p.push(vec![1, 2]).await.unwrap()); - } + #[tokio::test] + async fn test_policy_close() { + let p = AsyncLFUPolicy::new(100, 10).unwrap(); + p.add(1, 1); + p.close().await.unwrap(); + sleep(WAIT).await; + assert!(p.items_tx.send(vec![1]).is_err()) + } - #[tokio::test] - async fn test_policy_add_after_close() { - let p = AsyncLFUPolicy::new(100, 10).unwrap(); - p.close().await.unwrap(); - p.add(1, 1); - } + #[tokio::test] + async fn test_policy_push_after_close() { + let p = AsyncLFUPolicy::new(100, 10).unwrap(); + p.close().await.unwrap(); + assert!(!p.push(vec![1, 2]).await.unwrap()); + } + + #[tokio::test] + async fn test_policy_add_after_close() { + let p = AsyncLFUPolicy::new(100, 10).unwrap(); + p.close().await.unwrap(); + p.add(1, 1); } }