Skip to content

Commit

Permalink
[ISSUE #735] 🔥Optimize pull message logic
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Jul 5, 2024
1 parent e5330d1 commit f16c97c
Show file tree
Hide file tree
Showing 17 changed files with 448 additions and 259 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,4 @@ trait-variant = "0.1.2"
once_cell = "1.19.0"

mockall = "0.12.1"
cfg-if = "1.0.0"
2 changes: 1 addition & 1 deletion rocketmq-broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ dirs.workspace = true
local-ip-address = "0.6.1"
dns-lookup = "2.0"
log = "0.4.22"

cfg-if = { workspace = true }
[dev-dependencies]
mockall = "0.12.1"
static_assertions = { version = "1" }
Expand Down
14 changes: 14 additions & 0 deletions rocketmq-broker/src/client/manager/consumer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,20 @@ impl ConsumerManager {
.insert(topic.to_string(), subscription_data.clone());
}

pub fn compensate_basic_consumer_info(
&self,
group: &str,
consume_type: ConsumeType,
message_model: MessageModel,
) {
let mut write_guard = self.consumer_compensation_table.write();
let consumer_group_info = write_guard
.entry(group.to_string())
.or_insert_with(|| ConsumerGroupInfo::with_group_name(group.to_string()));
consumer_group_info.set_consume_type(consume_type);
consumer_group_info.set_message_model(message_model);
}

pub fn register_consumer(
&self,
group: &str,
Expand Down
8 changes: 7 additions & 1 deletion rocketmq-broker/src/coldctr/cold_data_cg_ctr_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,10 @@
*/

#[derive(Default)]
pub struct ColdDataCgCtrService {}
pub struct ColdDataCgCtrService;

impl ColdDataCgCtrService {
pub fn is_cg_need_cold_data_flow_ctr(&self, _consumer_group: &str) -> bool {
false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ impl MessageFilter for ExpressionForRetryMessageFilter {
tags_code: Option<i64>,
cq_ext_unit: Option<&CqExtUnit>,
) -> bool {
todo!()
true
}

fn is_matched_by_commit_log(
&self,
msg_buffer: Option<&[u8]>,
properties: Option<&HashMap<String, String>>,
) -> bool {
todo!()
true
}
}
4 changes: 2 additions & 2 deletions rocketmq-broker/src/filter/expression_message_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ impl MessageFilter for ExpressionMessageFilter {
tags_code: Option<i64>,
cq_ext_unit: Option<&CqExtUnit>,
) -> bool {
todo!()
true
}

fn is_matched_by_commit_log(
&self,
msg_buffer: Option<&[u8]>,
properties: Option<&HashMap<String, String>>,
) -> bool {
todo!()
true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use bytes::BytesMut;
use rocketmq_common::common::broker::broker_config::BrokerConfig;
use rocketmq_common::common::mix_all::MASTER_ID;
use rocketmq_common::common::sys_flag::pull_sys_flag::PullSysFlag;
use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_remoting::code::response_code::RemotingSysResponseCode;
use rocketmq_remoting::code::response_code::ResponseCode;
use rocketmq_remoting::net::channel::Channel;
Expand All @@ -44,6 +45,7 @@ use tracing::debug;
use tracing::info;

use crate::client::manager::consumer_manager::ConsumerManager;
use crate::long_polling::pull_request::PullRequest;
use crate::mqtrace::consume_message_context::ConsumeMessageContext;
use crate::mqtrace::consume_message_hook::ConsumeMessageHook;
use crate::offset::manager::broadcast_offset_manager::BroadcastOffsetManager;
Expand Down Expand Up @@ -166,18 +168,20 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler {
request_header.topic.as_str(),
get_message_result.message_count(),
);

ctx.upgrade()?;

if self.broker_config.transfer_msg_by_heap {
let body = self.read_get_message_result(
&get_message_result,
request_header.consumer_group.as_str(),
request_header.topic.as_str(),
request_header.queue_id.unwrap(),
);
return Some(response.set_body(body));
} /*else {
None
}*/
None
Some(response.set_body(body))
} else {
None
}
}
ResponseCode::PullNotFound => {
let has_suspend_flag =
Expand All @@ -196,17 +200,17 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler {
let queue_id = request_header.queue_id.unwrap();
let offset = request_header.queue_offset;

/* PullRequest::new(
let pull_request = PullRequest::new(
request,
channel,
ctx,
polling_time_mills,
begin_time_mills,
get_current_millis(),
offset,
subscription_data,
message_filter,
);*/
Arc::new(message_filter),
);
}

None
}
ResponseCode::PullOffsetMoved => Some(response),
Expand Down
101 changes: 88 additions & 13 deletions rocketmq-broker/src/processor/pull_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use tracing::warn;

use crate::client::consumer_group_info::ConsumerGroupInfo;
use crate::client::manager::consumer_manager::ConsumerManager;
use crate::coldctr::cold_data_cg_ctr_service::ColdDataCgCtrService;
use crate::coldctr::cold_data_pull_request_hold_service::NO_SUSPEND_KEY;
use crate::filter::expression_for_retry_message_filter::ExpressionForRetryMessageFilter;
use crate::filter::expression_message_filter::ExpressionMessageFilter;
Expand All @@ -73,6 +74,7 @@ pub struct PullMessageProcessor<MS> {
consumer_offset_manager: Arc<ConsumerOffsetManager>,
broadcast_offset_manager: Arc<BroadcastOffsetManager>,
message_store: Arc<MS>,
cold_data_cg_ctr_service: Arc<ColdDataCgCtrService>,
}

impl<MS> PullMessageProcessor<MS> {
Expand All @@ -99,6 +101,7 @@ impl<MS> PullMessageProcessor<MS> {
consumer_offset_manager,
broadcast_offset_manager,
message_store,
cold_data_cg_ctr_service: Arc::new(Default::default()),
}
}

Expand Down Expand Up @@ -462,9 +465,17 @@ where
);
}
match RequestSource::parse_integer(request_header.request_source) {
RequestSource::ProxyForBroadcast => {}
RequestSource::ProxyForStream => {}
_ => {}
RequestSource::ProxyForBroadcast => {
unimplemented!("ProxyForBroadcast not implement")
}
RequestSource::ProxyForStream => {
unimplemented!("ProxyForStream not implement")
}
_ => self.consumer_manager.compensate_basic_consumer_info(
request_header.consumer_group.as_str(),
ConsumeType::ConsumePassively,
MessageModel::Clustering,
),
}
let has_subscription_flag =
PullSysFlag::has_subscription_flag(request_header.sys_flag as u32);
Expand Down Expand Up @@ -540,7 +551,7 @@ where
);
}
let sgc_ref = subscription_group_config.as_ref().unwrap();
if sgc_ref.consume_broadcast_enable()
if !sgc_ref.consume_broadcast_enable()
&& consumer_group_info.as_ref().unwrap().get_message_model()
== MessageModel::Broadcasting
{
Expand Down Expand Up @@ -671,6 +682,15 @@ where
};

