Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #765]♻️Refactor ConsumeQueueStore🚀 #766

Merged
merged 2 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 107 additions & 1 deletion rocketmq-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

#![allow(dead_code)]
#![allow(unused_imports)]
#![feature(sync_unsafe_cell)]

use std::cell::SyncUnsafeCell;
use std::ops::Deref;
use std::ops::DerefMut;
use std::sync::Arc;

pub use crate::common::attribute::topic_attributes as TopicAttributes;
pub use crate::common::message::message_accessor as MessageAccessor;
Expand All @@ -40,5 +46,105 @@ pub mod log;
mod thread_pool;
pub mod utils;

pub struct ArcCellWrapper<T: ?Sized> {
inner: Arc<SyncUnsafeCell<T>>,
}

impl<T> ArcCellWrapper<T> {
#[inline]
pub fn new(value: T) -> Self {
Self {
inner: Arc::new(SyncUnsafeCell::new(value)),
}
}
}

impl<T: ?Sized> Clone for ArcCellWrapper<T> {
fn clone(&self) -> Self {
ArcCellWrapper {
inner: Arc::clone(&self.inner),
}
}
}

impl<T> AsRef<T> for ArcCellWrapper<T> {
fn as_ref(&self) -> &T {
unsafe { &*self.inner.get() }
}
}

impl<T> AsMut<T> for ArcCellWrapper<T> {
fn as_mut(&mut self) -> &mut T {
unsafe { &mut *self.inner.get() }
}
}

impl<T> Deref for ArcCellWrapper<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.as_ref()
}
}

impl<T> DerefMut for ArcCellWrapper<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.as_mut()
}
}

#[cfg(test)]
mod tests {}
mod arc_cell_wrapper_tests {
use std::sync::Arc;

use super::*;

#[test]
fn new_creates_arc_cell_wrapper_with_provided_value() {
let wrapper = ArcCellWrapper::new(10);
assert_eq!(*wrapper.as_ref(), 10);
}

#[test]
fn clone_creates_a_new_instance_with_same_value() {
let wrapper = ArcCellWrapper::new(20);
let cloned_wrapper = wrapper.clone();
assert_eq!(*cloned_wrapper.as_ref(), 20);
}

#[test]
fn as_ref_returns_immutable_reference_to_value() {
let wrapper = ArcCellWrapper::new(30);
assert_eq!(*wrapper.as_ref(), 30);
}

#[test]
fn as_mut_returns_mutable_reference_to_value() {
let mut wrapper = ArcCellWrapper::new(40);
*wrapper.as_mut() = 50;
assert_eq!(*wrapper.as_ref(), 50);
}

#[test]
fn deref_returns_reference_to_inner_value() {
let wrapper = ArcCellWrapper::new(60);
assert_eq!(*wrapper, 60);
}

#[test]
fn deref_mut_allows_modification_of_inner_value() {
let mut wrapper = ArcCellWrapper::new(70);
*wrapper = 80;
assert_eq!(*wrapper, 80);
}

#[test]
fn multiple_clones_share_the_same_underlying_data() {
let wrapper = ArcCellWrapper::new(Arc::new(90));
let cloned_wrapper1 = wrapper.clone();
let cloned_wrapper2 = wrapper.clone();

assert_eq!(Arc::strong_count(wrapper.as_ref()), 1);
assert_eq!(Arc::strong_count(cloned_wrapper1.as_ref()), 1);
assert_eq!(Arc::strong_count(cloned_wrapper2.as_ref()), 1);
}
}
8 changes: 2 additions & 6 deletions rocketmq-store/src/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::base::get_message_result::GetMessageResult;
use crate::base::message_result::PutMessageResult;
use crate::filter::MessageFilter;
use crate::hook::put_message_hook::BoxedPutMessageHook;
use crate::queue::ConsumeQueueTrait;
use crate::queue::ArcConsumeQueue;
use crate::stats::broker_stats_manager::BrokerStatsManager;
use crate::store::running_flags::RunningFlags;

Expand Down Expand Up @@ -113,11 +113,7 @@ pub trait RocketMQMessageStore: Clone + 'static {

fn notify_message_arrive_if_necessary(&self, dispatch_request: &mut DispatchRequest);

