Skip to content

Commit

Permalink
Merge pull request #23 from goldwind-ting/feat-mutecfg
Browse files Browse the repository at this point in the history
delete cfg macros
  • Loading branch information
al8n authored May 22, 2022
2 parents b69afdf + 70099eb commit 1a611d2
Show file tree
Hide file tree
Showing 8 changed files with 415 additions and 392 deletions.
22 changes: 13 additions & 9 deletions src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,15 +583,19 @@ mod test;
use crate::Item as CrateItem;
use std::time::Duration;

cfg_sync!(
mod sync;
pub use sync::{Cache, CacheBuilder};
);

cfg_async!(
mod axync;
pub use axync::{AsyncCacheBuilder, AsyncCache};
);
#[cfg(feature = "sync")]
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
mod sync;
#[cfg(feature = "sync")]
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
pub use sync::{Cache, CacheBuilder};

#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
mod axync;
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub use axync::{AsyncCache, AsyncCacheBuilder};

// TODO: find the optimal value for this
const DEFAULT_INSERT_BUF_SIZE: usize = 32 * 1024;
Expand Down
223 changes: 123 additions & 100 deletions src/cache/axync.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
use crate::axync::{
bounded, select, sleep, spawn, stop_channel, unbounded, Instant, JoinHandle, Receiver, Sender,
UnboundedReceiver, UnboundedSender, WaitGroup,
};
use crate::cache::builder::CacheBuilderCore;
use crate::policy::AsyncLFUPolicy;
use crate::store::ShardedMap;
use crate::ttl::{ExpirationMap, Time};
use crate::{
metrics::MetricType, CacheCallback, CacheError, Coster, DefaultCacheCallback, DefaultCoster,
DefaultKeyBuilder, DefaultUpdateValidator, KeyBuilder, Metrics, UpdateValidator,
};
use std::collections::hash_map::RandomState;
use std::collections::HashMap;
use std::hash::{BuildHasher, Hash};
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use crate::cache::builder::CacheBuilderCore;
use crate::{metrics::MetricType, CacheCallback, CacheError, Coster, DefaultCacheCallback, DefaultCoster, DefaultUpdateValidator, KeyBuilder, Metrics, UpdateValidator, DefaultKeyBuilder};
use crate::axync::{bounded, Receiver, Sender, stop_channel, unbounded, UnboundedReceiver, UnboundedSender, WaitGroup, select, JoinHandle, sleep, Instant, spawn};
use crate::policy::AsyncLFUPolicy;
use crate::store::ShardedMap;
use crate::ttl::{ExpirationMap, Time};

/// The `AsyncCacheBuilder` struct is used when creating [`AsyncCache`] instances if you want to customize the [`AsyncCache`] settings.
///
Expand Down Expand Up @@ -121,9 +127,7 @@ pub struct AsyncCacheBuilder<
inner: CacheBuilderCore<K, V, KH, C, U, CB, S>,
}

impl<K: Hash + Eq, V: Send + Sync + 'static>
AsyncCacheBuilder<K, V>
{
impl<K: Hash + Eq, V: Send + Sync + 'static> AsyncCacheBuilder<K, V> {
/// Create a new AsyncCacheBuilder
#[inline]
pub fn new(num_counters: usize, max_cost: i64) -> Self {
Expand All @@ -133,9 +137,7 @@ AsyncCacheBuilder<K, V>
}
}

