Skip to content

Commit

Permalink
[ISSUE #793]🔥Implement SendMessageProcessor#handleRetryAndDLQ ✨
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Jul 16, 2024
1 parent 93135eb commit c64b65a
Show file tree
Hide file tree
Showing 9 changed files with 728 additions and 322 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rocketmq-broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ local-ip-address = "0.6.1"
dns-lookup = "2.0"
log = "0.4.22"
cfg-if = { workspace = true }
lazy_static.workspace = true
[dev-dependencies]
mockall = "0.12.1"
static_assertions = { version = "1" }
Expand Down
6 changes: 6 additions & 0 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use crate::broker::broker_hook::BrokerShutdownHook;
use crate::client::default_consumer_ids_change_listener::DefaultConsumerIdsChangeListener;
use crate::client::manager::consumer_manager::ConsumerManager;
use crate::client::manager::producer_manager::ProducerManager;
use crate::client::rebalance::rebalance_lock_manager::RebalanceLockManager;
use crate::filter::manager::consumer_filter_manager::ConsumerFilterManager;
use crate::hook::batch_check_before_put_message::BatchCheckBeforePutMessageHook;
use crate::hook::check_before_put_message::CheckBeforePutMessageHook;
Expand Down Expand Up @@ -110,6 +111,7 @@ pub(crate) struct BrokerRuntime {
is_isolated: Arc<AtomicBool>,
#[cfg(feature = "local_file_store")]
pull_request_hold_service: Option<PullRequestHoldService<DefaultMessageStore>>,
rebalance_lock_manager: Arc<RebalanceLockManager>,
}

impl Clone for BrokerRuntime {
Expand Down Expand Up @@ -142,6 +144,7 @@ impl Clone for BrokerRuntime {
should_start_time: self.should_start_time.clone(),
is_isolated: self.is_isolated.clone(),
pull_request_hold_service: self.pull_request_hold_service.clone(),
rebalance_lock_manager: self.rebalance_lock_manager.clone(),

Check warning on line 147 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L147

Added line #L147 was not covered by tests
}
}
}
Expand Down Expand Up @@ -216,6 +219,7 @@ impl BrokerRuntime {
should_start_time: Arc::new(AtomicU64::new(0)),
is_isolated: Arc::new(AtomicBool::new(false)),
pull_request_hold_service: None,
rebalance_lock_manager: Arc::new(Default::default()),

Check warning on line 222 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L222

Added line #L222 was not covered by tests
}
}

