Skip to content

Commit

Permalink
[ISSUE #1004]🚀optimize and improve consume logic⚡️
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Sep 27, 2024
1 parent 34cd0ec commit 7f983fd
Show file tree
Hide file tree
Showing 10 changed files with 372 additions and 37 deletions.
5 changes: 5 additions & 0 deletions rocketmq-client/src/consumer/consumer_impl/process_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ impl ProcessQueue {
.store(last_pull_timestamp, std::sync::atomic::Ordering::Release);
}

pub(crate) fn set_last_lock_timestamp(&self, last_lock_timestamp: u64) {
self.last_lock_timestamp
.store(last_lock_timestamp, std::sync::atomic::Ordering::Release);
}

pub fn msg_count(&self) -> u64 {
self.msg_count.load(std::sync::atomic::Ordering::Acquire)
}
Expand Down
204 changes: 167 additions & 37 deletions rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

use std::collections::HashMap;
use std::collections::HashSet;
use std::ops::DerefMut;
use std::sync::Arc;

use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::mix_all;
use rocketmq_common::ArcRefCellWrapper;
use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_common::WeakCellWrapper;
use rocketmq_remoting::protocol::body::request::lock_batch_request_body::LockBatchRequestBody;
use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType;
use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
Expand Down Expand Up @@ -173,45 +176,55 @@ where
let mut changed = false;
let mut remove_queue_map = HashMap::new();
let process_queue_table_cloned = self.process_queue_table.clone();
let mut process_queue_table = process_queue_table_cloned.write().await;
// Drop process queues no longer belong to me
for (mq, pq) in process_queue_table.iter() {
if mq.get_topic() == topic {
if !mq_set.contains(mq) {
pq.set_dropped(true);
remove_queue_map.insert(mq.clone(), pq.clone());
} else if pq.is_pull_expired() {
if let Some(sub_rebalance) = self.sub_rebalance_impl.as_mut().unwrap().upgrade()
{
if sub_rebalance.consume_type() == ConsumeType::ConsumePassively {
pq.set_dropped(true);
remove_queue_map.insert(mq.clone(), pq.clone());
error!(
"[BUG]doRebalance, {:?}, try remove unnecessary mq, {}, because \
pull is pause, so try to fixed it",
self.consumer_group,
mq.get_topic()
);
{
let process_queue_table = process_queue_table_cloned.read().await;
// Drop process queues no longer belong to me
for (mq, pq) in process_queue_table.iter() {
if mq.get_topic() == topic {
if !mq_set.contains(mq) {
pq.set_dropped(true);
remove_queue_map.insert(mq.clone(), pq.clone());
} else if pq.is_pull_expired() {
if let Some(sub_rebalance) =
self.sub_rebalance_impl.as_mut().unwrap().upgrade()
{
if sub_rebalance.consume_type() == ConsumeType::ConsumePassively {
pq.set_dropped(true);
remove_queue_map.insert(mq.clone(), pq.clone());
error!(
"[BUG]doRebalance, {:?}, try remove unnecessary mq, {}, \
because pull is pause, so try to fixed it",
self.consumer_group,
mq.get_topic()
);
}
}
}
}
}
}

// Remove message queues no longer belong to me
for (mq, pq) in remove_queue_map {
if let Some(mut sub_rebalance) = self.sub_rebalance_impl.as_mut().unwrap().upgrade() {
if sub_rebalance
.remove_unnecessary_message_queue(&mq, &pq)
.await
{
process_queue_table.remove(&mq);
changed = true;
info!(
"doRebalance, {:?}, remove unnecessary mq, {}",
self.consumer_group,
mq.get_topic()
);
{
if !remove_queue_map.is_empty() {
let mut process_queue_table = process_queue_table_cloned.write().await;
// Remove message queues no longer belong to me
for (mq, pq) in remove_queue_map {
if let Some(mut sub_rebalance) =
self.sub_rebalance_impl.as_mut().unwrap().upgrade()
{
if sub_rebalance
.remove_unnecessary_message_queue(&mq, &pq)
.await
{
process_queue_table.remove(&mq);
changed = true;
info!(
"doRebalance, {:?}, remove unnecessary mq, {}",
self.consumer_group,
mq.get_topic()
);
}
}
}
}
}
Expand All @@ -223,9 +236,10 @@ where
return false;
}
let mut sub_rebalance_impl = sub_rebalance_impl.unwrap();
let mut process_queue_table = process_queue_table_cloned.write().await;
for mq in mq_set {
if !process_queue_table.contains_key(mq) {
if is_order && !self.lock(mq) {
if is_order && !self.lock(mq, process_queue_table.deref_mut()).await {
warn!(
"doRebalance, {:?}, add a new mq failed, {}, because lock failed",
self.consumer_group,
Expand All @@ -236,7 +250,7 @@ where
}

sub_rebalance_impl.remove_dirty_offset(mq).await;
let pq = Arc::new(ProcessQueue::new());
let pq = Arc::new(sub_rebalance_impl.create_process_queue());
pq.set_locked(true);
let next_offset = sub_rebalance_impl.compute_pull_from_where(mq).await;
if next_offset >= 0 {
Expand Down Expand Up @@ -404,7 +418,123 @@ where
queue_set
}

pub fn lock(&self, mq: &MessageQueue) -> bool {
unimplemented!()
pub async fn lock(
&mut self,
mq: &MessageQueue,
process_queue_table: &mut HashMap<MessageQueue, Arc<ProcessQueue>>,
) -> bool {
let client = self.client_instance.as_mut().unwrap();
let broker_name = client.get_broker_name_from_message_queue(mq).await;
let find_broker_result = client
.find_broker_address_in_subscribe(broker_name.as_str(), mix_all::MASTER_ID, true)
.await;
if let Some(find_broker_result) = find_broker_result {
let mut request_body = LockBatchRequestBody {
consumer_group: Some(self.consumer_group.clone().unwrap()),
client_id: Some(client.client_id.clone()),
..Default::default()
};
request_body.mq_set.insert(mq.clone());
let result = client
.mq_client_api_impl
.as_mut()
.unwrap()
.lock_batch_mq(find_broker_result.broker_addr.as_str(), request_body, 1_000)
.await;
match result {
Ok(locked_mq) => {
for mq in &locked_mq {
if let Some(pq) = process_queue_table.get(mq) {
pq.set_locked(true);
pq.set_last_pull_timestamp(get_current_millis());
}
}
let lock_ok = locked_mq.contains(mq);
info!(
"message queue lock {}, {:?} {}",
lock_ok, self.consumer_group, mq
);
lock_ok
}
Err(e) => {
error!("lockBatchMQ exception {},{}", mq, e);
false
}
}
} else {
false
}
}

pub async fn lock_all(&mut self) {
let broker_mqs = self.build_process_queue_table_by_broker_name().await;
for (broker_name, mqs) in broker_mqs {
if mqs.is_empty() {
continue;
}
let client = self.client_instance.as_mut().unwrap();
let find_broker_result = client
.find_broker_address_in_subscribe(broker_name.as_str(), mix_all::MASTER_ID, true)
.await;
if let Some(find_broker_result) = find_broker_result {
let request_body = LockBatchRequestBody {
consumer_group: Some(self.consumer_group.clone().unwrap()),
client_id: Some(client.client_id.clone()),
mq_set: mqs.clone(),
..Default::default()
};
let result = client
.mq_client_api_impl
.as_mut()
.unwrap()
.lock_batch_mq(find_broker_result.broker_addr.as_str(), request_body, 1_000)
.await;
match result {
Ok(lock_okmqset) => {
let process_queue_table = self.process_queue_table.read().await;
for mq in &mqs {
if let Some(pq) = process_queue_table.get(mq) {
if lock_okmqset.contains(mq) {
if pq.is_locked() {
info!(
"the message queue locked OK, Group: {:?} {}",
self.consumer_group, mq
);
}
pq.set_locked(true);
pq.set_last_lock_timestamp(get_current_millis());
} else {
pq.set_locked(false);
warn!(
"the message queue locked Failed, Group: {:?} {}",
self.consumer_group, mq
);
}
}
}
}
Err(e) => {
error!("lockBatchMQ exception {}", e);
}
}
}
}
}

async fn build_process_queue_table_by_broker_name(
&self,
) -> HashMap<String /* brokerName */, HashSet<MessageQueue>> {
let mut result = HashMap::new();
let process_queue_table = self.process_queue_table.read().await;
let client = self.client_instance.as_ref().unwrap();
for (mq, pq) in process_queue_table.iter() {
if pq.is_dropped() {
continue;
}
let broker_name = client.get_broker_name_from_message_queue(mq).await;
let entry = result.entry(broker_name).or_insert(HashSet::new());
entry.insert(mq.clone());
}
result
}
}
39 changes: 39 additions & 0 deletions rocketmq-client/src/implementation/mq_client_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashSet;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use std::time::Instant;
Expand All @@ -40,11 +41,14 @@ use rocketmq_remoting::code::request_code::RequestCode;
use rocketmq_remoting::code::response_code::ResponseCode;
use rocketmq_remoting::protocol::body::check_client_request_body::CheckClientRequestBody;
use rocketmq_remoting::protocol::body::get_consumer_listby_group_response_body::GetConsumerListByGroupResponseBody;
use rocketmq_remoting::protocol::body::request::lock_batch_request_body::LockBatchRequestBody;
use rocketmq_remoting::protocol::body::response::lock_batch_response_body::LockBatchResponseBody;
use rocketmq_remoting::protocol::body::unlock_batch_request_body::UnlockBatchRequestBody;
use rocketmq_remoting::protocol::header::client_request_header::GetRouteInfoRequestHeader;
use rocketmq_remoting::protocol::header::consumer_send_msg_back_request_header::ConsumerSendMsgBackRequestHeader;
use rocketmq_remoting::protocol::header::get_consumer_listby_group_request_header::GetConsumerListByGroupRequestHeader;
use rocketmq_remoting::protocol::header::heartbeat_request_header::HeartbeatRequestHeader;
use rocketmq_remoting::protocol::header::lock_batch_mq_request_header::LockBatchMqRequestHeader;
use rocketmq_remoting::protocol::header::message_operation_header::send_message_request_header::SendMessageRequestHeader;
use rocketmq_remoting::protocol::header::message_operation_header::send_message_request_header_v2::SendMessageRequestHeaderV2;
use rocketmq_remoting::protocol::header::message_operation_header::send_message_response_header::SendMessageResponseHeader;
Expand Down Expand Up @@ -1078,4 +1082,39 @@ impl MQClientAPIImpl {
}
}
}

pub async fn lock_batch_mq(
&mut self,
addr: &str,
request_body: LockBatchRequestBody,
timeout_millis: u64,
) -> Result<HashSet<MessageQueue>> {
let mut request = RemotingCommand::create_request_command(
RequestCode::LockBatchMq,
LockBatchMqRequestHeader::default(),
);
request.set_body_mut_ref(Some(request_body.encode()));
let response = self
.remoting_client
.invoke_async(
Some(mix_all::broker_vip_channel(
self.client_config.vip_channel_enabled,
addr,
)),
request,
timeout_millis,
)
.await?;
if ResponseCode::from(response.code()) == ResponseCode::Success {
LockBatchResponseBody::decode(response.body().as_ref().unwrap().as_ref())
.map(|body| body.lock_ok_mq_set)
.map_err(|e| MQBrokerError(response.code(), e.to_string(), addr.to_string()))
} else {
Err(MQBrokerError(
response.code(),
response.remark().map_or("".to_string(), |s| s.to_string()),
addr.to_string(),
))
}
}
}
2 changes: 2 additions & 0 deletions rocketmq-remoting/src/protocol/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub mod group_list;
pub mod kv_table;
pub mod pop_process_queue_info;
pub mod process_queue_info;
pub mod request;
pub mod response;
pub mod topic;
pub mod topic_info_wrapper;
pub mod unlock_batch_request_body;
17 changes: 17 additions & 0 deletions rocketmq-remoting/src/protocol/body/request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
pub mod lock_batch_request_body;
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashSet;
use std::fmt::Display;

use rocketmq_common::common::message::message_queue::MessageQueue;
use serde::Deserialize;
use serde::Serialize;

#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct LockBatchRequestBody {
pub consumer_group: Option<String>,
pub client_id: Option<String>,
pub only_this_broker: bool,
pub mq_set: HashSet<MessageQueue>,
}

impl Display for LockBatchRequestBody {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"UnlockBatchRequestBody [consumer_group={}, client_id={}, only_this_broker={}, \
mq_set={:?}]",
self.consumer_group.as_ref().unwrap_or(&"".to_string()),
self.client_id.as_ref().unwrap_or(&"".to_string()),
self.only_this_broker,
self.mq_set
)
}
}
Loading

0 comments on commit 7f983fd

Please sign in to comment.