Skip to content

Commit

Permalink
Make benefits_from_input_partitioning Default in SHJ (apache#8801)
Browse files Browse the repository at this point in the history
* SHJ order fixing

* Update join_selection.rs

* Change proto type for sort exprs

---------

Co-authored-by: Mustafa Akur <[email protected]>
  • Loading branch information
metesynnada and mustafasrepo authored Jan 10, 2024
1 parent e78f4bd commit 78d3314
Show file tree
Hide file tree
Showing 11 changed files with 370 additions and 41 deletions.
88 changes: 82 additions & 6 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::ExecutionPlan;

use arrow_schema::Schema;
use datafusion_common::internal_err;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{internal_err, JoinSide};
use datafusion_common::{DataFusionError, JoinType};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::sort_properties::SortProperties;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};

/// The [`JoinSelection`] rule tries to modify a given plan so that it can
/// accommodate infinite sources and optimize joins in the plan according to
Expand Down Expand Up @@ -425,31 +426,106 @@ pub type PipelineFixerSubrule = dyn Fn(
&ConfigOptions,
) -> Option<Result<PipelineStatePropagator>>;

/// This subrule checks if we can replace a hash join with a symmetric hash
/// join when we are dealing with infinite inputs on both sides. This change
/// avoids pipeline breaking and preserves query runnability. If possible,
/// this subrule makes this replacement; otherwise, it has no effect.
/// Converts a hash join to a symmetric hash join in the case of infinite inputs on both sides.
///
/// This subrule checks if a hash join can be replaced with a symmetric hash join when dealing
/// with unbounded (infinite) inputs on both sides. This replacement avoids pipeline breaking and
/// preserves query runnability. If the replacement is applicable, this subrule makes this change;
/// otherwise, it leaves the input unchanged.
///
/// # Arguments
/// * `input` - The current state of the pipeline, including the execution plan.
/// * `config_options` - Configuration options that might affect the transformation logic.
///
/// # Returns
/// An `Option` that contains the `Result` of the transformation. If the transformation is not applicable,
/// it returns `None`. If applicable, it returns `Some(Ok(...))` with the modified pipeline state,
/// or `Some(Err(...))` if an error occurs during the transformation.
fn hash_join_convert_symmetric_subrule(
mut input: PipelineStatePropagator,
config_options: &ConfigOptions,
) -> Option<Result<PipelineStatePropagator>> {
// Check if the current plan node is a HashJoinExec.
if let Some(hash_join) = input.plan.as_any().downcast_ref::<HashJoinExec>() {
// Determine if left and right children are unbounded.
let ub_flags = input.children_unbounded();
let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
// Update the unbounded flag of the input.
input.unbounded = left_unbounded || right_unbounded;
// Process only if both left and right sides are unbounded.
let result = if left_unbounded && right_unbounded {
// Determine the partition mode based on configuration.
let mode = if config_options.optimizer.repartition_joins {
StreamJoinPartitionMode::Partitioned
} else {
StreamJoinPartitionMode::SinglePartition
};
// A closure to determine the required sort order for each side of the join in the SymmetricHashJoinExec.
// This function checks if the columns involved in the filter have any specific ordering requirements.
// If the child nodes (left or right side of the join) already have a defined order and the columns used in the
// filter predicate are ordered, this function captures that ordering requirement. The identified order is then
// used in the SymmetricHashJoinExec to maintain bounded memory during join operations.
// However, if the child nodes do not have an inherent order, or if the filter columns are unordered,
// the function concludes that no specific order is required for the SymmetricHashJoinExec. This approach
// ensures that the symmetric hash join operation only imposes ordering constraints when necessary,
// based on the properties of the child nodes and the filter condition.
let determine_order = |side: JoinSide| -> Option<Vec<PhysicalSortExpr>> {
hash_join
.filter()
.map(|filter| {
filter.column_indices().iter().any(
|ColumnIndex {
index,
side: column_side,
}| {
// Skip if column side does not match the join side.
if *column_side != side {
return false;
}
// Retrieve equivalence properties and schema based on the side.
let (equivalence, schema) = match side {
JoinSide::Left => (
hash_join.left().equivalence_properties(),
hash_join.left().schema(),
),
JoinSide::Right => (
hash_join.right().equivalence_properties(),
hash_join.right().schema(),
),
};

let name = schema.field(*index).name();
let col = Arc::new(Column::new(name, *index)) as _;
// Check if the column is ordered.
equivalence.get_expr_ordering(col).state
!= SortProperties::Unordered
},
)
})
.unwrap_or(false)
.then(|| {
match side {
JoinSide::Left => hash_join.left().output_ordering(),
JoinSide::Right => hash_join.right().output_ordering(),
}
.map(|p| p.to_vec())
})
.flatten()
};

// Determine the sort order for both left and right sides.
let left_order = determine_order(JoinSide::Left);
let right_order = determine_order(JoinSide::Right);

SymmetricHashJoinExec::try_new(
hash_join.left().clone(),
hash_join.right().clone(),
hash_join.on().to_vec(),
hash_join.filter().cloned(),
hash_join.join_type(),
hash_join.null_equals_null(),
left_order,
right_order,
mode,
)
.map(|exec| {
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,8 @@ fn try_swapping_with_sym_hash_join(
new_filter,
sym_join.join_type(),
sym_join.null_equals_null(),
sym_join.right().output_ordering().map(|p| p.to_vec()),
sym_join.left().output_ordering().map(|p| p.to_vec()),
sym_join.partition_mode(),
)?)))
}
Expand Down Expand Up @@ -2048,6 +2050,8 @@ mod tests {
)),
&JoinType::Inner,
true,
None,
None,
StreamJoinPartitionMode::SinglePartition,
)?);
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
Expand Down
68 changes: 68 additions & 0 deletions datafusion/core/tests/sql/joins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,74 @@ async fn join_change_in_planner() -> Result<()> {
[
"SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a1@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a1@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false"
]
};
let mut actual: Vec<&str> = formatted.trim().lines().collect();
// Remove CSV lines
actual.remove(4);
actual.remove(7);

