Skip to content

Commit

Permalink
[ISSUE #702]🚀Support pull message result handle-2🚀 (#706)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Jun 28, 2024
1 parent 24c6f65 commit 01cd6f7
Show file tree
Hide file tree
Showing 10 changed files with 369 additions and 136 deletions.
7 changes: 7 additions & 0 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ pub(crate) struct BrokerRuntime {
broker_runtime: Option<RocketMQRuntime>,
producer_manager: Arc<ProducerManager>,
consumer_manager: Arc<ConsumerManager>,
broadcast_offset_manager: Arc<BroadcastOffsetManager>,
drop: Arc<AtomicBool>,
shutdown: Arc<AtomicBool>,
shutdown_hook: Option<BrokerShutdownHook>,
Expand Down Expand Up @@ -124,6 +125,7 @@ impl Clone for BrokerRuntime {
broker_runtime: None,
producer_manager: self.producer_manager.clone(),
consumer_manager: self.consumer_manager.clone(),
broadcast_offset_manager: self.broadcast_offset_manager.clone(),
drop: self.drop.clone(),
shutdown: self.shutdown.clone(),
shutdown_hook: self.shutdown_hook.clone(),
Expand Down Expand Up @@ -196,6 +198,7 @@ impl BrokerRuntime {
broker_runtime: Some(runtime),
producer_manager,
consumer_manager,
broadcast_offset_manager: Arc::new(Default::default()),
drop: Arc::new(AtomicBool::new(false)),
shutdown: Arc::new(AtomicBool::new(false)),
shutdown_hook: None,
Expand Down Expand Up @@ -361,6 +364,10 @@ impl BrokerRuntime {
);
let pull_message_result_handler = DefaultPullMessageResultHandler::new(
Arc::new(self.topic_config_manager.clone()),
Arc::new(self.consumer_offset_manager.clone()),
self.consumer_manager.clone(),
self.broadcast_offset_manager.clone(),
self.broker_stats_manager.clone(),
self.broker_config.clone(),
Arc::new(Default::default()),
);
Expand Down
12 changes: 12 additions & 0 deletions rocketmq-broker/src/offset/manager/broadcast_offset_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,16 @@ impl BroadcastOffsetManager {
) -> i64 {
unimplemented!()
}

pub fn update_offset(
&self,
topic: &str,
group: &str,
queue_id: i32,
offset: i64,
client_id: &str,
from_proxy: bool,
) {
unimplemented!()
}
}
17 changes: 17 additions & 0 deletions rocketmq-broker/src/offset/manager/consumer_offset_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,23 @@ impl ConfigManager for ConsumerOffsetManager {

#[allow(unused_variables)]
impl ConsumerOffsetManager {
pub fn commit_pull_offset(
&self,
_client_host: SocketAddr,
group: &str,
topic: &str,
queue_id: i32,
offset: i64,
) {
let key = format!("{}{}{}", topic, TOPIC_GROUP_SEPARATOR, group);
self.consumer_offset_wrapper
.lock()
.pull_offset_table
.entry(key)
.or_default()
.insert(queue_id, offset);
}

pub fn query_then_erase_reset_offset(
&self,
topic: &str,
Expand Down
185 changes: 178 additions & 7 deletions rocketmq-broker/src/processor/default_pull_message_result_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::net::SocketAddr;
use std::sync::Arc;

use bytes::Bytes;
use bytes::BytesMut;
use rocketmq_common::common::broker::broker_config::BrokerConfig;
use rocketmq_common::common::mix_all::MASTER_ID;
use rocketmq_common::common::sys_flag::pull_sys_flag::PullSysFlag;
use rocketmq_remoting::code::response_code::RemotingSysResponseCode;
use rocketmq_remoting::code::response_code::ResponseCode;
use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::header::pull_message_request_header::PullMessageRequestHeader;
use rocketmq_remoting::protocol::header::pull_message_response_header::PullMessageResponseHeader;
use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_remoting::protocol::request_source::RequestSource;
use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_context::TopicQueueMappingContext;
use rocketmq_remoting::protocol::subscription::subscription_group_config::SubscriptionGroupConfig;
use rocketmq_remoting::protocol::NamespaceUtil;
Expand All @@ -36,25 +41,42 @@ use rocketmq_store::stats::stats_type::StatsType;
use tracing::debug;
use tracing::info;

use crate::client::manager::consumer_manager::ConsumerManager;
use crate::mqtrace::consume_message_context::ConsumeMessageContext;
use crate::mqtrace::consume_message_hook::ConsumeMessageHook;
use crate::offset::manager::broadcast_offset_manager::BroadcastOffsetManager;
use crate::offset::manager::consumer_offset_manager::ConsumerOffsetManager;
use crate::processor::pull_message_processor::is_broadcast;
use crate::processor::pull_message_processor::rewrite_response_for_static_topic;
use crate::processor::pull_message_result_handler::PullMessageResultHandler;
use crate::topic::manager::topic_config_manager::TopicConfigManager;

pub struct DefaultPullMessageResultHandler {
topic_config_manager: Arc<TopicConfigManager>,
consumer_offset_manager: Arc<ConsumerOffsetManager>,
consumer_manager: Arc<ConsumerManager>,
broadcast_offset_manager: Arc<BroadcastOffsetManager>,
broker_stats_manager: Arc<BrokerStatsManager>,
broker_config: Arc<BrokerConfig>,
consume_message_hook_list: Arc<Vec<Box<dyn ConsumeMessageHook>>>,
}

impl DefaultPullMessageResultHandler {
pub fn new(
topic_config_manager: Arc<TopicConfigManager>,
consumer_offset_manager: Arc<ConsumerOffsetManager>,
consumer_manager: Arc<ConsumerManager>,
broadcast_offset_manager: Arc<BroadcastOffsetManager>,
broker_stats_manager: Arc<BrokerStatsManager>,
broker_config: Arc<BrokerConfig>,
consume_message_hook_list: Arc<Vec<Box<dyn ConsumeMessageHook>>>,
) -> Self {
Self {
topic_config_manager,
consumer_offset_manager,
consumer_manager,
broadcast_offset_manager,
broker_stats_manager,
broker_config,
consume_message_hook_list,
}
Expand All @@ -74,7 +96,7 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler {
broker_allow_suspend: bool,
message_filter: Box<dyn MessageFilter>,
mut response: RemotingCommand,
mapping_context: TopicQueueMappingContext,
mut mapping_context: TopicQueueMappingContext,
begin_time_mills: u64,
) -> Option<RemotingCommand> {
let client_address = channel.remote_address().to_string();
Expand All @@ -90,28 +112,95 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler {
&mut response,
client_address.as_str(),
);
let code = From::from(response.code());
self.execute_consume_message_hook_before(
&request,
&request_header,
&get_message_result,
broker_allow_suspend,
From::from(response.code()),
code,
);
/*let response_header = response.read_custom_header_mut::<PullMessageResponseHeader>();
let rewrite_result = PullMessageProcessor::rewrite_response_for_static_topic(
let response_header = response.read_custom_header_mut::<PullMessageResponseHeader>();
let rewrite_result = rewrite_response_for_static_topic(
&request_header,
response_header.unwrap(),
&mut mapping_context,
ResponseCode::from(response.code()),
code,
);
if rewrite_result.is_some() {
return rewrite_result;
}*/
None
}
self.update_broadcast_pulled_offset(
request_header.topic.as_str(),
request_header.consumer_group.as_str(),
request_header.queue_id.unwrap(),
&request_header,
&channel,
Some(&response),
get_message_result.next_begin_offset(),
);
self.try_commit_offset(
broker_allow_suspend,
&request_header,
get_message_result.next_begin_offset(),
channel.remote_address(),
);

match code {
ResponseCode::Success => {
self.broker_stats_manager.inc_group_get_nums(
request_header.consumer_group.as_str(),
request_header.topic.as_str(),
get_message_result.message_count(),
);
self.broker_stats_manager.inc_group_get_size(
request_header.consumer_group.as_str(),
request_header.topic.as_str(),
get_message_result.buffer_total_size(),
);
self.broker_stats_manager.inc_broker_get_nums(
request_header.topic.as_str(),
get_message_result.message_count(),
);
if self.broker_config.transfer_msg_by_heap {
let body = self.read_get_message_result(
&get_message_result,
request_header.consumer_group.as_str(),
request_header.topic.as_str(),
request_header.queue_id.unwrap(),
);
return Some(response.set_body(body));
} /*else {
None
}*/
None
}
ResponseCode::PullNotFound => Some(response),
ResponseCode::PullOffsetMoved => Some(response),
ResponseCode::PullRetryImmediately => Some(response),
_ => None,
}
}
}

impl DefaultPullMessageResultHandler {
fn read_get_message_result(
&self,
get_message_result: &GetMessageResult,
_group: &str,
_topic: &str,
_queue_id: i32,
) -> Option<Bytes> {
let mut bytes_mut =
BytesMut::with_capacity(get_message_result.buffer_total_size() as usize);
for msg in get_message_result.message_mapped_list() {
let data = &msg.mapped_file.as_ref().unwrap().get_mapped_file()
[msg.start_offset as usize..(msg.start_offset + msg.size as u64) as usize];
bytes_mut.extend_from_slice(data);
}
Some(bytes_mut.freeze())
}

fn execute_consume_message_hook_before(
&self,
request: &RemotingCommand,
Expand Down Expand Up @@ -306,4 +395,86 @@ impl DefaultPullMessageResultHandler {
}
response.set_command_custom_header_ref(response_header)
}

fn try_commit_offset(
&self,
broker_allow_suspend: bool,
request_header: &PullMessageRequestHeader,
next_offset: i64,
client_address: SocketAddr,
) {
self.consumer_offset_manager.commit_pull_offset(
client_address,
request_header.consumer_group.as_str(),
request_header.topic.as_str(),
request_header.queue_id.unwrap(),
next_offset,
);

let mut store_offset_enable = broker_allow_suspend;
let has_commit_offset_flag =
PullSysFlag::has_commit_offset_flag(request_header.sys_flag as u32);
store_offset_enable = store_offset_enable && has_commit_offset_flag;
if store_offset_enable {
self.consumer_offset_manager.commit_offset(
client_address,
request_header.consumer_group.as_str(),
request_header.topic.as_str(),
request_header.queue_id.unwrap(),
request_header.commit_offset,
);
}
}

fn update_broadcast_pulled_offset(
&self,
topic: &str,
group: &str,
queue_id: i32,
request_header: &PullMessageRequestHeader,
channel: &Channel,
response: Option<&RemotingCommand>,
next_begin_offset: i64,
) {
if response.is_none() || !self.broker_config.enable_broadcast_offset_store {
return;
}
let proxy_pull_broadcast =
request_header.request_source == Some(RequestSource::ProxyForBroadcast.get_value());
let consumer_group_info = self.consumer_manager.get_consumer_group_info(group);

if is_broadcast(proxy_pull_broadcast, consumer_group_info.as_ref()) {
let mut offset = request_header.queue_offset;
if let Some(response) = response {
if ResponseCode::from(response.code()) == ResponseCode::PullOffsetMoved {
offset = next_begin_offset;
}
}

let client_id = if proxy_pull_broadcast {
request_header
.proxy_forward_client_id
.clone()
.unwrap_or_default()
} else if let Some(ref consumer_group_info) = consumer_group_info {
if let Some(ref client_channel_info) =
consumer_group_info.find_channel_by_channel(channel)
{
client_channel_info.client_id().clone()
} else {
return;
}
} else {
return;
};
self.broadcast_offset_manager.update_offset(
topic,
group,
queue_id,
offset,
client_id.as_str(),
proxy_pull_broadcast,
);
}
}
}
Loading

0 comments on commit 01cd6f7

Please sign in to comment.