Skip to content

Commit

Permalink
[ISSUE #699]🚀Support pull message result handle
Browse files Browse the repository at this point in the history
  • Loading branch information
TeslaRustor committed Jun 27, 2024
1 parent ba333d0 commit 4dfc094
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 3 deletions.
2 changes: 2 additions & 0 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use crate::client::manager::producer_manager::ProducerManager;
use crate::filter::manager::consumer_filter_manager::ConsumerFilterManager;
use crate::hook::batch_check_before_put_message::BatchCheckBeforePutMessageHook;
use crate::hook::check_before_put_message::CheckBeforePutMessageHook;
use crate::offset::manager::broadcast_offset_manager::BroadcastOffsetManager;
use crate::offset::manager::consumer_offset_manager::ConsumerOffsetManager;
use crate::offset::manager::consumer_order_info_manager::ConsumerOrderInfoManager;
use crate::out_api::broker_outer_api::BrokerOuterAPI;
Expand Down Expand Up @@ -372,6 +373,7 @@ impl BrokerRuntime {
self.consumer_manager.clone(),
self.consumer_filter_manager.clone(),
Arc::new(self.consumer_offset_manager.clone()),
Arc::new(BroadcastOffsetManager::default()),

Check warning on line 376 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L376

Added line #L376 was not covered by tests
self.message_store.as_ref().unwrap().clone(),
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use tracing::info;

use crate::mqtrace::consume_message_context::ConsumeMessageContext;
use crate::mqtrace::consume_message_hook::ConsumeMessageHook;
use crate::processor::pull_message_processor::PullMessageProcessor;
use crate::processor::pull_message_result_handler::PullMessageResultHandler;
use crate::topic::manager::topic_config_manager::TopicConfigManager;

Expand Down Expand Up @@ -75,7 +74,7 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler {
broker_allow_suspend: bool,
message_filter: Box<dyn MessageFilter>,
mut response: RemotingCommand,
mut mapping_context: TopicQueueMappingContext,
mapping_context: TopicQueueMappingContext,
begin_time_mills: u64,
) -> Option<RemotingCommand> {
let client_address = channel.remote_address().to_string();
Expand Down
1 change: 0 additions & 1 deletion rocketmq-broker/src/processor/pull_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ use rocketmq_store::log_file::MAX_PULL_MSG_SIZE;
use tracing::error;
use tracing::warn;

use crate::client::client_channel_info::ClientChannelInfo;
use crate::client::consumer_group_info::ConsumerGroupInfo;
use crate::client::manager::consumer_manager::ConsumerManager;
use crate::filter::expression_for_retry_message_filter::ExpressionForRetryMessageFilter;
Expand Down

0 comments on commit 4dfc094

Please sign in to comment.