diff --git a/Cargo.lock b/Cargo.lock index 2db10ddd..ca0a6689 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1484,6 +1484,7 @@ dependencies = [ "futures-io", "futures-sink", "futures-util", + "lazy_static", "local-ip-address", "log", "mockall", diff --git a/rocketmq-broker/Cargo.toml b/rocketmq-broker/Cargo.toml index 29927303..1c00581a 100644 --- a/rocketmq-broker/Cargo.toml +++ b/rocketmq-broker/Cargo.toml @@ -58,6 +58,7 @@ local-ip-address = "0.6.1" dns-lookup = "2.0" log = "0.4.22" cfg-if = { workspace = true } +lazy_static.workspace = true [dev-dependencies] mockall = "0.12.1" static_assertions = { version = "1" } diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index 14848622..b68e7143 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -51,6 +51,7 @@ use crate::broker::broker_hook::BrokerShutdownHook; use crate::client::default_consumer_ids_change_listener::DefaultConsumerIdsChangeListener; use crate::client::manager::consumer_manager::ConsumerManager; use crate::client::manager::producer_manager::ProducerManager; +use crate::client::rebalance::rebalance_lock_manager::RebalanceLockManager; use crate::filter::manager::consumer_filter_manager::ConsumerFilterManager; use crate::hook::batch_check_before_put_message::BatchCheckBeforePutMessageHook; use crate::hook::check_before_put_message::CheckBeforePutMessageHook; @@ -110,6 +111,7 @@ pub(crate) struct BrokerRuntime { is_isolated: Arc, #[cfg(feature = "local_file_store")] pull_request_hold_service: Option>, + rebalance_lock_manager: Arc, } impl Clone for BrokerRuntime { @@ -142,6 +144,7 @@ impl Clone for BrokerRuntime { should_start_time: self.should_start_time.clone(), is_isolated: self.is_isolated.clone(), pull_request_hold_service: self.pull_request_hold_service.clone(), + rebalance_lock_manager: self.rebalance_lock_manager.clone(), } } } @@ -216,6 +219,7 @@ impl BrokerRuntime { should_start_time: Arc::new(AtomicU64::new(0)), is_isolated: Arc::new(AtomicBool::new(false)), pull_request_hold_service: None, + rebalance_lock_manager: Arc::new(Default::default()), } } @@ -372,9 +376,11 @@ impl BrokerRuntime { fn init_processor(&mut self) -> BrokerRequestProcessor { let send_message_processor = SendMessageProcessor::::new( self.topic_queue_mapping_manager.clone(), + self.subscription_group_manager.clone(), self.topic_config_manager.clone(), self.broker_config.clone(), self.message_store.as_ref().unwrap(), + self.rebalance_lock_manager.clone(), ); let mut pull_message_result_handler = ArcCellWrapper::new(Box::new(DefaultPullMessageResultHandler::new( diff --git a/rocketmq-broker/src/client.rs b/rocketmq-broker/src/client.rs index 49a907b1..56922a5a 100644 --- a/rocketmq-broker/src/client.rs +++ b/rocketmq-broker/src/client.rs @@ -22,3 +22,4 @@ pub(crate) mod consumer_ids_change_listener; pub(crate) mod default_consumer_ids_change_listener; pub(crate) mod manager; pub(crate) mod net; +pub(crate) mod rebalance; diff --git a/rocketmq-broker/src/client/rebalance.rs b/rocketmq-broker/src/client/rebalance.rs new file mode 100644 index 00000000..819b0105 --- /dev/null +++ b/rocketmq-broker/src/client/rebalance.rs @@ -0,0 +1,17 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +pub mod rebalance_lock_manager; diff --git a/rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs b/rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs new file mode 100644 index 00000000..a2b2de4e --- /dev/null +++ b/rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::collections::HashMap; +use std::sync::atomic::AtomicI64; +use std::sync::Arc; + +use lazy_static::lazy_static; +use parking_lot::RwLock; +use rocketmq_common::common::message::message_queue::MessageQueue; +use rocketmq_common::TimeUtils::get_current_millis; +use tracing::info; +use tracing::warn; + +lazy_static! { + pub static ref REBALANCE_LOCK_MAX_LIVE_TIME: i64 = { + std::env::var("rocketmq.broker.rebalance.lockMaxLiveTime") + .unwrap_or("60000".to_string()) + .parse::() + .unwrap_or(60000) + }; +} + +type MessageQueueLockTable = HashMap, LockEntry>>; + +#[derive(Clone, Default)] +pub struct RebalanceLockManager { + mq_lock_table: Arc>, +} + +impl RebalanceLockManager { + pub fn is_lock_all_expired(&self, group: &str) -> bool { + let lock_table = self.mq_lock_table.read(); + let lock_entry = lock_table.get(group); + if lock_entry.is_none() { + return true; + } + let lock_entry = lock_entry.unwrap(); + for (_, entry) in lock_entry.iter() { + if !entry.is_expired() { + return false; + } + } + true + } + + pub fn try_lock_batch( + &self, + group: &str, + mqs: Vec>, + client_id: &str, + ) -> Vec> { + let mut lock_mqs = Vec::new(); + let mut not_locked_mqs = Vec::new(); + for mq in mqs.iter() { + if self.is_locked(group, mq, client_id) { + lock_mqs.push(mq.clone()); + } else { + not_locked_mqs.push(mq.clone()); + } + } + if !not_locked_mqs.is_empty() { + let mut write_guard = self.mq_lock_table.write(); + let mut group_value = write_guard.get_mut(group); + if group_value.is_none() { + group_value = Some(write_guard.entry(group.to_string()).or_default()); + } + let group_value = group_value.unwrap(); + for mq in not_locked_mqs.iter() { + let lock_entry = group_value.entry(mq.clone()).or_insert_with(|| { + info!( + "RebalanceLockManager#tryLockBatch: lock a message which has not been \ + locked yet, group={}, clientId={}, mq={:?}", + group, client_id, mq + ); + LockEntry { + client_id: client_id.to_string(), + last_update_timestamp: AtomicI64::new(get_current_millis() as i64), + } + }); + if lock_entry.is_locked(client_id) { + lock_entry.last_update_timestamp.store( + get_current_millis() as i64, + std::sync::atomic::Ordering::Relaxed, + ); + lock_mqs.push(mq.clone()); + continue; + } + let old_client_id = lock_entry.client_id.as_str().to_string(); + if lock_entry.is_expired() { + lock_entry.client_id = client_id.to_string(); + lock_entry.last_update_timestamp.store( + get_current_millis() as i64, + std::sync::atomic::Ordering::Relaxed, + ); + lock_mqs.push(mq.clone()); + warn!( + "RebalanceLockManager#tryLockBatch: try to lock a expired message queue, \ + group={} mq={:?}, old client id={}, new client id={}", + group, mq, old_client_id, client_id + ); + continue; + } + warn!( + "RebalanceLockManager#tryLockBatch: message queue has been locked by other \ + group={}, mq={:?}, locked client id={}, current client id={}", + group, mq, old_client_id, client_id + ); + } + } + lock_mqs + } + + pub fn unlock_batch(&self, group: &str, mqs: Vec>, client_id: &str) { + let mut write_guard = self.mq_lock_table.write(); + let group_value = write_guard.get_mut(group); + if group_value.is_none() { + warn!( + "RebalanceLockManager#unlockBatch: group not exist, group={}, clientId={}, \ + mqs={:?}", + group, client_id, mqs + ); + return; + } + let group_value = group_value.unwrap(); + for mq in mqs.iter() { + let lock_entry = group_value.get(mq); + if lock_entry.is_none() { + warn!( + "RebalanceLockManager#unlockBatch: mq not locked, group={}, clientId={}, mq={}", + group, client_id, mq + ); + continue; + } + let lock_entry = lock_entry.unwrap(); + if lock_entry.client_id == *client_id { + group_value.remove(mq); + info!( + "RebalanceLockManager#unlockBatch: unlock a message queue, group={}, \ + clientId={}, mq={:?}", + group, client_id, mq + ); + } else { + warn!( + "RebalanceLockManager#unlockBatch: unlock a message queue, but the client id \ + is not matched, group={}, clientId={}, mq={:?}", + group, client_id, mq + ); + } + } + } + + fn is_locked(&self, group: &str, mq: &Arc, client_id: &str) -> bool { + let lock_table = self.mq_lock_table.read(); + let group_value = lock_table.get(group); + if group_value.is_none() { + return false; + } + let group_value = group_value.unwrap(); + let lock_entry = group_value.get(mq); + if lock_entry.is_none() { + return false; + } + let lock_entry = lock_entry.unwrap(); + let locked = lock_entry.is_locked(client_id); + if locked { + lock_entry.last_update_timestamp.store( + get_current_millis() as i64, + std::sync::atomic::Ordering::Relaxed, + ); + } + locked + } +} + +struct LockEntry { + client_id: String, + last_update_timestamp: AtomicI64, +} + +impl LockEntry { + pub fn new() -> LockEntry { + Self { + client_id: "".to_string(), + last_update_timestamp: AtomicI64::new(get_current_millis() as i64), + } + } + + #[inline] + pub fn is_expired(&self) -> bool { + let now = get_current_millis() as i64; + let last_update_timestamp = self + .last_update_timestamp + .load(std::sync::atomic::Ordering::Relaxed); + (now - last_update_timestamp) > *REBALANCE_LOCK_MAX_LIVE_TIME + } + + #[inline] + pub fn is_locked(&self, client_id: &str) -> bool { + self.client_id == client_id && !self.is_expired() + } +} + +#[cfg(test)] +mod rebalance_lock_manager_tests { + use std::sync::Arc; + + use rocketmq_common::common::message::message_queue::MessageQueue; + + use super::*; + + #[test] + fn lock_all_expired_returns_true_when_no_locks_exist() { + let manager = RebalanceLockManager::default(); + assert!(manager.is_lock_all_expired("test_group")); + } + + #[test] + fn lock_all_expired_returns_false_when_active_locks_exist() { + let manager = RebalanceLockManager::default(); + let mq = Arc::new(MessageQueue::default()); + manager.try_lock_batch("test_group", vec![mq.clone()], "client_1"); + assert!(!manager.is_lock_all_expired("test_group")); + } + + #[test] + fn try_lock_batch_locks_message_queues_for_new_group() { + let manager = RebalanceLockManager::default(); + let mq = Arc::new(MessageQueue::default()); + let locked_mqs = manager.try_lock_batch("test_group", vec![mq.clone()], "client_1"); + assert_eq!(locked_mqs.len(), 1); + } + + #[test] + fn try_lock_batch_does_not_lock_already_locked_message_queues() { + let manager = RebalanceLockManager::default(); + let mq = Arc::new(MessageQueue::default()); + manager.try_lock_batch("test_group", vec![mq.clone()], "client_1"); + let locked_mqs = manager.try_lock_batch("test_group", vec![mq.clone()], "client_2"); + assert!(locked_mqs.is_empty()); + } + + #[test] + fn unlock_batch_unlocks_message_queues_locked_by_client() { + let manager = RebalanceLockManager::default(); + let mq = Arc::new(MessageQueue::default()); + manager.try_lock_batch("test_group", vec![mq.clone()], "client_1"); + manager.unlock_batch("test_group", vec![mq.clone()], "client_1"); + let locked_mqs = manager.try_lock_batch("test_group", vec![mq.clone()], "client_2"); + assert_eq!(locked_mqs.len(), 1); + } + + #[test] + fn unlock_batch_does_not_unlock_message_queues_locked_by_other_clients() { + let manager = RebalanceLockManager::default(); + let mq = Arc::new(MessageQueue::default()); + manager.try_lock_batch("test_group", vec![mq.clone()], "client_1"); + manager.unlock_batch("test_group", vec![mq.clone()], "client_2"); + assert!(!manager.is_lock_all_expired("test_group")); + } + + #[test] + fn is_locked_returns_true_for_locked_message_queue() { + let manager = RebalanceLockManager::default(); + let mq = Arc::new(MessageQueue::default()); + manager.try_lock_batch("test_group", vec![mq.clone()], "client_1"); + assert!(manager.is_locked("test_group", &mq, "client_1")); + } + + #[test] + fn is_locked_returns_false_for_unlocked_message_queue() { + let manager = RebalanceLockManager::default(); + let mq = Arc::new(MessageQueue::default()); + assert!(!manager.is_locked("test_group", &mq, "client_1")); + } +} diff --git a/rocketmq-broker/src/processor.rs b/rocketmq-broker/src/processor.rs index 8a9df972..a587e486 100644 --- a/rocketmq-broker/src/processor.rs +++ b/rocketmq-broker/src/processor.rs @@ -15,38 +15,15 @@ * limitations under the License. */ -use std::sync::Arc; - -use rand::Rng; -use rocketmq_common::common::broker::broker_config::BrokerConfig; -use rocketmq_common::common::constant::PermName; -use rocketmq_common::common::message::message_enum::MessageType; -use rocketmq_common::common::message::MessageConst; -use rocketmq_common::common::mix_all::RETRY_GROUP_TOPIC_PREFIX; -use rocketmq_common::common::topic::TopicValidator; -use rocketmq_common::common::TopicSysFlag::build_sys_flag; -use rocketmq_common::ArcCellWrapper; -use rocketmq_common::MessageDecoder; -use rocketmq_common::TimeUtils; use rocketmq_remoting::code::request_code::RequestCode; -use rocketmq_remoting::code::response_code::RemotingSysResponseCode::SystemError; -use rocketmq_remoting::code::response_code::ResponseCode; use rocketmq_remoting::net::channel::Channel; -use rocketmq_remoting::protocol::header::message_operation_header::send_message_request_header::SendMessageRequestHeader; -use rocketmq_remoting::protocol::header::message_operation_header::send_message_response_header::SendMessageResponseHeader; -use rocketmq_remoting::protocol::header::message_operation_header::TopicRequestHeaderTrait; use rocketmq_remoting::protocol::remoting_command::RemotingCommand; -use rocketmq_remoting::protocol::NamespaceUtil; use rocketmq_remoting::runtime::processor::RequestProcessor; use rocketmq_remoting::runtime::server::ConnectionHandlerContext; use rocketmq_store::log_file::MessageStore; -use rocketmq_store::stats::broker_stats_manager::BrokerStatsManager; use tracing::info; -use tracing::warn; use self::client_manage_processor::ClientManageProcessor; -use crate::mqtrace::send_message_context::SendMessageContext; -use crate::mqtrace::send_message_hook::SendMessageHook; use crate::processor::ack_message_processor::AckMessageProcessor; use crate::processor::admin_broker_processor::AdminBrokerProcessor; use crate::processor::change_invisible_time_processor::ChangeInvisibleTimeProcessor; @@ -61,7 +38,6 @@ use crate::processor::query_assignment_processor::QueryAssignmentProcessor; use crate::processor::query_message_processor::QueryMessageProcessor; use crate::processor::reply_message_processor::ReplyMessageProcessor; use crate::processor::send_message_processor::SendMessageProcessor; -use crate::topic::manager::topic_config_manager::TopicConfigManager; pub(crate) mod ack_message_processor; pub(crate) mod admin_broker_processor; @@ -176,218 +152,3 @@ impl RequestProcessor for BrokerReques } } } - -#[derive(Clone)] -pub(crate) struct SendMessageProcessorInner { - pub(crate) broker_config: Arc, - pub(crate) topic_config_manager: TopicConfigManager, - pub(crate) send_message_hook_vec: ArcCellWrapper>>, -} - -impl SendMessageProcessorInner { - pub(crate) fn execute_send_message_hook_before(&self, context: &SendMessageContext) { - for hook in self.send_message_hook_vec.iter() { - hook.send_message_before(context); - } - } - - pub(crate) fn execute_send_message_hook_after( - &self, - response: Option<&mut RemotingCommand>, - context: &mut SendMessageContext, - ) { - for hook in self.send_message_hook_vec.iter() { - if let Some(ref response) = response { - if let Some(ref header) = - response.decode_command_custom_header::() - { - context.msg_id = header.msg_id().to_string(); - context.queue_id = Some(header.queue_id()); - context.queue_offset = Some(header.queue_offset()); - context.code = response.code(); - context.error_msg = response.remark().unwrap_or(&"".to_string()).to_string(); - } - } - - hook.send_message_after(context); - } - } - - pub(crate) fn consumer_send_msg_back( - &self, - _channel: &Channel, - _ctx: &ConnectionHandlerContext, - _request: &RemotingCommand, - ) -> Option { - todo!() - } - - pub(crate) fn build_msg_context( - &self, - channel: &Channel, - _ctx: &ConnectionHandlerContext, - request_header: &mut SendMessageRequestHeader, - request: &RemotingCommand, - ) -> SendMessageContext { - let namespace = NamespaceUtil::get_namespace_from_resource(request_header.topic.as_str()); - - let mut send_message_context = SendMessageContext { - namespace, - producer_group: request_header.producer_group.clone(), - ..Default::default() - }; - send_message_context.topic(request_header.topic.clone()); - send_message_context.body_length( - request - .body() - .as_ref() - .map_or_else(|| 0, |b| b.len() as i32), - ); - send_message_context.msg_props(request_header.properties.clone().unwrap_or("".to_string())); - send_message_context.born_host(channel.remote_address().to_string()); - send_message_context.broker_addr(self.broker_config.get_broker_addr()); - send_message_context.queue_id(request_header.queue_id); - send_message_context.broker_region_id(self.broker_config.region_id().to_string()); - send_message_context.born_time_stamp(request_header.born_timestamp); - send_message_context.request_time_stamp(TimeUtils::get_current_millis() as i64); - - if let Some(owner) = request.ext_fields() { - if let Some(value) = owner.get(BrokerStatsManager::COMMERCIAL_OWNER) { - send_message_context.commercial_owner(value.clone()); - } - } - let mut properties = - MessageDecoder::string_to_message_properties(request_header.properties.as_ref()); - properties.insert( - MessageConst::PROPERTY_MSG_REGION.to_string(), - self.broker_config.region_id().to_string(), - ); - properties.insert( - MessageConst::PROPERTY_TRACE_SWITCH.to_string(), - self.broker_config.trace_on.to_string(), - ); - request_header.properties = Some(MessageDecoder::message_properties_to_string(&properties)); - - if let Some(unique_key) = - properties.get(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) - { - send_message_context.msg_unique_key.clone_from(unique_key); - } else { - send_message_context.msg_unique_key = "".to_string(); - } - - if properties.contains_key(MessageConst::PROPERTY_SHARDING_KEY) { - send_message_context.msg_type = MessageType::OrderMsg; - } else { - send_message_context.msg_type = MessageType::NormalMsg; - } - send_message_context - } - - pub(crate) fn msg_check( - &mut self, - channel: &Channel, - _ctx: &ConnectionHandlerContext, - _request: &RemotingCommand, - request_header: &SendMessageRequestHeader, - response: &mut RemotingCommand, - ) { - //check broker permission - if !PermName::is_writeable(self.broker_config.broker_permission()) - && self - .topic_config_manager - .is_order_topic(request_header.topic.as_str()) - { - response.with_code(ResponseCode::NoPermission); - response.with_remark(Some(format!( - "the broker[{}] sending message is forbidden", - self.broker_config.broker_ip1.clone() - ))); - return; - } - - //check Topic - let result = TopicValidator::validate_topic(request_header.topic.as_str()); - if !result.valid() { - response.with_code(SystemError); - response.with_remark(Some(result.remark().to_string())); - return; - } - - if TopicValidator::is_not_allowed_send_topic(request_header.topic.as_str()) { - response.with_code(ResponseCode::NoPermission); - response.with_remark(Some(format!( - "Sending message to topic[{}] is forbidden.", - request_header.topic.as_str() - ))); - return; - } - let mut topic_config = self - .topic_config_manager - .select_topic_config(request_header.topic.as_str()); - if topic_config.is_none() { - let mut topic_sys_flag = 0; - if request_header.unit_mode.unwrap_or(false) { - if request_header.topic.starts_with(RETRY_GROUP_TOPIC_PREFIX) { - topic_sys_flag = build_sys_flag(false, true); - } else { - topic_sys_flag = build_sys_flag(true, false); - } - } - warn!( - "the topic {} not exist, producer: {}", - request_header.topic(), - channel.remote_address(), - ); - topic_config = self - .topic_config_manager - .create_topic_in_send_message_method( - request_header.topic.as_str(), - request_header.default_topic.as_str(), - channel.remote_address(), - request_header.default_topic_queue_nums, - topic_sys_flag, - ); - - if topic_config.is_none() && request_header.topic.starts_with(RETRY_GROUP_TOPIC_PREFIX) - { - topic_config = self - .topic_config_manager - .create_topic_in_send_message_back_method( - request_header.topic.as_str(), - 1, - PermName::PERM_WRITE | PermName::PERM_READ, - false, - topic_sys_flag, - ); - } - - if topic_config.is_none() { - response.with_code(ResponseCode::TopicNotExist); - response.with_remark(Some(format!( - "topic[{}] not exist, apply first please!", - request_header.topic.as_str() - ))); - } - } - - let queue_id_int = request_header.queue_id.unwrap(); - let topic_config_inner = topic_config.as_ref().unwrap(); - let id_valid = topic_config_inner - .write_queue_nums - .max(topic_config_inner.read_queue_nums); - if queue_id_int >= id_valid as i32 { - response.with_code(ResponseCode::SystemError); - response.with_remark(Some(format!( - "request queueId[{}] is illegal, {:?} Producer: {}", - queue_id_int, - topic_config_inner, - channel.remote_address() - ))); - } - } - - pub(crate) fn random_queue_id(&self, write_queue_nums: u32) -> u32 { - rand::thread_rng().gen_range(0..=99999999) % write_queue_nums - } -} diff --git a/rocketmq-broker/src/processor/send_message_processor.rs b/rocketmq-broker/src/processor/send_message_processor.rs index 05f45cb1..d0c4b6ad 100644 --- a/rocketmq-broker/src/processor/send_message_processor.rs +++ b/rocketmq-broker/src/processor/send_message_processor.rs @@ -19,19 +19,27 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; +use rand::Rng; use rocketmq_common::common::attribute::cleanup_policy::CleanupPolicy; use rocketmq_common::common::broker::broker_config::BrokerConfig; +use rocketmq_common::common::constant::PermName; +use rocketmq_common::common::key_builder::KeyBuilder; use rocketmq_common::common::message::message_batch::MessageExtBatch; use rocketmq_common::common::message::message_client_id_setter; use rocketmq_common::common::message::message_client_id_setter::get_uniq_id; +use rocketmq_common::common::message::message_enum::MessageType; use rocketmq_common::common::message::message_single::MessageExt; use rocketmq_common::common::message::message_single::MessageExtBrokerInner; use rocketmq_common::common::message::MessageConst; use rocketmq_common::common::message::MessageTrait; +use rocketmq_common::common::mix_all; use rocketmq_common::common::mix_all::RETRY_GROUP_TOPIC_PREFIX; +use rocketmq_common::common::mq_version::RocketMqVersion; use rocketmq_common::common::sys_flag::message_sys_flag::MessageSysFlag; use rocketmq_common::common::topic::TopicValidator; +use rocketmq_common::common::FAQUrl; use rocketmq_common::common::TopicFilterType; +use rocketmq_common::common::TopicSysFlag::build_sys_flag; use rocketmq_common::utils::message_utils; use rocketmq_common::utils::queue_type_utils::QueueTypeUtils; use rocketmq_common::utils::util_all; @@ -40,9 +48,11 @@ use rocketmq_common::CleanupPolicyUtils; use rocketmq_common::MessageDecoder; use rocketmq_common::MessageDecoder::message_properties_to_string; use rocketmq_common::MessageDecoder::string_to_message_properties; +use rocketmq_common::TimeUtils; use rocketmq_remoting::code::request_code::RequestCode; use rocketmq_remoting::code::response_code::RemotingSysResponseCode; use rocketmq_remoting::code::response_code::ResponseCode; +use rocketmq_remoting::code::response_code::ResponseCode::SystemError; use rocketmq_remoting::net::channel::Channel; use rocketmq_remoting::protocol::header::message_operation_header::send_message_request_header::parse_request_header; use rocketmq_remoting::protocol::header::message_operation_header::send_message_request_header::SendMessageRequestHeader; @@ -50,13 +60,18 @@ use rocketmq_remoting::protocol::header::message_operation_header::send_message_ use rocketmq_remoting::protocol::header::message_operation_header::TopicRequestHeaderTrait; use rocketmq_remoting::protocol::remoting_command::RemotingCommand; use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_context::TopicQueueMappingContext; +use rocketmq_remoting::protocol::NamespaceUtil; use rocketmq_remoting::runtime::server::ConnectionHandlerContext; use rocketmq_store::base::message_result::PutMessageResult; use rocketmq_store::log_file::MessageStore; use rocketmq_store::stats::broker_stats_manager::BrokerStatsManager; +use tracing::info; +use tracing::warn; +use crate::client::rebalance::rebalance_lock_manager::RebalanceLockManager; use crate::mqtrace::send_message_context::SendMessageContext; -use crate::processor::SendMessageProcessorInner; +use crate::mqtrace::send_message_hook::SendMessageHook; +use crate::subscription::manager::subscription_group_manager::SubscriptionGroupManager; use crate::topic::manager::topic_config_manager::TopicConfigManager; use crate::topic::manager::topic_queue_mapping_manager::TopicQueueMappingManager; @@ -64,10 +79,7 @@ pub struct SendMessageProcessor where MS: Clone, { - inner: SendMessageProcessorInner, - topic_queue_mapping_manager: Arc, - broker_config: Arc, - message_store: MS, + inner: Inner, store_host: SocketAddr, } @@ -75,9 +87,6 @@ impl Clone for SendMessageProcessor { fn clone(&self) -> Self { Self { inner: self.inner.clone(), - topic_queue_mapping_manager: self.topic_queue_mapping_manager.clone(), - broker_config: self.broker_config.clone(), - message_store: self.message_store.clone(), store_host: self.store_host, } } @@ -112,6 +121,7 @@ impl SendMessageProcessor { _ => { let mut request_header = parse_request_header(&request, request_code)?; let mapping_context = self + .inner .topic_queue_mapping_manager .build_topic_queue_mapping_context(&request_header, true); let rewrite_result = TopicQueueMappingManager::rewrite_request_for_static_topic( @@ -167,22 +177,25 @@ impl SendMessageProcessor { impl SendMessageProcessor { pub fn new( topic_queue_mapping_manager: Arc, + subscription_group_manager: Arc>, topic_config_manager: TopicConfigManager, broker_config: Arc, message_store: &MS, + rebalance_lock_manager: Arc, ) -> Self { let store_host = format!("{}:{}", broker_config.broker_ip1, broker_config.listen_port) .parse::() .unwrap(); Self { - inner: SendMessageProcessorInner { - broker_config: broker_config.clone(), + inner: Inner { + broker_config, topic_config_manager, send_message_hook_vec: ArcCellWrapper::new(Vec::new()), + topic_queue_mapping_manager, + subscription_group_manager, + message_store: message_store.clone(), + rebalance_lock_manager, }, - topic_queue_mapping_manager, - broker_config, - message_store: message_store.clone(), store_host, } } @@ -262,6 +275,7 @@ impl SendMessageProcessor { message_ext.message_ext_inner.store_host = self.store_host; message_ext.message_ext_inner.reconsume_times = request_header.reconsume_times.unwrap_or(0); let cluster_name = self + .inner .broker_config .broker_identity .broker_cluster_name @@ -322,8 +336,8 @@ impl SendMessageProcessor { response_header.set_batch_uniq_id(batch_uniq_id); is_inner_batch = true; } - if self.broker_config.async_send_enable { - let mut message_store = self.message_store.clone(); + if self.inner.broker_config.async_send_enable { + let mut message_store = self.inner.message_store.clone(); let put_message_result = tokio::spawn(async move { if is_inner_batch { message_store @@ -345,11 +359,12 @@ impl SendMessageProcessor { ) } else { let put_message_result = if is_inner_batch { - self.message_store + self.inner + .message_store .put_message(batch_message.message_ext_broker_inner) .await } else { - self.message_store.put_messages(batch_message).await + self.inner.message_store.put_messages(batch_message).await }; self.handle_put_message_result( put_message_result, @@ -399,7 +414,7 @@ impl SendMessageProcessor { &request_header, &mut response, &request, - &message_ext.message_ext_inner, + &mut message_ext.message_ext_inner, &mut topic_config, &mut ori_props, ) { @@ -412,7 +427,7 @@ impl SendMessageProcessor { .clone_from(request.body()); message_ext.message_ext_inner.message.flag = request_header.flag; - let uniq_key = ori_props.get_mut(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); + let uniq_key = ori_props.get(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if let Some(uniq_key_inner) = uniq_key { if uniq_key_inner.is_empty() { let uniq_key_inner = message_client_id_setter::create_uniq_id(); @@ -421,7 +436,14 @@ impl SendMessageProcessor { uniq_key_inner, ); } + } else { + let uniq_key_inner = message_client_id_setter::create_uniq_id(); + ori_props.insert( + MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX.to_string(), + uniq_key_inner, + ); } + let tra_flag = ori_props .get(MessageConst::PROPERTY_TRANSACTION_PREPARED) .cloned(); @@ -456,7 +478,8 @@ impl SendMessageProcessor { message_ext.message_ext_inner.message.properties.insert( MessageConst::PROPERTY_CLUSTER.to_string(), - self.broker_config + self.inner + .broker_config .broker_identity .broker_cluster_name .clone(), @@ -473,13 +496,13 @@ impl SendMessageProcessor { && !(message_ext.reconsume_times() > 0 && message_ext.message_ext_inner.message.get_delay_time_level() > 0) { - if self.broker_config.reject_transaction_message { + if self.inner.broker_config.reject_transaction_message { return Some( response .set_code(ResponseCode::NoPermission) .set_remark(Some(format!( "the broker[{}] sending transaction message is forbidden", - self.broker_config.broker_ip1 + self.inner.broker_config.broker_ip1 ))), ); } @@ -488,12 +511,12 @@ impl SendMessageProcessor { false }; - if self.broker_config.async_send_enable { + if self.inner.broker_config.async_send_enable { let topic = message_ext.topic().to_string(); let put_message_handle = if send_transaction_prepare_message { unimplemented!() } else { - let mut message_store = self.message_store.clone(); + let mut message_store = self.inner.message_store.clone(); tokio::spawn(async move { message_store.put_message(message_ext).await }) }; let put_message_result = put_message_handle.await.unwrap(); @@ -507,7 +530,7 @@ impl SendMessageProcessor { ) } else { let topic = message_ext.topic().to_string(); - let put_message_result = self.message_store.put_message(message_ext).await; + let put_message_result = self.inner.message_store.put_message(message_ext).await; self.handle_put_message_result( put_message_result, response, @@ -533,57 +556,57 @@ impl SendMessageProcessor { let mut response = response; let mut send_ok = false; match put_message_result.put_message_status() { - rocketmq_store::base::message_status_enum::PutMessageStatus::PutOk => { - send_ok = true; - response = response.set_code(RemotingSysResponseCode::Success); - }, - rocketmq_store::base::message_status_enum::PutMessageStatus::FlushDiskTimeout => { - send_ok = true; - response = response.set_code(ResponseCode::FlushDiskTimeout); - }, - rocketmq_store::base::message_status_enum::PutMessageStatus::FlushSlaveTimeout => { - send_ok = true; - response = response.set_code(ResponseCode::FlushSlaveTimeout); - }, - rocketmq_store::base::message_status_enum::PutMessageStatus::SlaveNotAvailable =>{ - send_ok = true; - response = response.set_code(ResponseCode::SlaveNotAvailable); - }, - rocketmq_store::base::message_status_enum::PutMessageStatus::ServiceNotAvailable => { - response = response.set_code(ResponseCode::ServiceNotAvailable).set_remark(Some("service not available now. It may be caused by one of the following reasons: the broker's disk is full %s, messages are put to the slave, message store has been shut down, etc.".to_string())); - }, - rocketmq_store::base::message_status_enum::PutMessageStatus::CreateMappedFileFailed => { - response = response.set_code(RemotingSysResponseCode::SystemError).set_remark(Some("create mapped file failed, server is busy or broken.".to_string())); - }, - rocketmq_store::base::message_status_enum::PutMessageStatus::MessageIllegal | - rocketmq_store::base::message_status_enum::PutMessageStatus::PropertiesSizeExceeded => { - response = response.set_code(ResponseCode::MessageIllegal).set_remark(Some("the message is illegal, maybe msg body or properties length not matched. msg body length limit B, msg properties length limit 32KB.".to_string())); - }, - rocketmq_store::base::message_status_enum::PutMessageStatus::OsPageCacheBusy =>{ - response = response.set_code(RemotingSysResponseCode::SystemError).set_remark(Some("[PC_SYNCHRONIZED]broker busy, start flow control for a while".to_string())); - }, - rocketmq_store::base::message_status_enum::PutMessageStatus::UnknownError => { - response = response.set_code(RemotingSysResponseCode::SystemError).set_remark(Some("UNKNOWN_ERROR".to_string())); - }, - rocketmq_store::base::message_status_enum::PutMessageStatus::InSyncReplicasNotEnough => { - response = response.set_code(RemotingSysResponseCode::SystemError).set_remark(Some("in-sync replicas not enough".to_string())); - }, - rocketmq_store::base::message_status_enum::PutMessageStatus::LmqConsumeQueueNumExceeded => { - response = response.set_code(RemotingSysResponseCode::SystemError).set_remark(Some("[LMQ_CONSUME_QUEUE_NUM_EXCEEDED]broker config enableLmq and enableMultiDispatch, lmq consumeQueue num exceed maxLmqConsumeQueueNum config num, default limit 2w.".to_string())); - }, - rocketmq_store::base::message_status_enum::PutMessageStatus::WheelTimerFlowControl => { - response = response.set_code(RemotingSysResponseCode::SystemError).set_remark(Some("timer message is under flow control, max num limit is %d or the current value is greater than %d and less than %d, trigger random flow control".to_string())); - }, - rocketmq_store::base::message_status_enum::PutMessageStatus::WheelTimerMsgIllegal => { - response = response.set_code(ResponseCode::MessageIllegal).set_remark(Some("timer message illegal, the delay time should not be bigger than the max delay %dms; or if set del msg, the delay time should be bigger than the current time".to_string())); - }, - rocketmq_store::base::message_status_enum::PutMessageStatus::WheelTimerNotEnable => { - response = response.set_code(RemotingSysResponseCode::SystemError).set_remark(Some("accurate timer message is not enabled, timerWheelEnable is %s".to_string())); - }, - _ => { - response = response.set_code(RemotingSysResponseCode::SystemError).set_remark(Some("UNKNOWN_ERROR DEFAULT".to_string())); - } - } + rocketmq_store::base::message_status_enum::PutMessageStatus::PutOk => { + send_ok = true; + response = response.set_code(RemotingSysResponseCode::Success); + }, + rocketmq_store::base::message_status_enum::PutMessageStatus::FlushDiskTimeout => { + send_ok = true; + response = response.set_code(ResponseCode::FlushDiskTimeout); + }, + rocketmq_store::base::message_status_enum::PutMessageStatus::FlushSlaveTimeout => { + send_ok = true; + response = response.set_code(ResponseCode::FlushSlaveTimeout); + }, + rocketmq_store::base::message_status_enum::PutMessageStatus::SlaveNotAvailable =>{ + send_ok = true; + response = response.set_code(ResponseCode::SlaveNotAvailable); + }, + rocketmq_store::base::message_status_enum::PutMessageStatus::ServiceNotAvailable => { + response = response.set_code(ResponseCode::ServiceNotAvailable).set_remark(Some("service not available now. It may be caused by one of the following reasons: the broker's disk is full %s, messages are put to the slave, message store has been shut down, etc.".to_string())); + }, + rocketmq_store::base::message_status_enum::PutMessageStatus::CreateMappedFileFailed => { + response = response.set_code(RemotingSysResponseCode::SystemError).set_remark(Some("create mapped file failed, server is busy or broken.".to_string())); + }, + rocketmq_store::base::message_status_enum::PutMessageStatus::MessageIllegal | + rocketmq_store::base::message_status_enum::PutMessageStatus::PropertiesSizeExceeded => { + response = response.set_code(ResponseCode::MessageIllegal).set_remark(Some("the message is illegal, maybe msg body or properties length not matched. msg body length limit B, msg properties length limit 32KB.".to_string())); + }, + rocketmq_store::base::message_status_enum::PutMessageStatus::OsPageCacheBusy =>{ + response = response.set_code(RemotingSysResponseCode::SystemError).set_remark(Some("[PC_SYNCHRONIZED]broker busy, start flow control for a while".to_string())); + }, + rocketmq_store::base::message_status_enum::PutMessageStatus::UnknownError => { + response = response.set_code(RemotingSysResponseCode::SystemError).set_remark(Some("UNKNOWN_ERROR".to_string())); + }, + rocketmq_store::base::message_status_enum::PutMessageStatus::InSyncReplicasNotEnough => { + response = response.set_code(RemotingSysResponseCode::SystemError).set_remark(Some("in-sync replicas not enough".to_string())); + }, + rocketmq_store::base::message_status_enum::PutMessageStatus::LmqConsumeQueueNumExceeded => { + response = response.set_code(RemotingSysResponseCode::SystemError).set_remark(Some("[LMQ_CONSUME_QUEUE_NUM_EXCEEDED]broker config enableLmq and enableMultiDispatch, lmq consumeQueue num exceed maxLmqConsumeQueueNum config num, default limit 2w.".to_string())); + }, + rocketmq_store::base::message_status_enum::PutMessageStatus::WheelTimerFlowControl => { + response = response.set_code(RemotingSysResponseCode::SystemError).set_remark(Some("timer message is under flow control, max num limit is %d or the current value is greater than %d and less than %d, trigger random flow control".to_string())); + }, + rocketmq_store::base::message_status_enum::PutMessageStatus::WheelTimerMsgIllegal => { + response = response.set_code(ResponseCode::MessageIllegal).set_remark(Some("timer message illegal, the delay time should not be bigger than the max delay %dms; or if set del msg, the delay time should be bigger than the current time".to_string())); + }, + rocketmq_store::base::message_status_enum::PutMessageStatus::WheelTimerNotEnable => { + response = response.set_code(RemotingSysResponseCode::SystemError).set_remark(Some("accurate timer message is not enabled, timerWheelEnable is %s".to_string())); + }, + _ => { + response = response.set_code(RemotingSysResponseCode::SystemError).set_remark(Some("UNKNOWN_ERROR DEFAULT".to_string())); + } + } let binding = HashMap::new(); let ext_fields = request.ext_fields().unwrap_or(&binding); @@ -591,7 +614,7 @@ impl SendMessageProcessor { let auth_type = ext_fields.get(BrokerStatsManager::ACCOUNT_AUTH_TYPE); let owner_parent = ext_fields.get(BrokerStatsManager::ACCOUNT_OWNER_PARENT); let owner_self = ext_fields.get(BrokerStatsManager::ACCOUNT_OWNER_SELF); - let commercial_size_per_msg = self.broker_config.commercial_size_per_msg; + let commercial_size_per_msg = self.inner.broker_config.commercial_size_per_msg; if send_ok { if TopicValidator::RMQ_SYS_SCHEDULE_TOPIC == topic { @@ -630,14 +653,17 @@ impl SendMessageProcessor { response.with_opaque(request.opaque()); response.add_ext_field( MessageConst::PROPERTY_MSG_REGION, - self.broker_config.region_id(), + self.inner.broker_config.region_id(), ); response.add_ext_field( MessageConst::PROPERTY_TRACE_SWITCH, - self.broker_config.trace_on.to_string(), + self.inner.broker_config.trace_on.to_string(), ); - let start_timestamp = self.broker_config.start_accept_send_request_time_stamp; - if self.message_store.now() < (start_timestamp as u64) { + let start_timestamp = self + .inner + .broker_config + .start_accept_send_request_time_stamp; + if self.inner.message_store.now() < (start_timestamp as u64) { response = response .set_code(RemotingSysResponseCode::SystemError) .set_remark(Some(format!( @@ -653,14 +679,314 @@ impl SendMessageProcessor { } fn handle_retry_and_dlq( - &self, + &mut self, request_header: &SendMessageRequestHeader, response: &mut RemotingCommand, request: &RemotingCommand, - msg: &MessageExt, + msg: &mut MessageExt, topic_config: &mut rocketmq_common::common::config::TopicConfig, properties: &mut HashMap, ) -> bool { + let mut new_topic = request_header.topic().to_string(); + if !new_topic.is_empty() && new_topic.starts_with(RETRY_GROUP_TOPIC_PREFIX) { + let group_name = KeyBuilder::parse_group(new_topic.as_str()); + let subscription_group_config = self + .inner + .subscription_group_manager + .find_subscription_group_config(group_name.as_str()); + if subscription_group_config.is_none() { + response + .with_code(ResponseCode::SubscriptionNotExist) + .with_remark(Some(format!( + "subscription group not exist, {} {}", + group_name.as_str(), + FAQUrl::suggest_todo(FAQUrl::SUBSCRIPTION_GROUP_NOT_EXIST) + ))); + return false; + } + let subscription_group_config = subscription_group_config.unwrap(); + + let mut max_reconsume_times = subscription_group_config.retry_max_times(); + if request.version() >= From::from(RocketMqVersion::V349) + && request_header.max_reconsume_times.is_some() + { + max_reconsume_times = request_header.max_reconsume_times.unwrap(); + } + let reconsume_times = request_header.reconsume_times.unwrap_or(0); + let mut send_retry_message_to_dead_letter_queue_directly = false; + if self + .inner + .rebalance_lock_manager + .is_lock_all_expired(group_name.as_str()) + { + info!( + "Group has unexpired lock record, which show it is ordered message, send it \ + to DLQ right now group={}, topic={}, reconsumeTimes={}, maxReconsumeTimes={}.", + group_name, new_topic, reconsume_times, max_reconsume_times + ); + send_retry_message_to_dead_letter_queue_directly = true; + } + if reconsume_times > max_reconsume_times + || send_retry_message_to_dead_letter_queue_directly + { + properties.insert( + MessageConst::PROPERTY_DELAY_TIME_LEVEL.to_string(), + "-1".to_string(), + ); + new_topic = mix_all::get_dlq_topic(group_name.as_str()); + let queue_id_int = self.inner.random_queue_id(DLQ_NUMS_PER_GROUP) as i32; + let new_topic_config = self + .inner + .topic_config_manager + .create_topic_in_send_message_back_method( + new_topic.as_str(), + DLQ_NUMS_PER_GROUP as i32, + PermName::PERM_WRITE | PermName::PERM_READ, + false, + 0, + ); + msg.message.topic = new_topic.to_string(); + msg.queue_id = queue_id_int; + msg.message.set_delay_time_level(0); + if new_topic_config.is_none() { + response + .with_code(ResponseCode::SystemError) + .with_remark(Some(format!( + "topic {} not exist, apply DLQ failed", + new_topic + ))); + return false; + } + *topic_config = new_topic_config.unwrap(); + } + } + + let mut sys_flag = request_header.sys_flag; + if TopicFilterType::MultiTag == topic_config.topic_filter_type { + sys_flag |= MessageSysFlag::MULTI_TAGS_FLAG; + } + msg.sys_flag = sys_flag; true } } + +const DLQ_NUMS_PER_GROUP: u32 = 1; + +#[derive(Clone)] +pub(crate) struct Inner { + topic_config_manager: TopicConfigManager, + send_message_hook_vec: ArcCellWrapper>>, + topic_queue_mapping_manager: Arc, + subscription_group_manager: Arc>, + broker_config: Arc, + message_store: MS, + rebalance_lock_manager: Arc, +} + +impl Inner { + pub(crate) fn execute_send_message_hook_before(&self, context: &SendMessageContext) { + for hook in self.send_message_hook_vec.iter() { + hook.send_message_before(context); + } + } + + pub(crate) fn execute_send_message_hook_after( + &self, + response: Option<&mut RemotingCommand>, + context: &mut SendMessageContext, + ) { + for hook in self.send_message_hook_vec.iter() { + if let Some(ref response) = response { + if let Some(ref header) = + response.decode_command_custom_header::() + { + context.msg_id = header.msg_id().to_string(); + context.queue_id = Some(header.queue_id()); + context.queue_offset = Some(header.queue_offset()); + context.code = response.code(); + context.error_msg = response.remark().unwrap_or(&"".to_string()).to_string(); + } + } + + hook.send_message_after(context); + } + } + + pub(crate) fn consumer_send_msg_back( + &self, + _channel: &Channel, + _ctx: &ConnectionHandlerContext, + _request: &RemotingCommand, + ) -> Option { + todo!() + } + + pub(crate) fn build_msg_context( + &self, + channel: &Channel, + _ctx: &ConnectionHandlerContext, + request_header: &mut SendMessageRequestHeader, + request: &RemotingCommand, + ) -> SendMessageContext { + let namespace = NamespaceUtil::get_namespace_from_resource(request_header.topic.as_str()); + + let mut send_message_context = SendMessageContext { + namespace, + producer_group: request_header.producer_group.clone(), + ..Default::default() + }; + send_message_context.topic(request_header.topic.clone()); + send_message_context.body_length( + request + .body() + .as_ref() + .map_or_else(|| 0, |b| b.len() as i32), + ); + send_message_context.msg_props(request_header.properties.clone().unwrap_or("".to_string())); + send_message_context.born_host(channel.remote_address().to_string()); + send_message_context.broker_addr(self.broker_config.get_broker_addr()); + send_message_context.queue_id(request_header.queue_id); + send_message_context.broker_region_id(self.broker_config.region_id().to_string()); + send_message_context.born_time_stamp(request_header.born_timestamp); + send_message_context.request_time_stamp(TimeUtils::get_current_millis() as i64); + + if let Some(owner) = request.ext_fields() { + if let Some(value) = owner.get(BrokerStatsManager::COMMERCIAL_OWNER) { + send_message_context.commercial_owner(value.clone()); + } + } + let mut properties = + MessageDecoder::string_to_message_properties(request_header.properties.as_ref()); + properties.insert( + MessageConst::PROPERTY_MSG_REGION.to_string(), + self.broker_config.region_id().to_string(), + ); + properties.insert( + MessageConst::PROPERTY_TRACE_SWITCH.to_string(), + self.broker_config.trace_on.to_string(), + ); + request_header.properties = Some(MessageDecoder::message_properties_to_string(&properties)); + + if let Some(unique_key) = + properties.get(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) + { + send_message_context.msg_unique_key.clone_from(unique_key); + } else { + send_message_context.msg_unique_key = "".to_string(); + } + + if properties.contains_key(MessageConst::PROPERTY_SHARDING_KEY) { + send_message_context.msg_type = MessageType::OrderMsg; + } else { + send_message_context.msg_type = MessageType::NormalMsg; + } + send_message_context + } + + pub(crate) fn msg_check( + &mut self, + channel: &Channel, + _ctx: &ConnectionHandlerContext, + _request: &RemotingCommand, + request_header: &SendMessageRequestHeader, + response: &mut RemotingCommand, + ) { + //check broker permission + if !PermName::is_writeable(self.broker_config.broker_permission()) + && self + .topic_config_manager + .is_order_topic(request_header.topic.as_str()) + { + response.with_code(ResponseCode::NoPermission); + response.with_remark(Some(format!( + "the broker[{}] sending message is forbidden", + self.broker_config.broker_ip1.clone() + ))); + return; + } + + //check Topic + let result = TopicValidator::validate_topic(request_header.topic.as_str()); + if !result.valid() { + response.with_code(SystemError); + response.with_remark(Some(result.remark().to_string())); + return; + } + + if TopicValidator::is_not_allowed_send_topic(request_header.topic.as_str()) { + response.with_code(ResponseCode::NoPermission); + response.with_remark(Some(format!( + "Sending message to topic[{}] is forbidden.", + request_header.topic.as_str() + ))); + return; + } + let mut topic_config = self + .topic_config_manager + .select_topic_config(request_header.topic.as_str()); + if topic_config.is_none() { + let mut topic_sys_flag = 0; + if request_header.unit_mode.unwrap_or(false) { + if request_header.topic.starts_with(RETRY_GROUP_TOPIC_PREFIX) { + topic_sys_flag = build_sys_flag(false, true); + } else { + topic_sys_flag = build_sys_flag(true, false); + } + } + warn!( + "the topic {} not exist, producer: {}", + request_header.topic(), + channel.remote_address(), + ); + topic_config = self + .topic_config_manager + .create_topic_in_send_message_method( + request_header.topic.as_str(), + request_header.default_topic.as_str(), + channel.remote_address(), + request_header.default_topic_queue_nums, + topic_sys_flag, + ); + + if topic_config.is_none() && request_header.topic.starts_with(RETRY_GROUP_TOPIC_PREFIX) + { + topic_config = self + .topic_config_manager + .create_topic_in_send_message_back_method( + request_header.topic.as_str(), + 1, + PermName::PERM_WRITE | PermName::PERM_READ, + false, + topic_sys_flag, + ); + } + + if topic_config.is_none() { + response.with_code(ResponseCode::TopicNotExist); + response.with_remark(Some(format!( + "topic[{}] not exist, apply first please!", + request_header.topic.as_str() + ))); + } + } + + let queue_id_int = request_header.queue_id.unwrap(); + let topic_config_inner = topic_config.as_ref().unwrap(); + let id_valid = topic_config_inner + .write_queue_nums + .max(topic_config_inner.read_queue_nums); + if queue_id_int >= id_valid as i32 { + response.with_code(ResponseCode::SystemError); + response.with_remark(Some(format!( + "request queueId[{}] is illegal, {:?} Producer: {}", + queue_id_int, + topic_config_inner, + channel.remote_address() + ))); + } + } + + pub(crate) fn random_queue_id(&self, write_queue_nums: u32) -> u32 { + rand::thread_rng().gen_range(0..=99999999) % write_queue_nums + } +} diff --git a/rocketmq-common/src/common/mix_all.rs b/rocketmq-common/src/common/mix_all.rs index f70b9e93..6ad06d6e 100644 --- a/rocketmq-common/src/common/mix_all.rs +++ b/rocketmq-common/src/common/mix_all.rs @@ -101,6 +101,10 @@ pub fn get_retry_topic(consumer_group: &str) -> String { format!("{}{}", RETRY_GROUP_TOPIC_PREFIX, consumer_group) } +pub fn get_dlq_topic(consumer_group: &str) -> String { + format!("{}{}", DLQ_GROUP_TOPIC_PREFIX, consumer_group) +} + pub fn is_lmq(lmq_meta_data: Option<&str>) -> bool { match lmq_meta_data { Some(data) => data.starts_with(LMQ_PREFIX),