Skip to content

Commit

Permalink
[ISSUE #765]♻️Refactor ConsumeQueueStore🚀
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Jul 12, 2024
1 parent d94b9e0 commit 2775a6e
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 135 deletions.
108 changes: 107 additions & 1 deletion rocketmq-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

#![allow(dead_code)]
#![allow(unused_imports)]
#![feature(sync_unsafe_cell)]

use std::cell::SyncUnsafeCell;
use std::ops::Deref;
use std::ops::DerefMut;
use std::sync::Arc;

pub use crate::common::attribute::topic_attributes as TopicAttributes;
pub use crate::common::message::message_accessor as MessageAccessor;
Expand All @@ -40,5 +46,105 @@ pub mod log;
mod thread_pool;
pub mod utils;

pub struct ArcCellWrapper<T: ?Sized> {
inner: Arc<SyncUnsafeCell<T>>,
}

impl<T> ArcCellWrapper<T> {
#[inline]
pub fn new(value: T) -> Self {
Self {
inner: Arc::new(SyncUnsafeCell::new(value)),
}
}
}

impl<T: ?Sized> Clone for ArcCellWrapper<T> {
fn clone(&self) -> Self {
ArcCellWrapper {
inner: Arc::clone(&self.inner),
}
}
}

impl<T> AsRef<T> for ArcCellWrapper<T> {
fn as_ref(&self) -> &T {
unsafe { &*self.inner.get() }
}
}

impl<T> AsMut<T> for ArcCellWrapper<T> {
fn as_mut(&mut self) -> &mut T {
unsafe { &mut *self.inner.get() }
}
}

impl<T> Deref for ArcCellWrapper<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.as_ref()
}
}

impl<T> DerefMut for ArcCellWrapper<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.as_mut()
}
}

#[cfg(test)]
mod tests {}
mod arc_cell_wrapper_tests {
use std::sync::Arc;

use super::*;

#[test]
fn new_creates_arc_cell_wrapper_with_provided_value() {
let wrapper = ArcCellWrapper::new(10);
assert_eq!(*wrapper.as_ref(), 10);
}

#[test]
fn clone_creates_a_new_instance_with_same_value() {
let wrapper = ArcCellWrapper::new(20);
let cloned_wrapper = wrapper.clone();
assert_eq!(*cloned_wrapper.as_ref(), 20);
}

#[test]
fn as_ref_returns_immutable_reference_to_value() {
let wrapper = ArcCellWrapper::new(30);
assert_eq!(*wrapper.as_ref(), 30);
}

#[test]
fn as_mut_returns_mutable_reference_to_value() {
let mut wrapper = ArcCellWrapper::new(40);
*wrapper.as_mut() = 50;
assert_eq!(*wrapper.as_ref(), 50);
}

#[test]
fn deref_returns_reference_to_inner_value() {
let wrapper = ArcCellWrapper::new(60);
assert_eq!(*wrapper, 60);
}

#[test]
fn deref_mut_allows_modification_of_inner_value() {
let mut wrapper = ArcCellWrapper::new(70);
*wrapper = 80;
assert_eq!(*wrapper, 80);
}

#[test]
fn multiple_clones_share_the_same_underlying_data() {
let wrapper = ArcCellWrapper::new(Arc::new(90));
let cloned_wrapper1 = wrapper.clone();
let cloned_wrapper2 = wrapper.clone();

assert_eq!(Arc::strong_count(wrapper.as_ref()), 3);
assert_eq!(Arc::strong_count(cloned_wrapper1.as_ref()), 3);
assert_eq!(Arc::strong_count(cloned_wrapper2.as_ref()), 3);
}
}
8 changes: 2 additions & 6 deletions rocketmq-store/src/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::base::get_message_result::GetMessageResult;
use crate::base::message_result::PutMessageResult;
use crate::filter::MessageFilter;
use crate::hook::put_message_hook::BoxedPutMessageHook;
use crate::queue::ConsumeQueueTrait;
use crate::queue::ArcConsumeQueue;
use crate::stats::broker_stats_manager::BrokerStatsManager;
use crate::store::running_flags::RunningFlags;

Expand Down Expand Up @@ -113,11 +113,7 @@ pub trait RocketMQMessageStore: Clone + 'static {

fn notify_message_arrive_if_necessary(&self, dispatch_request: &mut DispatchRequest);