assert_eq!(
expected,
actual[..],
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);
Ok(())
}

#[tokio::test]
async fn join_no_order_on_filter() -> Result<()> {
let config = SessionConfig::new().with_target_partitions(8);
let ctx = SessionContext::new_with_config(config);
let tmp_dir = TempDir::new().unwrap();
let left_file_path = tmp_dir.path().join("left.csv");
File::create(left_file_path.clone()).unwrap();
// Create schema
let schema = Arc::new(Schema::new(vec![
Field::new("a1", DataType::UInt32, false),
Field::new("a2", DataType::UInt32, false),
Field::new("a3", DataType::UInt32, false),
]));
// Specify the ordering:
let file_sort_order = vec![[datafusion_expr::col("a1")]
.into_iter()
.map(|e| {
let ascending = true;
let nulls_first = false;
e.sort(ascending, nulls_first)
})
.collect::<Vec<_>>()];
register_unbounded_file_with_ordering(
&ctx,
schema.clone(),
&left_file_path,
"left",
file_sort_order.clone(),
)?;
let right_file_path = tmp_dir.path().join("right.csv");
File::create(right_file_path.clone()).unwrap();
register_unbounded_file_with_ordering(
&ctx,
schema,
&right_file_path,
"right",
file_sort_order,
)?;
let sql = "SELECT * FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a3 > t2.a3 + 3 AND t1.a3 < t2.a3 + 10";
let dataframe = ctx.sql(sql).await?;
let physical_plan = dataframe.create_physical_plan().await?;
let formatted = displayable(physical_plan.as_ref()).indent(true).to_string();
let expected = {
[
"SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a3@0 AS Int64) > CAST(a3@1 AS Int64) + 3 AND CAST(a3@0 AS Int64) < CAST(a3@1 AS Int64) + 10",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false",
Expand Down
70 changes: 48 additions & 22 deletions datafusion/physical-plan/src/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use datafusion_physical_expr::equivalence::join_equivalence_properties;
use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;

use ahash::RandomState;
use datafusion_physical_expr::PhysicalSortRequirement;
use futures::Stream;
use hashbrown::HashSet;
use parking_lot::Mutex;
Expand Down Expand Up @@ -181,6 +182,10 @@ pub struct SymmetricHashJoinExec {
column_indices: Vec<ColumnIndex>,
/// If null_equals_null is true, null == null else null != null
pub(crate) null_equals_null: bool,
/// Left side sort expression(s)
pub(crate) left_sort_exprs: Option<Vec<PhysicalSortExpr>>,
/// Right side sort expression(s)
pub(crate) right_sort_exprs: Option<Vec<PhysicalSortExpr>>,
/// Partition Mode
mode: StreamJoinPartitionMode,
}
Expand All @@ -192,13 +197,16 @@ impl SymmetricHashJoinExec {
/// - It is not possible to join the left and right sides on keys `on`, or
/// - It fails to construct `SortedFilterExpr`s, or
/// - It fails to create the [ExprIntervalGraph].
#[allow(clippy::too_many_arguments)]
pub fn try_new(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
filter: Option<JoinFilter>,
join_type: &JoinType,
null_equals_null: bool,
left_sort_exprs: Option<Vec<PhysicalSortExpr>>,
right_sort_exprs: Option<Vec<PhysicalSortExpr>>,
mode: StreamJoinPartitionMode,
) -> Result<Self> {
let left_schema = left.schema();
Expand Down Expand Up @@ -232,6 +240,8 @@ impl SymmetricHashJoinExec {
metrics: ExecutionPlanMetricsSet::new(),
column_indices,
null_equals_null,
left_sort_exprs,
right_sort_exprs,
mode,
})
}
Expand Down Expand Up @@ -271,6 +281,16 @@ impl SymmetricHashJoinExec {
self.mode
}

/// Get left_sort_exprs
pub fn left_sort_exprs(&self) -> Option<&[PhysicalSortExpr]> {
self.left_sort_exprs.as_deref()
}

/// Get right_sort_exprs
pub fn right_sort_exprs(&self) -> Option<&[PhysicalSortExpr]> {
self.right_sort_exprs.as_deref()
}

/// Check if order information covers every column in the filter expression.
pub fn check_if_order_information_available(&self) -> Result<bool> {
if let Some(filter) = self.filter() {
Expand Down Expand Up @@ -337,10 +357,6 @@ impl ExecutionPlan for SymmetricHashJoinExec {
Ok(children.iter().any(|u| *u))
}

fn benefits_from_input_partitioning(&self) -> Vec<bool> {
vec![false, false]
}

fn required_input_distribution(&self) -> Vec<Distribution> {
match self.mode {
StreamJoinPartitionMode::Partitioned => {
Expand All @@ -360,6 +376,17 @@ impl ExecutionPlan for SymmetricHashJoinExec {
}
}

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
vec![
self.left_sort_exprs
.as_ref()
.map(PhysicalSortRequirement::from_sort_exprs),
self.right_sort_exprs
.as_ref()
.map(PhysicalSortRequirement::from_sort_exprs),
]
}

fn output_partitioning(&self) -> Partitioning {
let left_columns_len = self.left.schema().fields.len();
partitioned_join_output_partitioning(
Expand Down Expand Up @@ -403,6 +430,8 @@ impl ExecutionPlan for SymmetricHashJoinExec {
self.filter.clone(),
&self.join_type,
self.null_equals_null,
self.left_sort_exprs.clone(),
self.right_sort_exprs.clone(),
self.mode,
)?))
}
Expand Down Expand Up @@ -431,24 +460,21 @@ impl ExecutionPlan for SymmetricHashJoinExec {
}
// If `filter_state` and `filter` are both present, then calculate sorted filter expressions
// for both sides, and build an expression graph.
let (left_sorted_filter_expr, right_sorted_filter_expr, graph) = match (
self.left.output_ordering(),
self.right.output_ordering(),
&self.filter,
) {
(Some(left_sort_exprs), Some(right_sort_exprs), Some(filter)) => {
let (left, right, graph) = prepare_sorted_exprs(
filter,
&self.left,
&self.right,
left_sort_exprs,
right_sort_exprs,
)?;
(Some(left), Some(right), Some(graph))
}
// If `filter_state` or `filter` is not present, then return None for all three values:
_ => (None, None, None),
};
let (left_sorted_filter_expr, right_sorted_filter_expr, graph) =
match (&self.left_sort_exprs, &self.right_sort_exprs, &self.filter) {
(Some(left_sort_exprs), Some(right_sort_exprs), Some(filter)) => {
let (left, right, graph) = prepare_sorted_exprs(
filter,
&self.left,
&self.right,
left_sort_exprs,
right_sort_exprs,
)?;
(Some(left), Some(right), Some(graph))
}
// If `filter_state` or `filter` is not present, then return None for all three values:
_ => (None, None, None),
};

let (on_left, on_right) = self.on.iter().cloned().unzip();

Expand Down
6 changes: 4 additions & 2 deletions datafusion/physical-plan/src/joins/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,19 @@ pub async fn partitioned_sym_join_with_filter(

let join = SymmetricHashJoinExec::try_new(
Arc::new(RepartitionExec::try_new(
left,
left.clone(),
Partitioning::Hash(left_expr, partition_count),
)?),
Arc::new(RepartitionExec::try_new(
right,
right.clone(),
Partitioning::Hash(right_expr, partition_count),
)?),
on,
filter,
join_type,
null_equals_null,
left.output_ordering().map(|p| p.to_vec()),
right.output_ordering().map(|p| p.to_vec()),
StreamJoinPartitionMode::Partitioned,
)?;

Expand Down
2 changes: 2 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1537,6 +1537,8 @@ message SymmetricHashJoinExecNode {
StreamPartitionMode partition_mode = 6;
bool null_equals_null = 7;
JoinFilter filter = 8;
repeated PhysicalSortExprNode left_sort_exprs = 9;
repeated PhysicalSortExprNode right_sort_exprs = 10;
}

message InterleaveExecNode {
Expand Down
Loading

0 comments on commit 78d3314

Please sign in to comment.