-
Notifications
You must be signed in to change notification settings - Fork 113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #981]🔥Optimize client clusting consume⚡️ #985
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -30,7 +30,6 @@ | |||||||||||||||
use rocketmq_common::WeakCellWrapper; | ||||||||||||||||
use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult; | ||||||||||||||||
use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel; | ||||||||||||||||
use rocketmq_runtime::RocketMQRuntime; | ||||||||||||||||
use tracing::info; | ||||||||||||||||
use tracing::warn; | ||||||||||||||||
|
||||||||||||||||
|
@@ -53,7 +52,7 @@ | |||||||||||||||
pub(crate) consumer_config: ArcRefCellWrapper<ConsumerConfig>, | ||||||||||||||||
pub(crate) consumer_group: Arc<String>, | ||||||||||||||||
pub(crate) message_listener: ArcBoxMessageListenerConcurrently, | ||||||||||||||||
pub(crate) consume_runtime: Arc<RocketMQRuntime>, | ||||||||||||||||
// pub(crate) consume_runtime: Arc<RocketMQRuntime>, | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
impl ConsumeMessageConcurrentlyService { | ||||||||||||||||
|
@@ -71,10 +70,10 @@ | |||||||||||||||
consumer_config, | ||||||||||||||||
consumer_group: Arc::new(consumer_group), | ||||||||||||||||
message_listener, | ||||||||||||||||
consume_runtime: Arc::new(RocketMQRuntime::new_multi( | ||||||||||||||||
/*consume_runtime: Arc::new(RocketMQRuntime::new_multi( | ||||||||||||||||
consume_thread as usize, | ||||||||||||||||
"ConsumeMessageThread_", | ||||||||||||||||
)), | ||||||||||||||||
)),*/ | ||||||||||||||||
Comment on lines
+73
to
+76
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Eliminate commented-out initialization code The initialization of Apply this diff to remove the commented-out code: -/*consume_runtime: Arc::new(RocketMQRuntime::new_multi(
- consume_thread as usize,
- "ConsumeMessageThread_",
-)),*/ Committable suggestion
Suggested change
|
||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
|
@@ -140,12 +139,6 @@ | |||||||||||||||
} | ||||||||||||||||
if !msg_back_failed.is_empty() { | ||||||||||||||||
consume_request.msgs.append(&mut msg_back_success); | ||||||||||||||||
/* let msg_back_failed_switched = msg_back_failed | ||||||||||||||||
.into_iter() | ||||||||||||||||
.map(|msg| MessageClientExt { | ||||||||||||||||
message_ext_inner: msg, | ||||||||||||||||
}) | ||||||||||||||||
.collect();*/ | ||||||||||||||||
self.submit_consume_request_later( | ||||||||||||||||
msg_back_failed, | ||||||||||||||||
consume_request.process_queue.clone(), | ||||||||||||||||
|
@@ -182,7 +175,7 @@ | |||||||||||||||
message_queue: MessageQueue, | ||||||||||||||||
) { | ||||||||||||||||
let this = self.clone(); | ||||||||||||||||
self.consume_runtime.get_handle().spawn(async move { | ||||||||||||||||
tokio::spawn(async move { | ||||||||||||||||
Check warning on line 178 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs Codecov / codecov/patchrocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L178
|
||||||||||||||||
tokio::time::sleep(Duration::from_secs(5)).await; | ||||||||||||||||
this.submit_consume_request(msgs, process_queue, message_queue, true) | ||||||||||||||||
.await; | ||||||||||||||||
|
@@ -231,8 +224,8 @@ | |||||||||||||||
}); | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
fn shutdown(&self, await_terminate_millis: u64) { | ||||||||||||||||
todo!() | ||||||||||||||||
fn shutdown(&mut self, await_terminate_millis: u64) { | ||||||||||||||||
Check warning on line 227 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs Codecov / codecov/patchrocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L227
|
||||||||||||||||
// todo!() | ||||||||||||||||
Comment on lines
+227
to
+228
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Confirm necessity of mutable reference in The |
||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
fn update_core_pool_size(&self, core_pool_size: usize) { | ||||||||||||||||
|
@@ -267,7 +260,7 @@ | |||||||||||||||
dispatch_to_consume: bool, | ||||||||||||||||
) { | ||||||||||||||||
let consume_batch_size = self.consumer_config.consume_message_batch_max_size; | ||||||||||||||||
if msgs.len() < consume_batch_size as usize { | ||||||||||||||||
if msgs.len() <= consume_batch_size as usize { | ||||||||||||||||
Check warning on line 263 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs Codecov / codecov/patchrocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L263
|
||||||||||||||||
let mut consume_request = ConsumeRequest { | ||||||||||||||||
msgs: msgs.clone(), | ||||||||||||||||
message_listener: self.message_listener.clone(), | ||||||||||||||||
|
@@ -278,7 +271,8 @@ | |||||||||||||||
default_mqpush_consumer_impl: self.default_mqpush_consumer_impl.clone(), | ||||||||||||||||
}; | ||||||||||||||||
let consume_message_concurrently_service = self.clone(); | ||||||||||||||||
self.consume_runtime.get_handle().spawn(async move { | ||||||||||||||||
|
||||||||||||||||
tokio::spawn(async move { | ||||||||||||||||
Check warning on line 275 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs Codecov / codecov/patchrocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L275
|
||||||||||||||||
consume_request | ||||||||||||||||
.run(consume_message_concurrently_service) | ||||||||||||||||
.await | ||||||||||||||||
|
@@ -301,7 +295,12 @@ | |||||||||||||||
default_mqpush_consumer_impl: self.default_mqpush_consumer_impl.clone(), | ||||||||||||||||
}; | ||||||||||||||||
let consume_message_concurrently_service = self.clone(); | ||||||||||||||||
self.consume_runtime.get_handle().spawn(async move { | ||||||||||||||||
/* self.consume_runtime.get_handle().spawn(async move { | ||||||||||||||||
consume_request | ||||||||||||||||
.run(consume_message_concurrently_service) | ||||||||||||||||
.await | ||||||||||||||||
});*/ | ||||||||||||||||
tokio::spawn(async move { | ||||||||||||||||
Check warning on line 303 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs Codecov / codecov/patchrocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L303
|
||||||||||||||||
Comment on lines
+298
to
+303
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove obsolete commented-out task spawning code The previous code using Apply this diff to remove the commented-out spawning code: -/* self.consume_runtime.get_handle().spawn(async move {
- consume_request
- .run(consume_message_concurrently_service)
- .await
-});*/ Committable suggestion
Suggested change
|
||||||||||||||||
consume_request | ||||||||||||||||
.run(consume_message_concurrently_service) | ||||||||||||||||
.await | ||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,8 +33,8 @@ | |
todo!() | ||
} | ||
|
||
fn shutdown(&self, await_terminate_millis: u64) { | ||
todo!() | ||
fn shutdown(&mut self, await_terminate_millis: u64) { | ||
unimplemented!("shutdown") | ||
Comment on lines
+36
to
+37
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Implement the shutdown logic. The function signature change to take a mutable reference to However, the function is currently unimplemented. Please implement the shutdown logic, considering the following:
|
||
} | ||
|
||
fn update_core_pool_size(&self, core_pool_size: usize) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -61,7 +61,7 @@ | |
// nothing to do | ||
} | ||
|
||
fn shutdown(&self, await_terminate_millis: u64) { | ||
fn shutdown(&mut self, await_terminate_millis: u64) { | ||
Check warning on line 64 in rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs Codecov / codecov/patchrocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs#L64
|
||
todo!() | ||
} | ||
Comment on lines
+64
to
66
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Complete the The function signature change to take a mutable reference to However, the function body is currently unimplemented and contains a |
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,7 +33,7 @@ | |
todo!() | ||
} | ||
|
||
fn shutdown(&self, await_terminate_millis: u64) { | ||
fn shutdown(&mut self, await_terminate_millis: u64) { | ||
todo!() | ||
} | ||
Comment on lines
+36
to
38
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Breaking change: The function signature has been modified to take a mutable reference to Additionally, the function body is not implemented and contains a |
||
|
||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -47,6 +47,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use rocketmq_remoting::protocol::namespace_util::NamespaceUtil; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use rocketmq_remoting::runtime::RPCHook; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use tokio::runtime::Handle; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use tokio::sync::Mutex; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use tracing::error; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use tracing::info; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use tracing::warn; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -96,6 +97,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
#[derive(Clone)] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pub struct DefaultMQPushConsumerImpl { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pub(crate) global_lock: Arc<Mutex<()>>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pub(crate) pull_time_delay_mills_when_exception: u64, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pub(crate) client_config: ArcRefCellWrapper<ClientConfig>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pub(crate) consumer_config: ArcRefCellWrapper<ConsumerConfig>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -139,6 +141,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
rpc_hook: Option<Arc<Box<dyn RPCHook>>>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) -> Self { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let mut this = Self { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
global_lock: Arc::new(Default::default()), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pull_time_delay_mills_when_exception: 3_000, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
client_config: ArcRefCellWrapper::new(client_config.clone()), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
consumer_config: consumer_config.clone(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -372,6 +375,52 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Ok(()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pub async fn shutdown(&mut self, await_terminate_millis: u64) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let _lock = self.global_lock.lock().await; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
match *self.service_state { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ServiceState::CreateJust => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
warn!( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"the consumer [{}] do not start, so do nothing", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.consumer_config.consumer_group | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ServiceState::Running => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if let Some(consume_message_concurrently_service) = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.consume_message_concurrently_service.as_mut() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
consume_message_concurrently_service | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.consume_message_concurrently_service | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.shutdown(await_terminate_millis); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.persist_consumer_offset().await; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let client = self.client_instance.as_mut().unwrap(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
client | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.unregister_consumer(self.consumer_config.consumer_group.as_str()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.await; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
client.shutdown().await; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
info!( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"the consumer [{}] shutdown OK", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.consumer_config.consumer_group.as_str() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.rebalance_impl.destroy(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
*self.service_state = ServiceState::ShutdownAlready; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ServiceState::ShutdownAlready => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
warn!( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"the consumer [{}] has been shutdown, do nothing", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.consumer_config.consumer_group | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ServiceState::StartFailed => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
warn!( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"the consumer [{}] start failed, do nothing", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.consumer_config.consumer_group | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
drop(_lock); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+378
to
+422
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid holding the In the Suggestion: Refactor the Proposed change: pub async fn shutdown(&mut self, await_terminate_millis: u64) {
- let _lock = self.global_lock.lock().await;
+ {
+ let _lock = self.global_lock.lock().await;
+ match *self.service_state {
+ ServiceState::CreateJust => {
+ warn!(
+ "the consumer [{}] do not start, so do nothing",
+ self.consumer_config.consumer_group
+ );
+ return;
+ }
+ ServiceState::Running => {
+ *self.service_state = ServiceState::ShutdownAlready;
+ }
+ ServiceState::ShutdownAlready => {
+ warn!(
+ "the consumer [{}] has been shutdown, do nothing",
+ self.consumer_config.consumer_group
+ );
+ return;
+ }
+ ServiceState::StartFailed => {
+ warn!(
+ "the consumer [{}] start failed, do nothing",
+ self.consumer_config.consumer_group
+ );
+ return;
+ }
+ }
+ } // The lock is released here
if let Some(consume_message_concurrently_service) =
self.consume_message_concurrently_service.as_mut()
{
consume_message_concurrently_service
.consume_message_concurrently_service
.shutdown(await_terminate_millis);
}
self.persist_consumer_offset().await;
let client = self.client_instance.as_mut().unwrap();
client
.unregister_consumer(self.consumer_config.consumer_group.as_str())
.await;
client.shutdown().await;
info!(
"the consumer [{}] shutdown OK",
self.consumer_config.consumer_group.as_str()
);
self.rebalance_impl.destroy();
} Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async fn update_topic_subscribe_info_when_subscription_changed(&mut self) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if DO_NOT_UPDATE_TOPIC_SUBSCRIBE_INFO_WHEN_SUBSCRIPTION_CHANGED { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -1239,8 +1288,27 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Ok(false) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
fn persist_consumer_offset(&self) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
todo!() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async fn persist_consumer_offset(&self) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if let Err(err) = self.make_sure_state_ok() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
error!( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"group: {} persistConsumerOffset exception:{}", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.consumer_config.consumer_group, err | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let guard = self | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.rebalance_impl | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.rebalance_impl_inner | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.process_queue_table | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.read() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.await; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let allocate_mq = guard.keys().cloned().collect::<HashSet<_>>(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.offset_store | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.as_ref() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.unwrap() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.mut_from_ref() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.persist_all(&allocate_mq) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.await; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async fn update_topic_subscribe_info(&mut self, topic: &str, info: &HashSet<MessageQueue>) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,6 +43,7 @@ use crate::factory::mq_client_instance::MQClientInstance; | |
use crate::hook::filter_message_context::FilterMessageContext; | ||
use crate::hook::filter_message_hook::FilterMessageHook; | ||
use crate::implementation::communication_mode::CommunicationMode; | ||
use crate::implementation::mq_client_api_impl::MQClientAPIImpl; | ||
use crate::Result; | ||
|
||
#[derive(Clone)] | ||
|
@@ -107,6 +108,7 @@ impl PullAPIWrapper { | |
self.client_instance.client_config.decode_read_body, | ||
self.client_instance.client_config.decode_decompress_body, | ||
); | ||
|
||
let mut need_decode_inner_message = false; | ||
for msg in &msg_vec { | ||
if MessageSysFlag::check( | ||
|
@@ -191,6 +193,7 @@ impl PullAPIWrapper { | |
msg.message_ext_inner.queue_offset += offset_delta; | ||
} | ||
} | ||
|
||
pull_result_ext.pull_result.msg_found_list = msg_list_filter_again | ||
.into_iter() | ||
.map(ArcRefCellWrapper::new) | ||
|
@@ -233,7 +236,7 @@ impl PullAPIWrapper { | |
pull_callback: PCB, | ||
) -> Result<Option<PullResultExt>> | ||
where | ||
PCB: PullCallback, | ||
PCB: PullCallback + 'static, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Potential over-restriction with Adding a Consider refactoring to remove the |
||
{ | ||
let broker_name = self | ||
.client_instance | ||
|
@@ -320,16 +323,16 @@ impl PullAPIWrapper { | |
.compute_pull_from_which_filter_server(mq.get_topic(), broker_addr.as_str()) | ||
.await?; | ||
} | ||
self.client_instance | ||
.get_mq_client_api_impl() | ||
.pull_message( | ||
broker_addr.as_str(), | ||
request_header, | ||
timeout_millis, | ||
communication_mode, | ||
pull_callback, | ||
) | ||
.await | ||
|
||
MQClientAPIImpl::pull_message( | ||
self.client_instance.get_mq_client_api_impl(), | ||
broker_addr, | ||
request_header, | ||
timeout_millis, | ||
communication_mode, | ||
pull_callback, | ||
) | ||
.await | ||
Comment on lines
+326
to
+335
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure correct invocation of The If - MQClientAPIImpl::pull_message(
- self.client_instance.get_mq_client_api_impl(),
+ self.client_instance.get_mq_client_api_impl().pull_message(
broker_addr,
request_header,
timeout_millis,
communication_mode,
pull_callback,
)
.await
|
||
} else { | ||
Err(MQClientErr( | ||
-1, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -70,4 +70,5 @@ pub trait RebalanceLocal { | |
async fn do_rebalance(&mut self, is_order: bool) -> bool; | ||
|
||
fn client_rebalance(&mut self, topic: &str) -> bool; | ||
fn destroy(&mut self); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider adding a default implementation for the Adding a new method to the trait is a breaking change, as it requires all existing implementers of the trait to implement this method. To avoid breaking changes and provide a fallback behavior for implementers who may not need any cleanup, consider adding a default implementation to the trait. Here's an example of how you can add a default implementation: fn destroy(&mut self) {
// Default implementation that does nothing
} This way, implementers who need cleanup behavior can override the default implementation, while others can rely on the default implementation. |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove commented-out field
consume_runtime
for cleaner codeThe field
consume_runtime
is commented out in the struct definition. If it's no longer required, consider removing it entirely to maintain code cleanliness and readability.Apply this diff to remove the commented-out field:
-// pub(crate) consume_runtime: Arc<RocketMQRuntime>,
Committable suggestion