From d1214058b822a3d78bd746f57a7b4b60a69dd28d Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Mon, 20 Nov 2023 18:37:17 -0800 Subject: [PATCH] Renaming API to be more consistent with struct value --- datafusion/common/src/config.rs | 5 +-- .../physical_optimizer/projection_pushdown.rs | 14 +++----- datafusion/core/src/physical_planner.rs | 2 +- datafusion/core/src/test_util/parquet.rs | 3 +- datafusion/physical-plan/src/filter.rs | 33 +++++++------------ datafusion/proto/src/physical_plan/mod.rs | 7 ++-- .../test_files/information_schema.slt | 2 ++ docs/source/user-guide/configs.md | 4 +-- 8 files changed, 29 insertions(+), 41 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 06e0529400c13..03fb5ea320a04 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -525,8 +525,9 @@ config_namespace! { /// will be collected into a single partition pub hash_join_single_partition_threshold: usize, default = 1024 * 1024 - /// The default filter selectivity used by Filter statistics - /// when an exact selectivity cannot be determined + /// The default filter selectivity used by Filter Statistics + /// when an exact selectivity cannot be determined. Valid values are + /// between 0 (no selectivity) and 100 (all rows are selected). pub default_filter_selectivity: u8, default = 20 } } diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index c8add21bd78b7..66b144a7a00c6 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -302,16 +302,12 @@ fn try_swapping_with_filter( return Ok(None); }; - FilterExec::try_new( - new_predicate, - make_with_child(projection, filter.input())?, - ).and_then( - |e| { + FilterExec::try_new(new_predicate, make_with_child(projection, filter.input())?) + .and_then(|e| { let selectivity = filter.default_selectivity(); - e.with_selectivity(selectivity) - } - ) - .map(|e| Some(Arc::new(e) as _)) + e.with_default_selectivity(selectivity) + }) + .map(|e| Some(Arc::new(e) as _)) } /// Tries to swap the projection with its input [`RepartitionExec`]. If it can be done, diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index fb88b0c78644c..afdc9ec2bd07a 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -910,7 +910,7 @@ impl DefaultPhysicalPlanner { )?; let selectivity = session_state.config().options().optimizer.default_filter_selectivity; let filter = FilterExec::try_new(runtime_expr, physical_input)?; - Ok(Arc::new(filter.with_selectivity(selectivity)?)) + Ok(Arc::new(filter.with_default_selectivity(selectivity)?)) } LogicalPlan::Union(Union { inputs, schema }) => { let physical_plans = self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), session_state).await?; diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index 47bf8d6abcd73..f3c0d2987a46c 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -179,8 +179,7 @@ impl TestParquetFile { None, )); - let exec = - Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?); + let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?); Ok(exec) } else { Ok(Arc::new(ParquetExec::new(scan_config, None, None))) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 2392f984420a2..be3cbb12d94b6 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -85,7 +85,10 @@ impl FilterExec { } } - pub fn with_selectivity(mut self, default_selectivity: u8) -> Result{ + pub fn with_default_selectivity( + mut self, + default_selectivity: u8, + ) -> Result { if default_selectivity > 100 { return plan_err!("Default flter selectivity needs to be less than 100"); } @@ -182,16 +185,12 @@ impl ExecutionPlan for FilterExec { self: Arc, mut children: Vec>, ) -> Result> { - FilterExec::try_new( - self.predicate.clone(), - children.swap_remove(0), - ).and_then( - |e| { + FilterExec::try_new(self.predicate.clone(), children.swap_remove(0)) + .and_then(|e| { let selectivity = e.default_selectivity(); - e.with_selectivity(selectivity) - } - ) - .map(|e| Arc::new(e) as _) + e.with_default_selectivity(selectivity) + }) + .map(|e| Arc::new(e) as _) } fn execute( @@ -221,15 +220,7 @@ impl ExecutionPlan for FilterExec { let input_stats = self.input.statistics()?; let schema = self.schema(); if !check_support(predicate, &schema) { - let selectivity = self.default_selectivity as f32 / 100.0; - let mut stats = input_stats.clone().into_inexact(); - if let Precision::Inexact(n) = stats.num_rows { - stats.num_rows = Precision::Inexact((selectivity * n as f32) as usize); - } - if let Precision::Inexact(n) = stats.total_byte_size { - stats.total_byte_size = - Precision::Inexact((selectivity * n as f32) as usize); - } + let selectivity = self.default_selectivity as f64 / 100.0; let mut stats = input_stats.into_inexact(); stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity); stats.total_byte_size = stats @@ -1025,7 +1016,7 @@ mod tests { } #[tokio::test] - async fn test_validation_filter_selectivity() -> Result<()>{ + async fn test_validation_filter_selectivity() -> Result<()> { let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); let input = Arc::new(StatisticsExec::new( Statistics::new_unknown(&schema), @@ -1038,7 +1029,7 @@ mod tests { Arc::new(Literal::new(ScalarValue::Int32(Some(10)))), )); let filter = FilterExec::try_new(predicate, input)?; - assert!(filter.with_selectivity(120).is_err()); + assert!(filter.with_default_selectivity(120).is_err()); Ok(()) } } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index d8bd644c26bea..db088e8605ca5 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -162,14 +162,13 @@ impl AsExecutionPlan for PhysicalPlanNode { let filter_selectivity = filter.default_filter_selectivity.try_into(); let filter = FilterExec::try_new(predicate, input)?; match filter_selectivity { - Ok(filter_selectivity) => { - Ok(Arc::new(filter.with_selectivity(filter_selectivity)?)) - } + Ok(filter_selectivity) => Ok(Arc::new( + filter.with_default_selectivity(filter_selectivity)?, + )), Err(_) => Err(DataFusionError::Internal( "filter_selectivity in PhysicalPlanNode is invalid ".to_owned(), )), } - } PhysicalPlanType::CsvScan(scan) => Ok(Arc::new(CsvExec::new( parse_protobuf_file_scan_config( diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 741ff724781fe..ecdb46279c304 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -188,6 +188,7 @@ datafusion.explain.logical_plan_only false datafusion.explain.physical_plan_only false datafusion.explain.show_statistics false datafusion.optimizer.allow_symmetric_joins_without_pruning true +datafusion.optimizer.default_filter_selectivity 20 datafusion.optimizer.enable_distinct_aggregation_soft_limit true datafusion.optimizer.enable_round_robin_repartition true datafusion.optimizer.enable_topk_aggregation true @@ -261,6 +262,7 @@ datafusion.explain.logical_plan_only false When set to true, the explain stateme datafusion.explain.physical_plan_only false When set to true, the explain statement will only print physical plans datafusion.explain.show_statistics false When set to true, the explain statement will print operator statistics for physical plans datafusion.optimizer.allow_symmetric_joins_without_pruning true Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. +datafusion.optimizer.default_filter_selectivity 20 The default filter selectivity used by Filter statistics when an exact selectivity cannot be determined datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 1e14682012d59..d5a43e429e099 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -98,8 +98,8 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan | | datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | | datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | -| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition -| datafusion.optimizer.default_filter_selectivity | 20 | The assumed filter selectivity in from 0 (no rows) to 100 (all rows) used when it is not possibl to determine exactly the number of rows returned by a filter +| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | +| datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). | | datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | | datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | | datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |