Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1037]Optimize DefaultMQPushConsumer #1038

Merged
merged 1 commit into from
Oct 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion rocketmq-client/src/base/validators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ impl Validators {
),
));
}

Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ impl DefaultMQPushConsumerImpl {
self.consumer_config.unit_mode
);
*self.service_state = ServiceState::StartFailed;
// check all config
self.check_config()?;
//copy_subscription is can be removed
self.copy_subscription().await?;
if self.consumer_config.message_model() == MessageModel::Clustering {
self.client_config.change_instance_name_to_pid();
Expand Down Expand Up @@ -474,20 +476,6 @@ impl DefaultMQPushConsumerImpl {
));
}

/* if !util_all::parse_date(self.consumer_config.consume_timestamp.as_str(), DATE_FORMAT)
.is_ok()
{
return Err(MQClientError::MQClientException(
-1,
format!(
"consumeTimestamp is invalid, the valid format is {}, but received {}{}",
DATE_FORMAT,
self.consumer_config.consume_timestamp,
FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL)
),
));
}*/

if self
.consumer_config
.allocate_message_queue_strategy
Expand All @@ -502,16 +490,6 @@ impl DefaultMQPushConsumerImpl {
));
}

/* if self.consumer_config.subscription.is_empty() {
return Err(MQClientError::MQClientErr(
-1,
format!(
"subscription is null{}",
FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL)
),
));
}*/

