From 77ad090a8a936346e45c22c68ba93ad63a10d633 Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Thu, 16 Nov 2023 19:27:28 -0800 Subject: [PATCH 1/3] Turning filter selectivity as a configurable parameter --- datafusion/common/src/config.rs | 5 ++ .../physical_optimizer/projection_pushdown.rs | 12 ++++- datafusion/core/src/physical_planner.rs | 4 +- datafusion/core/src/test_util/parquet.rs | 3 +- datafusion/physical-plan/src/filter.rs | 51 ++++++++++++++++--- datafusion/proto/proto/datafusion.proto | 1 + datafusion/proto/src/generated/pbjson.rs | 20 ++++++++ datafusion/proto/src/generated/prost.rs | 2 + datafusion/proto/src/physical_plan/mod.rs | 13 ++++- docs/source/user-guide/configs.md | 3 +- 10 files changed, 102 insertions(+), 12 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index ba2072ecc151..06e0529400c1 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -524,6 +524,10 @@ config_namespace! { /// The maximum estimated size in bytes for one input side of a HashJoin /// will be collected into a single partition pub hash_join_single_partition_threshold: usize, default = 1024 * 1024 + + /// The default filter selectivity used by Filter statistics + /// when an exact selectivity cannot be determined + pub default_filter_selectivity: u8, default = 20 } } @@ -877,6 +881,7 @@ config_field!(String); config_field!(bool); config_field!(usize); config_field!(f64); +config_field!(u8); config_field!(u64); /// An implementation trait used to recursively walk configuration diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 8e50492ae5e5..b88ab935b91f 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -302,8 +302,16 @@ fn try_swapping_with_filter( return Ok(None); }; - FilterExec::try_new(new_predicate, make_with_child(projection, filter.input())?) - .map(|e| Some(Arc::new(e) as _)) + FilterExec::try_new( + new_predicate, + make_with_child(projection, filter.input())?, + ).and_then( + |e| { + let selectivity = filter.default_selectivity(); + e.with_selectivity(selectivity) + } + ) + .map(|e| Some(Arc::new(e) as _)) } /// Tries to swap the projection with its input [`RepartitionExec`]. If it can be done, diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 9f9b529ace03..d6934a0d38de 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -913,7 +913,9 @@ impl DefaultPhysicalPlanner { &input_schema, session_state, )?; - Ok(Arc::new(FilterExec::try_new(runtime_expr, physical_input)?)) + let selectivity = session_state.config().options().optimizer.default_filter_selectivity; + let filter = FilterExec::try_new(runtime_expr, physical_input)?; + Ok(Arc::new(filter.with_selectivity(selectivity)?)) } LogicalPlan::Union(Union { inputs, schema }) => { let physical_plans = self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), session_state).await?; diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index 0d11526703b4..1141800d8551 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -178,7 +178,8 @@ impl TestParquetFile { None, )); - let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?); + let exec = + Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?); Ok(exec) } else { Ok(Arc::new(ParquetExec::new(scan_config, None, None))) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 822ddfdf3eb0..f62688a18801 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -62,6 +62,8 @@ pub struct FilterExec { input: Arc, /// Execution metrics metrics: ExecutionPlanMetricsSet, + /// Selectivity for statistics. 0 = no rows, 100 all rows + default_selectivity: u8, } impl FilterExec { @@ -75,6 +77,7 @@ impl FilterExec { predicate, input: input.clone(), metrics: ExecutionPlanMetricsSet::new(), + default_selectivity: 20, }), other => { plan_err!("Filter predicate must return boolean values, not {other:?}") @@ -82,6 +85,14 @@ impl FilterExec { } } + pub fn with_selectivity(mut self, default_selectivity: u8) -> Result{ + if default_selectivity > 100 { + return plan_err!("Default flter selectivity needs to be less than 100"); + } + self.default_selectivity = default_selectivity; + Ok(self) + } + /// The expression to filter on. This expression must evaluate to a boolean value. pub fn predicate(&self) -> &Arc { &self.predicate @@ -91,6 +102,11 @@ impl FilterExec { pub fn input(&self) -> &Arc { &self.input } + + /// The default selectivity + pub fn default_selectivity(&self) -> u8 { + self.default_selectivity + } } impl DisplayAs for FilterExec { @@ -166,8 +182,16 @@ impl ExecutionPlan for FilterExec { self: Arc, mut children: Vec>, ) -> Result> { - FilterExec::try_new(self.predicate.clone(), children.swap_remove(0)) - .map(|e| Arc::new(e) as _) + FilterExec::try_new( + self.predicate.clone(), + children.swap_remove(0), + ).and_then( + |e| { + let selectivity = e.default_selectivity(); + e.with_selectivity(selectivity) + } + ) + .map(|e| Arc::new(e) as _) } fn execute( @@ -197,10 +221,7 @@ impl ExecutionPlan for FilterExec { let input_stats = self.input.statistics()?; let schema = self.schema(); if !check_support(predicate, &schema) { - // assume filter selects 20% of rows if we cannot do anything smarter - // tracking issue for making this configurable: - // https://github.com/apache/arrow-datafusion/issues/8133 - let selectivity = 0.2_f32; + let selectivity = self.default_selectivity as f32 / 100.0; let mut stats = input_stats.clone().into_inexact(); if let Precision::Inexact(n) = stats.num_rows { stats.num_rows = Precision::Inexact((selectivity * n as f32) as usize); @@ -1009,4 +1030,22 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_validation_filter_selectivity() -> Result<()>{ + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let input = Arc::new(StatisticsExec::new( + Statistics::new_unknown(&schema), + schema, + )); + // WHERE a = 10 + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(10)))), + )); + let filter = FilterExec::try_new(predicate, input)?; + assert!(filter.with_selectivity(120).is_err()); + Ok(()) + } } diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 793378a1ea87..71494a0788ac 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1360,6 +1360,7 @@ message PhysicalNegativeNode { message FilterExecNode { PhysicalPlanNode input = 1; PhysicalExprNode expr = 2; + uint32 default_filter_selectivity = 3; } message FileGroup { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index a78da2a51c9d..5c7e9acf77a5 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -7888,6 +7888,9 @@ impl serde::Serialize for FilterExecNode { if self.expr.is_some() { len += 1; } + if self.default_filter_selectivity != 0 { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.FilterExecNode", len)?; if let Some(v) = self.input.as_ref() { struct_ser.serialize_field("input", v)?; @@ -7895,6 +7898,9 @@ impl serde::Serialize for FilterExecNode { if let Some(v) = self.expr.as_ref() { struct_ser.serialize_field("expr", v)?; } + if self.default_filter_selectivity != 0 { + struct_ser.serialize_field("defaultFilterSelectivity", &self.default_filter_selectivity)?; + } struct_ser.end() } } @@ -7907,12 +7913,15 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { const FIELDS: &[&str] = &[ "input", "expr", + "default_filter_selectivity", + "defaultFilterSelectivity", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { Input, Expr, + DefaultFilterSelectivity, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -7936,6 +7945,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { match value { "input" => Ok(GeneratedField::Input), "expr" => Ok(GeneratedField::Expr), + "defaultFilterSelectivity" | "default_filter_selectivity" => Ok(GeneratedField::DefaultFilterSelectivity), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -7957,6 +7967,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { { let mut input__ = None; let mut expr__ = None; + let mut default_filter_selectivity__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Input => { @@ -7971,11 +7982,20 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { } expr__ = map_.next_value()?; } + GeneratedField::DefaultFilterSelectivity => { + if default_filter_selectivity__.is_some() { + return Err(serde::de::Error::duplicate_field("defaultFilterSelectivity")); + } + default_filter_selectivity__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } } } Ok(FilterExecNode { input: input__, expr: expr__, + default_filter_selectivity: default_filter_selectivity__.unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 7b7b0afb9216..7551fb3df424 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1908,6 +1908,8 @@ pub struct FilterExecNode { pub input: ::core::option::Option<::prost::alloc::boxed::Box>, #[prost(message, optional, tag = "2")] pub expr: ::core::option::Option, + #[prost(uint32, tag = "3")] + pub default_filter_selectivity: u32, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 1eedbe987ec1..4bf675ee271a 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -157,7 +157,17 @@ impl AsExecutionPlan for PhysicalPlanNode { .to_owned(), ) })?; - Ok(Arc::new(FilterExec::try_new(predicate, input)?)) + let filter_selectivity = filter.default_filter_selectivity.try_into(); + let filter = FilterExec::try_new(predicate, input)?; + match filter_selectivity { + Ok(filter_selectivity) => { + Ok(Arc::new(filter.with_selectivity(filter_selectivity)?)) + } + Err(_) => Err(DataFusionError::Internal( + "filter_selectivity in PhysicalPlanNode is invalid ".to_owned(), + )), + } + } PhysicalPlanType::CsvScan(scan) => Ok(Arc::new(CsvExec::new( parse_protobuf_file_scan_config( @@ -898,6 +908,7 @@ impl AsExecutionPlan for PhysicalPlanNode { protobuf::FilterExecNode { input: Some(Box::new(input)), expr: Some(exec.predicate().clone().try_into()?), + default_filter_selectivity: exec.default_selectivity() as u32, }, ))), }); diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 11363f0657f6..1e14682012d5 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -98,7 +98,8 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan | | datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | | datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | -| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | +| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition +| datafusion.optimizer.default_filter_selectivity | 20 | The assumed filter selectivity in from 0 (no rows) to 100 (all rows) used when it is not possibl to determine exactly the number of rows returned by a filter | datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | | datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | | datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans | From 8135173effdc67bdc5e824347003bc39f0c09186 Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Mon, 20 Nov 2023 18:37:17 -0800 Subject: [PATCH 2/3] Renaming API to be more consistent with struct value --- datafusion/common/src/config.rs | 5 +-- .../physical_optimizer/projection_pushdown.rs | 14 +++----- datafusion/core/src/physical_planner.rs | 2 +- datafusion/core/src/test_util/parquet.rs | 3 +- datafusion/physical-plan/src/filter.rs | 33 +++++++------------ datafusion/proto/src/physical_plan/mod.rs | 7 ++-- .../test_files/information_schema.slt | 2 ++ docs/source/user-guide/configs.md | 4 +-- 8 files changed, 29 insertions(+), 41 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 06e0529400c1..03fb5ea320a0 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -525,8 +525,9 @@ config_namespace! { /// will be collected into a single partition pub hash_join_single_partition_threshold: usize, default = 1024 * 1024 - /// The default filter selectivity used by Filter statistics - /// when an exact selectivity cannot be determined + /// The default filter selectivity used by Filter Statistics + /// when an exact selectivity cannot be determined. Valid values are + /// between 0 (no selectivity) and 100 (all rows are selected). pub default_filter_selectivity: u8, default = 20 } } diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index c8add21bd78b..66b144a7a00c 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -302,16 +302,12 @@ fn try_swapping_with_filter( return Ok(None); }; - FilterExec::try_new( - new_predicate, - make_with_child(projection, filter.input())?, - ).and_then( - |e| { + FilterExec::try_new(new_predicate, make_with_child(projection, filter.input())?) + .and_then(|e| { let selectivity = filter.default_selectivity(); - e.with_selectivity(selectivity) - } - ) - .map(|e| Some(Arc::new(e) as _)) + e.with_default_selectivity(selectivity) + }) + .map(|e| Some(Arc::new(e) as _)) } /// Tries to swap the projection with its input [`RepartitionExec`]. If it can be done, diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index fb88b0c78644..afdc9ec2bd07 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -910,7 +910,7 @@ impl DefaultPhysicalPlanner { )?; let selectivity = session_state.config().options().optimizer.default_filter_selectivity; let filter = FilterExec::try_new(runtime_expr, physical_input)?; - Ok(Arc::new(filter.with_selectivity(selectivity)?)) + Ok(Arc::new(filter.with_default_selectivity(selectivity)?)) } LogicalPlan::Union(Union { inputs, schema }) => { let physical_plans = self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), session_state).await?; diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index 47bf8d6abcd7..f3c0d2987a46 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -179,8 +179,7 @@ impl TestParquetFile { None, )); - let exec = - Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?); + let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?); Ok(exec) } else { Ok(Arc::new(ParquetExec::new(scan_config, None, None))) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 2392f984420a..be3cbb12d94b 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -85,7 +85,10 @@ impl FilterExec { } } - pub fn with_selectivity(mut self, default_selectivity: u8) -> Result{ + pub fn with_default_selectivity( + mut self, + default_selectivity: u8, + ) -> Result { if default_selectivity > 100 { return plan_err!("Default flter selectivity needs to be less than 100"); } @@ -182,16 +185,12 @@ impl ExecutionPlan for FilterExec { self: Arc, mut children: Vec>, ) -> Result> { - FilterExec::try_new( - self.predicate.clone(), - children.swap_remove(0), - ).and_then( - |e| { + FilterExec::try_new(self.predicate.clone(), children.swap_remove(0)) + .and_then(|e| { let selectivity = e.default_selectivity(); - e.with_selectivity(selectivity) - } - ) - .map(|e| Arc::new(e) as _) + e.with_default_selectivity(selectivity) + }) + .map(|e| Arc::new(e) as _) } fn execute( @@ -221,15 +220,7 @@ impl ExecutionPlan for FilterExec { let input_stats = self.input.statistics()?; let schema = self.schema(); if !check_support(predicate, &schema) { - let selectivity = self.default_selectivity as f32 / 100.0; - let mut stats = input_stats.clone().into_inexact(); - if let Precision::Inexact(n) = stats.num_rows { - stats.num_rows = Precision::Inexact((selectivity * n as f32) as usize); - } - if let Precision::Inexact(n) = stats.total_byte_size { - stats.total_byte_size = - Precision::Inexact((selectivity * n as f32) as usize); - } + let selectivity = self.default_selectivity as f64 / 100.0; let mut stats = input_stats.into_inexact(); stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity); stats.total_byte_size = stats @@ -1025,7 +1016,7 @@ mod tests { } #[tokio::test] - async fn test_validation_filter_selectivity() -> Result<()>{ + async fn test_validation_filter_selectivity() -> Result<()> { let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); let input = Arc::new(StatisticsExec::new( Statistics::new_unknown(&schema), @@ -1038,7 +1029,7 @@ mod tests { Arc::new(Literal::new(ScalarValue::Int32(Some(10)))), )); let filter = FilterExec::try_new(predicate, input)?; - assert!(filter.with_selectivity(120).is_err()); + assert!(filter.with_default_selectivity(120).is_err()); Ok(()) } } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index d8bd644c26be..db088e8605ca 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -162,14 +162,13 @@ impl AsExecutionPlan for PhysicalPlanNode { let filter_selectivity = filter.default_filter_selectivity.try_into(); let filter = FilterExec::try_new(predicate, input)?; match filter_selectivity { - Ok(filter_selectivity) => { - Ok(Arc::new(filter.with_selectivity(filter_selectivity)?)) - } + Ok(filter_selectivity) => Ok(Arc::new( + filter.with_default_selectivity(filter_selectivity)?, + )), Err(_) => Err(DataFusionError::Internal( "filter_selectivity in PhysicalPlanNode is invalid ".to_owned(), )), } - } PhysicalPlanType::CsvScan(scan) => Ok(Arc::new(CsvExec::new( parse_protobuf_file_scan_config( diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 741ff724781f..5c6bf6e2dac1 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -188,6 +188,7 @@ datafusion.explain.logical_plan_only false datafusion.explain.physical_plan_only false datafusion.explain.show_statistics false datafusion.optimizer.allow_symmetric_joins_without_pruning true +datafusion.optimizer.default_filter_selectivity 20 datafusion.optimizer.enable_distinct_aggregation_soft_limit true datafusion.optimizer.enable_round_robin_repartition true datafusion.optimizer.enable_topk_aggregation true @@ -261,6 +262,7 @@ datafusion.explain.logical_plan_only false When set to true, the explain stateme datafusion.explain.physical_plan_only false When set to true, the explain statement will only print physical plans datafusion.explain.show_statistics false When set to true, the explain statement will print operator statistics for physical plans datafusion.optimizer.allow_symmetric_joins_without_pruning true Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. +datafusion.optimizer.default_filter_selectivity 20 The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 1e14682012d5..d5a43e429e09 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -98,8 +98,8 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan | | datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | | datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | -| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition -| datafusion.optimizer.default_filter_selectivity | 20 | The assumed filter selectivity in from 0 (no rows) to 100 (all rows) used when it is not possibl to determine exactly the number of rows returned by a filter +| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | +| datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). | | datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | | datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | | datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans | From 81034c209128b0046f464a40a5245de9c53861f5 Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Mon, 4 Dec 2023 16:45:15 +0000 Subject: [PATCH 3/3] Adding a filter with custom selectivity --- datafusion/physical-plan/src/filter.rs | 32 ++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index be3cbb12d94b..9df34939ec37 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -1032,4 +1032,36 @@ mod tests { assert!(filter.with_default_selectivity(120).is_err()); Ok(()) } + + #[tokio::test] + async fn test_custom_filter_selectivity() -> Result<()> { + // Need a decimal to trigger inexact selectivity + let schema = + Schema::new(vec![Field::new("a", DataType::Decimal128(2, 3), false)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(1000), + total_byte_size: Precision::Inexact(4000), + column_statistics: vec![ColumnStatistics { + ..Default::default() + }], + }, + schema, + )); + // WHERE a = 10 + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Decimal128(Some(10), 10, 10))), + )); + let filter = FilterExec::try_new(predicate, input)?; + let statistics = filter.statistics()?; + assert_eq!(statistics.num_rows, Precision::Inexact(200)); + assert_eq!(statistics.total_byte_size, Precision::Inexact(800)); + let filter = filter.with_default_selectivity(40)?; + let statistics = filter.statistics()?; + assert_eq!(statistics.num_rows, Precision::Inexact(400)); + assert_eq!(statistics.total_byte_size, Precision::Inexact(1600)); + Ok(()) + } }