From c4661750b813f6d38c44ba927007764172e060c0 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Mon, 4 Mar 2024 08:38:01 -0500 Subject: [PATCH 1/2] Get rid of panics in logs --- .../core/src/execution_plans/shuffle_reader.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs b/ballista/core/src/execution_plans/shuffle_reader.rs index ec7f5cdfb..953b4d026 100644 --- a/ballista/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/core/src/execution_plans/shuffle_reader.rs @@ -432,7 +432,10 @@ fn send_fetch_partitions_with_fallback( join_handles.push(tokio::spawn(async move { for p in remote_locations.iter() { let now = Instant::now(); - let permit = sender_to_remote.reserve().await.unwrap(); + let Ok(permit) = sender_to_remote.reserve().await else { + // receiver has been dropped so we can return here + return; + }; SHUFFLE_READER_FETCH_PARTITION_TOTAL .with_label_values(&["remote"]) .inc(); @@ -475,7 +478,10 @@ fn send_fetch_partitions_with_fallback( join_handles.push(tokio::spawn(async move { while let Some(partition) = failed_partition_receiver.recv().await { let now = Instant::now(); - let permit = response_sender.reserve().await.unwrap(); + let Ok(permit) = response_sender.reserve().await else { + // receiver has been dropped so we can return here + return; + }; SHUFFLE_READER_FETCH_PARTITION_TOTAL .with_label_values(&["object_store"]) .inc(); @@ -547,7 +553,11 @@ fn send_fetch_partitions( join_handles.push(tokio::spawn(async move { for p in remote_locations.iter() { let now = Instant::now(); - let permit = response_sender.reserve().await.unwrap(); + let Ok(permit) = response_sender.reserve().await else { + //receiver has been dropped so we can return + return; + }; + SHUFFLE_READER_FETCH_PARTITION_TOTAL .with_label_values(&["remote"]) .inc(); From 665b134e9ca7b3ffa14c4f394737e3b2859a81d7 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Tue, 5 Mar 2024 09:45:18 -0500 Subject: [PATCH 2/2] empty commit