fn find_consume_queue(
&self,
topic: &str,
queue_id: i32,
) -> Option<Arc<parking_lot::Mutex<Box<dyn ConsumeQueueTrait>>>>;
fn find_consume_queue(&self, topic: &str, queue_id: i32) -> Option<ArcConsumeQueue>;

fn delete_topics(&self, delete_topics: Vec<String>);
}
36 changes: 16 additions & 20 deletions rocketmq-store/src/message_store/default_message_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ use crate::log_file::MessageStore;
use crate::log_file::MAX_PULL_MSG_SIZE;
use crate::queue::build_consume_queue::CommitLogDispatcherBuildConsumeQueue;
use crate::queue::local_file_consume_queue_store::ConsumeQueueStore;
use crate::queue::ArcConsumeQueue;
use crate::queue::ConsumeQueueStoreTrait;
use crate::queue::ConsumeQueueTrait;
use crate::stats::broker_stats_manager::BrokerStatsManager;
use crate::store::running_flags::RunningFlags;
use crate::store_path_config_helper::get_abort_file;
Expand Down Expand Up @@ -728,10 +728,11 @@ impl MessageStore for DefaultMessageStore {
committed: bool,
) -> i64 {
if committed {
self.consume_queue_store
.find_or_create_consume_queue(topic, queue_id)
.lock()
.get_max_offset_in_queue()
let queue = self
.consume_queue_store
.find_or_create_consume_queue(topic, queue_id);

queue.get_max_offset_in_queue()
} else {
self.consume_queue_store
.get_max_offset(topic, queue_id)
Expand Down Expand Up @@ -785,8 +786,8 @@ impl MessageStore for DefaultMessageStore {
let max_offset_py = self.commit_log.get_max_offset();
let consume_queue = self.find_consume_queue(topic, queue_id);
if let Some(consume_queue) = consume_queue {
min_offset = consume_queue.lock().get_min_offset_in_queue();
max_offset = consume_queue.lock().get_max_offset_in_queue();
min_offset = consume_queue.get_min_offset_in_queue();
max_offset = consume_queue.get_max_offset_in_queue();
if max_offset == 0 {
status = GetMessageStatus::NoMessageInQueue;
next_begin_offset = self.next_offset_correction(offset, 0);
Expand All @@ -803,7 +804,7 @@ impl MessageStore for DefaultMessageStore {
let max_filter_message_size = self
.message_store_config
.max_filter_message_size
.max(max_msg_nums * consume_queue.lock().get_unit_size());
.max(max_msg_nums * consume_queue.get_unit_size());
let disk_fall_recorded = self.message_store_config.disk_fall_recorded;
let mut max_pull_size = max_total_msg_size.max(100);
if max_pull_size > MAX_PULL_MSG_SIZE {
Expand All @@ -824,15 +825,14 @@ impl MessageStore for DefaultMessageStore {
.travel_cq_file_num_when_get_message
{
cq_file_num += 1;
let buffer_consume_queue = consume_queue
.lock()
.iterate_from_inner(next_begin_offset, max_msg_nums);
let buffer_consume_queue =
consume_queue.iterate_from_inner(next_begin_offset, max_msg_nums);
if buffer_consume_queue.is_none() {
status = GetMessageStatus::OffsetFoundNull;
next_begin_offset = self.next_offset_correction(
next_begin_offset,
self.consume_queue_store
.roll_next_file(&**consume_queue.lock(), next_begin_offset),
.roll_next_file(&**consume_queue, next_begin_offset),
);
warn!(
"consumer request topic: {}, offset: {}, minOffset: {}, maxOffset: \
Expand All @@ -856,7 +856,7 @@ impl MessageStore for DefaultMessageStore {
&self.message_store_config,
);
if (cq_unit.queue_offset - offset)
* consume_queue.lock().get_unit_size() as i64
* consume_queue.get_unit_size() as i64
> max_filter_message_size as i64
{
break;
Expand Down Expand Up @@ -996,7 +996,7 @@ impl MessageStore for DefaultMessageStore {
let consume_queue = self
.consume_queue_store
.find_or_create_consume_queue(topic, queue_id);
let first_cqitem = consume_queue.lock().get(consume_offset);
let first_cqitem = consume_queue.get(consume_offset);
if first_cqitem.is_none() {
return false;
}
Expand All @@ -1006,7 +1006,7 @@ impl MessageStore for DefaultMessageStore {
let size = cq.size;
return self.check_in_mem_by_commit_offset(start_offset_py, size);
}
let last_cqitem = consume_queue.lock().get(consume_offset + batch_size as i64);
let last_cqitem = consume_queue.get(consume_offset + batch_size as i64);
if last_cqitem.is_none() {
let size = cq.size;
return self.check_in_mem_by_commit_offset(start_offset_py, size);
Expand All @@ -1033,11 +1033,7 @@ impl MessageStore for DefaultMessageStore {
}
}

fn find_consume_queue(
&self,
topic: &str,
queue_id: i32,
) -> Option<Arc<parking_lot::Mutex<Box<dyn ConsumeQueueTrait>>>> {
fn find_consume_queue(&self, topic: &str, queue_id: i32) -> Option<ArcConsumeQueue> {
Some(
self.consume_queue_store
.find_or_create_consume_queue(topic, queue_id),
Expand Down
33 changes: 15 additions & 18 deletions rocketmq-store/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod batch_consume_queue;
pub mod build_consume_queue;
mod consume_queue_ext;
pub mod local_file_consume_queue_store;
mod queue_offset_operator;
pub mod single_consume_queue;

use std::collections::HashMap;
use std::sync::Arc;

use rocketmq_common::common::attribute::cq_type::CQType;
use rocketmq_common::common::boundary_type::BoundaryType;
use rocketmq_common::common::message::message_single::MessageExtBrokerInner;
use rocketmq_common::ArcCellWrapper;

use crate::base::dispatch_request::DispatchRequest;
use crate::base::swappable::Swappable;
Expand All @@ -35,6 +28,17 @@ use crate::filter::MessageFilter;
use crate::queue::consume_queue_ext::ConsumeQueueExt;
use crate::queue::queue_offset_operator::QueueOffsetOperator;

mod batch_consume_queue;
pub mod build_consume_queue;
mod consume_queue_ext;
pub mod local_file_consume_queue_store;
mod queue_offset_operator;
pub mod single_consume_queue;

//pub type ArcConsumeQueue = Arc<SyncUnsafeCell<Box<dyn ConsumeQueueTrait>>>;
pub type ArcConsumeQueue = ArcCellWrapper<Box<dyn ConsumeQueueTrait>>;
pub type ConsumeQueueTable = parking_lot::Mutex<HashMap<String, HashMap<i32, ArcConsumeQueue>>>;

/// FileQueueLifeCycle contains life cycle methods of ConsumerQueue that is directly implemented by
/// FILE.
pub trait FileQueueLifeCycle: Swappable {
Expand Down Expand Up @@ -263,19 +267,12 @@ pub trait ConsumeQueueStoreTrait: Send + Sync {
/// `topic`: Topic.
/// `queue_id`: Queue ID.
/// Returns the consumeQueue.
fn find_or_create_consume_queue(
&self,
topic: &str,
queue_id: i32,
) -> Arc<parking_lot::Mutex<Box<dyn ConsumeQueueTrait>>>;
fn find_or_create_consume_queue(&self, topic: &str, queue_id: i32) -> ArcConsumeQueue;

/// Find the consumeQueueMap of topic.
/// `topic`: Topic.
/// Returns the consumeQueueMap of topic.
fn find_consume_queue_map(
&self,
topic: &str,
) -> Option<HashMap<i32, Box<dyn ConsumeQueueTrait>>>;
fn find_consume_queue_map(&self, topic: &str) -> Option<HashMap<i32, ArcConsumeQueue>>;

/// Get the total size of all consumeQueue.
/// Returns the total size of all consumeQueue.
Expand All @@ -293,7 +290,7 @@ pub trait ConsumeQueueStoreTrait: Send + Sync {
/// Trait representing ConsumeQueueInterface.
pub trait ConsumeQueueTrait: Send + Sync + FileQueueLifeCycle {
/// Get the topic name.
fn get_topic(&self) -> String;
fn get_topic(&self) -> &str;

/// Get queue id.
fn get_queue_id(&self) -> i32;
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-store/src/queue/batch_consume_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl Swappable for BatchConsumeQueue {
}

impl ConsumeQueueTrait for BatchConsumeQueue {
fn get_topic(&self) -> String {
fn get_topic(&self) -> &str {
todo!()
}

Expand Down
Loading
Loading