Skip to content

Commit

Permalink
[ISSUE mxsm#699]🚀Support pull message result handle
Browse files Browse the repository at this point in the history
  • Loading branch information
TeslaRustor committed Jun 27, 2024
1 parent f60b39c commit ba333d0
Show file tree
Hide file tree
Showing 16 changed files with 608 additions and 19 deletions.
15 changes: 15 additions & 0 deletions rocketmq-broker/src/offset/manager/broadcast_offset_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,18 @@
*/
#[derive(Debug, Default)]
pub struct BroadcastOffsetManager {}

#[allow(unused_variables)]
impl BroadcastOffsetManager {
pub fn query_init_offset(
&self,
topic: &str,
group_id: &str,
queue_id: i32,
client_id: &str,
request_offset: i64,
from_proxy: bool,
) -> i64 {
unimplemented!()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use tracing::info;

use crate::mqtrace::consume_message_context::ConsumeMessageContext;
use crate::mqtrace::consume_message_hook::ConsumeMessageHook;
use crate::processor::pull_message_processor::PullMessageProcessor;
use crate::processor::pull_message_result_handler::PullMessageResultHandler;
use crate::topic::manager::topic_config_manager::TopicConfigManager;

Expand Down Expand Up @@ -74,7 +75,7 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler {
broker_allow_suspend: bool,
message_filter: Box<dyn MessageFilter>,
mut response: RemotingCommand,
mapping_context: TopicQueueMappingContext,
mut mapping_context: TopicQueueMappingContext,
begin_time_mills: u64,
) -> Option<RemotingCommand> {
let client_address = channel.remote_address().to_string();
Expand All @@ -90,14 +91,23 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler {
&mut response,
client_address.as_str(),
);
/* self.execute_consume_message_hook_before(
self.execute_consume_message_hook_before(
&request,
&request_header,
&get_message_result,
broker_allow_suspend,
From::from(response.code()),
);*/

);
/*let response_header = response.read_custom_header_mut::<PullMessageResponseHeader>();
let rewrite_result = PullMessageProcessor::rewrite_response_for_static_topic(
&request_header,
response_header.unwrap(),
&mut mapping_context,
ResponseCode::from(response.code()),
);
if rewrite_result.is_some() {
return rewrite_result;
}*/
None
}
}
Expand Down
Loading

0 comments on commit ba333d0

Please sign in to comment.