//ColdDataFlow not implement

cfg_if::cfg_if! {
if #[cfg(feature = "local_file_store")] {
if self.cold_data_cg_ctr_service.is_cg_need_cold_data_flow_ctr(request_header.consumer_group.as_str()) {
unimplemented!("ColdDataFlow not implement")
}
}
}

let use_reset_offset_feature = self.broker_config.use_server_side_reset_offset;
let topic = request_header.topic.as_str();
let group = request_header.consumer_group.as_str();
Expand Down Expand Up @@ -754,8 +774,8 @@ where
return -1;
}
let consumer_group_info = self.consumer_manager.get_consumer_group_info(group);
let proxy_pull_broadcast = RequestSource::ProxyForBroadcast.get_value()
== request_header.request_source.unwrap_or(-2);
let proxy_pull_broadcast = RequestSource::ProxyForBroadcast
== From::from(request_header.request_source.unwrap_or(-2));

if is_broadcast(proxy_pull_broadcast, consumer_group_info.as_ref()) {
let client_id = if proxy_pull_broadcast {
Expand Down Expand Up @@ -821,12 +841,67 @@ pub(crate) fn is_broadcast(
proxy_pull_broadcast: bool,
consumer_group_info: Option<&ConsumerGroupInfo>,
) -> bool {
match consumer_group_info {
Some(info) => {
proxy_pull_broadcast
|| (info.get_message_model() == MessageModel::Broadcasting
&& info.get_consume_type() == ConsumeType::ConsumePassively)
}
None => proxy_pull_broadcast,
proxy_pull_broadcast
|| consumer_group_info.map_or(false, |info| {
matches!(info.get_message_model(), MessageModel::Broadcasting)
&& matches!(info.get_consume_type(), ConsumeType::ConsumePassively)
})
}

#[cfg(test)]
mod tests {
use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType;
use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;

use super::*;
use crate::client::consumer_group_info::ConsumerGroupInfo;

#[test]
fn returns_true_for_proxy_pull_broadcast() {
let result = is_broadcast(true, None);
assert!(
result,
"Should return true when proxy_pull_broadcast is true"
);
}

#[test]
fn returns_false_for_non_broadcast_and_active_consumption() {
let consumer_group_info = ConsumerGroupInfo::new(
"test_group".to_string(),
ConsumeType::ConsumeActively,
MessageModel::Clustering,
ConsumeFromWhere::ConsumeFromLastOffset,
);
let result = is_broadcast(false, Some(&consumer_group_info));
assert!(
!result,
"Should return false for non-broadcast and active consumption"
);
}

#[test]
fn returns_true_for_broadcast_and_passive_consumption() {
let consumer_group_info = ConsumerGroupInfo::new(
"test_group".to_string(),
ConsumeType::ConsumePassively,
MessageModel::Broadcasting,
ConsumeFromWhere::ConsumeFromLastOffset,
);
let result = is_broadcast(false, Some(&consumer_group_info));
assert!(
result,
"Should return true for broadcast and passive consumption"
);
}

#[test]
fn returns_false_when_no_consumer_group_info_provided() {
let result = is_broadcast(false, None);
assert!(
!result,
"Should return false when no consumer group info is provided"
);
}
}
Loading

0 comments on commit f16c97c

Please sign in to comment.