diff --git a/rocketmq-store/src/consume_queue/mapped_file_queue.rs b/rocketmq-store/src/consume_queue/mapped_file_queue.rs index e48f1046..769d4df5 100644 --- a/rocketmq-store/src/consume_queue/mapped_file_queue.rs +++ b/rocketmq-store/src/consume_queue/mapped_file_queue.rs @@ -18,7 +18,10 @@ use std::{ fs, path::{Path, PathBuf}, - sync::Arc, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, }; use log::warn; @@ -42,11 +45,11 @@ pub struct MappedFileQueue { // pub(crate) mapped_files: Vec, pub(crate) allocate_mapped_file_service: Option, - pub(crate) flushed_where: Arc>, + pub(crate) flushed_where: Arc, - pub(crate) committed_where: Arc>, + pub(crate) committed_where: Arc, - pub(crate) store_timestamp: Arc>, + pub(crate) store_timestamp: Arc, } /*impl Swappable for MappedFileQueue { @@ -75,9 +78,9 @@ impl MappedFileQueue { mapped_file_size, mapped_files: Vec::new(), allocate_mapped_file_service, - flushed_where: Arc::new(parking_lot::Mutex::new(0)), - committed_where: Arc::new(parking_lot::Mutex::new(0)), - store_timestamp: Arc::new(parking_lot::Mutex::new(0)), + flushed_where: Arc::new(AtomicU64::new(0)), + committed_where: Arc::new(AtomicU64::new(0)), + store_timestamp: Arc::new(AtomicU64::new(0)), } } @@ -225,11 +228,13 @@ impl MappedFileQueue { } pub fn set_flushed_where(&mut self, flushed_where: i64) { - *self.flushed_where.lock() = flushed_where as u64; + self.flushed_where + .store(flushed_where as u64, Ordering::SeqCst); } pub fn set_committed_where(&mut self, committed_where: i64) { - *self.committed_where.lock() = committed_where as u64; + self.committed_where + .store(committed_where as u64, Ordering::SeqCst); } pub fn truncate_dirty_files(&mut self, offset: i64) {}