From ba333d06fe4d7578029b8425b6b3f83f061c5e5f Mon Sep 17 00:00:00 2001 From: TeslaRustor <77013810+TeslaRustor@users.noreply.github.com> Date: Thu, 27 Jun 2024 12:19:14 +0000 Subject: [PATCH 1/2] =?UTF-8?q?[ISSUE=20#699]=F0=9F=9A=80Support=20pull=20?= =?UTF-8?q?message=20result=20handle?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../manager/broadcast_offset_manager.rs | 15 + .../default_pull_message_result_handler.rs | 18 +- .../src/processor/pull_message_processor.rs | 288 +++++++++++++++++- .../manager/topic_queue_mapping_manager.rs | 2 +- .../src/common/broker/broker_config.rs | 2 + rocketmq-remoting/src/error.rs | 4 + .../header/pull_message_response_header.rs | 4 +- .../src/protocol/remoting_command.rs | 10 + .../src/protocol/static_topic.rs | 1 + .../static_topic/logic_queue_mapping_item.rs | 20 ++ .../topic_queue_mapping_detail.rs | 25 +- .../static_topic/topic_queue_mapping_utils.rs | 80 +++++ rocketmq-remoting/src/rpc.rs | 6 +- rocketmq-remoting/src/rpc/rpc_client_utils.rs | 72 +++++ rocketmq-remoting/src/rpc/rpc_request.rs | 11 +- rocketmq-remoting/src/rpc/rpc_response.rs | 69 +++++ 16 files changed, 608 insertions(+), 19 deletions(-) create mode 100644 rocketmq-remoting/src/protocol/static_topic/topic_queue_mapping_utils.rs create mode 100644 rocketmq-remoting/src/rpc/rpc_client_utils.rs create mode 100644 rocketmq-remoting/src/rpc/rpc_response.rs diff --git a/rocketmq-broker/src/offset/manager/broadcast_offset_manager.rs b/rocketmq-broker/src/offset/manager/broadcast_offset_manager.rs index af496509..4b459303 100644 --- a/rocketmq-broker/src/offset/manager/broadcast_offset_manager.rs +++ b/rocketmq-broker/src/offset/manager/broadcast_offset_manager.rs @@ -16,3 +16,18 @@ */ #[derive(Debug, Default)] pub struct BroadcastOffsetManager {} + +#[allow(unused_variables)] +impl BroadcastOffsetManager { + pub fn query_init_offset( + &self, + topic: &str, + group_id: &str, + queue_id: i32, + client_id: &str, + request_offset: i64, + from_proxy: bool, + ) -> i64 { + unimplemented!() + } +} diff --git a/rocketmq-broker/src/processor/default_pull_message_result_handler.rs b/rocketmq-broker/src/processor/default_pull_message_result_handler.rs index 448ea134..c994eed2 100644 --- a/rocketmq-broker/src/processor/default_pull_message_result_handler.rs +++ b/rocketmq-broker/src/processor/default_pull_message_result_handler.rs @@ -38,6 +38,7 @@ use tracing::info; use crate::mqtrace::consume_message_context::ConsumeMessageContext; use crate::mqtrace::consume_message_hook::ConsumeMessageHook; +use crate::processor::pull_message_processor::PullMessageProcessor; use crate::processor::pull_message_result_handler::PullMessageResultHandler; use crate::topic::manager::topic_config_manager::TopicConfigManager; @@ -74,7 +75,7 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler { broker_allow_suspend: bool, message_filter: Box, mut response: RemotingCommand, - mapping_context: TopicQueueMappingContext, + mut mapping_context: TopicQueueMappingContext, begin_time_mills: u64, ) -> Option { let client_address = channel.remote_address().to_string(); @@ -90,14 +91,23 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler { &mut response, client_address.as_str(), ); - /* self.execute_consume_message_hook_before( + self.execute_consume_message_hook_before( &request, &request_header, &get_message_result, broker_allow_suspend, From::from(response.code()), - );*/ - + ); + /*let response_header = response.read_custom_header_mut::(); + let rewrite_result = PullMessageProcessor::rewrite_response_for_static_topic( + &request_header, + response_header.unwrap(), + &mut mapping_context, + ResponseCode::from(response.code()), + ); + if rewrite_result.is_some() { + return rewrite_result; + }*/ None } } diff --git a/rocketmq-broker/src/processor/pull_message_processor.rs b/rocketmq-broker/src/processor/pull_message_processor.rs index b4b254c3..b0e03a96 100644 --- a/rocketmq-broker/src/processor/pull_message_processor.rs +++ b/rocketmq-broker/src/processor/pull_message_processor.rs @@ -30,10 +30,15 @@ use rocketmq_remoting::protocol::filter::filter_api::FilterAPI; use rocketmq_remoting::protocol::forbidden_type::ForbiddenType; use rocketmq_remoting::protocol::header::pull_message_request_header::PullMessageRequestHeader; use rocketmq_remoting::protocol::header::pull_message_response_header::PullMessageResponseHeader; +use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType; use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel; use rocketmq_remoting::protocol::remoting_command::RemotingCommand; use rocketmq_remoting::protocol::request_source::RequestSource; use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_context::TopicQueueMappingContext; +use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_detail::TopicQueueMappingDetail; +use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_utils::TopicQueueMappingUtils; +use rocketmq_remoting::rpc::rpc_client_utils::RpcClientUtils; +use rocketmq_remoting::rpc::rpc_response::RpcResponse; use rocketmq_remoting::runtime::server::ConnectionHandlerContext; use rocketmq_store::base::get_message_result::GetMessageResult; use rocketmq_store::base::message_status_enum::GetMessageStatus; @@ -43,10 +48,13 @@ use rocketmq_store::log_file::MAX_PULL_MSG_SIZE; use tracing::error; use tracing::warn; +use crate::client::client_channel_info::ClientChannelInfo; +use crate::client::consumer_group_info::ConsumerGroupInfo; use crate::client::manager::consumer_manager::ConsumerManager; use crate::filter::expression_for_retry_message_filter::ExpressionForRetryMessageFilter; use crate::filter::expression_message_filter::ExpressionMessageFilter; use crate::filter::manager::consumer_filter_manager::ConsumerFilterManager; +use crate::offset::manager::broadcast_offset_manager::BroadcastOffsetManager; use crate::offset::manager::consumer_offset_manager::ConsumerOffsetManager; use crate::processor::pull_message_result_handler::PullMessageResultHandler; use crate::subscription::manager::subscription_group_manager::SubscriptionGroupManager; @@ -63,6 +71,7 @@ pub struct PullMessageProcessor { consumer_manager: Arc, consumer_filter_manager: Arc, consumer_offset_manager: Arc, + broadcast_offset_manager: Arc, message_store: MS, } @@ -76,6 +85,7 @@ impl PullMessageProcessor { consumer_manager: Arc, consumer_filter_manager: Arc, consumer_offset_manager: Arc, + broadcast_offset_manager: Arc, message_store: MS, ) -> Self { Self { @@ -87,9 +97,224 @@ impl PullMessageProcessor { consumer_manager, consumer_filter_manager, consumer_offset_manager, + broadcast_offset_manager, message_store, } } + + pub fn rewrite_request_for_static_topic( + request_header: &mut PullMessageRequestHeader, + mapping_context: &mut TopicQueueMappingContext, + ) -> Option { + mapping_context.mapping_detail.as_ref()?; + let mapping_detail = mapping_context.mapping_detail.as_ref().unwrap(); + let topic = mapping_context.topic.as_str(); + let global_id = mapping_context.global_id; + if !mapping_context.is_leader() { + return Some(RemotingCommand::create_response_command_with_code_remark( + ResponseCode::NotLeaderForQueue, + format!( + "{}-{} cannot find mapping item in request process of current broker {}", + topic, + global_id.unwrap_or_default(), + mapping_detail + .topic_queue_mapping_info + .bname + .clone() + .unwrap_or_default() + ), + )); + } + + let global_offset = request_header.queue_offset; + let mapping_item = TopicQueueMappingUtils::find_logic_queue_mapping_item( + &mapping_context.mapping_item_list, + global_offset, + true, + )?; + mapping_context.current_item = Some(mapping_item.clone()); + + if global_offset < mapping_item.logic_offset { + // Handle offset moved... + } + + let bname = &mapping_item.bname; + let phy_queue_id = mapping_item.queue_id; + let phy_queue_offset = mapping_item.compute_physical_queue_offset(global_offset); + request_header.queue_id = Some(phy_queue_id); + request_header.queue_offset = phy_queue_offset; + if mapping_item.check_if_end_offset_decided() + /* && request_header.max_msg_nums.is_some() */ + { + request_header.max_msg_nums = std::cmp::min( + (mapping_item.end_offset - mapping_item.start_offset) as i32, + request_header.max_msg_nums, + ); + } + + if &mapping_detail.topic_queue_mapping_info.bname == bname { + return None; + } + + let mut sys_flag = request_header.sys_flag; + let topic_request = request_header.topic_request.as_mut().unwrap(); + topic_request.lo = Some(false); + topic_request + .rpc + .as_mut() + .unwrap() + .broker_name + .clone_from(bname); + sys_flag = PullSysFlag::clear_suspend_flag(sys_flag as u32) as i32; + sys_flag = PullSysFlag::clear_commit_offset_flag(sys_flag as u32) as i32; + request_header.sys_flag = sys_flag; + /* let rpc_request = RpcRequest::new(RequestCode::PullMessage, request_header.clone(), None); + let rpc_response = broker_controller + .broker_outer_api + .rpc_client + .invoke(rpc_request, broker_controller.broker_config.forward_timeout)?; + if rpc_response.exception.is_some() { + return Err(rpc_response.exception.unwrap()); + }*/ + + let rpc_response = RpcResponse::default(); + let response_header = rpc_response.get_header_mut::(); + let rewrite_result = Self::rewrite_response_for_static_topic( + request_header, + response_header.unwrap(), + mapping_context, + ResponseCode::from(rpc_response.code), + ); + if rewrite_result.is_some() { + return rewrite_result; + } + Some(RpcClientUtils::create_command_for_rpc_response( + rpc_response, + )) + } + + pub fn rewrite_response_for_static_topic( + request_header: &PullMessageRequestHeader, + response_header: &mut PullMessageResponseHeader, + mapping_context: &mut TopicQueueMappingContext, + code: ResponseCode, + ) -> Option { + mapping_context.mapping_detail.as_ref()?; + let mapping_detail = mapping_context.mapping_detail.as_ref().unwrap(); + let leader_item = mapping_context.leader_item.as_ref().unwrap(); + let current_item = mapping_context.current_item.as_ref().unwrap(); + let mapping_items = &mut mapping_context.mapping_item_list; + let earlist_item = + TopicQueueMappingUtils::find_logic_queue_mapping_item(mapping_items, 0, true).unwrap(); + + assert!(current_item.logic_offset >= 0); + + let request_offset = request_header.queue_offset; + let mut next_begin_offset = response_header.next_begin_offset; + let mut min_offset = response_header.min_offset; + let mut max_offset = response_header.max_offset; + let mut response_code = code; + + if code != ResponseCode::Success { + let mut is_revised = false; + if leader_item.gen == current_item.gen { + if request_offset > max_offset.unwrap() { + if code == ResponseCode::PullOffsetMoved { + response_code = ResponseCode::PullOffsetMoved; + next_begin_offset = max_offset; + } else { + response_code = code; + } + } else if request_offset < min_offset.unwrap() { + next_begin_offset = min_offset; + response_code = ResponseCode::PullRetryImmediately; + } else { + response_code = code; + } + } + + if earlist_item.gen == current_item.gen { + if request_offset < min_offset.unwrap() { + /*if code == ResponseCode::PullOffsetMoved { + response_code = ResponseCode::PullOffsetMoved; + next_begin_offset = min_offset; + } else { + response_code = ResponseCode::PullOffsetMoved; + next_begin_offset = min_offset; + }*/ + response_code = ResponseCode::PullOffsetMoved; + next_begin_offset = min_offset; + } else if request_offset >= max_offset.unwrap() { + if let Some(next_item) = + TopicQueueMappingUtils::find_next(mapping_items, Some(current_item), true) + { + is_revised = true; + next_begin_offset = Some(next_item.start_offset); + min_offset = Some(next_item.start_offset); + max_offset = min_offset; + response_code = ResponseCode::PullRetryImmediately; + } else { + response_code = ResponseCode::PullNotFound; + } + } else { + response_code = code; + } + } + + if !is_revised + && leader_item.gen != current_item.gen + && earlist_item.gen != current_item.gen + { + if request_offset < min_offset? { + next_begin_offset = min_offset; + response_code = ResponseCode::PullRetryImmediately; + } else if request_offset >= max_offset? { + if let Some(next_item) = + TopicQueueMappingUtils::find_next(mapping_items, Some(current_item), true) + { + next_begin_offset = Some(next_item.start_offset); + min_offset = Some(next_item.start_offset); + max_offset = min_offset; + response_code = ResponseCode::PullRetryImmediately; + } else { + response_code = ResponseCode::PullNotFound; + } + } else { + response_code = code; + } + } + } + + if current_item.check_if_end_offset_decided() + && next_begin_offset.unwrap() >= current_item.end_offset + { + next_begin_offset = Some(current_item.end_offset); + } + + response_header.next_begin_offset = + Some(current_item.compute_static_queue_offset_strictly(next_begin_offset.unwrap())); + response_header.min_offset = Some(current_item.compute_static_queue_offset_strictly( + min_offset.unwrap().max(current_item.start_offset), + )); + response_header.max_offset = Some( + current_item + .compute_static_queue_offset_strictly(max_offset.unwrap()) + .max(TopicQueueMappingDetail::compute_max_offset_from_mapping( + mapping_detail, + mapping_context.global_id, + )), + ); + response_header.offset_delta = Some(current_item.compute_offset_delta()); + + if code != ResponseCode::Success { + Some( + RemotingCommand::create_response_command_with_header(response_header.clone()) + .set_code(response_code), + ) + } else { + None + } + } } #[allow(unused_variables)] @@ -118,7 +343,7 @@ where let begin_time_mills = get_current_millis(); let mut response = RemotingCommand::create_response_command(); response.set_opaque_mut(request.opaque()); - let request_header = request + let mut request_header = request .decode_command_custom_header_fast::() .unwrap(); let mut response_header = PullMessageResponseHeader::default(); @@ -206,12 +431,13 @@ where ))), ); } - let topic_queue_mapping_context = self + let mut topic_queue_mapping_context = self .topic_queue_mapping_manager .build_topic_queue_mapping_context(&request_header, false); - if let Some(resp) = - self.rewrite_request_for_static_topic(&request_header, &topic_queue_mapping_context) - { + if let Some(resp) = Self::rewrite_request_for_static_topic( + &mut request_header, + &mut topic_queue_mapping_context, + ) { return Some(resp); } if request_header.queue_id.is_none() @@ -504,13 +730,13 @@ where None } - fn rewrite_request_for_static_topic( + /* fn rewrite_request_for_static_topic( &mut self, _request_header: &PullMessageRequestHeader, _mapping_context: &TopicQueueMappingContext, ) -> Option { unimplemented!() - } + }*/ fn query_broadcast_pull_init_offset( &mut self, @@ -520,6 +746,52 @@ where request_header: &PullMessageRequestHeader, channel: &Channel, ) -> i64 { - unimplemented!() + if !self.broker_config.enable_broadcast_offset_store { + return -1; + } + let consumer_group_info = self.consumer_manager.get_consumer_group_info(group); + let proxy_pull_broadcast = RequestSource::ProxyForBroadcast.get_value() + == request_header.request_source.unwrap_or(-2); + + if Self::is_broadcast(proxy_pull_broadcast, consumer_group_info.as_ref()) { + let client_id = if proxy_pull_broadcast { + request_header.proxy_forward_client_id.as_ref().cloned() + } else { + match consumer_group_info + .as_ref() + .unwrap() + .find_channel_by_channel(channel) + { + None => { + return -1; + } + Some(value) => Some(value.client_id().clone()), + } + }; + return self.broadcast_offset_manager.query_init_offset( + topic, + group, + queue_id, + client_id.as_ref().unwrap().as_str(), + request_header.queue_offset, + proxy_pull_broadcast, + ); + } + + -1 + } + + fn is_broadcast( + proxy_pull_broadcast: bool, + consumer_group_info: Option<&ConsumerGroupInfo>, + ) -> bool { + match consumer_group_info { + Some(info) => { + proxy_pull_broadcast + || (info.get_message_model() == MessageModel::Broadcasting + && info.get_consume_type() == ConsumeType::ConsumePassively) + } + None => proxy_pull_broadcast, + } } } diff --git a/rocketmq-broker/src/topic/manager/topic_queue_mapping_manager.rs b/rocketmq-broker/src/topic/manager/topic_queue_mapping_manager.rs index 2f1e6b88..26200762 100644 --- a/rocketmq-broker/src/topic/manager/topic_queue_mapping_manager.rs +++ b/rocketmq-broker/src/topic/manager/topic_queue_mapping_manager.rs @@ -174,7 +174,7 @@ impl TopicQueueMappingManager { topic: topic.clone(), global_id, mapping_detail: Some(mapping_detail.clone()), - mapping_item_list, + mapping_item_list: mapping_item_list.clone(), leader_item, current_item: None, }; diff --git a/rocketmq-common/src/common/broker/broker_config.rs b/rocketmq-common/src/common/broker/broker_config.rs index 57a51112..ba23aabd 100644 --- a/rocketmq-common/src/common/broker/broker_config.rs +++ b/rocketmq-common/src/common/broker/broker_config.rs @@ -153,6 +153,7 @@ pub struct BrokerConfig { pub commercial_base_count: i32, pub reject_pull_consumer_enable: bool, pub consumer_offset_update_version_step: i64, + pub enable_broadcast_offset_store: bool, } impl Default for BrokerConfig { @@ -217,6 +218,7 @@ impl Default for BrokerConfig { commercial_base_count: 1, reject_pull_consumer_enable: false, consumer_offset_update_version_step: 500, + enable_broadcast_offset_store: true, } } } diff --git a/rocketmq-remoting/src/error.rs b/rocketmq-remoting/src/error.rs index 20cdcadd..0d9552b7 100644 --- a/rocketmq-remoting/src/error.rs +++ b/rocketmq-remoting/src/error.rs @@ -31,6 +31,10 @@ pub enum RemotingError { RemoteException(String), } +#[derive(Debug, Error)] +#[error("RpcException: code: {0}, message: {1}")] +pub struct RpcException(pub i32, pub String); + #[derive(Debug, Error)] #[error("{0}")] pub struct RemotingCommandDecoderError(pub String); diff --git a/rocketmq-remoting/src/protocol/header/pull_message_response_header.rs b/rocketmq-remoting/src/protocol/header/pull_message_response_header.rs index e57fd35a..071a259c 100644 --- a/rocketmq-remoting/src/protocol/header/pull_message_response_header.rs +++ b/rocketmq-remoting/src/protocol/header/pull_message_response_header.rs @@ -25,7 +25,9 @@ use serde::Serialize; use crate::protocol::FastCodesHeader; -#[derive(Serialize, Deserialize, Debug, Default, RemotingSerializable, RequestHeaderCodec)] +#[derive( + Serialize, Deserialize, Debug, Default, RemotingSerializable, RequestHeaderCodec, Clone, +)] #[serde(rename_all = "camelCase")] pub struct PullMessageResponseHeader { pub suggest_which_broker_id: Option, diff --git a/rocketmq-remoting/src/protocol/remoting_command.rs b/rocketmq-remoting/src/protocol/remoting_command.rs index b519458f..26d9ce11 100644 --- a/rocketmq-remoting/src/protocol/remoting_command.rs +++ b/rocketmq-remoting/src/protocol/remoting_command.rs @@ -217,6 +217,16 @@ impl RemotingCommand { self } + pub fn set_command_custom_header_origin( + mut self, + command_custom_header: Option< + Arc>, + >, + ) -> Self { + self.command_custom_header = command_custom_header; + self + } + pub fn set_command_custom_header_ref(&mut self, command_custom_header: T) where T: CommandCustomHeader + Sync + Send + 'static, diff --git a/rocketmq-remoting/src/protocol/static_topic.rs b/rocketmq-remoting/src/protocol/static_topic.rs index 28b41a1b..70459d67 100644 --- a/rocketmq-remoting/src/protocol/static_topic.rs +++ b/rocketmq-remoting/src/protocol/static_topic.rs @@ -19,3 +19,4 @@ pub mod logic_queue_mapping_item; pub mod topic_queue_info; pub mod topic_queue_mapping_context; pub mod topic_queue_mapping_detail; +pub mod topic_queue_mapping_utils; diff --git a/rocketmq-remoting/src/protocol/static_topic/logic_queue_mapping_item.rs b/rocketmq-remoting/src/protocol/static_topic/logic_queue_mapping_item.rs index 2625aefe..4e1cca1a 100644 --- a/rocketmq-remoting/src/protocol/static_topic/logic_queue_mapping_item.rs +++ b/rocketmq-remoting/src/protocol/static_topic/logic_queue_mapping_item.rs @@ -42,4 +42,24 @@ impl LogicQueueMappingItem { } self.logic_offset + (physical_queue_offset - self.start_offset) } + + pub fn compute_physical_queue_offset(&self, static_queue_offset: i64) -> i64 { + self.start_offset + (static_queue_offset - self.logic_offset) + } + + pub fn compute_offset_delta(&self) -> i64 { + self.logic_offset - self.start_offset + } + + pub fn check_if_end_offset_decided(&self) -> bool { + self.end_offset > self.start_offset + } + + pub fn compute_max_static_queue_offset(&self) -> i64 { + if self.end_offset >= self.start_offset { + self.logic_offset + self.end_offset - self.start_offset + } else { + self.logic_offset + } + } } diff --git a/rocketmq-remoting/src/protocol/static_topic/topic_queue_mapping_detail.rs b/rocketmq-remoting/src/protocol/static_topic/topic_queue_mapping_detail.rs index cb47ec30..ac69e68b 100644 --- a/rocketmq-remoting/src/protocol/static_topic/topic_queue_mapping_detail.rs +++ b/rocketmq-remoting/src/protocol/static_topic/topic_queue_mapping_detail.rs @@ -16,7 +16,7 @@ pub struct TopicQueueMappingDetail { } impl TopicQueueMappingDetail { - pub fn get_mapping_info( + /* pub fn get_mapping_info( mapping_detail: &TopicQueueMappingDetail, global_id: i32, ) -> Option> { @@ -26,6 +26,29 @@ impl TopicQueueMappingDetail { } } None + }*/ + + pub fn get_mapping_info( + mapping_detail: &TopicQueueMappingDetail, + global_id: i32, + ) -> Option<&Vec> { + mapping_detail.hosted_queues.as_ref()?.get(&global_id) + } + + pub fn compute_max_offset_from_mapping( + mapping_detail: &TopicQueueMappingDetail, + global_id: Option, + ) -> i64 { + match Self::get_mapping_info(mapping_detail, global_id.unwrap()) { + Some(mapping_items) => { + if mapping_items.is_empty() { + return -1; + } + let item = mapping_items.last().unwrap(); + item.compute_max_static_queue_offset() + } + None => -1, + } } } diff --git a/rocketmq-remoting/src/protocol/static_topic/topic_queue_mapping_utils.rs b/rocketmq-remoting/src/protocol/static_topic/topic_queue_mapping_utils.rs new file mode 100644 index 00000000..0fb085af --- /dev/null +++ b/rocketmq-remoting/src/protocol/static_topic/topic_queue_mapping_utils.rs @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use crate::protocol::static_topic::logic_queue_mapping_item::LogicQueueMappingItem; + +pub struct TopicQueueMappingUtils; + +impl TopicQueueMappingUtils { + pub fn find_logic_queue_mapping_item( + mapping_items: &[LogicQueueMappingItem], + logic_offset: i64, + ignore_negative: bool, + ) -> Option<&LogicQueueMappingItem> { + if mapping_items.is_empty() { + return None; + } + // Could use binary search to improve performance + for i in (0..mapping_items.len()).rev() { + let item = &mapping_items[i]; + if ignore_negative && item.logic_offset < 0 { + continue; + } + if logic_offset >= item.logic_offset { + return Some(item); + } + } + // If not found, maybe out of range, return the first one + for item in mapping_items.iter() { + if ignore_negative && item.logic_offset < 0 { + continue; + } else { + return Some(item); + } + } + None + } + + pub fn find_next<'a>( + items: &'a [LogicQueueMappingItem], + current_item: Option<&'a LogicQueueMappingItem>, + ignore_negative: bool, + ) -> Option<&'a LogicQueueMappingItem> { + if items.is_empty() || current_item.is_none() { + return None; + } + let current_item = current_item.unwrap(); + for i in 0..items.len() { + let item = &items[i]; + if ignore_negative && item.logic_offset < 0 { + continue; + } + if item.gen == current_item.gen { + if i < items.len() - 1 { + let next_item = &items[i + 1]; + if ignore_negative && next_item.logic_offset < 0 { + return None; + } else { + return Some(next_item); + } + } else { + return None; + } + } + } + None + } +} diff --git a/rocketmq-remoting/src/rpc.rs b/rocketmq-remoting/src/rpc.rs index 61df3c21..fb4dcc49 100644 --- a/rocketmq-remoting/src/rpc.rs +++ b/rocketmq-remoting/src/rpc.rs @@ -14,6 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -mod rpc_request; +pub mod rpc_client_utils; +pub mod rpc_request; pub mod rpc_request_header; -mod topic_request_header; +pub mod rpc_response; +pub mod topic_request_header; diff --git a/rocketmq-remoting/src/rpc/rpc_client_utils.rs b/rocketmq-remoting/src/rpc/rpc_client_utils.rs new file mode 100644 index 00000000..0266971b --- /dev/null +++ b/rocketmq-remoting/src/rpc/rpc_client_utils.rs @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::any::Any; + +use bytes::Bytes; + +use crate::protocol::remoting_command::RemotingCommand; +use crate::rpc::rpc_request::RpcRequest; +use crate::rpc::rpc_response::RpcResponse; + +pub struct RpcClientUtils; + +impl RpcClientUtils { + pub fn create_command_for_rpc_request(rpc_request: RpcRequest) -> RemotingCommand { + let cmd = RemotingCommand::create_request_command(rpc_request.code, rpc_request.header); + cmd.set_body(Self::encode_body(rpc_request.body)) + } + + pub fn create_command_for_rpc_response(mut rpc_response: RpcResponse) -> RemotingCommand { + let mut cmd = match rpc_response.header.take() { + None => RemotingCommand::create_response_command_with_code(rpc_response.code), + Some(value) => RemotingCommand::create_response_command() + .set_command_custom_header_origin(Some(value)), + }; + match rpc_response.exception { + None => {} + Some(value) => cmd.set_remark_ref(Some(value.1.clone())), + } + cmd.set_body(Self::encode_body(rpc_response.body)) + } + + pub fn encode_body(_body: Option>) -> Option { + /*if body.is_none() { + return None; + } + let body = body.unwrap(); + if body.is::() { + return Some(body.downcast_ref::().unwrap().clone()); + } else if body.is::>() { + return Some(Bytes::from( + body.downcast_ref::>().unwrap().as_ref(), + )); + } + /*else if body.is::>() { + return Some(Bytes::from( + body.downcast_ref::>() + .unwrap() + .encode(), + )); + + }*/ + else { + None + }*/ + None + } +} diff --git a/rocketmq-remoting/src/rpc/rpc_request.rs b/rocketmq-remoting/src/rpc/rpc_request.rs index 32997337..4cadb898 100644 --- a/rocketmq-remoting/src/rpc/rpc_request.rs +++ b/rocketmq-remoting/src/rpc/rpc_request.rs @@ -14,12 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -use bytes::Bytes; +use std::any::Any; use crate::rpc::rpc_request_header::RpcRequestHeader; +#[derive(Default)] pub struct RpcRequest { pub code: i32, pub header: RpcRequestHeader, - pub body: Option, + pub body: Option>, +} + +impl RpcRequest { + pub fn new(code: i32, header: RpcRequestHeader, body: Option>) -> Self { + Self { code, header, body } + } } diff --git a/rocketmq-remoting/src/rpc/rpc_response.rs b/rocketmq-remoting/src/rpc/rpc_response.rs new file mode 100644 index 00000000..4a4182a9 --- /dev/null +++ b/rocketmq-remoting/src/rpc/rpc_response.rs @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::any::Any; +use std::cell::SyncUnsafeCell; +use std::sync::Arc; + +use crate::error::RpcException; +use crate::protocol::command_custom_header::CommandCustomHeader; + +#[derive(Default)] +pub struct RpcResponse { + pub code: i32, + pub header: Option>>, + pub body: Option>, + pub exception: Option, +} + +impl RpcResponse { + pub fn new(exception: Option) -> Self { + Self { + code: exception.as_ref().map_or(0, |e| e.0), + header: None, + body: None, + exception, + } + } + + pub fn get_header(&self) -> Option<&T> + where + T: CommandCustomHeader + Send + Sync + 'static, + { + match self.header.as_ref() { + None => None, + Some(value) => { + let value = value.get(); + let value = value as *const dyn CommandCustomHeader as *const T; + unsafe { Some(&*value) } + } + } + } + + pub fn get_header_mut(&self) -> Option<&mut T> + where + T: CommandCustomHeader + Send + Sync + 'static, + { + match self.header.as_ref() { + None => None, + Some(value) => { + let value = value.get(); + let value = value as *const dyn CommandCustomHeader as *mut T; + unsafe { Some(&mut *value) } + } + } + } +} From 4dfc094fdc94f25e8f30b3ea14cb48a497fd586b Mon Sep 17 00:00:00 2001 From: TeslaRustor <77013810+TeslaRustor@users.noreply.github.com> Date: Thu, 27 Jun 2024 15:21:06 +0000 Subject: [PATCH 2/2] =?UTF-8?q?[ISSUE=20#699]=F0=9F=9A=80Support=20pull=20?= =?UTF-8?q?message=20result=20handle?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rocketmq-broker/src/broker_runtime.rs | 2 ++ .../src/processor/default_pull_message_result_handler.rs | 3 +-- rocketmq-broker/src/processor/pull_message_processor.rs | 1 - 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index 8a142b39..fbd5097e 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -54,6 +54,7 @@ use crate::client::manager::producer_manager::ProducerManager; use crate::filter::manager::consumer_filter_manager::ConsumerFilterManager; use crate::hook::batch_check_before_put_message::BatchCheckBeforePutMessageHook; use crate::hook::check_before_put_message::CheckBeforePutMessageHook; +use crate::offset::manager::broadcast_offset_manager::BroadcastOffsetManager; use crate::offset::manager::consumer_offset_manager::ConsumerOffsetManager; use crate::offset::manager::consumer_order_info_manager::ConsumerOrderInfoManager; use crate::out_api::broker_outer_api::BrokerOuterAPI; @@ -372,6 +373,7 @@ impl BrokerRuntime { self.consumer_manager.clone(), self.consumer_filter_manager.clone(), Arc::new(self.consumer_offset_manager.clone()), + Arc::new(BroadcastOffsetManager::default()), self.message_store.as_ref().unwrap().clone(), ); diff --git a/rocketmq-broker/src/processor/default_pull_message_result_handler.rs b/rocketmq-broker/src/processor/default_pull_message_result_handler.rs index c994eed2..c32c5be1 100644 --- a/rocketmq-broker/src/processor/default_pull_message_result_handler.rs +++ b/rocketmq-broker/src/processor/default_pull_message_result_handler.rs @@ -38,7 +38,6 @@ use tracing::info; use crate::mqtrace::consume_message_context::ConsumeMessageContext; use crate::mqtrace::consume_message_hook::ConsumeMessageHook; -use crate::processor::pull_message_processor::PullMessageProcessor; use crate::processor::pull_message_result_handler::PullMessageResultHandler; use crate::topic::manager::topic_config_manager::TopicConfigManager; @@ -75,7 +74,7 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler { broker_allow_suspend: bool, message_filter: Box, mut response: RemotingCommand, - mut mapping_context: TopicQueueMappingContext, + mapping_context: TopicQueueMappingContext, begin_time_mills: u64, ) -> Option { let client_address = channel.remote_address().to_string(); diff --git a/rocketmq-broker/src/processor/pull_message_processor.rs b/rocketmq-broker/src/processor/pull_message_processor.rs index b0e03a96..581d3d0e 100644 --- a/rocketmq-broker/src/processor/pull_message_processor.rs +++ b/rocketmq-broker/src/processor/pull_message_processor.rs @@ -48,7 +48,6 @@ use rocketmq_store::log_file::MAX_PULL_MSG_SIZE; use tracing::error; use tracing::warn; -use crate::client::client_channel_info::ClientChannelInfo; use crate::client::consumer_group_info::ConsumerGroupInfo; use crate::client::manager::consumer_manager::ConsumerManager; use crate::filter::expression_for_retry_message_filter::ExpressionForRetryMessageFilter;