-
Notifications
You must be signed in to change notification settings - Fork 112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #981]🔥Optimize client clusting consume⚡️ #985
Conversation
WalkthroughThe changes in this pull request involve significant modifications to the RocketMQ client, primarily focusing on the consumer functionality. Key updates include the removal of the Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Consumer
participant Broker
Client->>Consumer: Request to consume messages
Consumer->>Broker: Pull messages
Broker-->>Consumer: Return messages
Consumer->>Client: Deliver messages
Assessment against linked issues
Possibly related PRs
Suggested labels
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 17
Outside diff range and nitpick comments (2)
rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs (1)
276-280
: Improve logging statement.Consider making the following changes to the logging statement:
- Replace
println!
with a proper logging macro likedebug!
,info!
,warn!
, orerror!
from thetracing
crate for consistency with the rest of the codebase.- Improve the formatting of the log message to make it more readable. For example:
info!( "Dispatching {} pull requests for topic {}", pull_request_list.len(), topic );- Guard the logging statement with a feature flag or log level check to allow controlling verbosity in production builds. For example:
or#[cfg(debug_assertions)] info!(...);if log_enabled!(Level::Info) { info!(...); }rocketmq-client/src/implementation/mq_client_api_impl.rs (1)
836-837
: Reconsider the Change to the Method Signature ofpull_message
Receiver Change: Changing the method receiver from
&mut self
tomut this: ArcRefCellWrapper<Self>
introduces additional complexity due to reference counting and interior mutability. Ensure this change is necessary and that it doesn't introduce unintended side effects. If the goal is to allow the method to be called in contexts requiring ownership or to be sent across threads, consider documenting this rationale.Parameter Type Change: Changing
addr
from&str
toString
results in an ownership transfer. If the function does not need to ownaddr
, consider keeping it as&str
to avoid unnecessary cloning and improve performance.Also applies to: 844-844
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files selected for processing (17)
- rocketmq-client/examples/quickstart/consumer.rs (1 hunks)
- rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs (7 hunks)
- rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs (1 hunks)
- rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs (1 hunks)
- rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs (1 hunks)
- rocketmq-client/src/consumer/consumer_impl/consume_message_service.rs (1 hunks)
- rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs (5 hunks)
- rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs (5 hunks)
- rocketmq-client/src/consumer/consumer_impl/re_balance.rs (1 hunks)
- rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs (4 hunks)
- rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs (1 hunks)
- rocketmq-client/src/consumer/mq_consumer_inner.rs (1 hunks)
- rocketmq-client/src/factory/mq_client_instance.rs (3 hunks)
- rocketmq-client/src/implementation/mq_client_api_impl.rs (5 hunks)
- rocketmq-remoting/src/clients/rocketmq_default_impl.rs (1 hunks)
- rocketmq-remoting/src/protocol/header/unregister_client_request_header.rs (2 hunks)
- rocketmq-remoting/src/protocol/remoting_command.rs (1 hunks)
Additional comments not posted (25)
rocketmq-client/src/consumer/mq_consumer_inner.rs (1)
44-44
: LGTM! Verify the asynchronous behavior is handled correctly.The change from a synchronous to an asynchronous method signature for
persist_consumer_offset
aligns with the overall objective of optimizing the client's message consumption and handling processes.Please ensure that the implementation of this method and its usage throughout the codebase are updated to properly handle the asynchronous behavior. Run the following script to verify the usage of
persist_consumer_offset
:rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs (1)
36-37
: Verify the impact of the function signature change.The function signature change from
fn shutdown(&self, await_terminate_millis: u64)
tofn shutdown(&mut self, await_terminate_millis: u64)
is a breaking change.Please ensure that all calls to this function have been updated to pass a mutable reference to the
ConsumeMessageOrderlyService
instance.Run the following script to verify the function usage:
Verification successful
No existing calls to
ConsumeMessageOrderlyService::shutdown
were found. Verification successful.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify all function calls to `shutdown` pass a mutable reference. # Test: Search for the function usage. Expect: Only occurrences of the new signature. rg --type rust -A 5 $'ConsumeMessageOrderlyService::shutdown'Length of output: 60
Script:
#!/bin/bash # Description: Find all calls to the `shutdown` method and verify they use a mutable reference. # Using ast-grep to find method calls to `shutdown` on mutable references. ast-grep --lang rust --pattern ' impl _ { $_:shutdown($_) { $$$ } } ' # Alternatively, using ripgrep to search for `shutdown` method calls with context. rg --type rust 'shutdown\s*\(' -A 5Length of output: 36430
rocketmq-client/examples/quickstart/consumer.rs (1)
35-35
: Verify the reason for disabling the logger initialization.The logger initialization has been commented out. This change disables logging during the execution of the
main
function, which could lead to missing important log messages that are useful for debugging or monitoring the application.Please verify the reason for disabling the logger initialization. If it's intentional, consider adding a comment to explain the change.
rocketmq-client/src/consumer/consumer_impl/consume_message_service.rs (1)
48-48
: LGTM!The change to the
shutdown
method signature in theConsumeMessageServiceTrait
trait is appropriate and consistent with the method's purpose. Allowing the method to modify the state of the implementing struct during the shutdown process is a reasonable update.Ensure that all implementations of the
ConsumeMessageServiceTrait
trait are updated to match the new method signature.rocketmq-remoting/src/protocol/header/unregister_client_request_header.rs (3)
34-34
: LGTM!The change to make
rpc_request_header
an optional field in theUnregisterClientRequestHeader
struct is a good design decision. It provides flexibility to construct instances of the struct without always requiring anRpcRequestHeader
.
58-58
: LGTM!The update to the
from
method in theFromMap
implementation forUnregisterClientRequestHeader
correctly handles the optionalRpcRequestHeader
value. It is consistent with the field type change and assigns the value obtained from the map appropriately.
65-85
: LGTM!The modifications to the
to_map
method in theCommandCustomHeader
implementation forUnregisterClientRequestHeader
are well-structured and handle the optional fields correctly. The method initializes a newHashMap
, inserts theclient_id
,producer_group
, andconsumer_group
fields conditionally, and extends therpc_request_header
map into the main map if present. The changes are consistent with the field type update and ensure that the method returns the expectedOption<HashMap<String, String>>
.rocketmq-remoting/src/clients/rocketmq_default_impl.rs (1)
376-377
: LGTM!The code change correctly marks the request as a one-way RPC before sending it. This optimization is beneficial for scenarios where a response is not expected, as it avoids unnecessary response handling and can improve performance.
rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs (3)
297-298
: LGTM!The variable renaming from
ci_all
tocid_all
improves code clarity.
Line range hint
304-318
: LGTM!The changes in this code segment, including the variable renaming and the warning log message, are appropriate and improve code clarity.
Line range hint
325-378
: LGTM!The changes in this code segment, including the variable renaming and the enhanced logging statement, are beneficial and improve code clarity and observability.
rocketmq-remoting/src/protocol/remoting_command.rs (1)
360-363
: LGTM!The
mark_oneway_rpc_ref
function correctly marks the command as a one-way RPC reference by setting the corresponding bit in theflag
field. The implementation is consistent with the existingmark_oneway_rpc
function and modifies theflag
field appropriately using a mutable reference.rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs (3)
46-46
: Import statement forMQClientAPIImpl
is appropriateThe import of
MQClientAPIImpl
is necessary for the updated method calls and is correctly added.
Line range hint
111-196
: Efficient handling of inner batched messagesThe added logic correctly identifies and processes messages that require inner decoding. This ensures that messages with
INNER_BATCH_FLAG
andNEED_UNWRAP_FLAG
are appropriately decoded.
196-196
: Proper assignment tomsg_found_list
The assignment of
msg_found_list
with messages wrapped inArcRefCellWrapper
aligns with the expected data structures and usage patterns.rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs (3)
263-263
: Review the logic change in batch size conditionThe condition has been updated from
msgs.len() < consume_batch_size
tomsgs.len() <= consume_batch_size
. This change affects when messages are processed in batches. Confirm that this adjustment is intentional and that it correctly handles cases whenmsgs.len()
equalsconsume_batch_size
.
275-277
: Task spawning withtokio::spawn
is appropriateThe use of
tokio::spawn
to run theconsume_request
task aligns with asynchronous best practices in Rust. This change simplifies task management and is suitable within the Tokio runtime environment.
178-181
: Verify appropriate use oftokio::spawn
for task schedulingThe code now uses
tokio::spawn
directly to spawn asynchronous tasks. Ensure that the application is correctly configured with a Tokio runtime and that this change aligns with the overall task management strategy.Run the following script to check for remaining usages of custom runtimes and confirm consistent task spawning:
Verification successful
Tokio::spawn Usage Verified
The use of
tokio::spawn
inconsume_message_concurrently_service.rs
is appropriate and aligns with the Tokio runtime configuration.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Search for any occurrences of `RocketMQRuntime` in the codebase. rg --type rust 'RocketMQRuntime'Length of output: 2989
rocketmq-client/src/implementation/mq_client_api_impl.rs (2)
54-54
: ImportUnregisterClientRequestHeader
CorrectlyThe
UnregisterClientRequestHeader
is correctly imported to support the newunregister_client
method.
1011-1041
:unregister_client
Method Implemented CorrectlyThe new
unregister_client
method is correctly implemented, constructing the request header and handling the response appropriately. Parameters are appropriately utilized.rocketmq-client/src/factory/mq_client_instance.rs (1)
944-949
: Methodsunregister_consumer
andunregister_producer
are correctly implementedThe methods
unregister_consumer
andunregister_producer
correctly delegate tounregister_client
with the appropriate parameters.rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs (4)
50-50
: Importingtokio::sync::Mutex
The addition of
tokio::sync::Mutex
is appropriate for introducing asynchronous mutex functionality.
100-100
: Introduction ofglobal_lock
field for thread safetyThe new field
global_lock: Arc<Mutex<()>>
inDefaultMQPushConsumerImpl
enhances concurrency control during shutdown operations.
144-144
: Initializingglobal_lock
correctlyThe
global_lock
is correctly initialized in the constructor usingArc::new(Default::default())
.
1291-1311
: Ensure all calls topersist_consumer_offset
are updated for asynchronous executionThe
persist_consumer_offset
method has been changed from a synchronous to an asynchronous function. Please verify that all calls to this method have been updated toawait
its result to prevent any runtime errors.Run the following script to identify all calls to
persist_consumer_offset
that need to be updated:Verification successful
All calls to
persist_consumer_offset
are correctly updated with.await
. No issues found.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Find all calls to `persist_consumer_offset` that may need to be updated. # Search for invocations of `persist_consumer_offset` in the codebase rg --type rust 'persist_consumer_offset\(' # Expected: All calls should now use `.await` on `persist_consumer_offset()`.Length of output: 492
fn shutdown(&mut self, await_terminate_millis: u64) { | ||
todo!() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Breaking change: shutdown
now requires a mutable reference to self
.
The function signature has been modified to take a mutable reference to self
, which is a breaking change for any existing code that calls this function. Please ensure that this change is necessary and update the calling code accordingly.
Additionally, the function body is not implemented and contains a todo!()
macro. Please provide more context on the planned implementation and ensure that the function is fully implemented before merging this pull request.
fn shutdown(&mut self, await_terminate_millis: u64) { | ||
unimplemented!("shutdown") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement the shutdown logic.
The function signature change to take a mutable reference to self
is appropriate, as the shutdown
function is expected to modify the state of the ConsumeMessageOrderlyService
instance.
However, the function is currently unimplemented. Please implement the shutdown logic, considering the following:
- The function should wait for the specified
await_terminate_millis
duration before terminating. - The function should gracefully shutdown any running tasks or processes.
- The function should update the state of the
ConsumeMessageOrderlyService
instance to reflect the shutdown.
@@ -70,4 +70,5 @@ pub trait RebalanceLocal { | |||
async fn do_rebalance(&mut self, is_order: bool) -> bool; | |||
|
|||
fn client_rebalance(&mut self, topic: &str) -> bool; | |||
fn destroy(&mut self); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding a default implementation for the destroy
method.
Adding a new method to the trait is a breaking change, as it requires all existing implementers of the trait to implement this method. To avoid breaking changes and provide a fallback behavior for implementers who may not need any cleanup, consider adding a default implementation to the trait.
Here's an example of how you can add a default implementation:
fn destroy(&mut self) {
// Default implementation that does nothing
}
This way, implementers who need cleanup behavior can override the default implementation, while others can rely on the default implementation.
fn shutdown(&mut self, await_terminate_millis: u64) { | ||
todo!() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Complete the shutdown
function implementation.
The function signature change to take a mutable reference to self
is appropriate, as it allows the function to modify the state of the ConsumeMessagePopConcurrentlyService
instance during shutdown.
However, the function body is currently unimplemented and contains a todo!()
macro. Please complete the implementation to ensure the shutdown
functionality works as intended.
fn destroy(&mut self) { | ||
unimplemented!() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement the destroy
method.
The destroy
method is currently unimplemented. Please add the necessary cleanup logic or resource deallocation code to ensure proper cleanup when the RebalancePushImpl
instance is no longer needed.
Consider adding a TODO comment to track the pending implementation:
fn destroy(&mut self) {
+ // TODO: Implement cleanup logic
unimplemented!()
}
Committable suggestion was skipped due to low confidence.
println!( | ||
"++++++++++++++++++++++++pull_message_async response: {}", | ||
response | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace println!
with Logging Macros for Response Logging
Replace the println!
statement with a logging macro to adhere to consistent logging practices.
Apply this diff:
+use tracing::debug;
...
- println!(
+ debug!(
"++++++++++++++++++++++++pull_message_async response: {}",
response
);
Committable suggestion was skipped due to low confidence.
@@ -253,6 +253,8 @@ where | |||
Ok(()) | |||
} | |||
|
|||
pub async fn shutdown(&mut self) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement the shutdown
method or mark it as unimplemented
The shutdown
method currently has an empty implementation. This could lead to confusion or runtime errors if the method is called. Consider implementing the necessary shutdown logic or explicitly mark it as unimplemented to indicate that it's a work in progress.
Apply this diff to address the issue:
pub async fn shutdown(&mut self) {
- // Empty implementation
+ unimplemented!("Shutdown not implemented yet");
}
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub async fn shutdown(&mut self) {} | |
pub async fn shutdown(&mut self) { | |
unimplemented!("Shutdown not implemented yet"); | |
} |
let consumer_table = self.consumer_table.read().await; | ||
for (_, value) in consumer_table.iter() { | ||
value.persist_consumer_offset().await; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid holding a lock across an .await
point
Holding the read lock on consumer_table
while awaiting persist_consumer_offset().await
can lead to deadlocks or performance bottlenecks because the lock is held for the duration of the asynchronous call. To prevent this, collect the consumers into a list, release the lock, and then perform the asynchronous operations.
Apply this diff to fix the issue:
- let consumer_table = self.consumer_table.read().await;
- for (_, value) in consumer_table.iter() {
- value.persist_consumer_offset().await;
- }
+ let consumers = {
+ let consumer_table = self.consumer_table.read().await;
+ consumer_table.values().cloned().collect::<Vec<_>>()
+ };
+ for consumer in consumers {
+ consumer.persist_consumer_offset().await;
+ }
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let consumer_table = self.consumer_table.read().await; | |
for (_, value) in consumer_table.iter() { | |
value.persist_consumer_offset().await; | |
} | |
let consumers = { | |
let consumer_table = self.consumer_table.read().await; | |
consumer_table.values().cloned().collect::<Vec<_>>() | |
}; | |
for consumer in consumers { | |
consumer.persist_consumer_offset().await; | |
} |
async fn unregister_client( | ||
&mut self, | ||
producer_group: Option<String>, | ||
consumer_group: Option<String>, | ||
) { | ||
let broker_addr_table = self.broker_addr_table.read().await; | ||
for (broker_name, broker_addrs) in broker_addr_table.iter() { | ||
for (id, addr) in broker_addrs.iter() { | ||
if let Err(err) = self | ||
.mq_client_api_impl | ||
.unregister_client( | ||
addr, | ||
self.client_id.as_str(), | ||
producer_group.clone(), | ||
consumer_group.clone(), | ||
self.client_config.mq_client_api_timeout, | ||
) | ||
.await | ||
{ | ||
} else { | ||
info!( | ||
"unregister client[Producer: {:?} Consumer: {:?}] from broker[{} {} {}] \ | ||
success", | ||
producer_group, consumer_group, broker_name, id, addr, | ||
); | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle errors in the unregister_client
call
The error handling block in the if let Err(err)
statement is empty. This means that if an error occurs during the unregister_client
call, it will be silently ignored, which can make debugging difficult. Consider logging the error to provide visibility into potential issues.
Apply this diff to handle the error:
)
.await
{
+ error!(
+ "Failed to unregister client[Producer: {:?} Consumer: {:?}] from broker[{} {} {}]: {}",
+ producer_group, consumer_group, broker_name, id, addr, err
+ );
} else {
info!(
"unregister client[Producer: {:?} Consumer: {:?}] from broker[{} {} {}] success",
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async fn unregister_client( | |
&mut self, | |
producer_group: Option<String>, | |
consumer_group: Option<String>, | |
) { | |
let broker_addr_table = self.broker_addr_table.read().await; | |
for (broker_name, broker_addrs) in broker_addr_table.iter() { | |
for (id, addr) in broker_addrs.iter() { | |
if let Err(err) = self | |
.mq_client_api_impl | |
.unregister_client( | |
addr, | |
self.client_id.as_str(), | |
producer_group.clone(), | |
consumer_group.clone(), | |
self.client_config.mq_client_api_timeout, | |
) | |
.await | |
{ | |
} else { | |
info!( | |
"unregister client[Producer: {:?} Consumer: {:?}] from broker[{} {} {}] \ | |
success", | |
producer_group, consumer_group, broker_name, id, addr, | |
); | |
} | |
} | |
} | |
} | |
async fn unregister_client( | |
&mut self, | |
producer_group: Option<String>, | |
consumer_group: Option<String>, | |
) { | |
let broker_addr_table = self.broker_addr_table.read().await; | |
for (broker_name, broker_addrs) in broker_addr_table.iter() { | |
for (id, addr) in broker_addrs.iter() { | |
if let Err(err) = self | |
.mq_client_api_impl | |
.unregister_client( | |
addr, | |
self.client_id.as_str(), | |
producer_group.clone(), | |
consumer_group.clone(), | |
self.client_config.mq_client_api_timeout, | |
) | |
.await | |
{ | |
error!( | |
"Failed to unregister client[Producer: {:?} Consumer: {:?}] from broker[{} {} {}]: {}", | |
producer_group, consumer_group, broker_name, id, addr, err | |
); | |
} else { | |
info!( | |
"unregister client[Producer: {:?} Consumer: {:?}] from broker[{} {} {}] \ | |
success", | |
producer_group, consumer_group, broker_name, id, addr, | |
); | |
} | |
} | |
} | |
} |
pub async fn shutdown(&mut self, await_terminate_millis: u64) { | ||
let _lock = self.global_lock.lock().await; | ||
match *self.service_state { | ||
ServiceState::CreateJust => { | ||
warn!( | ||
"the consumer [{}] do not start, so do nothing", | ||
self.consumer_config.consumer_group | ||
); | ||
} | ||
ServiceState::Running => { | ||
if let Some(consume_message_concurrently_service) = | ||
self.consume_message_concurrently_service.as_mut() | ||
{ | ||
consume_message_concurrently_service | ||
.consume_message_concurrently_service | ||
.shutdown(await_terminate_millis); | ||
} | ||
self.persist_consumer_offset().await; | ||
let client = self.client_instance.as_mut().unwrap(); | ||
client | ||
.unregister_consumer(self.consumer_config.consumer_group.as_str()) | ||
.await; | ||
client.shutdown().await; | ||
info!( | ||
"the consumer [{}] shutdown OK", | ||
self.consumer_config.consumer_group.as_str() | ||
); | ||
self.rebalance_impl.destroy(); | ||
*self.service_state = ServiceState::ShutdownAlready; | ||
} | ||
ServiceState::ShutdownAlready => { | ||
warn!( | ||
"the consumer [{}] has been shutdown, do nothing", | ||
self.consumer_config.consumer_group | ||
); | ||
} | ||
ServiceState::StartFailed => { | ||
warn!( | ||
"the consumer [{}] start failed, do nothing", | ||
self.consumer_config.consumer_group | ||
); | ||
} | ||
} | ||
drop(_lock); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid holding the global_lock
across .await
points in shutdown
method
In the shutdown
method, the global_lock
is acquired and held across several .await
calls. Holding an async
mutex lock across .await
points can lead to performance bottlenecks and potential deadlocks, as it prevents other tasks from acquiring the lock while the current task is suspended.
Suggestion:
Refactor the shutdown
method to limit the scope of the global_lock
. Acquire the lock only for the critical section that modifies the service_state
, and release it before making any asynchronous calls.
Proposed change:
pub async fn shutdown(&mut self, await_terminate_millis: u64) {
- let _lock = self.global_lock.lock().await;
+ {
+ let _lock = self.global_lock.lock().await;
+ match *self.service_state {
+ ServiceState::CreateJust => {
+ warn!(
+ "the consumer [{}] do not start, so do nothing",
+ self.consumer_config.consumer_group
+ );
+ return;
+ }
+ ServiceState::Running => {
+ *self.service_state = ServiceState::ShutdownAlready;
+ }
+ ServiceState::ShutdownAlready => {
+ warn!(
+ "the consumer [{}] has been shutdown, do nothing",
+ self.consumer_config.consumer_group
+ );
+ return;
+ }
+ ServiceState::StartFailed => {
+ warn!(
+ "the consumer [{}] start failed, do nothing",
+ self.consumer_config.consumer_group
+ );
+ return;
+ }
+ }
+ } // The lock is released here
if let Some(consume_message_concurrently_service) =
self.consume_message_concurrently_service.as_mut()
{
consume_message_concurrently_service
.consume_message_concurrently_service
.shutdown(await_terminate_millis);
}
self.persist_consumer_offset().await;
let client = self.client_instance.as_mut().unwrap();
client
.unregister_consumer(self.consumer_config.consumer_group.as_str())
.await;
client.shutdown().await;
info!(
"the consumer [{}] shutdown OK",
self.consumer_config.consumer_group.as_str()
);
self.rebalance_impl.destroy();
}
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub async fn shutdown(&mut self, await_terminate_millis: u64) { | |
let _lock = self.global_lock.lock().await; | |
match *self.service_state { | |
ServiceState::CreateJust => { | |
warn!( | |
"the consumer [{}] do not start, so do nothing", | |
self.consumer_config.consumer_group | |
); | |
} | |
ServiceState::Running => { | |
if let Some(consume_message_concurrently_service) = | |
self.consume_message_concurrently_service.as_mut() | |
{ | |
consume_message_concurrently_service | |
.consume_message_concurrently_service | |
.shutdown(await_terminate_millis); | |
} | |
self.persist_consumer_offset().await; | |
let client = self.client_instance.as_mut().unwrap(); | |
client | |
.unregister_consumer(self.consumer_config.consumer_group.as_str()) | |
.await; | |
client.shutdown().await; | |
info!( | |
"the consumer [{}] shutdown OK", | |
self.consumer_config.consumer_group.as_str() | |
); | |
self.rebalance_impl.destroy(); | |
*self.service_state = ServiceState::ShutdownAlready; | |
} | |
ServiceState::ShutdownAlready => { | |
warn!( | |
"the consumer [{}] has been shutdown, do nothing", | |
self.consumer_config.consumer_group | |
); | |
} | |
ServiceState::StartFailed => { | |
warn!( | |
"the consumer [{}] start failed, do nothing", | |
self.consumer_config.consumer_group | |
); | |
} | |
} | |
drop(_lock); | |
} | |
pub async fn shutdown(&mut self, await_terminate_millis: u64) { | |
{ | |
let _lock = self.global_lock.lock().await; | |
match *self.service_state { | |
ServiceState::CreateJust => { | |
warn!( | |
"the consumer [{}] do not start, so do nothing", | |
self.consumer_config.consumer_group | |
); | |
return; | |
} | |
ServiceState::Running => { | |
*self.service_state = ServiceState::ShutdownAlready; | |
} | |
ServiceState::ShutdownAlready => { | |
warn!( | |
"the consumer [{}] has been shutdown, do nothing", | |
self.consumer_config.consumer_group | |
); | |
return; | |
} | |
ServiceState::StartFailed => { | |
warn!( | |
"the consumer [{}] start failed, do nothing", | |
self.consumer_config.consumer_group | |
); | |
return; | |
} | |
} | |
} // The lock is released here | |
if let Some(consume_message_concurrently_service) = | |
self.consume_message_concurrently_service.as_mut() | |
{ | |
consume_message_concurrently_service | |
.consume_message_concurrently_service | |
.shutdown(await_terminate_millis); | |
} | |
self.persist_consumer_offset().await; | |
let client = self.client_instance.as_mut().unwrap(); | |
client | |
.unregister_consumer(self.consumer_config.consumer_group.as_str()) | |
.await; | |
client.shutdown().await; | |
info!( | |
"the consumer [{}] shutdown OK", | |
self.consumer_config.consumer_group.as_str() | |
); | |
self.rebalance_impl.destroy(); | |
} |
Which Issue(s) This PR Fixes(Closes)
Fixes #981
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
unregister_consumer
andunregister_producer
.Bug Fixes
shutdown
across multiple services to allow mutable references, enabling state modifications during shutdown.Improvements
PullAPIWrapper
for clearer structure and execution.Documentation