From cad91c2305a92a2fab070e34eb8ebf5d286fe9b3 Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 16 Oct 2024 09:33:59 -0700 Subject: [PATCH] Move Left/LeftAnti filtered SMJ join out of join partial stage --- .../src/joins/sort_merge_join.rs | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index de8c57278c56..2baeb29df635 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -817,7 +817,12 @@ impl Stream for SMJStream { match self.current_ordering { Ordering::Less | Ordering::Equal => { if !streamed_exhausted { - if self.filter.is_some() && matches!(self.join_type, JoinType::Left | JoinType::LeftSemi) { + if self.filter.is_some() + && matches!( + self.join_type, + JoinType::Left | JoinType::LeftSemi + ) + { self.freeze_all()?; if !self.output_record_batches.batches.is_empty() @@ -883,7 +888,11 @@ impl Stream for SMJStream { self.freeze_all()?; if !self.output_record_batches.batches.is_empty() { let record_batch = self.output_record_batch_and_reset()?; - let record_batch = if !(self.filter.is_some() && matches!(self.join_type, JoinType::Left | JoinType::LeftSemi)) { + let record_batch = if !(self.filter.is_some() + && matches!( + self.join_type, + JoinType::Left | JoinType::LeftSemi + )) { record_batch } else { RecordBatch::new_empty(Arc::clone(&self.schema)) @@ -898,7 +907,12 @@ impl Stream for SMJStream { self.freeze_all()?; if !self.output_record_batches.batches.is_empty() { - if self.filter.is_some() && matches!(self.join_type, JoinType::Left | JoinType::LeftSemi) { + if self.filter.is_some() + && matches!( + self.join_type, + JoinType::Left | JoinType::LeftSemi + ) + { let out = self.filter_joined_batch()?; return Poll::Ready(Some(Ok(out))); } else { @@ -1627,7 +1641,9 @@ impl SMJStream { self.output_size -= record_batch.num_rows(); } - if !(self.filter.is_some() && matches!(self.join_type, JoinType::Left | JoinType::LeftSemi)) { + if !(self.filter.is_some() + && matches!(self.join_type, JoinType::Left | JoinType::LeftSemi)) + { self.output_record_batches.batches.clear(); } Ok(record_batch)