fn find_consume_queue(
&self,
topic: &str,
queue_id: i32,
) -> Option<Arc<parking_lot::Mutex<Box<dyn ConsumeQueueTrait>>>>;
fn find_consume_queue(&self, topic: &str, queue_id: i32) -> Option<ArcConsumeQueue>;

fn delete_topics(&self, delete_topics: Vec<String>);
}
36 changes: 16 additions & 20 deletions rocketmq-store/src/message_store/default_message_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ use crate::log_file::MessageStore;
use crate::log_file::MAX_PULL_MSG_SIZE;
use crate::queue::build_consume_queue::CommitLogDispatcherBuildConsumeQueue;
use crate::queue::local_file_consume_queue_store::ConsumeQueueStore;
use crate::queue::ArcConsumeQueue;
use crate::queue::ConsumeQueueStoreTrait;
use crate::queue::ConsumeQueueTrait;
use crate::stats::broker_stats_manager::BrokerStatsManager;
use crate::store::running_flags::RunningFlags;
use crate::store_path_config_helper::get_abort_file;
Expand Down Expand Up @@ -728,10 +728,11 @@ impl MessageStore for DefaultMessageStore {
committed: bool,
) -> i64 {
if committed {
self.consume_queue_store
.find_or_create_consume_queue(topic, queue_id)
.lock()
.get_max_offset_in_queue()
let queue = self
.consume_queue_store
.find_or_create_consume_queue(topic, queue_id);

queue.get_max_offset_in_queue()
} else {
self.consume_queue_store
.get_max_offset(topic, queue_id)
Expand Down Expand Up @@ -785,8 +786,8 @@ impl MessageStore for DefaultMessageStore {
let max_offset_py = self.commit_log.get_max_offset();
let consume_queue = self.find_consume_queue(topic, queue_id);
if let Some(consume_queue) = consume_queue {
min_offset = consume_queue.lock().get_min_offset_in_queue();
max_offset = consume_queue.lock().get_max_offset_in_queue();
min_offset = consume_queue.get_min_offset_in_queue();
max_offset = consume_queue.get_max_offset_in_queue();
if max_offset == 0 {
status = GetMessageStatus::NoMessageInQueue;
next_begin_offset = self.next_offset_correction(offset, 0);
Expand All @@ -803,7 +804,7 @@ impl MessageStore for DefaultMessageStore {
let max_filter_message_size = self
.message_store_config
.max_filter_message_size
.max(max_msg_nums * consume_queue.lock().get_unit_size());
.max(max_msg_nums * consume_queue.get_unit_size());
let disk_fall_recorded = self.message_store_config.disk_fall_recorded;
let mut max_pull_size = max_total_msg_size.max(100);
if max_pull_size > MAX_PULL_MSG_SIZE {
Expand All @@ -824,15 +825,14 @@ impl MessageStore for DefaultMessageStore {
.travel_cq_file_num_when_get_message
{
cq_file_num += 1;
let buffer_consume_queue = consume_queue
.lock()
.iterate_from_inner(next_begin_offset, max_msg_nums);
let buffer_consume_queue =
consume_queue.iterate_from_inner(next_begin_offset, max_msg_nums);
if buffer_consume_queue.is_none() {
status = GetMessageStatus::OffsetFoundNull;
next_begin_offset = self.next_offset_correction(
next_begin_offset,
self.consume_queue_store
.roll_next_file(&**consume_queue.lock(), next_begin_offset),
.roll_next_file(&**consume_queue, next_begin_offset),
);
warn!(
"consumer request topic: {}, offset: {}, minOffset: {}, maxOffset: \
Expand All @@ -856,7 +856,7 @@ impl MessageStore for DefaultMessageStore {
&self.message_store_config,
);
if (cq_unit.queue_offset - offset)
* consume_queue.lock().get_unit_size() as i64
* consume_queue.get_unit_size() as i64
> max_filter_message_size as i64
{
break;
Expand Down Expand Up @@ -996,7 +996,7 @@ impl MessageStore for DefaultMessageStore {
let consume_queue = self
.consume_queue_store
.find_or_create_consume_queue(topic, queue_id);
let first_cqitem = consume_queue.lock().get(consume_offset);
let first_cqitem = consume_queue.get(consume_offset);
if first_cqitem.is_none() {
return false;
}
Expand All @@ -1006,7 +1006,7 @@ impl MessageStore for DefaultMessageStore {
let size = cq.size;
return self.check_in_mem_by_commit_offset(start_offset_py, size);
}
let last_cqitem = consume_queue.lock().get(consume_offset + batch_size as i64);
let last_cqitem = consume_queue.get(consume_offset + batch_size as i64);
if last_cqitem.is_none() {
let size = cq.size;
return self.check_in_mem_by_commit_offset(start_offset_py, size);
Expand All @@ -1033,11 +1033,7 @@ impl MessageStore for DefaultMessageStore {
}
}

fn find_consume_queue(
&self,
topic: &str,
queue_id: i32,
) -> Option<Arc<parking_lot::Mutex<Box<dyn ConsumeQueueTrait>>>> {
fn find_consume_queue(&self, topic: &str, queue_id: i32) -> Option<ArcConsumeQueue> {
Some(
self.consume_queue_store
.find_or_create_consume_queue(topic, queue_id),
Expand Down
33 changes: 15 additions & 18 deletions rocketmq-store/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod batch_consume_queue;
pub mod build_consume_queue;
mod consume_queue_ext;
pub mod local_file_consume_queue_store;
mod queue_offset_operator;
pub mod single_consume_queue;

use std::collections::HashMap;
use std::sync::Arc;

use rocketmq_common::common::attribute::cq_type::CQType;
use rocketmq_common::common::boundary_type::BoundaryType;
use rocketmq_common::common::message::message_single::MessageExtBrokerInner;
use rocketmq_common::ArcCellWrapper;

use crate::base::dispatch_request::DispatchRequest;
use crate::base::swappable::Swappable;
Expand All @@ -35,6 +28,17 @@ use crate::filter::MessageFilter;
use crate::queue::consume_queue_ext::ConsumeQueueExt;
use crate::queue::queue_offset_operator::QueueOffsetOperator;

mod batch_consume_queue;
pub mod build_consume_queue;
mod consume_queue_ext;
pub mod local_file_consume_queue_store;
mod queue_offset_operator;
pub mod single_consume_queue;

//pub type ArcConsumeQueue = Arc<SyncUnsafeCell<Box<dyn ConsumeQueueTrait>>>;
pub type ArcConsumeQueue = ArcCellWrapper<Box<dyn ConsumeQueueTrait>>;
pub type ConsumeQueueTable = parking_lot::Mutex<HashMap<String, HashMap<i32, ArcConsumeQueue>>>;

/// FileQueueLifeCycle contains life cycle methods of ConsumerQueue that is directly implemented by
/// FILE.
pub trait FileQueueLifeCycle: Swappable {
Expand Down Expand Up @@ -263,19 +267,12 @@ pub trait ConsumeQueueStoreTrait: Send + Sync {
/// `topic`: Topic.
/// `queue_id`: Queue ID.
/// Returns the consumeQueue.
fn find_or_create_consume_queue(
&self,
topic: &str,
queue_id: i32,
) -> Arc<parking_lot::Mutex<Box<dyn ConsumeQueueTrait>>>;
fn find_or_create_consume_queue(&self, topic: &str, queue_id: i32) -> ArcConsumeQueue;

/// Find the consumeQueueMap of topic.
/// `topic`: Topic.
/// Returns the consumeQueueMap of topic.
fn find_consume_queue_map(
&self,
topic: &str,
) -> Option<HashMap<i32, Box<dyn ConsumeQueueTrait>>>;
fn find_consume_queue_map(&self, topic: &str) -> Option<HashMap<i32, ArcConsumeQueue>>;

/// Get the total size of all consumeQueue.
/// Returns the total size of all consumeQueue.
Expand All @@ -293,7 +290,7 @@ pub trait ConsumeQueueStoreTrait: Send + Sync {
/// Trait representing ConsumeQueueInterface.
pub trait ConsumeQueueTrait: Send + Sync + FileQueueLifeCycle {
/// Get the topic name.
fn get_topic(&self) -> String;
fn get_topic(&self) -> &str;

/// Get queue id.
fn get_queue_id(&self) -> i32;
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-store/src/queue/batch_consume_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl Swappable for BatchConsumeQueue {
}

impl ConsumeQueueTrait for BatchConsumeQueue {
fn get_topic(&self) -> String {
fn get_topic(&self) -> &str {
todo!()
}

Expand Down
Loading

0 comments on commit 2775a6e

Please sign in to comment.