Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix NestedLoopJoin performance regression #12531

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 82 additions & 29 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ use crate::{

use arrow::array::{BooleanBufferBuilder, UInt32Array, UInt64Array};
use arrow::compute::concat_batches;
use arrow::datatypes::{Schema, SchemaRef, UInt64Type};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow::util::bit_util;
use arrow_array::PrimitiveArray;
use datafusion_common::{exec_datafusion_err, JoinSide, Result, Statistics};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -348,6 +347,11 @@ impl ExecutionPlan for NestedLoopJoinExec {

let outer_table = self.right.execute(partition, context)?;

let indices_cache = (UInt64Array::new_null(0), UInt32Array::new_null(0));

// Right side has an order and it is maintained during operation.
let right_side_ordered =
self.maintains_input_order()[1] && self.right.output_ordering().is_some();
Ok(Box::pin(NestedLoopJoinStream {
schema: Arc::clone(&self.schema),
filter: self.filter.clone(),
Expand All @@ -357,6 +361,8 @@ impl ExecutionPlan for NestedLoopJoinExec {
is_exhausted: false,
column_indices: self.column_indices.clone(),
join_metrics,
indices_cache,
right_side_ordered,
}))
}

Expand Down Expand Up @@ -456,21 +462,74 @@ struct NestedLoopJoinStream {
// null_equals_null: bool
/// Join execution metrics
join_metrics: BuildProbeJoinMetrics,
/// Cache for join indices calculations
indices_cache: (UInt64Array, UInt32Array),
/// Whether the right side is ordered
right_side_ordered: bool,
}

/// Creates a Cartesian product of two input batches, preserving the order of the right batch,
/// and applying a join filter if provided.
///
/// # Example
/// Input:
/// left = [0, 1], right = [0, 1, 2]
///
/// Output:
/// left_indices = [0, 1, 0, 1, 0, 1], right_indices = [0, 0, 1, 1, 2, 2]
///
/// Input:
/// left = [0, 1, 2], right = [0, 1, 2, 3], filter = left.a != right.a
///
/// Output:
/// left_indices = [1, 2, 0, 2, 0, 1, 0, 1, 2], right_indices = [0, 0, 1, 1, 2, 2, 3, 3, 3]
fn build_join_indices(
right_row_index: usize,
left_batch: &RecordBatch,
right_batch: &RecordBatch,
filter: Option<&JoinFilter>,
indices_cache: &mut (UInt64Array, UInt32Array),
) -> Result<(UInt64Array, UInt32Array)> {
// left indices: [0, 1, 2, 3, 4, ..., left_row_count]
// right indices: [right_index, right_index, ..., right_index]

let left_row_count = left_batch.num_rows();
let left_indices = UInt64Array::from_iter_values(0..(left_row_count as u64));
let right_indices = UInt32Array::from(vec![right_row_index as u32; left_row_count]);
// in the nested loop join, the filter can contain non-equal and equal condition.
let right_row_count = right_batch.num_rows();
let output_row_count = left_row_count * right_row_count;

// We always use the same indices before applying the filter, so we can cache them
let (left_indices_cache, right_indices_cache) = indices_cache;
let cached_output_row_count = left_indices_cache.len();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of 25 rows build-side there are 200k arrays, for 500 rows -- 4kk and so on (I suppose we don't need that much data on the build side to reach GBs size for these arrays).

I understand that we still will have to create interemediate batches to apply filter, and produce output batches, but I suppose, that starting from some point the size of these caches will become meaningful.

Copy link
Contributor Author

@alihan-synnada alihan-synnada Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can do away with the cache or make it optional. In case we remove the cache, we could create the indices and apply the filter in chunks similar to before. If we pass in a range that we then use to calculate the indices for instead of creating right_batch.num_rows() chunks, we can control the size of the intermediate batches too. Something like (0..output_row_count).chunks(CHUNK_SIZE) should do the trick, now that we create the indices by mapping the current row index.

I believe it can bring the performance without cache down to a similar level to before the regression, maybe even better. I'll run a few benchmarks with this setup without a cache and update the benchmarks table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The chunks approach didn't change the performance, but it helped reduce the sizes of the intermediate batches. The 10% performance hit without a cache comes from the way the arrays are constructed and I couldn't find a faster approach for now. I suggest we go with the cached approach for now. When the issue that enables NLJ to emit massive batches is implemented, we can choose between the cached and chunked approaches depending on NLJ's output size. I'll open an issue about it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I will merge this soon to avoid performance issues in any upcoming release unless there is more feedback. We seem to gain 20% performance relative to how it was before with caches, and we can migrate to a cached-vs-chunked-depending-on-output-batch-size approach in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The chunks approach didn't change the performance, but it helped reduce the sizes of the intermediate batches.

Thank you for checking this option


let (left_indices, right_indices) =
match output_row_count.cmp(&cached_output_row_count) {
std::cmp::Ordering::Equal => {
// Reuse the cached indices
(left_indices_cache.clone(), right_indices_cache.clone())
}
std::cmp::Ordering::Less => {
// Left_row_count never changes because it's the build side. The changes to the
// right_row_count can be handled trivially by taking the first output_row_count
// elements of the cache because of how the indices are generated.
// (See the Ordering::Greater match arm)
(
left_indices_cache.slice(0, output_row_count),
right_indices_cache.slice(0, output_row_count),
)
}
std::cmp::Ordering::Greater => {
// Rebuild the indices cache

// Produces 0, 1, 2, 0, 1, 2, 0, 1, 2, ...
*left_indices_cache = UInt64Array::from_iter_values(
(0..output_row_count as u64).map(|i| i % left_row_count as u64),
);

// Produces 0, 0, 0, 1, 1, 1, 2, 2, 2, ...
*right_indices_cache = UInt32Array::from_iter_values(
(0..output_row_count as u32).map(|i| i / left_row_count as u32),
);

(left_indices_cache.clone(), right_indices_cache.clone())
}
};

if let Some(filter) = filter {
apply_join_filter_to_indices(
left_batch,
Expand Down Expand Up @@ -524,6 +583,8 @@ impl NestedLoopJoinStream {
&self.column_indices,
&self.schema,
visited_left_side,
&mut self.indices_cache,
self.right_side_ordered,
);

// Recording time & updating output metrics
Expand Down Expand Up @@ -587,6 +648,7 @@ impl NestedLoopJoinStream {
}
}

#[allow(clippy::too_many_arguments)]
fn join_left_and_right_batch(
left_batch: &RecordBatch,
right_batch: &RecordBatch,
Expand All @@ -595,27 +657,18 @@ fn join_left_and_right_batch(
column_indices: &[ColumnIndex],
schema: &Schema,
visited_left_side: &SharedBitmapBuilder,
indices_cache: &mut (UInt64Array, UInt32Array),
right_side_ordered: bool,
) -> Result<RecordBatch> {
let indices = (0..right_batch.num_rows())
.map(|right_row_index| {
build_join_indices(right_row_index, left_batch, right_batch, filter)
})
.collect::<Result<Vec<(UInt64Array, UInt32Array)>>>()
.map_err(|e| {
exec_datafusion_err!(
"Fail to build join indices in NestedLoopJoinExec, error:{e}"
)
})?;

let mut left_indices_builder: Vec<u64> = vec![];
let mut right_indices_builder: Vec<u32> = vec![];
for (left_side, right_side) in indices {
left_indices_builder.extend(left_side.values());
right_indices_builder.extend(right_side.values());
}
let (left_side, right_side) =
build_join_indices(left_batch, right_batch, filter, indices_cache).map_err(
|e| {
exec_datafusion_err!(
"Fail to build join indices in NestedLoopJoinExec, error: {e}"
)
},
)?;

let left_side: PrimitiveArray<UInt64Type> = left_indices_builder.into();
let right_side = right_indices_builder.into();
// set the left bitmap
// and only full join need the left bitmap
if need_produce_result_in_final(join_type) {
Expand All @@ -630,7 +683,7 @@ fn join_left_and_right_batch(
right_side,
0..right_batch.num_rows(),
join_type,
true,
right_side_ordered,
);

build_batch_from_indices(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2136,10 +2136,10 @@ FROM (select t1_id from join_t1 where join_t1.t1_id > 22) as join_t1
RIGHT JOIN (select t2_id from join_t2 where join_t2.t2_id > 11) as join_t2
ON join_t1.t1_id < join_t2.t2_id
----
NULL 22
33 44
33 55
44 55
NULL 22

#####
# Configuration teardown
Expand Down