diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index 45bac789..35f8c54c 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -72,6 +72,7 @@ use crate::processor::default_pull_message_result_handler::DefaultPullMessageRes use crate::processor::end_transaction_processor::EndTransactionProcessor; use crate::processor::pull_message_processor::PullMessageProcessor; use crate::processor::pull_message_result_handler::PullMessageResultHandler; +use crate::processor::query_assignment_processor::QueryAssignmentProcessor; use crate::processor::query_message_processor::QueryMessageProcessor; use crate::processor::reply_message_processor::ReplyMessageProcessor; use crate::processor::send_message_processor::SendMessageProcessor; @@ -537,7 +538,9 @@ impl BrokerRuntime { self.subscription_group_manager.clone(), )), consumer_manage_processor: ArcMut::new(consumer_manage_processor), - query_assignment_processor: Default::default(), + query_assignment_processor: ArcMut::new(QueryAssignmentProcessor::new( + self.message_store_config.clone(), + )), query_message_processor: ArcMut::new(query_message_processor), end_transaction_processor: ArcMut::new(EndTransactionProcessor::new( self.message_store_config.clone(), diff --git a/rocketmq-broker/src/load_balance/message_request_mode_manager.rs b/rocketmq-broker/src/load_balance/message_request_mode_manager.rs index f9b89fb3..ad54bcc6 100644 --- a/rocketmq-broker/src/load_balance/message_request_mode_manager.rs +++ b/rocketmq-broker/src/load_balance/message_request_mode_manager.rs @@ -21,14 +21,13 @@ use cheetah_string::CheetahString; use rocketmq_common::common::config_manager::ConfigManager; use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils; use rocketmq_remoting::protocol::body::set_message_request_mode_request_body::SetMessageRequestModeRequestBody; -use rocketmq_rust::ArcMut; use rocketmq_store::config::message_store_config::MessageStoreConfig; use tracing::info; use crate::broker_path_config_helper; pub(crate) struct MessageRequestModeManager { - message_store_config: ArcMut, + message_store_config: Arc, message_request_mode_map: Arc< parking_lot::Mutex< HashMap< @@ -40,7 +39,7 @@ pub(crate) struct MessageRequestModeManager { } impl MessageRequestModeManager { - pub fn new(message_store_config: ArcMut) -> Self { + pub fn new(message_store_config: Arc) -> Self { Self { message_store_config, message_request_mode_map: Arc::new(parking_lot::Mutex::new(HashMap::new())), @@ -113,14 +112,13 @@ mod tests { use cheetah_string::CheetahString; use rocketmq_common::common::message::message_enum::MessageRequestMode; use rocketmq_remoting::protocol::body::set_message_request_mode_request_body::SetMessageRequestModeRequestBody; - use rocketmq_rust::ArcMut; use rocketmq_store::config::message_store_config::MessageStoreConfig; use super::*; #[test] fn set_message_request_mode_adds_entry() { - let message_store_config = ArcMut::new(MessageStoreConfig::default()); + let message_store_config = Arc::new(MessageStoreConfig::default()); let manager = MessageRequestModeManager::new(message_store_config); let topic = CheetahString::from("test_topic"); let consumer_group = CheetahString::from("test_group"); @@ -138,7 +136,7 @@ mod tests { #[test] fn get_message_request_mode_returns_none_for_nonexistent_entry() { - let message_store_config = ArcMut::new(MessageStoreConfig::default()); + let message_store_config = Arc::new(MessageStoreConfig::default()); let manager = MessageRequestModeManager::new(message_store_config); let topic = CheetahString::from("nonexistent_topic"); let consumer_group = CheetahString::from("nonexistent_group"); @@ -150,7 +148,7 @@ mod tests { #[test] fn encode_pretty_returns_pretty_json() { - let message_store_config = ArcMut::new(MessageStoreConfig::default()); + let message_store_config = Arc::new(MessageStoreConfig::default()); let manager = MessageRequestModeManager::new(message_store_config); let topic = CheetahString::from("test_topic"); let consumer_group = CheetahString::from("test_group"); @@ -169,18 +167,18 @@ mod tests { #[test] fn decode_populates_message_request_mode_map() { - let message_store_config = ArcMut::new(MessageStoreConfig::default()); + let message_store_config = Arc::new(MessageStoreConfig::default()); let manager = MessageRequestModeManager::new(message_store_config); let json = r#"{ - "test_topic": { - "test_group": { - "topic": "test_topic", - "consumerGroup": "test_group", - "mode": "PULL", - "popShareQueueNum": 0 - } - } - }"#; + "test_topic": { + "test_group": { + "topic": "test_topic", + "consumerGroup": "test_group", + "mode": "PULL", + "popShareQueueNum": 0 + } + } + }"#; manager.decode(json); let result = manager.get_message_request_mode( diff --git a/rocketmq-broker/src/processor/query_assignment_processor.rs b/rocketmq-broker/src/processor/query_assignment_processor.rs index 248a7aeb..bc8f0a56 100644 --- a/rocketmq-broker/src/processor/query_assignment_processor.rs +++ b/rocketmq-broker/src/processor/query_assignment_processor.rs @@ -14,19 +14,83 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use crate::load_balance::message_request_mode_manager::MessageRequestModeManager; + use cheetah_string::CheetahString; + use rocketmq_client_rust::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy; + use rocketmq_client_rust::consumer::rebalance_strategy::allocate_message_queue_averagely::AllocateMessageQueueAveragely; + use rocketmq_client_rust::consumer::rebalance_strategy::allocate_message_queue_averagely_by_circle::AllocateMessageQueueAveragelyByCircle; + use rocketmq_common::common::config_manager::ConfigManager; + use rocketmq_remoting::code::request_code::RequestCode; + use rocketmq_remoting::net::channel::Channel; + use rocketmq_remoting::protocol::remoting_command::RemotingCommand; + use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext; + use rocketmq_store::config::message_store_config::MessageStoreConfig; + use std::collections::HashMap; + use std::sync::Arc; -use rocketmq_remoting::protocol::remoting_command::RemotingCommand; -use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext; +pub struct QueryAssignmentProcessor { + message_request_mode_manager: MessageRequestModeManager, + load_strategy: HashMap>, + message_store_config: Arc, +} -#[derive(Default)] -pub struct QueryAssignmentProcessor {} +impl QueryAssignmentProcessor { + pub fn new(message_store_config: Arc) -> Self { + let allocate_message_queue_averagely: Arc = + Arc::new(AllocateMessageQueueAveragely); + let allocate_message_queue_averagely_by_circle: Arc = + Arc::new(AllocateMessageQueueAveragelyByCircle); + let mut load_strategy = HashMap::new(); + load_strategy.insert( + CheetahString::from_static_str(allocate_message_queue_averagely.get_name()), + allocate_message_queue_averagely, + ); + load_strategy.insert( + CheetahString::from_static_str(allocate_message_queue_averagely_by_circle.get_name()), + allocate_message_queue_averagely_by_circle, + ); + let manager = MessageRequestModeManager::new(message_store_config.clone()); + let _ = manager.load(); + Self { + message_request_mode_manager: manager, + load_strategy, + message_store_config, + } + } +} impl QueryAssignmentProcessor { - fn process_request( - &self, + pub async fn process_request( + &mut self, + channel: Channel, + ctx: ConnectionHandlerContext, + request_code: RequestCode, + request: RemotingCommand, + ) -> Option { + match request_code { + RequestCode::QueryAssignment => self.query_assignment(channel, ctx, request).await, + RequestCode::SetMessageRequestMode => { + self.set_message_request_mode(channel, ctx, request).await + } + _ => None, + } + } + + async fn query_assignment( + &mut self, + _channel: Channel, + _ctx: ConnectionHandlerContext, + _request: RemotingCommand, + ) -> Option { + unimplemented!() + } + + async fn set_message_request_mode( + &mut self, + _channel: Channel, _ctx: ConnectionHandlerContext, _request: RemotingCommand, - ) -> RemotingCommand { - todo!() + ) -> Option { + unimplemented!() } } diff --git a/rocketmq-client/src/consumer/rebalance_strategy.rs b/rocketmq-client/src/consumer/rebalance_strategy.rs index 642501ce..e5099b88 100644 --- a/rocketmq-client/src/consumer/rebalance_strategy.rs +++ b/rocketmq-client/src/consumer/rebalance_strategy.rs @@ -15,7 +15,7 @@ * limitations under the License. */ pub mod allocate_message_queue_averagely; -mod allocate_message_queue_averagely_by_circle; +pub mod allocate_message_queue_averagely_by_circle; use std::collections::HashSet;