Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1076]PullMessageService and RebalanceService add shutdown method #1077

Merged
merged 4 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
use rocketmq_common::common::message::message_enum::MessageRequestMode;
use rocketmq_common::ArcRefCellWrapper;
use rocketmq_rust::Shutdown;
use tracing::info;
use tracing::warn;

Expand All @@ -27,19 +28,25 @@ use crate::factory::mq_client_instance::MQClientInstance;
#[derive(Clone)]
pub struct PullMessageService {
tx: Option<tokio::sync::mpsc::Sender<Box<dyn MessageRequest + Send + 'static>>>,
tx_shutdown: Option<tokio::sync::broadcast::Sender<()>>,
}

impl PullMessageService {
pub fn new() -> Self {
PullMessageService { tx: None }
PullMessageService {
tx: None,
tx_shutdown: None,
}
}
pub async fn start(&mut self, mut instance: ArcRefCellWrapper<MQClientInstance>) {
let (tx, mut rx) =
tokio::sync::mpsc::channel::<Box<dyn MessageRequest + Send + 'static>>(1024 * 4);
let (mut shutdown, tx_shutdown) = Shutdown::new(1);
self.tx = Some(tx);
self.tx_shutdown = Some(tx_shutdown);
tokio::spawn(async move {
info!(">>>>>>>>>>>>>>>>>>>>>>>PullMessageService started<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
while let Some(request) = rx.recv().await {
/*while let Some(request) = rx.recv().await {
if request.get_message_request_mode() == MessageRequestMode::Pull {
let pull_request =
unsafe { *Box::from_raw(Box::into_raw(request) as *mut PullRequest) };
Expand All @@ -49,6 +56,32 @@ impl PullMessageService {
unsafe { *Box::from_raw(Box::into_raw(request) as *mut PopRequest) };
PullMessageService::pop_message(pop_request, instance.as_mut()).await;
}
}*/
if shutdown.is_shutdown() {
info!("PullMessageService shutdown");
return;
}
loop {
tokio::select! {
_ = shutdown.recv() => {
info!("PullMessageService shutdown");
}
Some(request) = rx.recv() => {
if request.get_message_request_mode() == MessageRequestMode::Pull {
let pull_request =
unsafe { *Box::from_raw(Box::into_raw(request) as *mut PullRequest) };
PullMessageService::pull_message(pull_request, instance.as_mut()).await;
} else {
let pop_request =
unsafe { *Box::from_raw(Box::into_raw(request) as *mut PopRequest) };
PullMessageService::pop_message(pop_request, instance.as_mut()).await;
}
}
}
if shutdown.is_shutdown() {
info!("PullMessageService shutdown");
break;
}
Comment on lines +81 to +84
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify shutdown handling after the loop

After the tokio::select! loop, there's a check for shutdown.is_shutdown() (lines 81-84). Since the loop exits when a shutdown signal is received, this check and the associated break statement may be unnecessary. Consider restructuring to eliminate redundant code.

Apply this diff to simplify the loop:

                     }
-                    if shutdown.is_shutdown() {
-                        info!("PullMessageService shutdown");
-                        break;
-                    }
                 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if shutdown.is_shutdown() {
info!("PullMessageService shutdown");
break;
}
if shutdown.is_shutdown() {
info!("PullMessageService shutdown");
break;
}

}
});
}
Expand Down Expand Up @@ -106,4 +139,17 @@ impl PullMessageService {
warn!("Failed to send pull request to pull_tx, error: {:?}", e);
}
}

pub fn shutdown(&self) {
if let Some(tx_shutdown) = &self.tx_shutdown {
if let Err(e) = tx_shutdown.send(()) {
warn!("Failed to send shutdown signal to pull_tx, error: {:?}", e);
}
} else {
warn!(
"Attempted to shutdown but tx_shutdown is None. Ensure `start` is called before \
`shutdown`."
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ use std::time::Duration;

use once_cell::sync::Lazy;
use rocketmq_common::ArcRefCellWrapper;
use rocketmq_rust::Shutdown;
use tokio::select;
use tokio::sync::Notify;
use tokio::time::Instant;
use tracing::info;
use tracing::warn;

use crate::factory::mq_client_instance::MQClientInstance;

Expand All @@ -47,17 +49,21 @@ static MIN_INTERVAL: Lazy<Duration> = Lazy::new(|| {
#[derive(Clone)]
pub struct RebalanceService {
notify: Arc<Notify>,
tx_shutdown: Option<tokio::sync::broadcast::Sender<()>>,
}

impl RebalanceService {
pub fn new() -> Self {
RebalanceService {
notify: Arc::new(Notify::new()),
tx_shutdown: None,
}
}

pub async fn start(&mut self, mut instance: ArcRefCellWrapper<MQClientInstance>) {
let notify = self.notify.clone();
let (mut shutdown, tx_shutdown) = Shutdown::new(1);
self.tx_shutdown = Some(tx_shutdown);
tokio::spawn(async move {
let mut last_rebalance_timestamp = Instant::now();
let min_interval = *MIN_INTERVAL;
Expand All @@ -66,8 +72,12 @@ impl RebalanceService {
loop {
select! {
_ = notify.notified() => {}
_ = shutdown.recv() => {info!("RebalanceService shutdown");}
_ = tokio::time::sleep(real_wait_interval) => {}
}
if shutdown.is_shutdown() {
return;
}
let interval = Instant::now() - last_rebalance_timestamp;
if interval < min_interval {
real_wait_interval = min_interval - interval;
Expand All @@ -87,4 +97,17 @@ impl RebalanceService {
pub fn wakeup(&self) {
self.notify.notify_waiters();
}

pub fn shutdown(&self) {
if let Some(tx_shutdown) = &self.tx_shutdown {
if let Err(e) = tx_shutdown.send(()) {
warn!(
"Failed to send shutdown signal to RebalanceService, error: {:?}",
e
);
}
} else {
warn!("Shutdown called before start; no shutdown signal sent");
}
}
}
Loading