impl<K: Hash + Eq, V: Send + Sync + 'static, KH: KeyBuilder<K>>
AsyncCacheBuilder<K, V, KH>
{
impl<K: Hash + Eq, V: Send + Sync + 'static, KH: KeyBuilder<K>> AsyncCacheBuilder<K, V, KH> {
/// Create a new AsyncCacheBuilder
#[inline]
pub fn new_with_key_builder(num_counters: usize, max_cost: i64, kh: KH) -> Self {
Expand All @@ -146,15 +148,15 @@ AsyncCacheBuilder<K, V, KH>
}

impl<K, V, KH, C, U, CB, S> AsyncCacheBuilder<K, V, KH, C, U, CB, S>
where
K: Hash + Eq,
V: Send + Sync + 'static,
KH: KeyBuilder<K>,
C: Coster<V>,
U: UpdateValidator<V>,
CB: CacheCallback<V>,
S: BuildHasher + Clone + 'static + Send, {

where
K: Hash + Eq,
V: Send + Sync + 'static,
KH: KeyBuilder<K>,
C: Coster<V>,
U: UpdateValidator<V>,
CB: CacheCallback<V>,
S: BuildHasher + Clone + 'static + Send,
{
/// Build Cache and start all threads needed by the Cache.
#[inline]
pub fn finalize(self) -> Result<AsyncCache<K, V, KH, C, U, CB, S>, CacheError> {
Expand Down Expand Up @@ -212,7 +214,7 @@ impl<K, V, KH, C, U, CB, S> AsyncCacheBuilder<K, V, KH, C, U, CB, S>
metrics.clone(),
callback.clone(),
)
.spawn();
.spawn();

let this = AsyncCache {
store,
Expand Down Expand Up @@ -307,7 +309,6 @@ impl<V> Item<V> {
}
}


/// AsyncCache is a thread-safe async implementation of a hashmap with a TinyLFU admission
/// policy and a Sampled LFU eviction policy. You can use the same AsyncCache instance
/// from as many threads as you want.
Expand Down Expand Up @@ -372,14 +373,14 @@ pub struct AsyncCache<
}

impl<K, V, KH, C, U, CB, S> AsyncCache<K, V, KH, C, U, CB, S>
where
K: Hash + Eq,
V: Send + Sync + 'static,
KH: KeyBuilder<K>,
C: Coster<V>,
U: UpdateValidator<V>,
CB: CacheCallback<V>,
S: BuildHasher + Clone + 'static,
where
K: Hash + Eq,
V: Send + Sync + 'static,
KH: KeyBuilder<K>,
C: Coster<V>,
U: UpdateValidator<V>,
CB: CacheCallback<V>,
S: BuildHasher + Clone + 'static,
{
/// `insert` attempts to add the key-value item to the cache. If it returns false,
/// then the `insert` was dropped and the key-value item isn't added to the cache. If
Expand All @@ -396,30 +397,47 @@ impl<K, V, KH, C, U, CB, S> AsyncCache<K, V, KH, C, U, CB, S>

/// `try_insert` is the non-panicking version of [`insert`](#method.insert)
pub async fn try_insert(&self, key: K, val: V, cost: i64) -> Result<bool, CacheError> {
self.try_insert_with_ttl(key, val, cost, Duration::ZERO).await
self.try_insert_with_ttl(key, val, cost, Duration::ZERO)
.await
}

/// `insert_with_ttl` works like Set but adds a key-value pair to the cache that will expire
/// after the specified TTL (time to live) has passed. A zero value means the value never
/// expires, which is identical to calling `insert`.
pub async fn insert_with_ttl(&self, key: K, val: V, cost: i64, ttl: Duration) -> bool {
self.try_insert_in(key, val, cost, ttl, false).await.unwrap()
self.try_insert_in(key, val, cost, ttl, false)
.await
.unwrap()
}

/// `try_insert_with_ttl` is the non-panicking version of [`insert_with_ttl`](#method.insert_with_ttl)
pub async fn try_insert_with_ttl(&self, key: K, val: V, cost: i64, ttl: Duration) -> Result<bool, CacheError> {
pub async fn try_insert_with_ttl(
&self,
key: K,
val: V,
cost: i64,
ttl: Duration,
) -> Result<bool, CacheError> {
self.try_insert_in(key, val, cost, ttl, false).await
}

/// `insert_if_present` is like `insert`, but only updates the value of an existing key. It
/// does NOT add the key to cache if it's absent.
pub async fn insert_if_present(&self, key: K, val: V, cost: i64) -> bool {
self.try_insert_in(key, val, cost, Duration::ZERO, true).await.unwrap()
self.try_insert_in(key, val, cost, Duration::ZERO, true)
.await
.unwrap()
}

/// `try_insert_if_present` is the non-panicking version of [`insert_if_present`](#method.insert_if_present)
pub async fn try_insert_if_present(&self, key: K, val: V, cost: i64) -> Result<bool, CacheError> {
self.try_insert_in(key, val, cost, Duration::ZERO, true).await
pub async fn try_insert_if_present(
&self,
key: K,
val: V,
cost: i64,
) -> Result<bool, CacheError> {
self.try_insert_in(key, val, cost, Duration::ZERO, true)
.await
}

/// wait until the previous operations finished.
Expand All @@ -430,10 +448,12 @@ impl<K, V, KH, C, U, CB, S> AsyncCache<K, V, KH, C, U, CB, S>

let wg = WaitGroup::new();
let wait_item = Item::Wait(wg.add(1));
match self.insert_buf_tx
.try_send(wait_item) {
match self.insert_buf_tx.try_send(wait_item) {
Ok(_) => Ok(wg.wait().await),
Err(e) => Err(CacheError::SendError(format!("cache set buf sender: {}", e.to_string()))),
Err(e) => Err(CacheError::SendError(format!(
"cache set buf sender: {}",
e.to_string()
))),
}
}

Expand Down Expand Up @@ -473,19 +493,23 @@ impl<K, V, KH, C, U, CB, S> AsyncCache<K, V, KH, C, U, CB, S>

self.clear()?;
// Block until processItems thread is returned
self.stop_tx
.send(())
.await
.map_err(
|e| CacheError::SendError(format!("fail to send stop signal to working thread, {}", e))
)?;
self.stop_tx.send(()).await.map_err(|e| {
CacheError::SendError(format!("fail to send stop signal to working thread, {}", e))
})?;
self.policy.close().await?;
self.is_closed.store(true, Ordering::SeqCst);
Ok(())
}

#[inline]
async fn try_insert_in(&self, key: K, val: V, cost: i64, ttl: Duration, only_update: bool) -> Result<bool, CacheError> {
async fn try_insert_in(
&self,
key: K,
val: V,
cost: i64,
ttl: Duration,
only_update: bool,
) -> Result<bool, CacheError> {
if self.is_closed.load(Ordering::SeqCst) {
return Ok(false);
}
Expand All @@ -494,41 +518,41 @@ impl<K, V, KH, C, U, CB, S> AsyncCache<K, V, KH, C, U, CB, S>
let is_update = item.is_update();
// Attempt to send item to policy.
select! {
res = self.insert_buf_tx.send(item) => res.map_or_else(|_| {
if is_update {
// Return true if this was an update operation since we've already
// updated the store. For all the other operations (set/delete), we
// return false which means the item was not inserted.
Ok(true)
} else {
self.metrics.add(MetricType::DropSets, index, 1);
Ok(false)
}
}, |_| Ok(true)),
else => {
if is_update {
// Return true if this was an update operation since we've already
// updated the store. For all the other operations (set/delete), we
// return false which means the item was not inserted.
Ok(true)
} else {
self.metrics.add(MetricType::DropSets, index, 1);
Ok(false)
}
res = self.insert_buf_tx.send(item) => res.map_or_else(|_| {
if is_update {
// Return true if this was an update operation since we've already
// updated the store. For all the other operations (set/delete), we
// return false which means the item was not inserted.
Ok(true)
} else {
self.metrics.add(MetricType::DropSets, index, 1);
Ok(false)
}
}, |_| Ok(true)),
else => {
if is_update {
// Return true if this was an update operation since we've already
// updated the store. For all the other operations (set/delete), we
// return false which means the item was not inserted.
Ok(true)
} else {
self.metrics.add(MetricType::DropSets, index, 1);
Ok(false)
}
}
}
} else {
Ok(false)
}
}
}

impl<V, U, CB, S> CacheProcessor<V, U, CB, S>
where
V: Send + Sync + 'static,
U: UpdateValidator<V>,
CB: CacheCallback<V>,
S: BuildHasher + Clone + 'static + Send,
where
V: Send + Sync + 'static,
U: UpdateValidator<V>,
CB: CacheCallback<V>,
S: BuildHasher + Clone + 'static + Send,
{
pub(crate) fn new(
num_to_keep: usize,
Expand Down Expand Up @@ -568,18 +592,18 @@ impl<V, U, CB, S> CacheProcessor<V, U, CB, S>

loop {
select! {
item = self.insert_buf_rx.recv() => {
let _ = self.handle_insert_event(item)?;
}
_ = &mut cleanup_timer => {
cleanup_timer.as_mut().reset(Instant::now() + self.cleanup_duration);
let _ = self.handle_cleanup_event()?;
},
Some(_) = self.clear_rx.recv() => {
let _ = CacheCleaner::new(&mut self).clean().await?;
},
_ = self.stop_rx.recv() => return self.handle_close_event(),
item = self.insert_buf_rx.recv() => {
let _ = self.handle_insert_event(item)?;
}
_ = &mut cleanup_timer => {
cleanup_timer.as_mut().reset(Instant::now() + self.cleanup_duration);
let _ = self.handle_cleanup_event()?;
},
Some(_) = self.clear_rx.recv() => {
let _ = CacheCleaner::new(&mut self).clean().await?;
},
_ = self.stop_rx.recv() => return self.handle_close_event(),
}
}
})
}
Expand All @@ -594,9 +618,8 @@ impl<V, U, CB, S> CacheProcessor<V, U, CB, S>

#[inline]
pub(crate) fn handle_insert_event(&mut self, res: Option<Item<V>>) -> Result<(), CacheError> {
res
.ok_or_else(|| CacheError::RecvError(format!("fail to receive msg from insert buffer")))
.and_then(|item| self.handle_item(item))
res.ok_or_else(|| CacheError::RecvError(format!("fail to receive msg from insert buffer")))
.and_then(|item| self.handle_item(item))
}

#[inline]
Expand All @@ -613,23 +636,23 @@ impl<V, U, CB, S> CacheProcessor<V, U, CB, S>
}

impl<'a, V, U, CB, S> CacheCleaner<'a, V, U, CB, S>
where
V: Send + Sync + 'static,
U: UpdateValidator<V>,
CB: CacheCallback<V>,
S: BuildHasher + Clone + 'static + Send,
where
V: Send + Sync + 'static,
U: UpdateValidator<V>,
CB: CacheCallback<V>,
S: BuildHasher + Clone + 'static + Send,
{
#[inline]
pub(crate) async fn clean(mut self) -> Result<(), CacheError> {
loop {
select! {
// clear out the insert buffer channel.
Some(item) = self.processor.insert_buf_rx.recv() => {
self.handle_item(item);
},
_ = async {} => return Ok(()),
else => return Ok(()),
}
// clear out the insert buffer channel.
Some(item) = self.processor.insert_buf_rx.recv() => {
self.handle_item(item);
},
_ = async {} => return Ok(()),
else => return Ok(()),
}
}
}
}
Expand Down
Loading

0 comments on commit 1a611d2

Please sign in to comment.