Expand Down Expand Up @@ -372,9 +376,11 @@ impl BrokerRuntime {
fn init_processor(&mut self) -> BrokerRequestProcessor<DefaultMessageStore> {
let send_message_processor = SendMessageProcessor::<DefaultMessageStore>::new(
self.topic_queue_mapping_manager.clone(),
self.subscription_group_manager.clone(),

Check warning on line 379 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L379

Added line #L379 was not covered by tests
self.topic_config_manager.clone(),
self.broker_config.clone(),
self.message_store.as_ref().unwrap(),
self.rebalance_lock_manager.clone(),

Check warning on line 383 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L383

Added line #L383 was not covered by tests
);
let mut pull_message_result_handler =
ArcCellWrapper::new(Box::new(DefaultPullMessageResultHandler::new(
Expand Down
1 change: 1 addition & 0 deletions rocketmq-broker/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ pub(crate) mod consumer_ids_change_listener;
pub(crate) mod default_consumer_ids_change_listener;
pub(crate) mod manager;
pub(crate) mod net;
pub(crate) mod rebalance;
17 changes: 17 additions & 0 deletions rocketmq-broker/src/client/rebalance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
pub mod rebalance_lock_manager;
289 changes: 289 additions & 0 deletions rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashMap;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;

use lazy_static::lazy_static;
use parking_lot::RwLock;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::TimeUtils::get_current_millis;
use tracing::info;
use tracing::warn;

lazy_static! {
pub static ref REBALANCE_LOCK_MAX_LIVE_TIME: i64 = {
std::env::var("rocketmq.broker.rebalance.lockMaxLiveTime")
.unwrap_or("60000".to_string())
.parse::<i64>()
.unwrap_or(60000)
};
}

type MessageQueueLockTable = HashMap<String, HashMap<Arc<MessageQueue>, LockEntry>>;

#[derive(Clone, Default)]
pub struct RebalanceLockManager {
mq_lock_table: Arc<RwLock<MessageQueueLockTable>>,
}

impl RebalanceLockManager {
pub fn is_lock_all_expired(&self, group: &str) -> bool {
let lock_table = self.mq_lock_table.read();
let lock_entry = lock_table.get(group);
if lock_entry.is_none() {
return true;
}
let lock_entry = lock_entry.unwrap();
for (_, entry) in lock_entry.iter() {
if !entry.is_expired() {
return false;
}
}
true

Check warning on line 57 in rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs#L57

Added line #L57 was not covered by tests
}

pub fn try_lock_batch(
&self,
group: &str,
mqs: Vec<Arc<MessageQueue>>,
client_id: &str,
) -> Vec<Arc<MessageQueue>> {
let mut lock_mqs = Vec::new();
let mut not_locked_mqs = Vec::new();
for mq in mqs.iter() {
if self.is_locked(group, mq, client_id) {
lock_mqs.push(mq.clone());

Check warning on line 70 in rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs#L70

Added line #L70 was not covered by tests
} else {
not_locked_mqs.push(mq.clone());
}
}
if !not_locked_mqs.is_empty() {
let mut write_guard = self.mq_lock_table.write();
let mut group_value = write_guard.get_mut(group);
if group_value.is_none() {
group_value = Some(write_guard.entry(group.to_string()).or_default());
}
let group_value = group_value.unwrap();
for mq in not_locked_mqs.iter() {
let lock_entry = group_value.entry(mq.clone()).or_insert_with(|| {
info!(
"RebalanceLockManager#tryLockBatch: lock a message which has not been \
locked yet, group={}, clientId={}, mq={:?}",
group, client_id, mq
);
LockEntry {
client_id: client_id.to_string(),
last_update_timestamp: AtomicI64::new(get_current_millis() as i64),
}

Check warning on line 92 in rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs#L92

Added line #L92 was not covered by tests
});
if lock_entry.is_locked(client_id) {
lock_entry.last_update_timestamp.store(
get_current_millis() as i64,
std::sync::atomic::Ordering::Relaxed,
);
lock_mqs.push(mq.clone());
continue;
}
let old_client_id = lock_entry.client_id.as_str().to_string();
if lock_entry.is_expired() {
lock_entry.client_id = client_id.to_string();
lock_entry.last_update_timestamp.store(
get_current_millis() as i64,
std::sync::atomic::Ordering::Relaxed,

Check warning on line 107 in rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs#L104-L107

Added lines #L104 - L107 were not covered by tests
);
lock_mqs.push(mq.clone());
warn!(

Check warning on line 110 in rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs#L109-L110

Added lines #L109 - L110 were not covered by tests
"RebalanceLockManager#tryLockBatch: try to lock a expired message queue, \
group={} mq={:?}, old client id={}, new client id={}",
group, mq, old_client_id, client_id
);
continue;
}
warn!(
"RebalanceLockManager#tryLockBatch: message queue has been locked by other \
group={}, mq={:?}, locked client id={}, current client id={}",
group, mq, old_client_id, client_id
);
}
}
lock_mqs
}

pub fn unlock_batch(&self, group: &str, mqs: Vec<Arc<MessageQueue>>, client_id: &str) {
let mut write_guard = self.mq_lock_table.write();
let group_value = write_guard.get_mut(group);
if group_value.is_none() {
warn!(

Check warning on line 131 in rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs#L131

Added line #L131 was not covered by tests
"RebalanceLockManager#unlockBatch: group not exist, group={}, clientId={}, \
mqs={:?}",
group, client_id, mqs
);
return;
}
let group_value = group_value.unwrap();
for mq in mqs.iter() {
let lock_entry = group_value.get(mq);
if lock_entry.is_none() {
warn!(

Check warning on line 142 in rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs#L142

Added line #L142 was not covered by tests
"RebalanceLockManager#unlockBatch: mq not locked, group={}, clientId={}, mq={}",
group, client_id, mq
);
continue;
}
let lock_entry = lock_entry.unwrap();
if lock_entry.client_id == *client_id {
group_value.remove(mq);
info!(
"RebalanceLockManager#unlockBatch: unlock a message queue, group={}, \
clientId={}, mq={:?}",
group, client_id, mq
);
} else {
warn!(
"RebalanceLockManager#unlockBatch: unlock a message queue, but the client id \
is not matched, group={}, clientId={}, mq={:?}",
group, client_id, mq
);
}
}
}

fn is_locked(&self, group: &str, mq: &Arc<MessageQueue>, client_id: &str) -> bool {
let lock_table = self.mq_lock_table.read();
let group_value = lock_table.get(group);
if group_value.is_none() {
return false;
}
let group_value = group_value.unwrap();
let lock_entry = group_value.get(mq);
if lock_entry.is_none() {
return false;
}
let lock_entry = lock_entry.unwrap();
let locked = lock_entry.is_locked(client_id);
if locked {
lock_entry.last_update_timestamp.store(
get_current_millis() as i64,
std::sync::atomic::Ordering::Relaxed,
);
}
locked
}
}

struct LockEntry {
client_id: String,
last_update_timestamp: AtomicI64,
}

impl LockEntry {
pub fn new() -> LockEntry {
Self {
client_id: "".to_string(),
last_update_timestamp: AtomicI64::new(get_current_millis() as i64),
}
}

Check warning on line 200 in rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs#L195-L200

Added lines #L195 - L200 were not covered by tests

#[inline]
pub fn is_expired(&self) -> bool {
let now = get_current_millis() as i64;
let last_update_timestamp = self
.last_update_timestamp
.load(std::sync::atomic::Ordering::Relaxed);
(now - last_update_timestamp) > *REBALANCE_LOCK_MAX_LIVE_TIME
}

#[inline]
pub fn is_locked(&self, client_id: &str) -> bool {
self.client_id == client_id && !self.is_expired()
}
}

#[cfg(test)]
mod rebalance_lock_manager_tests {
use std::sync::Arc;

use rocketmq_common::common::message::message_queue::MessageQueue;

use super::*;

#[test]
fn lock_all_expired_returns_true_when_no_locks_exist() {
let manager = RebalanceLockManager::default();
assert!(manager.is_lock_all_expired("test_group"));
}

#[test]
fn lock_all_expired_returns_false_when_active_locks_exist() {
let manager = RebalanceLockManager::default();
let mq = Arc::new(MessageQueue::default());
manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
assert!(!manager.is_lock_all_expired("test_group"));
}

#[test]
fn try_lock_batch_locks_message_queues_for_new_group() {
let manager = RebalanceLockManager::default();
let mq = Arc::new(MessageQueue::default());
let locked_mqs = manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
assert_eq!(locked_mqs.len(), 1);
}

#[test]
fn try_lock_batch_does_not_lock_already_locked_message_queues() {
let manager = RebalanceLockManager::default();
let mq = Arc::new(MessageQueue::default());
manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
let locked_mqs = manager.try_lock_batch("test_group", vec![mq.clone()], "client_2");
assert!(locked_mqs.is_empty());
}

#[test]
fn unlock_batch_unlocks_message_queues_locked_by_client() {
let manager = RebalanceLockManager::default();
let mq = Arc::new(MessageQueue::default());
manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
manager.unlock_batch("test_group", vec![mq.clone()], "client_1");
let locked_mqs = manager.try_lock_batch("test_group", vec![mq.clone()], "client_2");
assert_eq!(locked_mqs.len(), 1);
}

#[test]
fn unlock_batch_does_not_unlock_message_queues_locked_by_other_clients() {
let manager = RebalanceLockManager::default();
let mq = Arc::new(MessageQueue::default());
manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
manager.unlock_batch("test_group", vec![mq.clone()], "client_2");
assert!(!manager.is_lock_all_expired("test_group"));
}

#[test]
fn is_locked_returns_true_for_locked_message_queue() {
let manager = RebalanceLockManager::default();
let mq = Arc::new(MessageQueue::default());
manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
assert!(manager.is_locked("test_group", &mq, "client_1"));
}

#[test]
fn is_locked_returns_false_for_unlocked_message_queue() {
let manager = RebalanceLockManager::default();
let mq = Arc::new(MessageQueue::default());
assert!(!manager.is_locked("test_group", &mq, "client_1"));
}
}
Loading

0 comments on commit c64b65a

Please sign in to comment.