Skip to content

Commit

Permalink
Move Left/LeftAnti filtered SMJ join out of join partial stage
Browse files Browse the repository at this point in the history
  • Loading branch information
comphead committed Oct 18, 2024
1 parent 227ab72 commit cad91c2
Showing 1 changed file with 20 additions and 4 deletions.
24 changes: 20 additions & 4 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,12 @@ impl Stream for SMJStream {
match self.current_ordering {
Ordering::Less | Ordering::Equal => {
if !streamed_exhausted {
if self.filter.is_some() && matches!(self.join_type, JoinType::Left | JoinType::LeftSemi) {
if self.filter.is_some()
&& matches!(
self.join_type,
JoinType::Left | JoinType::LeftSemi
)
{
self.freeze_all()?;

if !self.output_record_batches.batches.is_empty()
Expand Down Expand Up @@ -883,7 +888,11 @@ impl Stream for SMJStream {
self.freeze_all()?;
if !self.output_record_batches.batches.is_empty() {
let record_batch = self.output_record_batch_and_reset()?;
let record_batch = if !(self.filter.is_some() && matches!(self.join_type, JoinType::Left | JoinType::LeftSemi)) {
let record_batch = if !(self.filter.is_some()
&& matches!(
self.join_type,
JoinType::Left | JoinType::LeftSemi
)) {
record_batch
} else {
RecordBatch::new_empty(Arc::clone(&self.schema))
Expand All @@ -898,7 +907,12 @@ impl Stream for SMJStream {
self.freeze_all()?;

if !self.output_record_batches.batches.is_empty() {
if self.filter.is_some() && matches!(self.join_type, JoinType::Left | JoinType::LeftSemi) {
if self.filter.is_some()
&& matches!(
self.join_type,
JoinType::Left | JoinType::LeftSemi
)
{
let out = self.filter_joined_batch()?;
return Poll::Ready(Some(Ok(out)));
} else {
Expand Down Expand Up @@ -1627,7 +1641,9 @@ impl SMJStream {
self.output_size -= record_batch.num_rows();
}

if !(self.filter.is_some() && matches!(self.join_type, JoinType::Left | JoinType::LeftSemi)) {
if !(self.filter.is_some()
&& matches!(self.join_type, JoinType::Left | JoinType::LeftSemi))
{
self.output_record_batches.batches.clear();
}
Ok(record_batch)
Expand Down

0 comments on commit cad91c2

Please sign in to comment.