From 3523aa2773d2261f6968f1b1594c0d64ecef4e35 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Mon, 29 May 2023 15:40:36 +0800 Subject: [PATCH] fix: deadlock when stop keepalive bg task --- cluster/src/shard_lock_manager.rs | 16 +++++++++++----- common_util/src/runtime/mod.rs | 6 ++++++ 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/cluster/src/shard_lock_manager.rs b/cluster/src/shard_lock_manager.rs index 7844c7434c..c1b7f957b9 100644 --- a/cluster/src/shard_lock_manager.rs +++ b/cluster/src/shard_lock_manager.rs @@ -466,10 +466,13 @@ impl ShardLock { // Wait for the lease check worker to stop. if let Some(handle) = self.lease_check_handle.take() { - if let Err(e) = handle.await { - warn!("Failed to wait for the lease check worker to stop, maybe it has exited so ignore it, shard_id:{}, err:{e}", self.shard_id); - } + handle.abort(); } + + info!( + "Finish exiting from background keepalive task, shard_id:{}", + self.shard_id + ); } async fn acquire_lock_with_lease(&self, lease_id: i64, etcd_client: &mut Client) -> Result<()> { @@ -711,7 +714,7 @@ impl ShardLockManager { let mut shard_locks = self.shard_locks.write().await; let shard_lock = shard_locks.remove(&shard_id); - match shard_lock { + let res = match shard_lock { Some(mut v) => { let mut etcd_client = self.etcd_client.clone(); v.revoke(&mut etcd_client).await?; @@ -723,7 +726,10 @@ impl ShardLockManager { warn!("The lock is not exist, shard_id:{shard_id}"); Ok(false) } - } + }; + + info!("Finish revoke lock for shard, shard_id:{shard_id}"); + res } } diff --git a/common_util/src/runtime/mod.rs b/common_util/src/runtime/mod.rs index e864513c7b..407577b097 100644 --- a/common_util/src/runtime/mod.rs +++ b/common_util/src/runtime/mod.rs @@ -101,6 +101,12 @@ pin_project! { } } +impl JoinHandle { + pub fn abort(&self) { + self.inner.abort(); + } +} + impl Future for JoinHandle { type Output = Result;