if self.consumer_config.message_listener.is_none() {
return Err(MQClientError::MQClientErr(
-1,
Expand All @@ -528,14 +506,14 @@ impl DefaultMQPushConsumerImpl {
.as_ref()
.unwrap()
.message_listener_orderly
.is_some()
.is_none()
&& self
.consumer_config
.message_listener
.as_ref()
.unwrap()
.message_listener_concurrently
.is_some()
.is_none()
Comment on lines +509 to +516
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify conditional checks to improve readability

The conditional check uses multiple unwrap() calls on message_listener, which can be simplified for better readability and to avoid potential panics. Consider using pattern matching or if let constructs to handle the Option safely.

For example:

if let Some(message_listener) = &self.consumer_config.message_listener {
    if message_listener.message_listener_orderly.is_none()
        && message_listener.message_listener_concurrently.is_none()
    {
        return Err(MQClientError::MQClientErr(
            -1,
            format!(
                "messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently{}",
                FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL)
            ),
        ));
    }
} else {
    return Err(MQClientError::MQClientErr(
        -1,
        format!(
            "messageListener is null{}",
            FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL)
        ),
    ));
}

{
return Err(MQClientError::MQClientErr(
-1,
Expand Down Expand Up @@ -701,7 +679,18 @@ impl DefaultMQPushConsumerImpl {
async fn copy_subscription(&mut self) -> Result<()> {
let sub = self.consumer_config.subscription();
if !sub.is_empty() {
unimplemented!()
for (topic, sub_expression) in sub.as_ref() {
let subscription_data = FilterAPI::build_subscription_data(topic, sub_expression)
.map_err(|e| {
MQClientError::MQClientErr(
-1,
format!("buildSubscriptionData exception, {}", e),
)
})?;
self.rebalance_impl
.put_subscription_data(topic, subscription_data)
.await;
}
Comment on lines +682 to +693
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify error handling for improved readability

The error handling within the map_err closure can be streamlined for better clarity. Instead of the multi-line closure, consider a more concise expression.

For example:

let subscription_data = FilterAPI::build_subscription_data(topic, sub_expression)
    .map_err(|e| MQClientError::MQClientErr(-1, format!("buildSubscriptionData exception, {}", e)))?;

}
if self.message_listener.is_none() {
self.message_listener = self.consumer_config.message_listener.clone();
Expand Down
25 changes: 24 additions & 1 deletion rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl PullAPIWrapper {
}
msg_vec = inner_msg_vec;
}

// filter message
let mut msg_list_filter_again =
if !subscription_data.tags_set.is_empty() && !subscription_data.class_filter_mode {
let mut msg_vec_again = Vec::with_capacity(msg_vec.len());
Expand Down Expand Up @@ -219,6 +219,29 @@ impl PullAPIWrapper {
}
}

/// Pulls messages from the broker asynchronously.
///
/// # Arguments
///
/// * `mq` - A reference to the `MessageQueue` from which to pull messages.
/// * `sub_expression` - The subscription expression.
/// * `expression_type` - The type of the subscription expression.
/// * `sub_version` - The version of the subscription.
/// * `offset` - The offset from which to start pulling messages.
/// * `max_nums` - The maximum number of messages to pull.
/// * `max_size_in_bytes` - The maximum size of messages to pull in bytes.
/// * `sys_flag` - The system flag for the pull request.
/// * `commit_offset` - The commit offset.
/// * `broker_suspend_max_time_millis` - The maximum time in milliseconds for which the broker
/// can suspend the pull request.
/// * `timeout_millis` - The timeout for the pull request in milliseconds.
/// * `communication_mode` - The communication mode (e.g., sync, async).
/// * `pull_callback` - The callback to execute when the pull request completes.
///
/// # Returns
///
/// A `Result` containing an `Option` with the `PullResultExt` if successful, or an
/// `MQClientError` if an error occurs.
pub async fn pull_kernel_impl<PCB>(
&mut self,
mq: &MessageQueue,
Expand Down
93 changes: 43 additions & 50 deletions rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,86 +14,79 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use rocketmq_common::common::message::message_enum::MessageRequestMode;
use rocketmq_common::ArcRefCellWrapper;
use tracing::info;
use tracing::warn;

use crate::consumer::consumer_impl::message_request::MessageRequest;
use crate::consumer::consumer_impl::pop_request::PopRequest;
use crate::consumer::consumer_impl::pull_request::PullRequest;
use crate::factory::mq_client_instance::MQClientInstance;

#[derive(Clone)]
pub struct PullMessageService {
pop_tx: Option<tokio::sync::mpsc::Sender<PopRequest>>,
pull_tx: Option<tokio::sync::mpsc::Sender<PullRequest>>,
tx: Option<tokio::sync::mpsc::Sender<Box<dyn MessageRequest + Send + 'static>>>,

Check warning on line 29 in rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs#L29

Added line #L29 was not covered by tests
}

impl PullMessageService {
pub fn new() -> Self {
PullMessageService {
pop_tx: None,
pull_tx: None,
}
PullMessageService { tx: None }

Check warning on line 34 in rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs#L34

Added line #L34 was not covered by tests
}
pub async fn start(&mut self, instance: ArcRefCellWrapper<MQClientInstance>) {
let (pop_tx, mut pop_rx) = tokio::sync::mpsc::channel(1024 * 4);
let (pull_tx, mut pull_rx) = tokio::sync::mpsc::channel(1024 * 4);
self.pop_tx = Some(pop_tx);
self.pull_tx = Some(pull_tx);
//let instance_wrapper = ArcRefCellWrapper::new(instance);
let instance_wrapper_clone = instance.clone();
tokio::spawn(async move {
info!(
">>>>>>>>>>>>>>>>>>>>>>>PullMessageService [PopRequest] \
started<<<<<<<<<<<<<<<<<<<<<<<<<<<<"
);
while let Some(request) = pop_rx.recv().await {
if let Some(mut consumer) =
instance.select_consumer(request.get_consumer_group()).await
{
consumer.pop_message(request).await;
} else {
warn!(
"No matched consumer for the PopRequest {}, drop it",
request
)
}
}
});
pub async fn start(&mut self, mut instance: ArcRefCellWrapper<MQClientInstance>) {

Check warning on line 36 in rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs#L36

Added line #L36 was not covered by tests
let (tx, mut rx) =
tokio::sync::mpsc::channel::<Box<dyn MessageRequest + Send + 'static>>(1024 * 4);
self.tx = Some(tx);
tokio::spawn(async move {
info!(
">>>>>>>>>>>>>>>>>>>>>>>PullMessageService [PullRequest] \
started<<<<<<<<<<<<<<<<<<<<<<<<<<<<"
);
while let Some(request) = pull_rx.recv().await {
if let Some(mut consumer) = instance_wrapper_clone
.select_consumer(request.get_consumer_group())
.await
{
consumer.pull_message(request).await;
info!(">>>>>>>>>>>>>>>>>>>>>>>PullMessageService started<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
while let Some(request) = rx.recv().await {
if request.get_message_request_mode() == MessageRequestMode::Pull {
let pull_request =
unsafe { *Box::from_raw(Box::into_raw(request) as *mut PullRequest) };
PullMessageService::pull_message(pull_request, instance.as_mut()).await;
} else {
warn!(
"No matched consumer for the PullRequest {},drop it",
request
)
let pop_request =
unsafe { *Box::from_raw(Box::into_raw(request) as *mut PopRequest) };
PullMessageService::pop_message(pop_request, instance.as_mut()).await;
Comment on lines +42 to +50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Avoid unsafe downcasting to prevent undefined behavior

The use of unsafe blocks to downcast Box<dyn MessageRequest> to PullRequest or PopRequest using pointer casting is risky and can lead to undefined behavior if the actual type does not match the expected type. This might cause application crashes or security vulnerabilities.

Consider implementing safe downcasting by extending the MessageRequest trait with the Any trait and providing an as_any method for downcasting.

Apply these changes to enable safe downcasting:

  1. Extend MessageRequest with Any and add as_any method:

    +use std::any::Any;
    
    -pub trait MessageRequest {
    +pub trait MessageRequest: Any {
        // Existing methods...
    +   fn as_any(&self) -> &dyn Any;
    }
  2. Implement as_any in PullRequest and PopRequest:

    impl MessageRequest for PullRequest {
        // Existing implementations...
        fn as_any(&self) -> &dyn Any {
            self
        }
    }
    
    impl MessageRequest for PopRequest {
        // Existing implementations...
        fn as_any(&self) -> &dyn Any {
            self
        }
    }
  3. Modify the message handling loop to use safe downcasting:

    while let Some(request) = rx.recv().await {
        if let Some(pull_request) = request.as_any().downcast_ref::<PullRequest>() {
            Self::pull_message(pull_request.clone(), instance.as_mut()).await;
        } else if let Some(pop_request) = request.as_any().downcast_ref::<PopRequest>() {
            Self::pop_message(pop_request.clone(), instance.as_mut()).await;
        } else {
            warn!("Received unknown MessageRequest type");
        }
    }

This approach eliminates the need for unsafe code and ensures type safety when downcasting.

}
}
});
}

async fn pull_message(request: PullRequest, instance: &mut MQClientInstance) {

Check warning on line 56 in rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs#L56

Added line #L56 was not covered by tests
if let Some(mut consumer) = instance.select_consumer(request.get_consumer_group()).await {
consumer.pull_message(request).await;
} else {
warn!(
"No matched consumer for the PullRequest {},drop it",
request
)
}
}

Check warning on line 65 in rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs#L65

Added line #L65 was not covered by tests

async fn pop_message(request: PopRequest, instance: &mut MQClientInstance) {

Check warning on line 67 in rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs#L67

Added line #L67 was not covered by tests
if let Some(mut consumer) = instance.select_consumer(request.get_consumer_group()).await {
consumer.pop_message(request).await;
} else {
warn!(
"No matched consumer for the PopRequest {}, drop it",
request
)
}
}

Check warning on line 76 in rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs#L76

Added line #L76 was not covered by tests

pub fn execute_pull_request_later(&self, pull_request: PullRequest, time_delay: u64) {
let this = self.clone();
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(time_delay)).await;
if let Err(e) = this.pull_tx.as_ref().unwrap().send(pull_request).await {
if let Err(e) = this.tx.as_ref().unwrap().send(Box::new(pull_request)).await {

Check warning on line 82 in rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs#L82

Added line #L82 was not covered by tests
warn!("Failed to send pull request to pull_tx, error: {:?}", e);
}
Comment on lines +82 to 84
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Outdated channel name in error messages for pull requests

In execute_pull_request_later and execute_pull_request_immediately, the error messages reference pull_tx, which no longer exists after consolidating channels into tx.

Update the error messages to reflect the current channel name to avoid confusion.

Apply this diff to update the error messages:

 if let Err(e) = self.tx.as_ref().unwrap().send(Box::new(pull_request)).await {
-    warn!("Failed to send pull request to pull_tx, error: {:?}", e);
+    warn!("Failed to send pull request to tx, error: {:?}", e);
 }

Ensure all mentions of pull_tx are updated accordingly.

Also applies to: 89-91

});
}

pub async fn execute_pull_request_immediately(&self, pull_request: PullRequest) {
if let Err(e) = self.pull_tx.as_ref().unwrap().send(pull_request).await {
if let Err(e) = self.tx.as_ref().unwrap().send(Box::new(pull_request)).await {
warn!("Failed to send pull request to pull_tx, error: {:?}", e);
}
}
Expand All @@ -102,14 +95,14 @@
let this = self.clone();
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(time_delay)).await;
if let Err(e) = this.pop_tx.as_ref().unwrap().send(pop_request).await {
if let Err(e) = this.tx.as_ref().unwrap().send(Box::new(pop_request)).await {

Check warning on line 98 in rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs#L98

Added line #L98 was not covered by tests
warn!("Failed to send pull request to pull_tx, error: {:?}", e);
}
Comment on lines +98 to 100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Incorrect error messages for pop requests

In execute_pop_pull_request_later and execute_pop_pull_request_immediately, the error messages incorrectly mention "pull request" and "pull_tx" when they should refer to "pop request" and "tx".

Update the error messages to accurately reflect the operation and current channel name.

Apply this diff to correct the error messages:

 if let Err(e) = this.tx.as_ref().unwrap().send(Box::new(pop_request)).await {
-    warn!("Failed to send pull request to pull_tx, error: {:?}", e);
+    warn!("Failed to send pop request to tx, error: {:?}", e);
 }

Repeat the change for both methods to maintain consistency.

Also applies to: 105-107

});
}

pub async fn execute_pop_pull_request_immediately(&self, pop_request: PopRequest) {
if let Err(e) = self.pop_tx.as_ref().unwrap().send(pop_request).await {
if let Err(e) = self.tx.as_ref().unwrap().send(Box::new(pop_request)).await {
warn!("Failed to send pull request to pull_tx, error: {:?}", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,7 @@ where
let mut balanced = true;
let sub_table = self.subscription_inner.read().await;
if !sub_table.is_empty() {
let topics = sub_table
.keys()
.map(|item| item.to_string())
.collect::<HashSet<String>>();
let topics = sub_table.keys().cloned().collect::<HashSet<String>>();
drop(sub_table);
for topic in &topics {
//try_query_assignment unimplemented
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl RebalancePushImpl {
self.rebalance_impl_inner.client_instance = Some(client_instance);
}

#[inline]
pub async fn put_subscription_data(
&mut self,
topic: &str,
Expand Down
6 changes: 5 additions & 1 deletion rocketmq-client/src/consumer/default_mq_push_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub struct ConsumerConfig {
pub(crate) consume_from_where: ConsumeFromWhere,
pub(crate) consume_timestamp: Option<String>,
pub(crate) allocate_message_queue_strategy: Option<Arc<dyn AllocateMessageQueueStrategy>>,
//this field will be removed in a certain version after April 5, 2020
pub(crate) subscription: ArcRefCellWrapper<HashMap<String, String>>,
pub(crate) message_listener: Option<ArcRefCellWrapper<MessageListener>>,
pub(crate) message_queue_listener: Option<Arc<Box<dyn MessageQueueListener>>>,
Expand Down Expand Up @@ -243,6 +244,10 @@ impl ConsumerConfig {
self.allocate_message_queue_strategy = Some(allocate_message_queue_strategy);
}

/**
* This method will be removed in a certain version after April 5, 2020, so please do not
* use this method.
*/
pub fn set_subscription(&mut self, subscription: ArcRefCellWrapper<HashMap<String, String>>) {
self.subscription = subscription;
}
Expand Down Expand Up @@ -414,7 +419,6 @@ impl Default for ConsumerConfig {
}
}

#[derive(Clone)]
pub struct DefaultMQPushConsumer {
client_config: ClientConfig,
consumer_config: ArcRefCellWrapper<ConsumerConfig>,
Expand Down
12 changes: 10 additions & 2 deletions rocketmq-client/src/factory/mq_client_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,16 @@ impl MQClientInstance {
if self.client_config.namesrv_addr.is_none() {
self.mq_client_api_impl
.as_mut()
.unwrap()
.expect("mq_client_api_impl is None")
.fetch_name_server_addr()
.await;
}
// Start request-response channel
self.mq_client_api_impl.as_mut().unwrap().start().await;
self.mq_client_api_impl
.as_mut()
.expect("mq_client_api_impl is None")
.start()
.await;
Comment on lines +285 to +294
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor repeated as_mut().expect() calls for clarity

In the start method, the repeated calls to self.mq_client_api_impl.as_mut().expect("mq_client_api_impl is None") can be refactored to improve readability and reduce redundancy.

Consider assigning the unwrapped mq_client_api_impl to a variable:

+            let mq_client_api_impl = self.mq_client_api_impl.as_mut().expect("mq_client_api_impl is None");
             if self.client_config.namesrv_addr.is_none() {
-                self.mq_client_api_impl
-                    .as_mut()
-                    .expect("mq_client_api_impl is None")
-                    .fetch_name_server_addr()
-                    .await;
+                mq_client_api_impl.fetch_name_server_addr().await;
             }
-            self.mq_client_api_impl
-                .as_mut()
-                .expect("mq_client_api_impl is None")
-                .start()
-                .await;
+            mq_client_api_impl.start().await;

This change reduces code repetition and enhances readability.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
.expect("mq_client_api_impl is None")
.fetch_name_server_addr()
.await;
}
// Start request-response channel
self.mq_client_api_impl.as_mut().unwrap().start().await;
self.mq_client_api_impl
.as_mut()
.expect("mq_client_api_impl is None")
.start()
.await;
let mq_client_api_impl = self.mq_client_api_impl.as_mut().expect("mq_client_api_impl is None");
if self.client_config.namesrv_addr.is_none() {
mq_client_api_impl.fetch_name_server_addr().await;
}
// Start request-response channel
mq_client_api_impl.start().await;

// Start various schedule tasks
self.start_scheduled_task(this.clone());
// Start pull service
Expand Down Expand Up @@ -337,6 +341,7 @@ impl MQClientInstance {

fn start_scheduled_task(&mut self, this: ArcRefCellWrapper<Self>) {
if self.client_config.namesrv_addr.is_none() {
// Fetch name server address
let mut mq_client_api_impl = self.mq_client_api_impl.as_ref().unwrap().clone();
self.instance_runtime.get_handle().spawn(async move {
info!("ScheduledTask fetchNameServerAddr started");
Expand All @@ -352,6 +357,7 @@ impl MQClientInstance {
});
}

// Update topic route info from name server
let mut client_instance = this.clone();
let poll_name_server_interval = self.client_config.poll_name_server_interval;
self.instance_runtime.get_handle().spawn(async move {
Expand All @@ -370,6 +376,7 @@ impl MQClientInstance {
}
});

// Clean offline broker and send heartbeat to all broker
let mut client_instance = this.clone();
let heartbeat_broker_interval = self.client_config.heartbeat_broker_interval;
self.instance_runtime.get_handle().spawn(async move {
Expand All @@ -389,6 +396,7 @@ impl MQClientInstance {
}
});

// Persist all consumer offset
let mut client_instance = this;
let persist_consumer_offset_interval =
self.client_config.persist_consumer_offset_interval as u64;
Expand Down
Loading
Loading