From 45926ab28d8a7962f2fec41bc23ff89eda83c0e4 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Tue, 10 Dec 2024 04:26:19 +0800 Subject: [PATCH] Fix hash join with sort push down (#13560) * fix: join with sort push down * chore: insert some value * apply suggestion * recover handle_costom_pushdown change * apply suggestion * add more test * add partition --- .../src/physical_optimizer/sort_pushdown.rs | 101 +++++++++++ datafusion/sqllogictest/test_files/joins.slt | 171 +++++++++++++----- 2 files changed, 228 insertions(+), 44 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index d48c7118cb8e..6c761f674b3b 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -28,6 +28,7 @@ use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::tree_node::PlanContext; use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; +use arrow_schema::SchemaRef; use datafusion_common::tree_node::{ ConcreteTreeNode, Transformed, TreeNode, TreeNodeRecursion, @@ -38,6 +39,8 @@ use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::PhysicalSortRequirement; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; +use datafusion_physical_plan::joins::utils::ColumnIndex; +use datafusion_physical_plan::joins::HashJoinExec; /// This is a "data class" we use within the [`EnforceSorting`] rule to push /// down [`SortExec`] in the plan. In some cases, we can reduce the total @@ -294,6 +297,8 @@ fn pushdown_requirement_to_children( .then(|| LexRequirement::new(parent_required.to_vec())); Ok(Some(vec![req])) } + } else if let Some(hash_join) = plan.as_any().downcast_ref::() { + handle_hash_join(hash_join, parent_required) } else { handle_custom_pushdown(plan, parent_required, maintains_input_order) } @@ -606,6 +611,102 @@ fn handle_custom_pushdown( } } +// For hash join we only maintain the input order for the right child +// for join type: Inner, Right, RightSemi, RightAnti +fn handle_hash_join( + plan: &HashJoinExec, + parent_required: &LexRequirement, +) -> Result>>> { + // If there's no requirement from the parent or the plan has no children + // or the join type is not Inner, Right, RightSemi, RightAnti, return early + if parent_required.is_empty() || !plan.maintains_input_order()[1] { + return Ok(None); + } + + // Collect all unique column indices used in the parent-required sorting expression + let all_indices: HashSet = parent_required + .iter() + .flat_map(|order| { + collect_columns(&order.expr) + .into_iter() + .map(|col| col.index()) + .collect::>() + }) + .collect(); + + let column_indices = build_join_column_index(plan); + let projected_indices: Vec<_> = if let Some(projection) = &plan.projection { + projection.iter().map(|&i| &column_indices[i]).collect() + } else { + column_indices.iter().collect() + }; + let len_of_left_fields = projected_indices + .iter() + .filter(|ci| ci.side == JoinSide::Left) + .count(); + + let all_from_right_child = all_indices.iter().all(|i| *i >= len_of_left_fields); + + // If all columns are from the right child, update the parent requirements + if all_from_right_child { + // Transform the parent-required expression for the child schema by adjusting columns + let updated_parent_req = parent_required + .iter() + .map(|req| { + let child_schema = plan.children()[1].schema(); + let updated_columns = Arc::clone(&req.expr) + .transform_up(|expr| { + if let Some(col) = expr.as_any().downcast_ref::() { + let index = projected_indices[col.index()].index; + Ok(Transformed::yes(Arc::new(Column::new( + child_schema.field(index).name(), + index, + )))) + } else { + Ok(Transformed::no(expr)) + } + })? + .data; + Ok(PhysicalSortRequirement::new(updated_columns, req.options)) + }) + .collect::>>()?; + + // Populating with the updated requirements for children that maintain order + Ok(Some(vec![ + None, + Some(LexRequirement::new(updated_parent_req)), + ])) + } else { + Ok(None) + } +} + +// this function is used to build the column index for the hash join +// push down sort requirements to the right child +fn build_join_column_index(plan: &HashJoinExec) -> Vec { + let map_fields = |schema: SchemaRef, side: JoinSide| { + schema + .fields() + .iter() + .enumerate() + .map(|(index, _)| ColumnIndex { index, side }) + .collect::>() + }; + + match plan.join_type() { + JoinType::Inner | JoinType::Right => { + map_fields(plan.left().schema(), JoinSide::Left) + .into_iter() + .chain(map_fields(plan.right().schema(), JoinSide::Right)) + .collect::>() + } + JoinType::RightSemi | JoinType::RightAnti => { + map_fields(plan.right().schema(), JoinSide::Right) + } + _ => unreachable!("unexpected join type: {}", plan.join_type()), + } +} + /// Define the Requirements Compatibility #[derive(Debug)] enum RequirementsCompatibility { diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index e636e93007a4..62f625119897 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -2864,13 +2864,13 @@ explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id I ---- physical_plan 01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] -02)--SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----CoalesceBatchesExec: target_batch_size=2 -04)------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)] -05)--------CoalesceBatchesExec: target_batch_size=2 -06)----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 -07)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -08)--------------MemoryExec: partitions=1, partition_sizes=[1] +02)--CoalesceBatchesExec: target_batch_size=2 +03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)] +04)------CoalesceBatchesExec: target_batch_size=2 +05)--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 +06)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +07)------------MemoryExec: partitions=1, partition_sizes=[1] +08)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] 09)--------CoalesceBatchesExec: target_batch_size=2 10)----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 11)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 @@ -2905,13 +2905,13 @@ explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOI ---- physical_plan 01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] -02)--SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----CoalesceBatchesExec: target_batch_size=2 -04)------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)] -05)--------CoalesceBatchesExec: target_batch_size=2 -06)----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 -07)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -08)--------------MemoryExec: partitions=1, partition_sizes=[1] +02)--CoalesceBatchesExec: target_batch_size=2 +03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)] +04)------CoalesceBatchesExec: target_batch_size=2 +05)--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 +06)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +07)------------MemoryExec: partitions=1, partition_sizes=[1] +08)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] 09)--------CoalesceBatchesExec: target_batch_size=2 10)----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 11)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 @@ -2967,10 +2967,10 @@ explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id I ---- physical_plan 01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] -02)--SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----CoalesceBatchesExec: target_batch_size=2 -04)------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)] -05)--------MemoryExec: partitions=1, partition_sizes=[1] +02)--CoalesceBatchesExec: target_batch_size=2 +03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)] +04)------MemoryExec: partitions=1, partition_sizes=[1] +05)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] 06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 07)----------MemoryExec: partitions=1, partition_sizes=[1] @@ -3003,10 +3003,10 @@ explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOI ---- physical_plan 01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] -02)--SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----CoalesceBatchesExec: target_batch_size=2 -04)------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)] -05)--------MemoryExec: partitions=1, partition_sizes=[1] +02)--CoalesceBatchesExec: target_batch_size=2 +03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)] +04)------MemoryExec: partitions=1, partition_sizes=[1] +05)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] 06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 07)----------MemoryExec: partitions=1, partition_sizes=[1] @@ -3061,13 +3061,13 @@ explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHER ---- physical_plan 01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] -02)--SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----CoalesceBatchesExec: target_batch_size=2 -04)------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@1 != t1_name@0 -05)--------CoalesceBatchesExec: target_batch_size=2 -06)----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 -07)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -08)--------------MemoryExec: partitions=1, partition_sizes=[1] +02)--CoalesceBatchesExec: target_batch_size=2 +03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@1 != t1_name@0 +04)------CoalesceBatchesExec: target_batch_size=2 +05)--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 +06)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +07)------------MemoryExec: partitions=1, partition_sizes=[1] +08)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] 09)--------CoalesceBatchesExec: target_batch_size=2 10)----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 11)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 @@ -3083,13 +3083,13 @@ explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGH ---- physical_plan 01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] -02)--SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----CoalesceBatchesExec: target_batch_size=2 -04)------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@0 != t1_name@1 -05)--------CoalesceBatchesExec: target_batch_size=2 -06)----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 -07)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -08)--------------MemoryExec: partitions=1, partition_sizes=[1] +02)--CoalesceBatchesExec: target_batch_size=2 +03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@0 != t1_name@1 +04)------CoalesceBatchesExec: target_batch_size=2 +05)--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 +06)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +07)------------MemoryExec: partitions=1, partition_sizes=[1] +08)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] 09)--------CoalesceBatchesExec: target_batch_size=2 10)----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 11)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 @@ -3143,10 +3143,10 @@ explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHER ---- physical_plan 01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] -02)--SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----CoalesceBatchesExec: target_batch_size=2 -04)------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@1 != t1_name@0 -05)--------MemoryExec: partitions=1, partition_sizes=[1] +02)--CoalesceBatchesExec: target_batch_size=2 +03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@1 != t1_name@0 +04)------MemoryExec: partitions=1, partition_sizes=[1] +05)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] 06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 07)----------MemoryExec: partitions=1, partition_sizes=[1] @@ -3160,10 +3160,10 @@ explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGH ---- physical_plan 01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] -02)--SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----CoalesceBatchesExec: target_batch_size=2 -04)------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@0 != t1_name@1 -05)--------MemoryExec: partitions=1, partition_sizes=[1] +02)--CoalesceBatchesExec: target_batch_size=2 +03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@0 != t1_name@1 +04)------MemoryExec: partitions=1, partition_sizes=[1] +05)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] 06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 07)----------MemoryExec: partitions=1, partition_sizes=[1] @@ -4313,3 +4313,86 @@ physical_plan 04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(binary_col@0, binary_col@0)] 05)--------MemoryExec: partitions=1, partition_sizes=[1] 06)--------MemoryExec: partitions=1, partition_sizes=[1] + +# Test hash join sort push down +# Issue: https://github.com/apache/datafusion/issues/13559 +statement ok +CREATE TABLE test(a INT, b INT, c INT) + +statement ok +insert into test values (1,2,3), (4,5,6), (null, 7, 8), (8, null, 9), (9, 10, null) + +statement ok +set datafusion.execution.target_partitions = 2; + +query TT +explain select * from test where a in (select a from test where b > 3) order by c desc nulls first; +---- +logical_plan +01)Sort: test.c DESC NULLS FIRST +02)--LeftSemi Join: test.a = __correlated_sq_1.a +03)----TableScan: test projection=[a, b, c] +04)----SubqueryAlias: __correlated_sq_1 +05)------Projection: test.a +06)--------Filter: test.b > Int32(3) +07)----------TableScan: test projection=[a, b] +physical_plan +01)SortPreservingMergeExec: [c@2 DESC] +02)--CoalesceBatchesExec: target_batch_size=3 +03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(a@0, a@0)] +04)------CoalesceBatchesExec: target_batch_size=3 +05)--------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2 +06)----------CoalesceBatchesExec: target_batch_size=3 +07)------------FilterExec: b@1 > 3, projection=[a@0] +08)--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +09)----------------MemoryExec: partitions=1, partition_sizes=[1] +10)------SortExec: expr=[c@2 DESC], preserve_partitioning=[true] +11)--------CoalesceBatchesExec: target_batch_size=3 +12)----------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2 +13)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +14)--------------MemoryExec: partitions=1, partition_sizes=[1] + +query TT +explain select * from test where a in (select a from test where b > 3) order by c desc nulls last; +---- +logical_plan +01)Sort: test.c DESC NULLS LAST +02)--LeftSemi Join: test.a = __correlated_sq_1.a +03)----TableScan: test projection=[a, b, c] +04)----SubqueryAlias: __correlated_sq_1 +05)------Projection: test.a +06)--------Filter: test.b > Int32(3) +07)----------TableScan: test projection=[a, b] +physical_plan +01)SortPreservingMergeExec: [c@2 DESC NULLS LAST] +02)--CoalesceBatchesExec: target_batch_size=3 +03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(a@0, a@0)] +04)------CoalesceBatchesExec: target_batch_size=3 +05)--------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2 +06)----------CoalesceBatchesExec: target_batch_size=3 +07)------------FilterExec: b@1 > 3, projection=[a@0] +08)--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +09)----------------MemoryExec: partitions=1, partition_sizes=[1] +10)------SortExec: expr=[c@2 DESC NULLS LAST], preserve_partitioning=[true] +11)--------CoalesceBatchesExec: target_batch_size=3 +12)----------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2 +13)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +14)--------------MemoryExec: partitions=1, partition_sizes=[1] + +query III +select * from test where a in (select a from test where b > 3) order by c desc nulls first; +---- +9 10 NULL +4 5 6 + +query III +select * from test where a in (select a from test where b > 3) order by c desc nulls last; +---- +4 5 6 +9 10 NULL + +statement ok +DROP TABLE test + +statement ok +set datafusion.execution.target_partitions = 1;