Skip to content

Commit

Permalink
fix refcell borrow_mut error cause channel closed (#44)
Browse files Browse the repository at this point in the history
Fix #13.
  • Loading branch information
Millione authored Apr 19, 2023
1 parent 0324d16 commit 7cbf8ed
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 32 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ serde_json = {version = "1", optional = true}
seahash = "4.1"
wg = "0.3"
thiserror = "1"
tracing = "0.1"
xxhash-rust = { version = "0.8", features = ["xxh64"] }

[dev-dependencies]
Expand Down
17 changes: 9 additions & 8 deletions src/cache/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,22 +702,23 @@ where
loop {
select! {
item = self.insert_buf_rx.recv().fuse() => {
if let Err(_e) = self.handle_insert_event(item) {
return;
if let Err(e) = self.handle_insert_event(item) {
tracing::error!("fail to handle insert event, error: {}", e);
}
}
_ = cleanup_timer.next().fuse() => {
if let Err(_e) = self.handle_cleanup_event() {
return;
if let Err(e) = self.handle_cleanup_event() {
tracing::error!("fail to handle cleanup event, error: {}", e);
}
},
_ = self.clear_rx.recv().fuse() => {
let _ = CacheCleaner::new(&mut self).clean().await;
if let Err(e) = CacheCleaner::new(&mut self).clean().await {
tracing::error!("fail to handle clear event, error: {}", e);
}
},
_ = self.stop_rx.recv().fuse() => {
if let Err(_e) = self.handle_close_event() {
return;
}
_ = self.handle_close_event();
return;
},
}
}
Expand Down
12 changes: 9 additions & 3 deletions src/cache/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,13 +637,19 @@ where
spawn(move || loop {
select! {
recv(self.insert_buf_rx) -> res => {
self.handle_insert_event(res)?;
if let Err(e) = self.handle_insert_event(res) {
tracing::error!("fail to handle insert event: {}", e);
}
},
recv(self.clear_rx) -> _ => {
self.handle_clear_event()?;
if let Err(e) = self.handle_clear_event() {
tracing::error!("fail to handle clear event: {}", e);
}
},
recv(ticker) -> msg => {
self.handle_cleanup_event(msg)?;
if let Err(e) = self.handle_cleanup_event(msg) {
tracing::error!("fail to handle cleanup event: {}", e);
}
},
recv(self.stop_rx) -> _ => return Ok(()),
}
Expand Down
30 changes: 9 additions & 21 deletions src/ttl.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use parking_lot::RwLock;
use std::cell::RefCell;
use std::collections::{hash_map::RandomState, HashMap};
use std::hash::BuildHasher;
use std::ops::{Deref, DerefMut};
Expand Down Expand Up @@ -101,15 +100,15 @@ impl<S: BuildHasher> DerefMut for Bucket<S> {

#[derive(Debug)]
pub(crate) struct ExpirationMap<S = RandomState> {
buckets: RwLock<RefCell<HashMap<i64, Bucket<S>, S>>>,
buckets: RwLock<HashMap<i64, Bucket<S>, S>>,
hasher: S,
}

impl Default for ExpirationMap {
fn default() -> Self {
let hasher = RandomState::default();
Self {
buckets: RwLock::new(RefCell::new(HashMap::with_hasher(hasher.clone()))),
buckets: RwLock::new(HashMap::with_hasher(hasher.clone())),
hasher,
}
}
Expand All @@ -124,7 +123,7 @@ impl ExpirationMap {
impl<S: BuildHasher + Clone + 'static> ExpirationMap<S> {
pub(crate) fn with_hasher(hasher: S) -> ExpirationMap<S> {
ExpirationMap {
buckets: RwLock::new(RefCell::new(HashMap::with_hasher(hasher.clone()))),
buckets: RwLock::new(HashMap::with_hasher(hasher.clone())),
hasher,
}
}
Expand All @@ -137,11 +136,8 @@ impl<S: BuildHasher + Clone + 'static> ExpirationMap<S> {

let bucket_num = storage_bucket(expiration);

let m = self.buckets.read();
let mut m = self.buckets.write();

let mut m = m
.try_borrow_mut()
.map_err(|e| CacheError::InsertError(e.to_string()))?;
match m.get_mut(&bucket_num) {
None => {
let mut bucket = Bucket::with_hasher(self.hasher.clone());
Expand All @@ -167,17 +163,15 @@ impl<S: BuildHasher + Clone + 'static> ExpirationMap<S> {
return Ok(());
}

let m = self.buckets.read();
let (old_bucket_num, new_bucket_num) =
(storage_bucket(old_exp_time), storage_bucket(new_exp_time));

if old_bucket_num == new_bucket_num {
return Ok(());
}

let mut m = m
.try_borrow_mut()
.map_err(|e| CacheError::UpdateError(e.to_string()))?;
let mut m = self.buckets.write();

m.remove(&old_bucket_num);

match m.get_mut(&new_bucket_num) {
Expand All @@ -196,12 +190,8 @@ impl<S: BuildHasher + Clone + 'static> ExpirationMap<S> {

pub fn try_remove(&self, key: &u64, expiration: Time) -> Result<(), CacheError> {
let bucket_num = storage_bucket(expiration);
let m = self.buckets.read();
if let Some(bucket) = m
.try_borrow_mut()
.map_err(|e| CacheError::RemoveError(e.to_string()))?
.get_mut(&bucket_num)
{
let mut m = self.buckets.write();
if let Some(bucket) = m.get_mut(&bucket_num) {
bucket.remove(key);
};

Expand All @@ -212,9 +202,7 @@ impl<S: BuildHasher + Clone + 'static> ExpirationMap<S> {
let bucket_num = cleanup_bucket(now);
Ok(self
.buckets
.read()
.try_borrow_mut()
.map_err(|e| CacheError::CleanupError(e.to_string()))?
.write()
.remove(&bucket_num)
.map(|bucket| bucket.map))
}
Expand Down

0 comments on commit 7cbf8ed

Please sign in to comment.