Skip to content

Commit

Permalink
Sliding Nested Join Algorithm (#142)
Browse files Browse the repository at this point in the history
  • Loading branch information
metesynnada committed Dec 15, 2023
1 parent 41290a6 commit cef0a6d
Show file tree
Hide file tree
Showing 9 changed files with 2,413 additions and 64 deletions.
3 changes: 3 additions & 0 deletions datafusion/physical-plan/src/joins/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
pub use cross_join::CrossJoinExec;
pub use hash_join::HashJoinExec;
pub use nested_loop_join::NestedLoopJoinExec;
pub use sliding_nested_loop_join::SlidingNestedLoopJoinExec;
// Note: SortMergeJoin is not used in plans yet
pub use sort_merge_join::SortMergeJoinExec;
pub use symmetric_hash_join::SymmetricHashJoinExec;
mod cross_join;
mod hash_join;
mod nested_loop_join;
mod sliding_nested_loop_join;
mod sliding_window_join_utils;
mod sort_merge_join;
mod stream_join_utils;
mod symmetric_hash_join;
Expand Down
78 changes: 18 additions & 60 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ impl ExecutionPlan for NestedLoopJoinExec {

// For the nested loop join, different join type need the different distribution for
// left and right node.
fn distribution_from_join_type(join_type: &JoinType) -> Vec<Distribution> {
pub fn distribution_from_join_type(join_type: &JoinType) -> Vec<Distribution> {
match join_type {
JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => {
// need the left data, and the right should be one partition
Expand Down Expand Up @@ -745,7 +745,10 @@ mod tests {

use super::*;
use crate::{
common, expressions::Column, memory::MemoryExec, repartition::RepartitionExec,
assert_batches_sorted_eq,
common::assert_contains,
execution::runtime_env::{RuntimeConfig, RuntimeEnv},
physical_plan::{expressions::Column, memory::MemoryExec},
test::build_table_i32,
};

Expand All @@ -754,6 +757,10 @@ mod tests {
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};

use crate::physical_plan::joins::test_utils::partitioned_nested_join_with_filter;
use crate::physical_plan::joins::utils::JoinSide;
use datafusion_common::ScalarValue;
use datafusion_physical_expr::PhysicalExpr;

fn build_table(
Expand Down Expand Up @@ -826,62 +833,13 @@ mod tests {
JoinFilter::new(filter_expression, column_indices, intermediate_schema)
}

async fn multi_partitioned_join_collect(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
join_type: &JoinType,
join_filter: Option<JoinFilter>,
context: Arc<TaskContext>,
) -> Result<(Vec<String>, Vec<RecordBatch>)> {
let partition_count = 4;
let mut output_partition = 1;
let distribution = distribution_from_join_type(join_type);
// left
let left = if matches!(distribution[0], Distribution::SinglePartition) {
left
} else {
output_partition = partition_count;
Arc::new(RepartitionExec::try_new(
left,
Partitioning::RoundRobinBatch(partition_count),
)?)
} as Arc<dyn ExecutionPlan>;

let right = if matches!(distribution[1], Distribution::SinglePartition) {
right
} else {
output_partition = partition_count;
Arc::new(RepartitionExec::try_new(
right,
Partitioning::RoundRobinBatch(partition_count),
)?)
} as Arc<dyn ExecutionPlan>;

// Use the required distribution for nested loop join to test partition data
let nested_loop_join =
NestedLoopJoinExec::try_new(left, right, join_filter, join_type)?;
let columns = columns(&nested_loop_join.schema());
let mut batches = vec![];
for i in 0..output_partition {
let stream = nested_loop_join.execute(i, context.clone())?;
let more_batches = common::collect(stream).await?;
batches.extend(
more_batches
.into_iter()
.filter(|b| b.num_rows() > 0)
.collect::<Vec<_>>(),
);
}
Ok((columns, batches))
}

#[tokio::test]
async fn join_inner_with_filter() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let left = build_left_table();
let right = build_right_table();
let filter = prepare_join_filter();
let (columns, batches) = multi_partitioned_join_collect(
let (columns, batches) = partitioned_nested_join_with_filter(
left,
right,
&JoinType::Inner,
Expand Down Expand Up @@ -910,7 +868,7 @@ mod tests {
let right = build_right_table();

let filter = prepare_join_filter();
let (columns, batches) = multi_partitioned_join_collect(
let (columns, batches) = partitioned_nested_join_with_filter(
left,
right,
&JoinType::Left,
Expand Down Expand Up @@ -941,7 +899,7 @@ mod tests {
let right = build_right_table();

let filter = prepare_join_filter();
let (columns, batches) = multi_partitioned_join_collect(
let (columns, batches) = partitioned_nested_join_with_filter(
left,
right,
&JoinType::Right,
Expand Down Expand Up @@ -972,7 +930,7 @@ mod tests {
let right = build_right_table();

let filter = prepare_join_filter();
let (columns, batches) = multi_partitioned_join_collect(
let (columns, batches) = partitioned_nested_join_with_filter(
left,
right,
&JoinType::Full,
Expand Down Expand Up @@ -1005,7 +963,7 @@ mod tests {
let right = build_right_table();

let filter = prepare_join_filter();
let (columns, batches) = multi_partitioned_join_collect(
let (columns, batches) = partitioned_nested_join_with_filter(
left,
right,
&JoinType::LeftSemi,
Expand Down Expand Up @@ -1034,7 +992,7 @@ mod tests {
let right = build_right_table();

let filter = prepare_join_filter();
let (columns, batches) = multi_partitioned_join_collect(
let (columns, batches) = partitioned_nested_join_with_filter(
left,
right,
&JoinType::LeftAnti,
Expand Down Expand Up @@ -1064,7 +1022,7 @@ mod tests {
let right = build_right_table();

let filter = prepare_join_filter();
let (columns, batches) = multi_partitioned_join_collect(
let (columns, batches) = partitioned_nested_join_with_filter(
left,
right,
&JoinType::RightSemi,
Expand Down Expand Up @@ -1093,7 +1051,7 @@ mod tests {
let right = build_right_table();

let filter = prepare_join_filter();
let (columns, batches) = multi_partitioned_join_collect(
let (columns, batches) = partitioned_nested_join_with_filter(
left,
right,
&JoinType::RightAnti,
Expand Down Expand Up @@ -1147,7 +1105,7 @@ mod tests {
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);

let err = multi_partitioned_join_collect(
let err = partitioned_nested_join_with_filter(
left.clone(),
right.clone(),
&join_type,
Expand Down
Loading

0 comments on commit cef0a6d

Please sign in to comment.