diff --git a/datafusion-examples/examples/rewrite_expr.rs b/datafusion-examples/examples/rewrite_expr.rs index 5e95562033e6..d749ba403dd4 100644 --- a/datafusion-examples/examples/rewrite_expr.rs +++ b/datafusion-examples/examples/rewrite_expr.rs @@ -17,7 +17,7 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::TreeNode; use datafusion_common::{plan_err, DataFusionError, Result, ScalarValue}; use datafusion_expr::{ AggregateUDF, Between, Expr, Filter, LogicalPlan, ScalarUDF, TableSource, WindowUDF, @@ -95,12 +95,9 @@ impl MyAnalyzerRule { Ok(match plan { LogicalPlan::Filter(filter) => { let predicate = Self::analyze_expr(filter.predicate.clone())?; - Transformed::Yes(LogicalPlan::Filter(Filter::try_new( - predicate, - filter.input, - )?)) + LogicalPlan::Filter(Filter::try_new(predicate, filter.input)?) } - _ => Transformed::No(plan), + _ => plan, }) }) } @@ -111,11 +108,9 @@ impl MyAnalyzerRule { Ok(match expr { Expr::Literal(ScalarValue::Int64(i)) => { // transform to UInt64 - Transformed::Yes(Expr::Literal(ScalarValue::UInt64( - i.map(|i| i as u64), - ))) + Expr::Literal(ScalarValue::UInt64(i.map(|i| i as u64))) } - _ => Transformed::No(expr), + _ => expr, }) }) } @@ -175,12 +170,12 @@ fn my_rewrite(expr: Expr) -> Result { let low: Expr = *low; let high: Expr = *high; if negated { - Transformed::Yes(expr.clone().lt(low).or(expr.gt(high))) + expr.clone().lt(low).or(expr.gt(high)) } else { - Transformed::Yes(expr.clone().gt_eq(low).and(expr.lt_eq(high))) + expr.clone().gt_eq(low).and(expr.lt_eq(high)) } } - _ => Transformed::No(expr), + _ => expr, }) }) } diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index 5f11c8cc1d11..2ee246253c26 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -113,7 +113,7 @@ pub trait TreeNode: Sized + Clone { /// The default tree traversal direction is transform_up(Postorder Traversal). fn transform(self, op: &F) -> Result where - F: Fn(Self) -> Result>, + F: Fn(Self) -> Result, { self.transform_up(op) } @@ -123,9 +123,9 @@ pub trait TreeNode: Sized + Clone { /// When the `op` does not apply to a given node, it is left unchanged. fn transform_down(self, op: &F) -> Result where - F: Fn(Self) -> Result>, + F: Fn(Self) -> Result, { - let after_op = op(self)?.into(); + let after_op = op(self)?; after_op.map_children(|node| node.transform_down(op)) } @@ -134,9 +134,9 @@ pub trait TreeNode: Sized + Clone { /// When the `op` does not apply to a given node, it is left unchanged. fn transform_down_mut(self, op: &mut F) -> Result where - F: FnMut(Self) -> Result>, + F: FnMut(Self) -> Result, { - let after_op = op(self)?.into(); + let after_op = op(self)?; after_op.map_children(|node| node.transform_down_mut(op)) } @@ -145,11 +145,11 @@ pub trait TreeNode: Sized + Clone { /// When the `op` does not apply to a given node, it is left unchanged. fn transform_up(self, op: &F) -> Result where - F: Fn(Self) -> Result>, + F: Fn(Self) -> Result, { let after_op_children = self.map_children(|node| node.transform_up(op))?; - let new_node = op(after_op_children)?.into(); + let new_node = op(after_op_children)?; Ok(new_node) } @@ -158,11 +158,11 @@ pub trait TreeNode: Sized + Clone { /// When the `op` does not apply to a given node, it is left unchanged. fn transform_up_mut(self, op: &mut F) -> Result where - F: FnMut(Self) -> Result>, + F: FnMut(Self) -> Result, { let after_op_children = self.map_children(|node| node.transform_up_mut(op))?; - let new_node = op(after_op_children)?.into(); + let new_node = op(after_op_children)?; Ok(new_node) } @@ -314,29 +314,6 @@ pub enum VisitRecursion { Stop, } -pub enum Transformed { - /// The item was transformed / rewritten somehow - Yes(T), - /// The item was not transformed - No(T), -} - -impl Transformed { - pub fn into(self) -> T { - match self { - Transformed::Yes(t) => t, - Transformed::No(t) => t, - } - } - - pub fn into_pair(self) -> (T, bool) { - match self { - Transformed::Yes(t) => (t, true), - Transformed::No(t) => (t, false), - } - } -} - /// Helper trait for implementing [`TreeNode`] that have children stored as Arc's /// /// If some trait object, such as `dyn T`, implements this trait, diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index 7b66ca529094..e34a78264b85 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -27,7 +27,7 @@ use crate::{ repartition::RepartitionExec, Partitioning, }, }; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::TreeNode; use std::sync::Arc; /// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that @@ -71,12 +71,9 @@ impl PhysicalOptimizerRule for CoalesceBatches { }) .unwrap_or(false); if wrap_in_coalesce { - Ok(Transformed::Yes(Arc::new(CoalesceBatchesExec::new( - plan, - target_batch_size, - )))) + Ok(Arc::new(CoalesceBatchesExec::new(plan, target_batch_size))) } else { - Ok(Transformed::No(plan)) + Ok(plan) } }) } diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index 61eb2381c63b..077649afb5e6 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -26,7 +26,7 @@ use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGro use crate::physical_plan::ExecutionPlan; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::TreeNode; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{AggregateExpr, PhysicalExpr}; @@ -109,9 +109,9 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { }); Ok(if let Some(transformed) = transformed { - Transformed::Yes(transformed) + transformed } else { - Transformed::No(plan) + plan }) }) } @@ -185,9 +185,9 @@ fn discard_column_index(group_expr: Arc) -> Arc None, }; Ok(if let Some(normalized_form) = normalized_form { - Transformed::Yes(normalized_form) + normalized_form } else { - Transformed::No(expr) + expr }) }) .unwrap_or(group_expr) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index bf3f9ef0f3e6..181e1cbb910f 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -48,7 +48,7 @@ use crate::physical_plan::{ }; use arrow::compute::SortOptions; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::TreeNode; use datafusion_expr::logical_plan::JoinType; use datafusion_physical_expr::expressions::{Column, NoOp}; use datafusion_physical_expr::utils::map_columns_before_projection; @@ -205,9 +205,7 @@ impl PhysicalOptimizerRule for EnforceDistribution { adjusted.plan } else { // Run a bottom-up process - plan.transform_up(&|plan| { - Ok(Transformed::Yes(reorder_join_keys_to_inputs(plan)?)) - })? + plan.transform_up(&reorder_join_keys_to_inputs)? }; let distribution_context = DistributionContext::new(adjusted); @@ -272,7 +270,7 @@ impl PhysicalOptimizerRule for EnforceDistribution { /// fn adjust_input_keys_ordering( mut requirements: PlanWithKeyRequirements, -) -> Result> { +) -> Result { let parent_required = requirements.required_key_ordering.clone(); let plan_any = requirements.plan.as_any(); @@ -309,7 +307,6 @@ fn adjust_input_keys_ordering( vec![], &join_constructor, ) - .map(Transformed::Yes) } PartitionMode::CollectLeft => { let new_right_request = match join_type { @@ -329,13 +326,11 @@ fn adjust_input_keys_ordering( // Push down requirements to the right side requirements.children[1].required_key_ordering = new_right_request.unwrap_or(vec![]); - Ok(Transformed::Yes(requirements)) + Ok(requirements) } PartitionMode::Auto => { // Can not satisfy, clear the current requirements and generate new empty requirements - Ok(Transformed::Yes(PlanWithKeyRequirements::new( - requirements.plan, - ))) + Ok(PlanWithKeyRequirements::new(requirements.plan)) } } } else if let Some(CrossJoinExec { left, .. }) = @@ -345,7 +340,7 @@ fn adjust_input_keys_ordering( // Push down requirements to the right side requirements.children[1].required_key_ordering = shift_right_required(&parent_required, left_columns_len).unwrap_or_default(); - Ok(Transformed::Yes(requirements)) + Ok(requirements) } else if let Some(SortMergeJoinExec { left, right, @@ -375,7 +370,6 @@ fn adjust_input_keys_ordering( sort_options.clone(), &join_constructor, ) - .map(Transformed::Yes) } else if let Some(aggregate_exec) = plan_any.downcast_ref::() { if !parent_required.is_empty() { match aggregate_exec.mode() { @@ -383,15 +377,12 @@ fn adjust_input_keys_ordering( requirements.plan.clone(), &parent_required, aggregate_exec, - ) - .map(Transformed::Yes), - _ => Ok(Transformed::Yes(PlanWithKeyRequirements::new( - requirements.plan, - ))), + ), + _ => Ok(PlanWithKeyRequirements::new(requirements.plan)), } } else { // Keep everything unchanged - Ok(Transformed::No(requirements)) + Ok(requirements) } } else if let Some(proj) = plan_any.downcast_ref::() { let expr = proj.expr(); @@ -401,26 +392,22 @@ fn adjust_input_keys_ordering( let new_required = map_columns_before_projection(&parent_required, expr); if new_required.len() == parent_required.len() { requirements.children[0].required_key_ordering = new_required; - Ok(Transformed::Yes(requirements)) + Ok(requirements) } else { // Can not satisfy, clear the current requirements and generate new empty requirements - Ok(Transformed::Yes(PlanWithKeyRequirements::new( - requirements.plan, - ))) + Ok(PlanWithKeyRequirements::new(requirements.plan)) } } else if plan_any.downcast_ref::().is_some() || plan_any.downcast_ref::().is_some() || plan_any.downcast_ref::().is_some() { - Ok(Transformed::Yes(PlanWithKeyRequirements::new( - requirements.plan, - ))) + Ok(PlanWithKeyRequirements::new(requirements.plan)) } else { // By default, push down the parent requirements to children requirements.children.iter_mut().for_each(|child| { child.required_key_ordering = parent_required.clone(); }); - Ok(Transformed::Yes(requirements)) + Ok(requirements) } } @@ -1139,11 +1126,11 @@ fn add_sort_preserving_partitions( fn ensure_distribution( dist_context: DistributionContext, config: &ConfigOptions, -) -> Result> { +) -> Result { let dist_context = dist_context.update_children()?; if dist_context.plan.children().is_empty() { - return Ok(Transformed::No(dist_context)); + return Ok(dist_context); } let target_partitions = config.execution.target_partitions; @@ -1333,7 +1320,7 @@ fn ensure_distribution( children_nodes, }; - Ok(Transformed::Yes(new_distribution_context)) + Ok(new_distribution_context) } /// A struct to keep track of distribution changing operators @@ -1395,7 +1382,7 @@ impl DistributionContext { .collect::>(); Ok(Self { - plan: with_new_children_if_necessary(self.plan, children_plans)?.into(), + plan: with_new_children_if_necessary(self.plan, children_plans)?, distribution_connection: false, children_nodes: self.children_nodes, }) @@ -1420,8 +1407,7 @@ impl TreeNode for DistributionContext { self.plan = with_new_children_if_necessary( self.plan, self.children_nodes.iter().map(|c| c.plan.clone()).collect(), - )? - .into(); + )?; } Ok(self) } @@ -1484,8 +1470,7 @@ impl TreeNode for PlanWithKeyRequirements { self.plan = with_new_children_if_necessary( self.plan, self.children.iter().map(|c| c.plan.clone()).collect(), - )? - .into(); + )?; } Ok(self) } @@ -1912,7 +1897,7 @@ pub(crate) mod tests { config.optimizer.repartition_file_scans = false; config.optimizer.repartition_file_min_size = 1024; config.optimizer.prefer_existing_sort = prefer_existing_sort; - ensure_distribution(distribution_context, &config).map(|item| item.into().plan) + ensure_distribution(distribution_context, &config).map(|item| item.plan) } /// Test whether plan matches with expected plan diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index f609ddea66cf..9f3ea182203f 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -58,7 +58,7 @@ use crate::physical_plan::{ with_new_children_if_necessary, Distribution, ExecutionPlan, InputOrderMode, }; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::TreeNode; use datafusion_common::{plan_err, DataFusionError}; use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement}; use datafusion_physical_plan::repartition::RepartitionExec; @@ -135,7 +135,7 @@ impl PlanWithCorrespondingSort { .iter() .map(|item| item.plan.clone()) .collect::>(); - let plan = with_new_children_if_necessary(parent_plan, children_plans)?.into(); + let plan = with_new_children_if_necessary(parent_plan, children_plans)?; Ok(Self { plan, @@ -163,8 +163,7 @@ impl TreeNode for PlanWithCorrespondingSort { self.plan = with_new_children_if_necessary( self.plan, self.children_nodes.iter().map(|c| c.plan.clone()).collect(), - )? - .into(); + )?; } Ok(self) } @@ -221,7 +220,7 @@ impl PlanWithCorrespondingCoalescePartitions { .iter() .map(|item| item.plan.clone()) .collect(); - self.plan = with_new_children_if_necessary(self.plan, children_plans)?.into(); + self.plan = with_new_children_if_necessary(self.plan, children_plans)?; Ok(self) } } @@ -244,8 +243,7 @@ impl TreeNode for PlanWithCorrespondingCoalescePartitions { self.plan = with_new_children_if_necessary( self.plan, self.children_nodes.iter().map(|c| c.plan.clone()).collect(), - )? - .into(); + )?; } Ok(self) } @@ -319,7 +317,7 @@ impl PhysicalOptimizerRule for EnforceSorting { /// By performing sorting in parallel, we can increase performance in some scenarios. fn parallelize_sorts( requirements: PlanWithCorrespondingCoalescePartitions, -) -> Result> { +) -> Result { let PlanWithCorrespondingCoalescePartitions { mut plan, coalesce_connection, @@ -331,11 +329,11 @@ fn parallelize_sorts( // SortPreservingMergeExec or a CoalescePartitionsExec, and they // all have a single child. Therefore, if the first child is `None`, // we can return immediately. - return Ok(Transformed::No(PlanWithCorrespondingCoalescePartitions { + return Ok(PlanWithCorrespondingCoalescePartitions { plan, coalesce_connection, children_nodes, - })); + }); } else if (is_sort(&plan) || is_sort_preserving_merge(&plan)) && plan.output_partitioning().partition_count() <= 1 { @@ -351,31 +349,27 @@ fn parallelize_sorts( add_sort_above(&mut plan, &sort_reqs, fetch); let spm = SortPreservingMergeExec::new(sort_exprs, plan).with_fetch(fetch); - return Ok(Transformed::Yes( - PlanWithCorrespondingCoalescePartitions::new(Arc::new(spm)), - )); + return Ok(PlanWithCorrespondingCoalescePartitions::new(Arc::new(spm))); } else if is_coalesce_partitions(&plan) { // There is an unnecessary `CoalescePartitionsExec` in the plan. update_child_to_remove_coalesce(&mut plan, &mut children_nodes[0])?; let new_plan = Arc::new(CoalescePartitionsExec::new(plan)) as _; - return Ok(Transformed::Yes( - PlanWithCorrespondingCoalescePartitions::new(new_plan), - )); + return Ok(PlanWithCorrespondingCoalescePartitions::new(new_plan)); } - Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions { + Ok(PlanWithCorrespondingCoalescePartitions { plan, coalesce_connection, children_nodes, - })) + }) } /// This function enforces sorting requirements and makes optimizations without /// violating these requirements whenever possible. fn ensure_sorting( requirements: PlanWithCorrespondingSort, -) -> Result> { +) -> Result { let requirements = PlanWithCorrespondingSort::update_children( requirements.plan, requirements.children_nodes, @@ -383,10 +377,10 @@ fn ensure_sorting( // Perform naive analysis at the beginning -- remove already-satisfied sorts: if requirements.plan.children().is_empty() { - return Ok(Transformed::No(requirements)); + return Ok(requirements); } if let Some(result) = analyze_immediate_sort_removal(&requirements) { - return Ok(Transformed::Yes(result)); + return Ok(result); } let plan = requirements.plan; @@ -441,7 +435,7 @@ fn ensure_sorting( if is_window(&plan) && children_nodes[0].sort_connection { if let Some(result) = analyze_window_sort_removal(&mut children_nodes[0], &plan)? { - return Ok(Transformed::Yes(result)); + return Ok(result); } } else if is_sort_preserving_merge(&plan) && children_nodes[0] @@ -453,11 +447,9 @@ fn ensure_sorting( // This SortPreservingMergeExec is unnecessary, input already has a // single partition. let child_node = children_nodes.swap_remove(0); - return Ok(Transformed::Yes(child_node)); + return Ok(child_node); } - Ok(Transformed::Yes( - PlanWithCorrespondingSort::update_children(plan, children_nodes)?, - )) + PlanWithCorrespondingSort::update_children(plan, children_nodes) } /// Analyzes a given [`SortExec`] (`plan`) to determine whether its input diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index ba66dca55b35..a8ee2aab6f65 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -38,7 +38,7 @@ use crate::physical_plan::projection::ProjectionExec; use crate::physical_plan::ExecutionPlan; use arrow_schema::Schema; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::TreeNode; use datafusion_common::{internal_err, JoinSide}; use datafusion_common::{DataFusionError, JoinType}; use datafusion_physical_expr::expressions::Column; @@ -367,7 +367,7 @@ fn partitioned_hash_join(hash_join: &HashJoinExec) -> Result, collect_left_threshold: usize, -) -> Result>> { +) -> Result> { let transformed = if let Some(hash_join) = plan.as_any().downcast_ref::() { @@ -414,9 +414,9 @@ fn statistical_join_selection_subrule( }; Ok(if let Some(transformed) = transformed { - Transformed::Yes(transformed) + transformed } else { - Transformed::No(plan) + plan }) } @@ -645,7 +645,7 @@ fn apply_subrules( mut input: PipelineStatePropagator, subrules: &Vec>, config_options: &ConfigOptions, -) -> Result> { +) -> Result { for subrule in subrules { if let Some(value) = subrule(input.clone(), config_options).transpose()? { input = value; @@ -662,7 +662,7 @@ fn apply_subrules( // catch this and raise an error anyway. .unwrap_or(true); input.unbounded = is_unbounded; - Ok(Transformed::Yes(input)) + Ok(input) } #[cfg(test)] diff --git a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs index 9855247151b8..e44ce05614ab 100644 --- a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs +++ b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs @@ -23,7 +23,7 @@ use crate::physical_plan::aggregates::AggregateExec; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::ExecutionPlan; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::TreeNode; use datafusion_common::Result; use itertools::Itertools; use std::sync::Arc; @@ -106,7 +106,7 @@ impl LimitedDistinctAggregation { let mut rewrite_applicable = true; let mut closure = |plan: Arc| { if !rewrite_applicable { - return Ok(Transformed::No(plan)); + return Ok(plan); } if let Some(aggr) = plan.as_any().downcast_ref::() { if found_match_aggr { @@ -117,7 +117,7 @@ impl LimitedDistinctAggregation { // a partial and final aggregation with different groupings disqualifies // rewriting the child aggregation rewrite_applicable = false; - return Ok(Transformed::No(plan)); + return Ok(plan); } } } @@ -128,12 +128,12 @@ impl LimitedDistinctAggregation { Some(new_aggr) => { match_aggr = plan; found_match_aggr = true; - return Ok(Transformed::Yes(new_aggr)); + return Ok(new_aggr); } } } rewrite_applicable = false; - Ok(Transformed::No(plan)) + Ok(plan) }; let child = child.clone().transform_down_mut(&mut closure).ok()?; if is_global_limit { @@ -165,9 +165,9 @@ impl PhysicalOptimizerRule for LimitedDistinctAggregation { if let Some(plan) = LimitedDistinctAggregation::transform_limit(plan.clone()) { - Transformed::Yes(plan) + plan } else { - Transformed::No(plan) + plan }, ) })? diff --git a/datafusion/core/src/physical_optimizer/output_requirements.rs b/datafusion/core/src/physical_optimizer/output_requirements.rs index 4d03840d3dd3..8134e300a75d 100644 --- a/datafusion/core/src/physical_optimizer/output_requirements.rs +++ b/datafusion/core/src/physical_optimizer/output_requirements.rs @@ -30,7 +30,7 @@ use crate::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use arrow_schema::SchemaRef; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::TreeNode; use datafusion_common::{Result, Statistics}; use datafusion_physical_expr::{ Distribution, LexRequirement, PhysicalSortExpr, PhysicalSortRequirement, @@ -200,9 +200,9 @@ impl PhysicalOptimizerRule for OutputRequirements { if let Some(sort_req) = plan.as_any().downcast_ref::() { - Ok(Transformed::Yes(sort_req.input())) + Ok(sort_req.input()) } else { - Ok(Transformed::No(plan)) + Ok(plan) } }), } diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs index e281d0e7c23e..6b10faadf96e 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs @@ -28,7 +28,7 @@ use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; use datafusion_common::config::OptimizerOptions; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::TreeNode; use datafusion_common::{plan_err, DataFusionError}; use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; use datafusion_physical_plan::joins::SymmetricHashJoinExec; @@ -109,8 +109,7 @@ impl TreeNode for PipelineStatePropagator { self.plan = with_new_children_if_necessary( self.plan, self.children.iter().map(|c| c.plan.clone()).collect(), - )? - .into(); + )?; } Ok(self) } @@ -121,7 +120,7 @@ impl TreeNode for PipelineStatePropagator { pub fn check_finiteness_requirements( mut input: PipelineStatePropagator, optimizer_options: &OptimizerOptions, -) -> Result> { +) -> Result { if let Some(exec) = input.plan.as_any().downcast_ref::() { if !(optimizer_options.allow_symmetric_joins_without_pruning || (exec.check_if_order_information_available()? && is_prunable(exec))) @@ -136,7 +135,7 @@ pub fn check_finiteness_requirements( .unbounded_output(&input.children_unbounded()) .map(|value| { input.unbounded = value; - Transformed::Yes(input) + input }) } diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 34d1af85565a..c309c0165f79 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -43,7 +43,7 @@ use crate::physical_plan::{Distribution, ExecutionPlan}; use arrow_schema::SchemaRef; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; +use datafusion_common::tree_node::{TreeNode, VisitRecursion}; use datafusion_common::JoinSide; use datafusion_physical_expr::expressions::{Column, Literal}; use datafusion_physical_expr::{ @@ -90,14 +90,14 @@ impl PhysicalOptimizerRule for ProjectionPushdown { /// by leveraging source providers with built-in projection capabilities. pub fn remove_unnecessary_projections( plan: Arc, -) -> Result>> { +) -> Result> { let maybe_modified = if let Some(projection) = plan.as_any().downcast_ref::() { // If the projection does not cause any change on the input, we can // safely remove it: if is_projection_removable(projection) { - return Ok(Transformed::Yes(projection.input().clone())); + return Ok(projection.input().clone()); } // If it does, check if we can push it under its child(ren): let input = projection.input().as_any(); @@ -111,7 +111,7 @@ pub fn remove_unnecessary_projections( // To unify 3 or more sequential projections: remove_unnecessary_projections(new_plan) } else { - Ok(Transformed::No(plan)) + Ok(plan) }; } else if let Some(output_req) = input.downcast_ref::() { try_swapping_with_output_req(projection, output_req)? @@ -147,10 +147,10 @@ pub fn remove_unnecessary_projections( None } } else { - return Ok(Transformed::No(plan)); + return Ok(plan); }; - Ok(maybe_modified.map_or(Transformed::No(plan), Transformed::Yes)) + Ok(maybe_modified.unwrap_or(plan)) } /// Tries to embed `projection` to its input (`csv`). If possible, returns @@ -885,21 +885,21 @@ fn update_expr( .clone() .transform_up_mut(&mut |expr: Arc| { if state == RewriteState::RewrittenInvalid { - return Ok(Transformed::No(expr)); + return Ok(expr); } let Some(column) = expr.as_any().downcast_ref::() else { - return Ok(Transformed::No(expr)); + return Ok(expr); }; if sync_with_child { state = RewriteState::RewrittenValid; // Update the index of `column`: - Ok(Transformed::Yes(projected_exprs[column.index()].0.clone())) + Ok(projected_exprs[column.index()].0.clone()) } else { // default to invalid, in case we can't find the relevant column state = RewriteState::RewrittenInvalid; // Determine how to update `column` to accommodate `projected_exprs` - projected_exprs + Ok(projected_exprs .iter() .enumerate() .find_map(|(index, (projected_expr, alias))| { @@ -912,10 +912,7 @@ fn update_expr( }, ) }) - .map_or_else( - || Ok(Transformed::No(expr)), - |c| Ok(Transformed::Yes(c)), - ) + .unwrap_or(expr)) } }); diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index b68dbabc0437..cb36b003f485 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -36,10 +36,7 @@ use arrow::{ record_batch::RecordBatch, }; use arrow_array::cast::AsArray; -use datafusion_common::{ - internal_err, plan_err, - tree_node::{Transformed, TreeNode}, -}; +use datafusion_common::{internal_err, plan_err, tree_node::TreeNode}; use datafusion_common::{plan_datafusion_err, ScalarValue}; use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee}; use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef}; @@ -840,11 +837,11 @@ fn rewrite_column_expr( e.transform(&|expr| { if let Some(column) = expr.as_any().downcast_ref::() { if column == column_old { - return Ok(Transformed::Yes(Arc::new(column_new.clone()))); + return Ok(Arc::new(column_new.clone())); } } - Ok(Transformed::No(expr)) + Ok(expr) }) } diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index e49b358608aa..ec55ba157a14 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -30,7 +30,7 @@ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::TreeNode; use datafusion_physical_plan::unbounded_output; /// For a given `plan`, this object carries the information one needs from its @@ -97,8 +97,7 @@ impl OrderPreservationContext { self.plan = with_new_children_if_necessary( self.plan, self.children_nodes.iter().map(|c| c.plan.clone()).collect(), - )? - .into(); + )?; self.ordering_connection = false; Ok(self) } @@ -122,8 +121,7 @@ impl TreeNode for OrderPreservationContext { self.plan = with_new_children_if_necessary( self.plan, self.children_nodes.iter().map(|c| c.plan.clone()).collect(), - )? - .into(); + )?; } Ok(self) } @@ -235,12 +233,12 @@ pub(crate) fn replace_with_order_preserving_variants( // should only be made to fix the pipeline (streaming). is_spm_better: bool, config: &ConfigOptions, -) -> Result> { +) -> Result { let mut requirements = requirements.update_children()?; if !(is_sort(&requirements.plan) && requirements.children_nodes[0].ordering_connection) { - return Ok(Transformed::No(requirements)); + return Ok(requirements); } // For unbounded cases, replace with the order-preserving variant in @@ -264,12 +262,12 @@ pub(crate) fn replace_with_order_preserving_variants( for child in updated_sort_input.children_nodes.iter_mut() { child.ordering_connection = false; } - Ok(Transformed::Yes(updated_sort_input)) + Ok(updated_sort_input) } else { for child in requirements.children_nodes.iter_mut() { child.ordering_connection = false; } - Ok(Transformed::Yes(requirements)) + Ok(requirements) } } diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index f0a8c8cfd3cb..801384239c95 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -29,7 +29,7 @@ use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::TreeNode; use datafusion_common::{plan_err, DataFusionError, JoinSide, Result}; use datafusion_expr::JoinType; use datafusion_physical_expr::expressions::Column; @@ -89,16 +89,13 @@ impl TreeNode for SortPushDown { self.plan = with_new_children_if_necessary( self.plan, self.children_nodes.iter().map(|c| c.plan.clone()).collect(), - )? - .into(); + )?; } Ok(self) } } -pub(crate) fn pushdown_sorts( - mut requirements: SortPushDown, -) -> Result> { +pub(crate) fn pushdown_sorts(mut requirements: SortPushDown) -> Result { let plan = &requirements.plan; let parent_required = requirements.required_ordering.as_deref().unwrap_or(&[]); @@ -128,12 +125,12 @@ pub(crate) fn pushdown_sorts( } // Can push down requirements child.required_ordering = None; - Ok(Transformed::Yes(child)) + Ok(child) } else { // Can not push down requirements let mut empty_node = SortPushDown::new(requirements.plan); empty_node.assign_initial_requirements(); - Ok(Transformed::Yes(empty_node)) + Ok(empty_node) } } else { // Executors other than SortExec @@ -146,7 +143,7 @@ pub(crate) fn pushdown_sorts( for (child, order) in requirements.children_nodes.iter_mut().zip(reqs) { child.required_ordering = order; } - return Ok(Transformed::Yes(requirements)); + return Ok(requirements); } // Can not satisfy the parent requirements, check whether the requirements can be pushed down: if let Some(adjusted) = pushdown_requirement_to_children(plan, parent_required)? { @@ -154,7 +151,7 @@ pub(crate) fn pushdown_sorts( c.required_ordering = o; } requirements.required_ordering = None; - Ok(Transformed::Yes(requirements)) + Ok(requirements) } else { // Can not push down requirements, add new SortExec: let mut new_plan = requirements.plan; @@ -162,7 +159,7 @@ pub(crate) fn pushdown_sorts( let mut new_empty = SortPushDown::new(new_plan); new_empty.assign_initial_requirements(); // Can not push down requirements - Ok(Transformed::Yes(new_empty)) + Ok(new_empty) } } } diff --git a/datafusion/core/src/physical_optimizer/topk_aggregation.rs b/datafusion/core/src/physical_optimizer/topk_aggregation.rs index dd0261420304..c01dcf4c743e 100644 --- a/datafusion/core/src/physical_optimizer/topk_aggregation.rs +++ b/datafusion/core/src/physical_optimizer/topk_aggregation.rs @@ -26,7 +26,7 @@ use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::ExecutionPlan; use arrow_schema::DataType; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::TreeNode; use datafusion_common::Result; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::PhysicalSortExpr; @@ -101,13 +101,13 @@ impl TopKAggregation { let mut cardinality_preserved = true; let mut closure = |plan: Arc| { if !cardinality_preserved { - return Ok(Transformed::No(plan)); + return Ok(plan); } if let Some(aggr) = plan.as_any().downcast_ref::() { // either we run into an Aggregate and transform it match Self::transform_agg(aggr, order, limit) { None => cardinality_preserved = false, - Some(plan) => return Ok(Transformed::Yes(plan)), + Some(plan) => return Ok(plan), } } else { // or we continue down whitelisted nodes of other types @@ -115,7 +115,7 @@ impl TopKAggregation { cardinality_preserved = false; } } - Ok(Transformed::No(plan)) + Ok(plan) }; let child = child.clone().transform_down_mut(&mut closure).ok()?; let sort = SortExec::new(sort.expr().to_vec(), child) @@ -141,9 +141,9 @@ impl PhysicalOptimizerRule for TopKAggregation { plan.transform_down(&|plan| { Ok( if let Some(plan) = TopKAggregation::transform_sort(plan.clone()) { - Transformed::Yes(plan) + plan } else { - Transformed::No(plan) + plan }, ) })? diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 5617d217eb9f..e64393c147be 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -27,7 +27,7 @@ use crate::{aggregate_function, ExprSchemable}; use crate::{built_in_function, BuiltinScalarFunction}; use crate::{built_in_window_function, udaf}; use arrow::datatypes::DataType; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::TreeNode; use datafusion_common::{internal_err, DFSchema, OwnedTableReference}; use datafusion_common::{plan_err, Column, DataFusionError, Result, ScalarValue}; use std::collections::HashSet; @@ -1263,7 +1263,7 @@ impl Expr { rewrite_placeholder(low.as_mut(), expr.as_ref(), schema)?; rewrite_placeholder(high.as_mut(), expr.as_ref(), schema)?; } - Ok(Transformed::Yes(expr)) + Ok(expr) }) } } diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index 1f04c80833f0..0e473af672f2 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -20,7 +20,7 @@ use crate::expr::Alias; use crate::logical_plan::Projection; use crate::{Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder}; -use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; +use datafusion_common::tree_node::{TreeNode, TreeNodeRewriter}; use datafusion_common::Result; use datafusion_common::{Column, DFSchema}; use std::collections::HashMap; @@ -37,9 +37,9 @@ pub fn normalize_col(expr: Expr, plan: &LogicalPlan) -> Result { Ok({ if let Expr::Column(c) = expr { let col = LogicalPlanBuilder::normalize(plan, c)?; - Transformed::Yes(Expr::Column(col)) + Expr::Column(col) } else { - Transformed::No(expr) + expr } }) }) @@ -61,9 +61,9 @@ pub fn normalize_col_with_schemas( Ok({ if let Expr::Column(c) = expr { let col = c.normalize_with_schemas(schemas, using_columns)?; - Transformed::Yes(Expr::Column(col)) + Expr::Column(col) } else { - Transformed::No(expr) + expr } }) }) @@ -80,9 +80,9 @@ pub fn normalize_col_with_schemas_and_ambiguity_check( if let Expr::Column(c) = expr { let col = c.normalize_with_schemas_and_ambiguity_check(schemas, using_columns)?; - Transformed::Yes(Expr::Column(col)) + Expr::Column(col) } else { - Transformed::No(expr) + expr } }) }) @@ -106,11 +106,11 @@ pub fn replace_col(expr: Expr, replace_map: &HashMap<&Column, &Column>) -> Resul Ok({ if let Expr::Column(c) = &expr { match replace_map.get(c) { - Some(new_c) => Transformed::Yes(Expr::Column((*new_c).to_owned())), - None => Transformed::No(expr), + Some(new_c) => Expr::Column((*new_c).to_owned()), + None => expr, } } else { - Transformed::No(expr) + expr } }) }) @@ -129,9 +129,9 @@ pub fn unnormalize_col(expr: Expr) -> Expr { relation: None, name: c.name, }; - Transformed::Yes(Expr::Column(col)) + Expr::Column(col) } else { - Transformed::No(expr) + expr } }) }) @@ -167,9 +167,9 @@ pub fn strip_outer_reference(expr: Expr) -> Expr { expr.transform(&|expr| { Ok({ if let Expr::OuterReferenceColumn(_, col) = expr { - Transformed::Yes(Expr::Column(col)) + Expr::Column(col) } else { - Transformed::No(expr) + expr } }) }) @@ -289,7 +289,7 @@ mod test { #[test] fn rewriter_rewrite() { // rewrites all "foo" string literals to "bar" - let transformer = |expr: Expr| -> Result> { + let transformer = |expr: Expr| -> Result { match expr { Expr::Literal(ScalarValue::Utf8(Some(utf8_val))) => { let utf8_val = if utf8_val == "foo" { @@ -297,10 +297,10 @@ mod test { } else { utf8_val }; - Ok(Transformed::Yes(lit(utf8_val))) + Ok(lit(utf8_val)) } // otherwise, return None - _ => Ok(Transformed::No(expr)), + _ => Ok(expr), } }; diff --git a/datafusion/expr/src/expr_rewriter/order_by.rs b/datafusion/expr/src/expr_rewriter/order_by.rs index c87a724d5646..f7688abd23a3 100644 --- a/datafusion/expr/src/expr_rewriter/order_by.rs +++ b/datafusion/expr/src/expr_rewriter/order_by.rs @@ -20,7 +20,7 @@ use crate::expr::{Alias, Sort}; use crate::expr_rewriter::normalize_col; use crate::{Cast, Expr, ExprSchemable, LogicalPlan, TryCast}; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::TreeNode; use datafusion_common::{Column, Result}; /// Rewrite sort on aggregate expressions to sort on the column of aggregate output @@ -91,7 +91,7 @@ fn rewrite_in_terms_of_projection( .to_field(input.schema()) .map(|f| f.qualified_column())?, ); - return Ok(Transformed::Yes(col)); + return Ok(col); } // if that doesn't work, try to match the expression as an @@ -103,7 +103,7 @@ fn rewrite_in_terms_of_projection( e } else { // The expr is not based on Aggregate plan output. Skip it. - return Ok(Transformed::No(expr)); + return Ok(expr); }; // expr is an actual expr like min(t.c2), but we are looking @@ -118,7 +118,7 @@ fn rewrite_in_terms_of_projection( // look for the column named the same as this expr if let Some(found) = proj_exprs.iter().find(|a| expr_match(&search_col, a)) { let found = found.clone(); - return Ok(Transformed::Yes(match normalized_expr { + return Ok(match normalized_expr { Expr::Cast(Cast { expr: _, data_type }) => Expr::Cast(Cast { expr: Box::new(found), data_type, @@ -128,10 +128,10 @@ fn rewrite_in_terms_of_projection( data_type, }), _ => found, - })); + }); } - Ok(Transformed::No(expr)) + Ok(expr) }) } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 93a38fb40df5..f247ded8bc11 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -45,8 +45,7 @@ use crate::{ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::tree_node::{ - RewriteRecursion, Transformed, TreeNode, TreeNodeRewriter, TreeNodeVisitor, - VisitRecursion, + RewriteRecursion, TreeNode, TreeNodeRewriter, TreeNodeVisitor, VisitRecursion, }; use datafusion_common::{ aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints, @@ -1219,17 +1218,17 @@ impl LogicalPlan { let value = param_values .get_placeholders_with_values(id, data_type.as_ref())?; // Replace the placeholder with the value - Ok(Transformed::Yes(Expr::Literal(value))) + Ok(Expr::Literal(value)) } Expr::ScalarSubquery(qry) => { let subquery = Arc::new(qry.subquery.replace_params_with_values(param_values)?); - Ok(Transformed::Yes(Expr::ScalarSubquery(Subquery { + Ok(Expr::ScalarSubquery(Subquery { subquery, outer_ref_columns: qry.outer_ref_columns.clone(), - }))) + })) } - _ => Ok(Transformed::No(expr)), + _ => Ok(expr), } }) } @@ -3245,9 +3244,9 @@ digraph { Arc::new(LogicalPlan::TableScan(table)), ) .unwrap(); - Ok(Transformed::Yes(LogicalPlan::Filter(filter))) + Ok(LogicalPlan::Filter(filter)) } - x => Ok(Transformed::No(x)), + x => Ok(x), }) .unwrap(); diff --git a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs index 953716713e41..4e8696df5140 100644 --- a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs +++ b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs @@ -17,7 +17,7 @@ use crate::analyzer::AnalyzerRule; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; +use datafusion_common::tree_node::{TreeNode, TreeNodeRewriter}; use datafusion_common::Result; use datafusion_expr::expr::{AggregateFunction, AggregateFunctionDefinition, InSubquery}; use datafusion_expr::expr_rewriter::rewrite_preserving_name; @@ -51,7 +51,7 @@ impl AnalyzerRule for CountWildcardRule { } } -fn analyze_internal(plan: LogicalPlan) -> Result> { +fn analyze_internal(plan: LogicalPlan) -> Result { let mut rewriter = CountWildcardRewriter {}; match plan { LogicalPlan::Window(window) => { @@ -61,11 +61,9 @@ fn analyze_internal(plan: LogicalPlan) -> Result> { .map(|expr| rewrite_preserving_name(expr.clone(), &mut rewriter)) .collect::>>()?; - Ok(Transformed::Yes( - LogicalPlanBuilder::from((*window.input).clone()) - .window(window_expr)? - .build()?, - )) + Ok(LogicalPlanBuilder::from((*window.input).clone()) + .window(window_expr)? + .build()?) } LogicalPlan::Aggregate(agg) => { let aggr_expr = agg @@ -74,20 +72,22 @@ fn analyze_internal(plan: LogicalPlan) -> Result> { .map(|expr| rewrite_preserving_name(expr.clone(), &mut rewriter)) .collect::>>()?; - Ok(Transformed::Yes(LogicalPlan::Aggregate( - Aggregate::try_new(agg.input.clone(), agg.group_expr, aggr_expr)?, - ))) + Ok(LogicalPlan::Aggregate(Aggregate::try_new( + agg.input.clone(), + agg.group_expr, + aggr_expr, + )?)) } LogicalPlan::Sort(Sort { expr, input, fetch }) => { let sort_expr = expr .iter() .map(|expr| rewrite_preserving_name(expr.clone(), &mut rewriter)) .collect::>>()?; - Ok(Transformed::Yes(LogicalPlan::Sort(Sort { + Ok(LogicalPlan::Sort(Sort { expr: sort_expr, input, fetch, - }))) + })) } LogicalPlan::Projection(projection) => { let projection_expr = projection @@ -95,20 +95,19 @@ fn analyze_internal(plan: LogicalPlan) -> Result> { .iter() .map(|expr| rewrite_preserving_name(expr.clone(), &mut rewriter)) .collect::>>()?; - Ok(Transformed::Yes(LogicalPlan::Projection( - Projection::try_new(projection_expr, projection.input)?, - ))) + Ok(LogicalPlan::Projection(Projection::try_new( + projection_expr, + projection.input, + )?)) } LogicalPlan::Filter(Filter { predicate, input, .. }) => { let predicate = rewrite_preserving_name(predicate, &mut rewriter)?; - Ok(Transformed::Yes(LogicalPlan::Filter(Filter::try_new( - predicate, input, - )?))) + Ok(LogicalPlan::Filter(Filter::try_new(predicate, input)?)) } - _ => Ok(Transformed::No(plan)), + _ => Ok(plan), } } diff --git a/datafusion/optimizer/src/analyzer/inline_table_scan.rs b/datafusion/optimizer/src/analyzer/inline_table_scan.rs index 90af7aec8293..3a4f51c97374 100644 --- a/datafusion/optimizer/src/analyzer/inline_table_scan.rs +++ b/datafusion/optimizer/src/analyzer/inline_table_scan.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use crate::analyzer::AnalyzerRule; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::TreeNode; use datafusion_common::Result; use datafusion_expr::expr::Exists; use datafusion_expr::expr::InSubquery; @@ -50,7 +50,7 @@ impl AnalyzerRule for InlineTableScan { } } -fn analyze_internal(plan: LogicalPlan) -> Result> { +fn analyze_internal(plan: LogicalPlan) -> Result { Ok(match plan { // Match only on scans without filter / projection / fetch // Views and DataFrames won't have those added @@ -64,33 +64,29 @@ fn analyze_internal(plan: LogicalPlan) -> Result> { }) if filters.is_empty() && source.get_logical_plan().is_some() => { let sub_plan = source.get_logical_plan().unwrap(); let projection_exprs = generate_projection_expr(&projection, sub_plan)?; - let plan = LogicalPlanBuilder::from(sub_plan.clone()) + LogicalPlanBuilder::from(sub_plan.clone()) .project(projection_exprs)? // Ensures that the reference to the inlined table remains the // same, meaning we don't have to change any of the parent nodes // that reference this table. .alias(table_name)? - .build()?; - Transformed::Yes(plan) + .build()? } LogicalPlan::Filter(filter) => { let new_expr = filter.predicate.transform(&rewrite_subquery)?; - Transformed::Yes(LogicalPlan::Filter(Filter::try_new( - new_expr, - filter.input, - )?)) + LogicalPlan::Filter(Filter::try_new(new_expr, filter.input)?) } - _ => Transformed::No(plan), + _ => plan, }) } -fn rewrite_subquery(expr: Expr) -> Result> { +fn rewrite_subquery(expr: Expr) -> Result { match expr { Expr::Exists(Exists { subquery, negated }) => { let plan = subquery.subquery.as_ref().clone(); let new_plan = plan.transform_up(&analyze_internal)?; let subquery = subquery.with_plan(Arc::new(new_plan)); - Ok(Transformed::Yes(Expr::Exists(Exists { subquery, negated }))) + Ok(Expr::Exists(Exists { subquery, negated })) } Expr::InSubquery(InSubquery { expr, @@ -100,17 +96,15 @@ fn rewrite_subquery(expr: Expr) -> Result> { let plan = subquery.subquery.as_ref().clone(); let new_plan = plan.transform_up(&analyze_internal)?; let subquery = subquery.with_plan(Arc::new(new_plan)); - Ok(Transformed::Yes(Expr::InSubquery(InSubquery::new( - expr, subquery, negated, - )))) + Ok(Expr::InSubquery(InSubquery::new(expr, subquery, negated))) } Expr::ScalarSubquery(subquery) => { let plan = subquery.subquery.as_ref().clone(); let new_plan = plan.transform_up(&analyze_internal)?; let subquery = subquery.with_plan(Arc::new(new_plan)); - Ok(Transformed::Yes(Expr::ScalarSubquery(subquery))) + Ok(Expr::ScalarSubquery(subquery)) } - _ => Ok(Transformed::No(expr)), + _ => Ok(expr), } } diff --git a/datafusion/optimizer/src/decorrelate.rs b/datafusion/optimizer/src/decorrelate.rs index b1000f042c98..fe87e699239e 100644 --- a/datafusion/optimizer/src/decorrelate.rs +++ b/datafusion/optimizer/src/decorrelate.rs @@ -17,9 +17,7 @@ use crate::simplify_expressions::{ExprSimplifier, SimplifyContext}; use crate::utils::collect_subquery_cols; -use datafusion_common::tree_node::{ - RewriteRecursion, Transformed, TreeNode, TreeNodeRewriter, -}; +use datafusion_common::tree_node::{RewriteRecursion, TreeNode, TreeNodeRewriter}; use datafusion_common::{plan_err, Result}; use datafusion_common::{Column, DFSchemaRef, DataFusionError, ScalarValue}; use datafusion_expr::expr::{AggregateFunctionDefinition, Alias}; @@ -376,22 +374,20 @@ fn agg_exprs_evaluation_result_on_empty_batch( match func_def { AggregateFunctionDefinition::BuiltIn(fun) => { if matches!(fun, datafusion_expr::AggregateFunction::Count) { - Transformed::Yes(Expr::Literal(ScalarValue::Int64(Some( - 0, - )))) + Expr::Literal(ScalarValue::Int64(Some(0))) } else { - Transformed::Yes(Expr::Literal(ScalarValue::Null)) + Expr::Literal(ScalarValue::Null) } } AggregateFunctionDefinition::UDF { .. } => { - Transformed::Yes(Expr::Literal(ScalarValue::Null)) + Expr::Literal(ScalarValue::Null) } AggregateFunctionDefinition::Name(_) => { - Transformed::Yes(Expr::Literal(ScalarValue::Null)) + Expr::Literal(ScalarValue::Null) } } } - _ => Transformed::No(expr), + _ => expr, }; Ok(new_expr) })?; @@ -418,12 +414,12 @@ fn proj_exprs_evaluation_result_on_empty_batch( let result_expr = expr.clone().transform_up(&|expr| { if let Expr::Column(Column { name, .. }) = &expr { if let Some(result_expr) = input_expr_result_map_for_count_bug.get(name) { - Ok(Transformed::Yes(result_expr.clone())) + Ok(result_expr.clone()) } else { - Ok(Transformed::No(expr)) + Ok(expr) } } else { - Ok(Transformed::No(expr)) + Ok(expr) } })?; if result_expr.ne(expr) { @@ -451,12 +447,12 @@ fn filter_exprs_evaluation_result_on_empty_batch( let result_expr = filter_expr.clone().transform_up(&|expr| { if let Expr::Column(Column { name, .. }) = &expr { if let Some(result_expr) = input_expr_result_map_for_count_bug.get(name) { - Ok(Transformed::Yes(result_expr.clone())) + Ok(result_expr.clone()) } else { - Ok(Transformed::No(expr)) + Ok(expr) } } else { - Ok(Transformed::No(expr)) + Ok(expr) } })?; let pull_up_expr = if result_expr.ne(filter_expr) { diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 4eb925ac0629..9f0d170e4f86 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; -use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; +use datafusion_common::tree_node::{TreeNode, VisitRecursion}; use datafusion_common::{ internal_err, plan_datafusion_err, Column, DFSchema, DFSchemaRef, DataFusionError, JoinConstraint, Result, @@ -1019,11 +1019,11 @@ pub fn replace_cols_by_name( e.transform_up(&|expr| { Ok(if let Expr::Column(c) = &expr { match replace_map.get(&c.flat_name()) { - Some(new_c) => Transformed::Yes(new_c.clone()), - None => Transformed::No(expr), + Some(new_c) => new_c.clone(), + None => expr, } } else { - Transformed::No(expr) + expr }) }) } diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 34ed4a9475cb..eccefc3a341c 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -20,9 +20,7 @@ use crate::optimizer::ApplyOrder; use crate::utils::replace_qualified_name; use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::alias::AliasGenerator; -use datafusion_common::tree_node::{ - RewriteRecursion, Transformed, TreeNode, TreeNodeRewriter, -}; +use datafusion_common::tree_node::{RewriteRecursion, TreeNode, TreeNodeRewriter}; use datafusion_common::{plan_err, Column, DataFusionError, Result, ScalarValue}; use datafusion_expr::expr_rewriter::create_col_from_scalar_expr; use datafusion_expr::logical_plan::{JoinType, Subquery}; @@ -92,12 +90,12 @@ impl OptimizerRule for ScalarSubqueryToJoin { if let Some(map_expr) = expr_check_map.get(&col.name) { - Ok(Transformed::Yes(map_expr.clone())) + Ok(map_expr.clone()) } else { - Ok(Transformed::No(expr)) + Ok(expr) } } else { - Ok(Transformed::No(expr)) + Ok(expr) } })?; } @@ -147,12 +145,12 @@ impl OptimizerRule for ScalarSubqueryToJoin { if let Some(map_expr) = expr_check_map.get(&col.name) { - Ok(Transformed::Yes(map_expr.clone())) + Ok(map_expr.clone()) } else { - Ok(Transformed::No(expr)) + Ok(expr) } } else { - Ok(Transformed::No(expr)) + Ok(expr) } })?; expr_to_rewrite_expr_map.insert(expr, new_expr); diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index f0bd1740d5d2..0b61aff2865e 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -23,7 +23,7 @@ use crate::{ PhysicalSortRequirement, }; use datafusion_common::tree_node::TreeNode; -use datafusion_common::{tree_node::Transformed, JoinType}; +use datafusion_common::JoinType; use std::sync::Arc; /// An `EquivalenceClass` is a set of [`Arc`]s that are known @@ -263,10 +263,10 @@ impl EquivalenceGroup { .transform(&|expr| { for cls in self.iter() { if cls.contains(&expr) { - return Ok(Transformed::Yes(cls.canonical_expr().unwrap())); + return Ok(cls.canonical_expr().unwrap()); } } - Ok(Transformed::No(expr)) + Ok(expr) }) .unwrap_or(expr) } diff --git a/datafusion/physical-expr/src/equivalence/mod.rs b/datafusion/physical-expr/src/equivalence/mod.rs index 387dce2cdc8b..e431c2c3af95 100644 --- a/datafusion/physical-expr/src/equivalence/mod.rs +++ b/datafusion/physical-expr/src/equivalence/mod.rs @@ -22,7 +22,7 @@ mod properties; use crate::expressions::Column; use crate::{LexRequirement, PhysicalExpr, PhysicalSortRequirement}; pub use class::{EquivalenceClass, EquivalenceGroup}; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::TreeNode; pub use ordering::OrderingEquivalenceClass; pub use projection::ProjectionMapping; pub use properties::{join_equivalence_properties, EquivalenceProperties}; @@ -48,11 +48,8 @@ pub fn add_offset_to_expr( offset: usize, ) -> Arc { expr.transform_down(&|e| match e.as_any().downcast_ref::() { - Some(col) => Ok(Transformed::Yes(Arc::new(Column::new( - col.name(), - offset + col.index(), - )))), - None => Ok(Transformed::No(e)), + Some(col) => Ok(Arc::new(Column::new(col.name(), offset + col.index()))), + None => Ok(e), }) .unwrap() // Note that we can safely unwrap here since our transform always returns diff --git a/datafusion/physical-expr/src/equivalence/projection.rs b/datafusion/physical-expr/src/equivalence/projection.rs index 0f92b2c2f431..473ef3a0fb56 100644 --- a/datafusion/physical-expr/src/equivalence/projection.rs +++ b/datafusion/physical-expr/src/equivalence/projection.rs @@ -21,7 +21,7 @@ use crate::expressions::Column; use crate::PhysicalExpr; use arrow::datatypes::SchemaRef; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::TreeNode; use datafusion_common::Result; /// Stores the mapping between source expressions and target expressions for a @@ -68,9 +68,9 @@ impl ProjectionMapping { let matching_input_field = input_schema.field(idx); let matching_input_column = Column::new(matching_input_field.name(), idx); - Ok(Transformed::Yes(Arc::new(matching_input_column))) + Ok(Arc::new(matching_input_column)) } - None => Ok(Transformed::No(e)), + None => Ok(e), }) .map(|source_expr| (source_expr, target_expr)) }) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 31c1cf61193a..2464314c2f66 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -34,7 +34,7 @@ use crate::{ physical_exprs_contains, LexOrdering, LexOrderingRef, LexRequirement, LexRequirementRef, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, }; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::TreeNode; use super::ordering::collapse_lex_ordering; @@ -798,7 +798,7 @@ impl EquivalenceProperties { fn update_ordering( mut node: ExprOrdering, eq_properties: &EquivalenceProperties, -) -> Transformed { +) -> ExprOrdering { // We have a Column, which is one of the two possible leaf node types: let normalized_expr = eq_properties.eq_group.normalize_expr(node.expr.clone()); if eq_properties.is_expr_constant(&normalized_expr) { @@ -815,9 +815,9 @@ fn update_ordering( // We have a Literal, which is the other possible leaf node type: node.state = node.expr.get_ordering(&[]); } else { - return Transformed::No(node); + return node; } - Transformed::Yes(node) + node } /// This function determines whether the provided expression is constant diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs index 52fb85657f4e..a4c3c9886410 100644 --- a/datafusion/physical-expr/src/expressions/case.rs +++ b/datafusion/physical-expr/src/expressions/case.rs @@ -407,7 +407,7 @@ mod tests { use arrow::datatypes::*; use datafusion_common::cast::{as_float64_array, as_int32_array}; use datafusion_common::plan_err; - use datafusion_common::tree_node::{Transformed, TreeNode}; + use datafusion_common::tree_node::TreeNode; use datafusion_common::ScalarValue; use datafusion_expr::type_coercion::binary::comparison_coercion; use datafusion_expr::Operator; @@ -956,9 +956,9 @@ mod tests { _ => None, }; Ok(if let Some(transformed) = transformed { - Transformed::Yes(transformed) + transformed } else { - Transformed::No(e) + e }) }) .unwrap(); @@ -977,9 +977,9 @@ mod tests { _ => None, }; Ok(if let Some(transformed) = transformed { - Transformed::Yes(transformed) + transformed } else { - Transformed::No(e) + e }) }) .unwrap(); diff --git a/datafusion/physical-expr/src/utils/mod.rs b/datafusion/physical-expr/src/utils/mod.rs index 64a62dc7820d..4856dab6ccfe 100644 --- a/datafusion/physical-expr/src/utils/mod.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -28,9 +28,7 @@ use crate::{PhysicalExpr, PhysicalSortExpr}; use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData}; use arrow::compute::{and_kleene, is_not_null, SlicesIterator}; use arrow::datatypes::SchemaRef; -use datafusion_common::tree_node::{ - Transformed, TreeNode, TreeNodeRewriter, VisitRecursion, -}; +use datafusion_common::tree_node::{TreeNode, TreeNodeRewriter, VisitRecursion}; use datafusion_common::Result; use datafusion_expr::Operator; @@ -275,12 +273,9 @@ pub fn reassign_predicate_columns( Err(_) if ignore_not_found => usize::MAX, Err(e) => return Err(e.into()), }; - return Ok(Transformed::Yes(Arc::new(Column::new( - column.name(), - index, - )))); + return Ok(Arc::new(Column::new(column.name(), index))); } - Ok(Transformed::No(expr)) + Ok(expr) }) } diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index 41c8dbed1453..063979654ea2 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -165,7 +165,7 @@ mod tests { let schema = test::aggr_test_schema(); let empty = Arc::new(EmptyExec::new(schema.clone())); - let empty2 = with_new_children_if_necessary(empty.clone(), vec![])?.into(); + let empty2 = with_new_children_if_necessary(empty.clone(), vec![])?; assert_eq!(empty.schema(), empty2.schema()); let too_many_kids = vec![empty2]; diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs b/datafusion/physical-plan/src/joins/stream_join_utils.rs index 9a4c98927683..dc240fc8c5c3 100644 --- a/datafusion/physical-plan/src/joins/stream_join_utils.rs +++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs @@ -31,7 +31,7 @@ use arrow::compute::concat_batches; use arrow_array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray, RecordBatch}; use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder}; use arrow_schema::{Schema, SchemaRef}; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::TreeNode; use datafusion_common::{ arrow_datafusion_err, plan_datafusion_err, DataFusionError, JoinSide, Result, ScalarValue, @@ -287,8 +287,8 @@ pub fn convert_sort_expr_with_filter_schema( let converted_filter_expr = expr.transform_up(&|p| { convert_filter_columns(p.as_ref(), &column_map).map(|transformed| { match transformed { - Some(transformed) => Transformed::Yes(transformed), - None => Transformed::No(p), + Some(transformed) => transformed, + None => p, } }) })?; diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 1dd1392b9d86..87d6e5497ff7 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -29,7 +29,6 @@ use crate::sorts::sort_preserving_merge::SortPreservingMergeExec; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use datafusion_common::tree_node::Transformed; use datafusion_common::utils::DataPtr; use datafusion_common::{plan_err, DataFusionError, Result}; use datafusion_execution::TaskContext; @@ -477,7 +476,7 @@ pub fn need_data_exchange(plan: Arc) -> bool { pub fn with_new_children_if_necessary( plan: Arc, children: Vec>, -) -> Result>> { +) -> Result> { let old_children = plan.children(); if children.len() != old_children.len() { internal_err!("Wrong number of children") @@ -487,9 +486,9 @@ pub fn with_new_children_if_necessary( .zip(old_children.iter()) .any(|(c1, c2)| !Arc::data_ptr_eq(c1, c2)) { - Ok(Transformed::Yes(plan.with_new_children(children)?)) + Ok(plan.with_new_children(children)?) } else { - Ok(Transformed::No(plan)) + Ok(plan) } } diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index 3ab3de62f37a..f8f68820f448 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -171,8 +171,7 @@ mod tests { let placeholder = Arc::new(PlaceholderRowExec::new(schema)); - let placeholder_2 = - with_new_children_if_necessary(placeholder.clone(), vec![])?.into(); + let placeholder_2 = with_new_children_if_necessary(placeholder.clone(), vec![])?; assert_eq!(placeholder.schema(), placeholder_2.schema()); let too_many_kids = vec![placeholder_2]; diff --git a/datafusion/physical-plan/src/tree_node.rs b/datafusion/physical-plan/src/tree_node.rs index bce906a00c4d..c54250a9d682 100644 --- a/datafusion/physical-plan/src/tree_node.rs +++ b/datafusion/physical-plan/src/tree_node.rs @@ -18,7 +18,7 @@ //! This module provides common traits for visiting or rewriting tree nodes easily. use crate::{with_new_children_if_necessary, ExecutionPlan}; -use datafusion_common::tree_node::{DynTreeNode, Transformed}; +use datafusion_common::tree_node::DynTreeNode; use datafusion_common::Result; use std::sync::Arc; @@ -32,6 +32,6 @@ impl DynTreeNode for dyn ExecutionPlan { arc_self: Arc, new_children: Vec>, ) -> Result> { - with_new_children_if_necessary(arc_self, new_children).map(Transformed::into) + with_new_children_if_necessary(arc_self, new_children) } } diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index 616a2fc74932..1b9706265052 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -20,7 +20,7 @@ use arrow_schema::{ DataType, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DECIMAL_DEFAULT_SCALE, }; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::TreeNode; use sqlparser::ast::Ident; use datafusion_common::{exec_err, internal_err, plan_err}; @@ -37,11 +37,11 @@ pub(crate) fn resolve_columns(expr: &Expr, plan: &LogicalPlan) -> Result { match nested_expr { Expr::Column(col) => { let field = plan.schema().field_from_column(&col)?; - Ok(Transformed::Yes(Expr::Column(field.qualified_column()))) + Ok(Expr::Column(field.qualified_column())) } _ => { // keep recursing - Ok(Transformed::No(nested_expr)) + Ok(nested_expr) } } }) @@ -68,9 +68,9 @@ pub(crate) fn rebase_expr( ) -> Result { expr.clone().transform_up(&|nested_expr| { if base_exprs.contains(&nested_expr) { - Ok(Transformed::Yes(expr_as_column_expr(&nested_expr, plan)?)) + Ok(expr_as_column_expr(&nested_expr, plan)?) } else { - Ok(Transformed::No(nested_expr)) + Ok(nested_expr) } }) } @@ -173,12 +173,12 @@ pub(crate) fn resolve_aliases_to_exprs( expr.clone().transform_up(&|nested_expr| match nested_expr { Expr::Column(c) if c.relation.is_none() => { if let Some(aliased_expr) = aliases.get(&c.name) { - Ok(Transformed::Yes(aliased_expr.clone())) + Ok(aliased_expr.clone()) } else { - Ok(Transformed::No(Expr::Column(c))) + Ok(Expr::Column(c)) } } - _ => Ok(Transformed::No(nested_expr)), + _ => Ok(nested_expr), }) } diff --git a/docs/source/library-user-guide/working-with-exprs.md b/docs/source/library-user-guide/working-with-exprs.md index 96be8ef7f1ae..0662a3a27cd9 100644 --- a/docs/source/library-user-guide/working-with-exprs.md +++ b/docs/source/library-user-guide/working-with-exprs.md @@ -92,7 +92,7 @@ In our example, we'll use rewriting to update our `add_one` UDF, to be rewritten ### Rewriting with `transform` -To implement the inlining, we'll need to write a function that takes an `Expr` and returns a `Result`. If the expression is _not_ to be rewritten `Transformed::No` is used to wrap the original `Expr`. If the expression _is_ to be rewritten, `Transformed::Yes` is used to wrap the new `Expr`. +To implement the inlining, we'll need to write a function that takes an `Expr` and returns a `Result`. ```rust fn rewrite_add_one(expr: Expr) -> Result { @@ -102,9 +102,9 @@ fn rewrite_add_one(expr: Expr) -> Result { let input_arg = scalar_fun.args[0].clone(); let new_expression = input_arg + lit(1i64); - Transformed::Yes(new_expression) + new_expression } - _ => Transformed::No(expr), + _ => expr, }) }) }