diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 806ed1d4b45e..81f6b2edac13 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -129,32 +129,47 @@ impl Repartition { } } -/// Recursively visits all `plan`s puts and then optionally adds a -/// `RepartitionExec` at the output of `plan` to match -/// `target_partitions` in an attempt to increase the overall parallelism. +/// Recursively attempts to increase the overall parallelism of the +/// plan, while respecting ordering, by adding a `RepartitionExec` at +/// the output of `plan` if it would help parallelism and not destroy +/// any possibly useful ordering. /// -/// It does so using depth first scan of the tree, and repartitions +/// It does so using a depth first scan of the tree, and repartitions /// any plan that: /// /// 1. Has fewer partitions than `target_partitions` /// /// 2. Has a direct parent that `benefits_from_input_partitioning` /// -/// if `can_reorder` is false, means that the output of this node -/// can not be reordered as as the final output is relying on that order +/// 3. Does not destroy any existing sort order if the parent is +/// relying on it. /// -/// If 'would_benefit` is false, the upstream operator doesn't -/// benefit from additional repartition +/// if `can_reorder` is false, it means the parent node of `plan` is +/// trying to take advantage of the output sort order of plan, so it +/// should not be repartitioned if doing so would destroy the output +/// sort order. /// +/// (Parent) - If can_reorder is false, means this parent node is +/// trying to use the sort ouder order this plan. If true +/// means parent doesn't care about sort order +/// +/// (plan) - We are deciding to add a partition above here +/// +/// (children) - Recursively visit all children first +/// +/// If 'would_benefit` is true, the upstream operator would benefit +/// from additional partitions and thus repatitioning is considered. +/// +/// if `is_root` is true, no repartition is added. fn optimize_partitions( target_partitions: usize, plan: Arc, + is_root: bool, can_reorder: bool, would_benefit: bool, ) -> Result> { // Recurse into children bottom-up (attempt to repartition as // early as possible) - let new_plan = if plan.children().is_empty() { // leaf node - don't replace children plan @@ -163,10 +178,34 @@ fn optimize_partitions( .children() .iter() .map(|child| { + // does plan itelf (not parent) require its input to + // be sorted in some way? + let required_input_ordering = + plan_has_required_input_ordering(plan.as_ref()); + + let can_reorder_child = if can_reorder { + // parent of `plan` will not use any particular order + + // if `plan` itself doesn't need order OR + !required_input_ordering || + // child has no order to preserve + child.output_ordering().is_none() + } else { + // parent would like to use the `plan`'s output + // order. + + // if `plan` doesn't maintain the input order and + // doesn't need the child's output order itself + (!plan.maintains_input_order() && !required_input_ordering) || + // child has no ordering to preserve + child.output_ordering().is_none() + }; + optimize_partitions( target_partitions, child.clone(), - can_reorder || child.output_ordering().is_none(), + false, // child is not root + can_reorder_child, plan.benefits_from_input_partitioning(), ) }) @@ -191,6 +230,11 @@ fn optimize_partitions( && stats.num_rows.map(|num_rows| num_rows > 1).unwrap_or(true); } + // don't reparititon root of the plan + if is_root { + could_repartition = false; + } + if would_benefit && could_repartition && can_reorder { Ok(Arc::new(RepartitionExec::try_new( new_plan, @@ -201,6 +245,14 @@ fn optimize_partitions( } } +/// Returns true if `plan` requires any of inputs to be sorted in some +/// way for correctness. If this is true, its output should not be +/// repartitioned if it would destroy the required order. +fn plan_has_required_input_ordering(plan: &dyn ExecutionPlan) -> bool { + // NB: checking `is_empty()` is not the right check! + plan.required_input_ordering().iter().any(Option::is_some) +} + impl PhysicalOptimizerRule for Repartition { fn optimize( &self, @@ -213,11 +265,15 @@ impl PhysicalOptimizerRule for Repartition { if !enabled || target_partitions == 1 { Ok(plan) } else { + let is_root = true; + let can_reorder = plan.output_ordering().is_none(); + let would_benefit = false; optimize_partitions( target_partitions, plan.clone(), - plan.output_ordering().is_none(), - false, + is_root, + can_reorder, + would_benefit, ) } } @@ -230,6 +286,13 @@ impl PhysicalOptimizerRule for Repartition { true } } + +#[cfg(test)] +#[ctor::ctor] +fn init() { + let _ = env_logger::try_init(); +} + #[cfg(test)] mod tests { use arrow::compute::SortOptions; @@ -251,12 +314,13 @@ mod tests { use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::union::UnionExec; - use crate::physical_plan::{displayable, Statistics}; + use crate::physical_plan::{displayable, DisplayFormatType, Statistics}; fn schema() -> SchemaRef { Arc::new(Schema::new(vec![Field::new("c1", DataType::Boolean, true)])) } + /// Create a non sorted parquet exec fn parquet_exec() -> Arc { Arc::new(ParquetExec::new( FileScanConfig { @@ -275,6 +339,30 @@ mod tests { )) } + // Created a sorted parquet exec + fn parquet_exec_sorted() -> Arc { + let sort_exprs = vec![PhysicalSortExpr { + expr: col("c1", &schema()).unwrap(), + options: SortOptions::default(), + }]; + + Arc::new(ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), + file_schema: schema(), + file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: Some(sort_exprs), + infinite_source: false, + }, + None, + None, + )) + } + fn sort_preserving_merge_exec( input: Arc, ) -> Arc { @@ -350,6 +438,14 @@ mod tests { )) } + fn union_exec(input: Vec>) -> Arc { + Arc::new(UnionExec::new(input)) + } + + fn sort_required_exec(input: Arc) -> Arc { + Arc::new(SortRequiredExec::new(input)) + } + fn trim_plan_display(plan: &str) -> Vec<&str> { plan.split('\n') .map(|s| s.trim()) @@ -550,8 +646,7 @@ mod tests { #[test] fn repartition_ignores_union() -> Result<()> { - let plan: Arc = - Arc::new(UnionExec::new(vec![parquet_exec(); 5])); + let plan = union_exec(vec![parquet_exec(); 5]); let expected = &[ "UnionExec", @@ -568,9 +663,11 @@ mod tests { } #[test] - fn repartition_ignores_sort_preserving_merge() -> Result<()> { + fn repartition_through_sort_preserving_merge() -> Result<()> { + // sort preserving merge with non-sorted input let plan = sort_preserving_merge_exec(parquet_exec()); + // need repartiton and resort as the data was not sorted correctly let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", "SortExec: [c1@0 ASC]", @@ -583,9 +680,94 @@ mod tests { } #[test] - fn repartition_does_not_repartition_transitively() -> Result<()> { + fn repartition_ignores_sort_preserving_merge() -> Result<()> { + // sort preserving merge already sorted input, + let plan = sort_preserving_merge_exec(parquet_exec_sorted()); + + // should not repartition / sort (as the data was already sorted) + let expected = &[ + "SortPreservingMergeExec: [c1@0 ASC]", + "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + ]; + + assert_optimized!(expected, plan); + Ok(()) + } + + #[test] + fn repartition_ignores_sort_preserving_merge_with_union() -> Result<()> { + // 2 sorted parquet files unioned (partitions are concatenated, sort is preserved) + let input = union_exec(vec![parquet_exec_sorted(); 2]); + let plan = sort_preserving_merge_exec(input); + + // should not repartition / sort (as the data was already sorted) + let expected = &[ + "SortPreservingMergeExec: [c1@0 ASC]", + "UnionExec", + "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + ]; + + assert_optimized!(expected, plan); + Ok(()) + } + + #[test] + fn repartition_does_not_destroy_sort() -> Result<()> { + // SortRequired + // Parquet(sorted) + + let plan = sort_required_exec(parquet_exec_sorted()); + + // should not repartition as doing so destroys the necessary sort order + let expected = &[ + "SortRequiredExec", + "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + ]; + + assert_optimized!(expected, plan); + Ok(()) + } + + #[test] + fn repartition_does_not_destroy_sort_more_complex() -> Result<()> { + // model a more complicated scenario where one child of a union can be repartitioned for performance + // but the other can not be + // + // Union + // SortRequired + // Parquet(sorted) + // Filter + // Parquet(unsorted) + + let input1 = sort_required_exec(parquet_exec_sorted()); + let input2 = filter_exec(parquet_exec()); + let plan = union_exec(vec![input1, input2]); + + // should not repartition below the SortRequired as that + // destroys the sort order but should still repartition for + // FilterExec + let expected = &[ + "UnionExec", + // union input 1: no repartitioning + "SortRequiredExec", + "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + // union input 2: should repartition + "FilterExec: c1@0", + "RepartitionExec: partitioning=RoundRobinBatch(10)", + "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", + ]; + + assert_optimized!(expected, plan); + Ok(()) + } + + #[test] + fn repartition_transitively_with_projection() -> Result<()> { + // non sorted input let plan = sort_preserving_merge_exec(projection_exec(parquet_exec())); + // needs to repartition / sort as the data was not sorted correctly let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", "SortExec: [c1@0 ASC]", @@ -598,6 +780,22 @@ mod tests { Ok(()) } + #[test] + fn repartition_ignores_transitively_with_projection() -> Result<()> { + // sorted input + let plan = sort_preserving_merge_exec(projection_exec(parquet_exec_sorted())); + + // data should not be repartitioned / resorted + let expected = &[ + "SortPreservingMergeExec: [c1@0 ASC]", + "ProjectionExec: expr=[c1@0 as c1]", + "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + ]; + + assert_optimized!(expected, plan); + Ok(()) + } + #[test] fn repartition_transitively_past_sort_with_projection() -> Result<()> { let plan = @@ -655,4 +853,73 @@ mod tests { assert_optimized!(expected, plan); Ok(()) } + + /// Models operators like BoundedWindowExec that require an input + /// ordering but is easy to construct + #[derive(Debug)] + struct SortRequiredExec { + input: Arc, + } + + impl SortRequiredExec { + fn new(input: Arc) -> Self { + Self { input } + } + } + + impl ExecutionPlan for SortRequiredExec { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> SchemaRef { + self.input.schema() + } + + fn output_partitioning(&self) -> crate::physical_plan::Partitioning { + self.input.output_partitioning() + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.input.output_ordering() + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + // model that it requires the output ordering of its input + fn required_input_ordering(&self) -> Vec> { + vec![self.input.output_ordering()] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + assert_eq!(children.len(), 1); + let child = children.pop().unwrap(); + Ok(Arc::new(Self::new(child))) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unreachable!(); + } + + fn statistics(&self) -> Statistics { + self.input.statistics() + } + + fn fmt_as( + &self, + _t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "SortRequiredExec") + } + } } diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index b9a0d9707ee1..0417814f3c80 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -136,7 +136,12 @@ pub trait ExecutionPlan: Debug + Send + Sync { } /// Specifies the ordering requirements for all of the children - /// For each child, it's the local ordering requirement within each partition rather than the global ordering + /// For each child, it's the local ordering requirement within + /// each partition rather than the global ordering + /// + /// NOTE that checking `!is_empty()` does **not** check for a + /// required input ordering. Instead, the correct check is that at + /// least one entry must be `Some` fn required_input_ordering(&self) -> Vec> { vec![None; self.children().len()] } diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 0cc65abf0850..1e4f081f4789 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -86,11 +86,10 @@ async fn explain_analyze_baseline_metrics() { "CoalesceBatchesExec: target_batch_size=4096", "metrics=[output_rows=5, elapsed_compute" ); - // The number of output rows becomes less after changing the global sort to the local sort with limit push down assert_metrics!( &formatted, "CoalescePartitionsExec", - "metrics=[output_rows=3, elapsed_compute=" + "metrics=[output_rows=5, elapsed_compute=" ); assert_metrics!( &formatted,