From 3c36a9affd53b9a47c3e03aa0de53c0373b7dc4b Mon Sep 17 00:00:00 2001 From: Xin Li Date: Sun, 21 Jul 2024 10:44:15 +0800 Subject: [PATCH 1/3] Move OutputRequirements to datafusion-physical-optimizer crate --- .../physical_optimizer/enforce_distribution.rs | 4 ++-- datafusion/core/src/physical_optimizer/mod.rs | 1 - datafusion/physical-optimizer/Cargo.toml | 2 ++ datafusion/physical-optimizer/src/lib.rs | 1 + .../src}/output_requirements.rs | 18 ++++++++++-------- 5 files changed, 15 insertions(+), 11 deletions(-) rename datafusion/{core/src/physical_optimizer => physical-optimizer/src}/output_requirements.rs (95%) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index afed5dd37535..3d80b617626a 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -24,7 +24,6 @@ use std::fmt::Debug; use std::sync::Arc; -use super::output_requirements::OutputRequirementExec; use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_optimizer::utils::{ @@ -56,6 +55,7 @@ use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAg use datafusion_physical_plan::ExecutionPlanProperties; use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_optimizer::output_requirements::OutputRequirementExec; use itertools::izip; /// The `EnforceDistribution` rule ensures that distribution requirements are @@ -1290,7 +1290,7 @@ pub(crate) mod tests { use crate::datasource::object_store::ObjectStoreUrl; use crate::datasource::physical_plan::{CsvExec, FileScanConfig, ParquetExec}; use crate::physical_optimizer::enforce_sorting::EnforceSorting; - use crate::physical_optimizer::output_requirements::OutputRequirements; + use datafusion_physical_optimizer::output_requirements::OutputRequirements; use crate::physical_optimizer::test_utils::{ check_integrity, coalesce_partitions_exec, repartition_exec, }; diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index 582f340151ae..a0c9c3697744 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -29,7 +29,6 @@ pub mod enforce_sorting; pub mod join_selection; pub mod limited_distinct_aggregation; pub mod optimizer; -pub mod output_requirements; pub mod projection_pushdown; pub mod pruning; pub mod replace_with_order_preserving_variants; diff --git a/datafusion/physical-optimizer/Cargo.toml b/datafusion/physical-optimizer/Cargo.toml index 9c0ee61da52a..125ea6acc77f 100644 --- a/datafusion/physical-optimizer/Cargo.toml +++ b/datafusion/physical-optimizer/Cargo.toml @@ -33,4 +33,6 @@ workspace = true [dependencies] datafusion-common = { workspace = true, default-features = true } +datafusion-execution = { workspace = true } +datafusion-physical-expr = { workspace = true } datafusion-physical-plan = { workspace = true } diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index c5a49216f5fd..6b9df7cad5c8 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -18,5 +18,6 @@ #![deny(clippy::clone_on_ref_ptr)] mod optimizer; +pub mod output_requirements; pub use optimizer::PhysicalOptimizerRule; diff --git a/datafusion/core/src/physical_optimizer/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs similarity index 95% rename from datafusion/core/src/physical_optimizer/output_requirements.rs rename to datafusion/physical-optimizer/src/output_requirements.rs index cb9a0cb90e6c..d85daee7c00a 100644 --- a/datafusion/core/src/physical_optimizer/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -24,17 +24,19 @@ use std::sync::Arc; -use crate::physical_plan::sorts::sort::SortExec; -use crate::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; +use datafusion_execution::TaskContext; +use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream}; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{Result, Statistics}; use datafusion_physical_expr::{Distribution, LexRequirement, PhysicalSortRequirement}; -use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::{ExecutionPlanProperties, PlanProperties}; +use crate::PhysicalOptimizerRule; + /// This rule either adds or removes [`OutputRequirements`]s to/from the physical /// plan according to its `mode` attribute, which is set by the constructors /// `new_add_mode` and `new_remove_mode`. With this rule, we can keep track of @@ -86,7 +88,7 @@ enum RuleMode { /// /// See [`OutputRequirements`] for more details #[derive(Debug)] -pub(crate) struct OutputRequirementExec { +pub struct OutputRequirementExec { input: Arc, order_requirement: Option, dist_requirement: Distribution, @@ -94,7 +96,7 @@ pub(crate) struct OutputRequirementExec { } impl OutputRequirementExec { - pub(crate) fn new( + pub fn new( input: Arc, requirements: Option, dist_requirement: Distribution, @@ -108,7 +110,7 @@ impl OutputRequirementExec { } } - pub(crate) fn input(&self) -> Arc { + pub fn input(&self) -> Arc { self.input.clone() } @@ -179,8 +181,8 @@ impl ExecutionPlan for OutputRequirementExec { fn execute( &self, _partition: usize, - _context: Arc, - ) -> Result { + _context: Arc, + ) -> Result { unreachable!(); } From a3d3cbe7f249b0f77990902afa00306998f8cd76 Mon Sep 17 00:00:00 2001 From: Xin Li Date: Sun, 21 Jul 2024 10:48:35 +0800 Subject: [PATCH 2/3] Fix fmt --- .../core/src/physical_optimizer/enforce_distribution.rs | 4 ++-- datafusion/physical-optimizer/src/output_requirements.rs | 8 +++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 3d80b617626a..ea60421e6f6d 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -54,8 +54,8 @@ use datafusion_physical_expr::{ use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec}; use datafusion_physical_plan::ExecutionPlanProperties; -use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_optimizer::output_requirements::OutputRequirementExec; +use datafusion_physical_optimizer::PhysicalOptimizerRule; use itertools::izip; /// The `EnforceDistribution` rule ensures that distribution requirements are @@ -1290,7 +1290,6 @@ pub(crate) mod tests { use crate::datasource::object_store::ObjectStoreUrl; use crate::datasource::physical_plan::{CsvExec, FileScanConfig, ParquetExec}; use crate::physical_optimizer::enforce_sorting::EnforceSorting; - use datafusion_physical_optimizer::output_requirements::OutputRequirements; use crate::physical_optimizer::test_utils::{ check_integrity, coalesce_partitions_exec, repartition_exec, }; @@ -1301,6 +1300,7 @@ pub(crate) mod tests { use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::{displayable, DisplayAs, DisplayFormatType, Statistics}; + use datafusion_physical_optimizer::output_requirements::OutputRequirements; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::ScalarValue; diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index d85daee7c00a..f971d8f1f0aa 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -26,7 +26,9 @@ use std::sync::Arc; use datafusion_execution::TaskContext; use datafusion_physical_plan::sorts::sort::SortExec; -use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream}; +use datafusion_physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, +}; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; @@ -111,7 +113,7 @@ impl OutputRequirementExec { } pub fn input(&self) -> Arc { - self.input.clone() + Arc::clone(&self.input) } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -277,7 +279,7 @@ fn require_top_ordering_helper( // When an operator requires an ordering, any `SortExec` below can not // be responsible for (i.e. the originator of) the global ordering. let (new_child, is_changed) = - require_top_ordering_helper(children.swap_remove(0).clone())?; + require_top_ordering_helper(Arc::clone(children.swap_remove(0)))?; Ok((plan.with_new_children(vec![new_child])?, is_changed)) } else { // Stop searching, there is no global ordering desired for the query. From 00eb3a5b8ec9a520fae993063a2d509e5c3fe38f Mon Sep 17 00:00:00 2001 From: Xin Li Date: Sun, 21 Jul 2024 11:11:27 +0800 Subject: [PATCH 3/3] Fix cargo for cli --- datafusion-cli/Cargo.lock | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 61d9c72b89d9..84bff8c87190 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -118,9 +118,9 @@ dependencies = [ [[package]] name = "arrayref" -version = "0.3.7" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b4930d2cb77ce62f89ee5d5289b4ac049559b1c45539271f5ed4fdc7db34545" +checksum = "9d151e35f61089500b617991b791fc8bfd237ae50cd5950803758a179b41e67a" [[package]] name = "arrayvec" @@ -875,9 +875,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.5" +version = "1.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "324c74f2155653c90b04f25b2a47a8a631360cb908f92a772695f430c7e31052" +checksum = "2aba8f4e9906c7ce3c73463f62a7f0c65183ada1a2d47e397cc8810827f9694f" dependencies = [ "jobserver", "libc", @@ -1397,6 +1397,8 @@ name = "datafusion-physical-optimizer" version = "40.0.0" dependencies = [ "datafusion-common", + "datafusion-execution", + "datafusion-physical-expr", "datafusion-physical-plan", ]