From 88a43c10ec65ea04ea56514af546b9cbf20d77d4 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Sat, 6 Jan 2024 11:02:58 +0100 Subject: [PATCH] refactor `PlanWithCorrespondingCoalescePartitions` using `TreeNode.transform_with_payload()` --- .../src/physical_optimizer/enforce_sorting.rs | 384 +++++++++--------- 1 file changed, 199 insertions(+), 185 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index ca84a7361a222..e5d57d1d57c73 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -171,87 +171,6 @@ impl TreeNode for PlanWithCorrespondingSort { } } -/// This object is used within the [`EnforceSorting`] rule to track the closest -/// [`CoalescePartitionsExec`] descendant(s) for every child of a plan. -#[derive(Debug, Clone)] -struct PlanWithCorrespondingCoalescePartitions { - plan: Arc, - // Stores whether the plan is a `CoalescePartitionsExec` or it is connected to - // a `CoalescePartitionsExec` via its children. - coalesce_connection: bool, - children_nodes: Vec, -} - -impl PlanWithCorrespondingCoalescePartitions { - /// Creates an empty tree with empty connections. - fn new(plan: Arc) -> Self { - let children = plan.children(); - Self { - plan, - coalesce_connection: false, - children_nodes: children.into_iter().map(Self::new).collect(), - } - } - - fn update_children(mut self) -> Result { - self.coalesce_connection = if self.plan.children().is_empty() { - // Plan has no children, it cannot be a `CoalescePartitionsExec`. - false - } else if is_coalesce_partitions(&self.plan) { - // Initiate a connection - true - } else { - self.children_nodes - .iter() - .enumerate() - .map(|(idx, node)| { - // Only consider operators that don't require a - // single partition, and connected to any coalesce - node.coalesce_connection - && !matches!( - self.plan.required_input_distribution()[idx], - Distribution::SinglePartition - ) - // If all children are None. There is nothing to track, set connection false. - }) - .any(|c| c) - }; - - let children_plans = self - .children_nodes - .iter() - .map(|item| item.plan.clone()) - .collect(); - self.plan = with_new_children_if_necessary(self.plan, children_plans)?.into(); - Ok(self) - } -} - -impl TreeNode for PlanWithCorrespondingCoalescePartitions { - fn children_nodes(&self) -> Vec> { - self.children_nodes.iter().map(Cow::Borrowed).collect() - } - - fn map_children(mut self, transform: F) -> Result - where - F: FnMut(Self) -> Result, - { - if !self.children_nodes.is_empty() { - self.children_nodes = self - .children_nodes - .into_iter() - .map(transform) - .collect::>()?; - self.plan = with_new_children_if_necessary( - self.plan, - self.children_nodes.iter().map(|c| c.plan.clone()).collect(), - )? - .into(); - } - Ok(self) - } -} - /// The boolean flag `repartition_sorts` defined in the config indicates /// whether we elect to transform [`CoalescePartitionsExec`] + [`SortExec`] cascades /// into [`SortExec`] + [`SortPreservingMergeExec`] cascades, which enables us to @@ -267,11 +186,12 @@ impl PhysicalOptimizerRule for EnforceSorting { // remove unnecessary sorts, and optimize sort-sensitive operators: let adjusted = plan_requirements.transform_up(&ensure_sorting)?; let new_plan = if config.optimizer.repartition_sorts { - let plan_with_coalesce_partitions = - PlanWithCorrespondingCoalescePartitions::new(adjusted.plan); - let parallel = - plan_with_coalesce_partitions.transform_up(¶llelize_sorts)?; - parallel.plan + let (parallel, _) = adjusted.plan.transform_with_payload( + &mut propagate_unnecessary_coalesce_connections_down, + false, + &mut parallelize_sorts_up, + )?; + parallel } else { adjusted.plan }; @@ -363,72 +283,204 @@ impl PhysicalOptimizerRule for EnforceSorting { } } -/// This function turns plans of the form -/// ```text +/// For a given `plan`, `propagate_unnecessary_coalesce_connections_down` and +/// `parallelize_sorts_up` can be used with `TreeNode.transform_with_payload()` to +/// propagate down/up the information one needs to decide if unnecessary coalesce nodes +/// can be dropped so as to increase parallelism. +/// +/// The algorithm flow is simply like this: +/// 1. During the top-down traversal, keep track of operators that allow eliminating +/// descendant coalesce nodes. +/// There are 2 scenarios that this rule covers: +/// - A one partition sort or sort preserving merge node allow eliminating descendant +/// coalesce nodes that can be reached on connection that doesn't require single +/// partition distribution. In this case the sort node needs to be adjusted to sort +/// preserving merge followed by a sort node. +/// E.g the following plan: +/// ```text /// "SortExec: expr=\[a@0 ASC\]", -/// " CoalescePartitionsExec", -/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", -/// ``` -/// to -/// ```text -/// "SortPreservingMergeExec: \[a@0 ASC\]", -/// " SortExec: expr=\[a@0 ASC\]", -/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", -/// ``` -/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s. -/// By performing sorting in parallel, we can increase performance in some scenarios. -fn parallelize_sorts( - requirements: PlanWithCorrespondingCoalescePartitions, -) -> Result> { - let PlanWithCorrespondingCoalescePartitions { - mut plan, - coalesce_connection, - mut children_nodes, - } = requirements.update_children()?; - - if plan.children().is_empty() || !children_nodes[0].coalesce_connection { - // We only take an action when the plan is either a SortExec, a - // SortPreservingMergeExec or a CoalescePartitionsExec, and they - // all have a single child. Therefore, if the first child is `None`, - // we can return immediately. - return Ok(Transformed::No(PlanWithCorrespondingCoalescePartitions { - plan, - coalesce_connection, - children_nodes, - })); - } else if (is_sort(&plan) || is_sort_preserving_merge(&plan)) +/// " ..." +/// " CoalescePartitionsExec", +/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", +/// ``` +/// can be optimized to +/// ```text +/// "SortPreservingMergeExec: \[a@0 ASC\]", +/// " SortExec: expr=\[a@0 ASC\]", +/// " ... +/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", +/// ``` +/// - A coalesce node allows eliminating descendant coalesce nodes that can be reached +/// on connection that doesn't require single partition distribution. +/// 2. During the bottom-up traversal, we use the information from the top-down traversal +/// and propagate up an alternative plan that doesn't contain the unnecessary coalesce +/// nodes. +/// - If the node is a one partition sort or sort preserving merge node then the +/// alternative plan is better and is accepted. +/// - If the node is a coalesce node then then the alternative plan is better and is +/// accepted. Also, if the node can be reached from its parent via an a connection +/// that allows eliminating coalesce nodes then start propagating up continue +/// propagating up the alternative plan without the coalesce node. +/// - If the current node is something else but we got an alternative plan from its +/// children then extend the alternative plan with the current node. +#[allow(clippy::type_complexity)] +pub(crate) fn propagate_unnecessary_coalesce_connections_down( + plan: Arc, + unnecessary_coalesce_connection: bool, +) -> Result<(Transformed>, Vec, bool)> { + let children_unnecessary_coalesce_connections = if (is_sort(&plan) + && plan.output_partitioning().partition_count() <= 1) + || is_sort_preserving_merge(&plan) + || is_coalesce_partitions(&plan) + { + // Start a unnecessary coalesce connection from + // - a single partition sort or + // - a sort preserving merge or, + // - a coalesce + // node down the tree. + vec![true] + } else { + // Keep the connection towards a child that doesn't require single partition + // distribution (coalesce). + plan.required_input_distribution() + .into_iter() + .map(|d| { + unnecessary_coalesce_connection + && !matches!(d, Distribution::SinglePartition) + }) + .collect() + }; + Ok(( + Transformed::No(plan), + children_unnecessary_coalesce_connections, + unnecessary_coalesce_connection, + )) +} + +#[allow(clippy::type_complexity)] +fn parallelize_sorts_up( + plan: Arc, + unnecessary_coalesce_connection: bool, + mut unnecessary_coalesce_eliminated_children: Vec>>, +) -> Result<( + Transformed>, + Option>, +)> { + if (is_sort(&plan) || is_sort_preserving_merge(&plan)) && plan.output_partitioning().partition_count() <= 1 { - // If there is a connection between a CoalescePartitionsExec and a - // global sort that satisfy the requirements (i.e. intermediate - // executors don't require single partition), then we can replace - // the CoalescePartitionsExec + Sort cascade with a SortExec + - // SortPreservingMergeExec cascade to parallelize sorting. - let (sort_exprs, fetch) = get_sort_exprs(&plan)?; - let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs); - let sort_exprs = sort_exprs.to_vec(); - update_child_to_remove_coalesce(&mut plan, &mut children_nodes[0])?; - add_sort_above(&mut plan, &sort_reqs, fetch); - let spm = SortPreservingMergeExec::new(sort_exprs, plan).with_fetch(fetch); - - return Ok(Transformed::Yes( - PlanWithCorrespondingCoalescePartitions::new(Arc::new(spm)), - )); + // If we encounter a sort or sort preserving merge node then we might get an + // alternative plan propagated up from the child node. + // As unnecessary coalesce nodes have been removed from alternative plan it is + // always better than the original so we can accept it, but we need to make sure + // that a sort preserving merge and sort modes are placed on the top of the + // alternative plan. + if let Some(mut unnecessary_coalesce_eliminated_plan) = + unnecessary_coalesce_eliminated_children.swap_remove(0) + { + let (sort_exprs, fetch) = get_sort_exprs(&plan)?; + let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs); + let sort_exprs = sort_exprs.to_vec(); + add_sort_above(&mut unnecessary_coalesce_eliminated_plan, &sort_reqs, fetch); + let spm = Arc::new( + SortPreservingMergeExec::new( + sort_exprs, + unnecessary_coalesce_eliminated_plan, + ) + .with_fetch(fetch), + ); + Ok((Transformed::Yes(spm), None)) + } else { + Ok((Transformed::No(plan), None)) + } } else if is_coalesce_partitions(&plan) { - // There is an unnecessary `CoalescePartitionsExec` in the plan. - update_child_to_remove_coalesce(&mut plan, &mut children_nodes[0])?; - - let new_plan = Arc::new(CoalescePartitionsExec::new(plan)) as _; - return Ok(Transformed::Yes( - PlanWithCorrespondingCoalescePartitions::new(new_plan), - )); + // Drop coalesce node from the actual and alternative plans if possible. + if let Some(unnecessary_coalesce_eliminated_child_plan) = + unnecessary_coalesce_eliminated_children.swap_remove(0) + { + // If the alternative subplan already propagated up then it accept it as it + // means we manage to eliminate a coalesce node under the current coalesce + // node. + let unnecessary_coalesce_eliminated_plan = if unnecessary_coalesce_connection + { + // If the current node has a connection from its parent that allows + // eliminating unnecessary coalesce nodes then continue propagating up the + // alternative plan without the current node as even the current node + // might not be needed. + Some(unnecessary_coalesce_eliminated_child_plan.clone()) + } else { + None + }; + let new_plan = plan + .clone() + .with_new_children(vec![unnecessary_coalesce_eliminated_child_plan])?; + Ok(( + Transformed::Yes(new_plan), + unnecessary_coalesce_eliminated_plan, + )) + } else { + let unnecessary_coalesce_eliminated_plan = if unnecessary_coalesce_connection + { + // If the current node has a connection from its parent that allows + // eliminating unnecessary coalesce nodes then start propagating up an + // alternative plan without the current node. + Some(plan.children().swap_remove(0)) + } else { + None + }; + Ok((Transformed::No(plan), unnecessary_coalesce_eliminated_plan)) + } + } else if unnecessary_coalesce_connection && is_repartition(&plan) { + // Due to the removal of coalesce nodes duplicate repartition nodes can become + // adjacent and we should get rid one of them. + // Please note that although getting rid of adjacent duplicate nodes is useful the + // issue should not be specific to this optimization rule so this part might be at + // a better place in a separate rule. + // But this optimization has been introduced since + // https://github.com/apache/arrow-datafusion/commit/5fc91cc8fbb56b6d2a32e66f8b327a871f7d15ac + // so we can leave it here for now. + + // Drop repartition node from the alternative plans if possible. + let unnecessary_coalesce_eliminated_plan = + unnecessary_coalesce_eliminated_children + .swap_remove(0) + .map(|unnecessary_coalesce_eliminated_child_plan| { + if is_repartition(&unnecessary_coalesce_eliminated_child_plan) + && plan.output_partitioning() + == unnecessary_coalesce_eliminated_child_plan + .output_partitioning() + { + Ok(unnecessary_coalesce_eliminated_child_plan) + } else { + plan.clone().with_new_children(vec![ + unnecessary_coalesce_eliminated_child_plan, + ]) + } + }) + .transpose()?; + Ok((Transformed::No(plan), unnecessary_coalesce_eliminated_plan)) + } else { + // If any of the children propagated up an alternative plan then keep propagating + // up the alternative plan extended with the current node. + let unnecessary_coalesce_eliminated_plan = + if unnecessary_coalesce_eliminated_children + .iter() + .any(|opc| opc.is_some()) + { + Some( + plan.clone().with_new_children( + unnecessary_coalesce_eliminated_children + .into_iter() + .zip(plan.children().into_iter()) + .map(|(opc, c)| opc.unwrap_or(c)) + .collect(), + )?, + ) + } else { + None + }; + Ok((Transformed::No(plan), unnecessary_coalesce_eliminated_plan)) } - - Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions { - plan, - coalesce_connection, - children_nodes, - })) } /// This function enforces sorting requirements and makes optimizations without @@ -654,44 +706,6 @@ fn analyze_window_sort_removal( } } -/// Updates child to remove the unnecessary [`CoalescePartitionsExec`] below it. -fn update_child_to_remove_coalesce( - child: &mut Arc, - coalesce_onwards: &mut PlanWithCorrespondingCoalescePartitions, -) -> Result<()> { - if coalesce_onwards.coalesce_connection { - *child = remove_corresponding_coalesce_in_sub_plan(coalesce_onwards, child)?; - } - Ok(()) -} - -/// Removes the [`CoalescePartitionsExec`] from the plan in `coalesce_onwards`. -fn remove_corresponding_coalesce_in_sub_plan( - coalesce_onwards: &mut PlanWithCorrespondingCoalescePartitions, - parent: &Arc, -) -> Result> { - if is_coalesce_partitions(&coalesce_onwards.plan) { - // We can safely use the 0th index since we have a `CoalescePartitionsExec`. - let mut new_plan = coalesce_onwards.plan.children()[0].clone(); - while new_plan.output_partitioning() == parent.output_partitioning() - && is_repartition(&new_plan) - && is_repartition(parent) - { - new_plan = new_plan.children().swap_remove(0) - } - Ok(new_plan) - } else { - let plan = coalesce_onwards.plan.clone(); - let mut children = plan.children(); - for (idx, node) in coalesce_onwards.children_nodes.iter_mut().enumerate() { - if node.coalesce_connection { - children[idx] = remove_corresponding_coalesce_in_sub_plan(node, &plan)?; - } - } - plan.with_new_children(children) - } -} - /// Updates child to remove the unnecessary sort below it. fn update_child_to_remove_unnecessary_sort( child_idx: usize,