-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #702]🚀Support pull message result handle-2🚀 #706
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,17 +14,22 @@ | |
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
use std::net::SocketAddr; | ||
use std::sync::Arc; | ||
|
||
use bytes::Bytes; | ||
use bytes::BytesMut; | ||
use rocketmq_common::common::broker::broker_config::BrokerConfig; | ||
use rocketmq_common::common::mix_all::MASTER_ID; | ||
use rocketmq_common::common::sys_flag::pull_sys_flag::PullSysFlag; | ||
use rocketmq_remoting::code::response_code::RemotingSysResponseCode; | ||
use rocketmq_remoting::code::response_code::ResponseCode; | ||
use rocketmq_remoting::net::channel::Channel; | ||
use rocketmq_remoting::protocol::header::pull_message_request_header::PullMessageRequestHeader; | ||
use rocketmq_remoting::protocol::header::pull_message_response_header::PullMessageResponseHeader; | ||
use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData; | ||
use rocketmq_remoting::protocol::remoting_command::RemotingCommand; | ||
use rocketmq_remoting::protocol::request_source::RequestSource; | ||
use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_context::TopicQueueMappingContext; | ||
use rocketmq_remoting::protocol::subscription::subscription_group_config::SubscriptionGroupConfig; | ||
use rocketmq_remoting::protocol::NamespaceUtil; | ||
|
@@ -36,25 +41,42 @@ | |
use tracing::debug; | ||
use tracing::info; | ||
|
||
use crate::client::manager::consumer_manager::ConsumerManager; | ||
use crate::mqtrace::consume_message_context::ConsumeMessageContext; | ||
use crate::mqtrace::consume_message_hook::ConsumeMessageHook; | ||
use crate::offset::manager::broadcast_offset_manager::BroadcastOffsetManager; | ||
use crate::offset::manager::consumer_offset_manager::ConsumerOffsetManager; | ||
use crate::processor::pull_message_processor::is_broadcast; | ||
use crate::processor::pull_message_processor::rewrite_response_for_static_topic; | ||
use crate::processor::pull_message_result_handler::PullMessageResultHandler; | ||
use crate::topic::manager::topic_config_manager::TopicConfigManager; | ||
|
||
pub struct DefaultPullMessageResultHandler { | ||
topic_config_manager: Arc<TopicConfigManager>, | ||
consumer_offset_manager: Arc<ConsumerOffsetManager>, | ||
consumer_manager: Arc<ConsumerManager>, | ||
broadcast_offset_manager: Arc<BroadcastOffsetManager>, | ||
broker_stats_manager: Arc<BrokerStatsManager>, | ||
broker_config: Arc<BrokerConfig>, | ||
consume_message_hook_list: Arc<Vec<Box<dyn ConsumeMessageHook>>>, | ||
} | ||
|
||
impl DefaultPullMessageResultHandler { | ||
pub fn new( | ||
topic_config_manager: Arc<TopicConfigManager>, | ||
consumer_offset_manager: Arc<ConsumerOffsetManager>, | ||
consumer_manager: Arc<ConsumerManager>, | ||
broadcast_offset_manager: Arc<BroadcastOffsetManager>, | ||
broker_stats_manager: Arc<BrokerStatsManager>, | ||
broker_config: Arc<BrokerConfig>, | ||
consume_message_hook_list: Arc<Vec<Box<dyn ConsumeMessageHook>>>, | ||
) -> Self { | ||
Self { | ||
topic_config_manager, | ||
consumer_offset_manager, | ||
consumer_manager, | ||
broadcast_offset_manager, | ||
broker_stats_manager, | ||
broker_config, | ||
consume_message_hook_list, | ||
} | ||
|
@@ -74,7 +96,7 @@ | |
broker_allow_suspend: bool, | ||
message_filter: Box<dyn MessageFilter>, | ||
mut response: RemotingCommand, | ||
mapping_context: TopicQueueMappingContext, | ||
mut mapping_context: TopicQueueMappingContext, | ||
begin_time_mills: u64, | ||
) -> Option<RemotingCommand> { | ||
let client_address = channel.remote_address().to_string(); | ||
|
@@ -90,28 +112,95 @@ | |
&mut response, | ||
client_address.as_str(), | ||
); | ||
let code = From::from(response.code()); | ||
self.execute_consume_message_hook_before( | ||
&request, | ||
&request_header, | ||
&get_message_result, | ||
broker_allow_suspend, | ||
From::from(response.code()), | ||
code, | ||
); | ||
/*let response_header = response.read_custom_header_mut::<PullMessageResponseHeader>(); | ||
let rewrite_result = PullMessageProcessor::rewrite_response_for_static_topic( | ||
let response_header = response.read_custom_header_mut::<PullMessageResponseHeader>(); | ||
let rewrite_result = rewrite_response_for_static_topic( | ||
&request_header, | ||
response_header.unwrap(), | ||
&mut mapping_context, | ||
ResponseCode::from(response.code()), | ||
code, | ||
); | ||
if rewrite_result.is_some() { | ||
return rewrite_result; | ||
}*/ | ||
None | ||
} | ||
self.update_broadcast_pulled_offset( | ||
request_header.topic.as_str(), | ||
request_header.consumer_group.as_str(), | ||
request_header.queue_id.unwrap(), | ||
&request_header, | ||
&channel, | ||
Some(&response), | ||
get_message_result.next_begin_offset(), | ||
); | ||
Comment on lines
+133
to
+141
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Complex Logic in The methods - Existing complex methods
+ Proposed simplified versions (details depend on further context) Also applies to: 143-147 |
||
self.try_commit_offset( | ||
broker_allow_suspend, | ||
&request_header, | ||
get_message_result.next_begin_offset(), | ||
channel.remote_address(), | ||
); | ||
|
||
match code { | ||
ResponseCode::Success => { | ||
self.broker_stats_manager.inc_group_get_nums( | ||
request_header.consumer_group.as_str(), | ||
request_header.topic.as_str(), | ||
get_message_result.message_count(), | ||
); | ||
self.broker_stats_manager.inc_group_get_size( | ||
request_header.consumer_group.as_str(), | ||
request_header.topic.as_str(), | ||
get_message_result.buffer_total_size(), | ||
); | ||
self.broker_stats_manager.inc_broker_get_nums( | ||
request_header.topic.as_str(), | ||
get_message_result.message_count(), | ||
); | ||
if self.broker_config.transfer_msg_by_heap { | ||
let body = self.read_get_message_result( | ||
&get_message_result, | ||
request_header.consumer_group.as_str(), | ||
request_header.topic.as_str(), | ||
request_header.queue_id.unwrap(), | ||
); | ||
return Some(response.set_body(body)); | ||
} /*else { | ||
None | ||
}*/ | ||
None | ||
} | ||
ResponseCode::PullNotFound => Some(response), | ||
ResponseCode::PullOffsetMoved => Some(response), | ||
ResponseCode::PullRetryImmediately => Some(response), | ||
_ => None, | ||
} | ||
} | ||
} | ||
|
||
impl DefaultPullMessageResultHandler { | ||
fn read_get_message_result( | ||
&self, | ||
get_message_result: &GetMessageResult, | ||
_group: &str, | ||
_topic: &str, | ||
_queue_id: i32, | ||
) -> Option<Bytes> { | ||
let mut bytes_mut = | ||
BytesMut::with_capacity(get_message_result.buffer_total_size() as usize); | ||
for msg in get_message_result.message_mapped_list() { | ||
let data = &msg.mapped_file.as_ref().unwrap().get_mapped_file() | ||
[msg.start_offset as usize..(msg.start_offset + msg.size as u64) as usize]; | ||
bytes_mut.extend_from_slice(data); | ||
} | ||
Some(bytes_mut.freeze()) | ||
} | ||
|
||
fn execute_consume_message_hook_before( | ||
&self, | ||
request: &RemotingCommand, | ||
|
@@ -306,4 +395,86 @@ | |
} | ||
response.set_command_custom_header_ref(response_header) | ||
} | ||
|
||
fn try_commit_offset( | ||
&self, | ||
broker_allow_suspend: bool, | ||
request_header: &PullMessageRequestHeader, | ||
next_offset: i64, | ||
client_address: SocketAddr, | ||
) { | ||
self.consumer_offset_manager.commit_pull_offset( | ||
client_address, | ||
request_header.consumer_group.as_str(), | ||
request_header.topic.as_str(), | ||
request_header.queue_id.unwrap(), | ||
next_offset, | ||
); | ||
|
||
let mut store_offset_enable = broker_allow_suspend; | ||
let has_commit_offset_flag = | ||
PullSysFlag::has_commit_offset_flag(request_header.sys_flag as u32); | ||
store_offset_enable = store_offset_enable && has_commit_offset_flag; | ||
if store_offset_enable { | ||
self.consumer_offset_manager.commit_offset( | ||
client_address, | ||
request_header.consumer_group.as_str(), | ||
request_header.topic.as_str(), | ||
request_header.queue_id.unwrap(), | ||
request_header.commit_offset, | ||
); | ||
} | ||
} | ||
|
||
fn update_broadcast_pulled_offset( | ||
&self, | ||
topic: &str, | ||
group: &str, | ||
queue_id: i32, | ||
request_header: &PullMessageRequestHeader, | ||
channel: &Channel, | ||
response: Option<&RemotingCommand>, | ||
next_begin_offset: i64, | ||
) { | ||
if response.is_none() || !self.broker_config.enable_broadcast_offset_store { | ||
return; | ||
} | ||
let proxy_pull_broadcast = | ||
request_header.request_source == Some(RequestSource::ProxyForBroadcast.get_value()); | ||
let consumer_group_info = self.consumer_manager.get_consumer_group_info(group); | ||
|
||
if is_broadcast(proxy_pull_broadcast, consumer_group_info.as_ref()) { | ||
let mut offset = request_header.queue_offset; | ||
if let Some(response) = response { | ||
if ResponseCode::from(response.code()) == ResponseCode::PullOffsetMoved { | ||
offset = next_begin_offset; | ||
} | ||
} | ||
|
||
let client_id = if proxy_pull_broadcast { | ||
request_header | ||
.proxy_forward_client_id | ||
.clone() | ||
.unwrap_or_default() | ||
} else if let Some(ref consumer_group_info) = consumer_group_info { | ||
if let Some(ref client_channel_info) = | ||
consumer_group_info.find_channel_by_channel(channel) | ||
{ | ||
client_channel_info.client_id().clone() | ||
} else { | ||
return; | ||
} | ||
} else { | ||
return; | ||
}; | ||
self.broadcast_offset_manager.update_offset( | ||
topic, | ||
group, | ||
queue_id, | ||
offset, | ||
client_id.as_str(), | ||
proxy_pull_broadcast, | ||
); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implementation needed for
update_offset
method.The method
update_offset
is currently marked asunimplemented!()
. This suggests that the functionality is not yet complete.Do you have a timeline for implementing this? It might be beneficial to add a
TODO
comment or create a tracking issue to ensure this does not get overlooked.