Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make filter selectivity for statistics configurable #8243

Merged
merged 4 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,11 @@ config_namespace! {
/// The maximum estimated size in bytes for one input side of a HashJoin
/// will be collected into a single partition
pub hash_join_single_partition_threshold: usize, default = 1024 * 1024

/// The default filter selectivity used by Filter Statistics
/// when an exact selectivity cannot be determined. Valid values are
/// between 0 (no selectivity) and 100 (all rows are selected).
pub default_filter_selectivity: u8, default = 20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to make this a float (0.2).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two main reasons for choosing a uint are the lack of Eq trait implementation for f32, as well as the problem that could arise when serializing numbers that cannot be perfectly represented as f32. If you had already made this consideration and you think f32 is still a better option, let me know and I will proceed

}
}

Expand Down Expand Up @@ -877,6 +882,7 @@ config_field!(String);
config_field!(bool);
config_field!(usize);
config_field!(f64);
config_field!(u8);
config_field!(u64);

/// An implementation trait used to recursively walk configuration
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,10 @@ fn try_swapping_with_filter(
};

FilterExec::try_new(new_predicate, make_with_child(projection, filter.input())?)
.and_then(|e| {
let selectivity = filter.default_selectivity();
e.with_default_selectivity(selectivity)
})
.map(|e| Some(Arc::new(e) as _))
}

Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,9 @@ impl DefaultPhysicalPlanner {
&input_schema,
session_state,
)?;
Ok(Arc::new(FilterExec::try_new(runtime_expr, physical_input)?))
let selectivity = session_state.config().options().optimizer.default_filter_selectivity;
let filter = FilterExec::try_new(runtime_expr, physical_input)?;
Ok(Arc::new(filter.with_default_selectivity(selectivity)?))
}
LogicalPlan::Union(Union { inputs, schema }) => {
let physical_plans = self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), session_state).await?;
Expand Down
46 changes: 42 additions & 4 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ pub struct FilterExec {
input: Arc<dyn ExecutionPlan>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Selectivity for statistics. 0 = no rows, 100 all rows
default_selectivity: u8,
}

impl FilterExec {
Expand All @@ -75,13 +77,25 @@ impl FilterExec {
predicate,
input: input.clone(),
metrics: ExecutionPlanMetricsSet::new(),
default_selectivity: 20,
}),
other => {
plan_err!("Filter predicate must return boolean values, not {other:?}")
}
}
}

pub fn with_default_selectivity(
mut self,
default_selectivity: u8,
) -> Result<Self, DataFusionError> {
if default_selectivity > 100 {
return plan_err!("Default flter selectivity needs to be less than 100");
}
self.default_selectivity = default_selectivity;
Ok(self)
}

/// The expression to filter on. This expression must evaluate to a boolean value.
pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
&self.predicate
Expand All @@ -91,6 +105,11 @@ impl FilterExec {
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
}

/// The default selectivity
pub fn default_selectivity(&self) -> u8 {
self.default_selectivity
}
}

impl DisplayAs for FilterExec {
Expand Down Expand Up @@ -167,6 +186,10 @@ impl ExecutionPlan for FilterExec {
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
FilterExec::try_new(self.predicate.clone(), children.swap_remove(0))
.and_then(|e| {
let selectivity = e.default_selectivity();
e.with_default_selectivity(selectivity)
})
.map(|e| Arc::new(e) as _)
}

Expand Down Expand Up @@ -197,10 +220,7 @@ impl ExecutionPlan for FilterExec {
let input_stats = self.input.statistics()?;
let schema = self.schema();
if !check_support(predicate, &schema) {
// assume filter selects 20% of rows if we cannot do anything smarter
// tracking issue for making this configurable:
// https://github.com/apache/arrow-datafusion/issues/8133
let selectivity = 0.2_f64;
let selectivity = self.default_selectivity as f64 / 100.0;
let mut stats = input_stats.into_inexact();
stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity);
stats.total_byte_size = stats
Expand Down Expand Up @@ -994,4 +1014,22 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_validation_filter_selectivity() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we also add a test showing that changing the default selectivity actually affects the output statistics?

I think if the selectivity got hard coded to 0.2 again, no tests would fail 🤔 Maybe we could add another unit tests here setting selectivity to 0.5 or something and demonstrating the statistics are different

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we should, however I didn't know how to observe the effect of such a parameter. Is there some form of observable state or result exposed that I can use to perform an assertion about what selectivity has been used ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should be able to look at the output of FilterExec::statistics() and the row number estimates will change with different value of selectivity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will try to finish it between today and tomorrow, thanks for the suggestion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb fixed with 81034c2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure however how this should be handled

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure however how this should be handled

That code will be invoked for 'complicated' predicates -- maybe we could fake it with something like sin(x) = 4.0.

let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let input = Arc::new(StatisticsExec::new(
Statistics::new_unknown(&schema),
schema,
));
// WHERE a = 10
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
));
let filter = FilterExec::try_new(predicate, input)?;
assert!(filter.with_default_selectivity(120).is_err());
Ok(())
}
}
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,7 @@ message PhysicalNegativeNode {
message FilterExecNode {
PhysicalPlanNode input = 1;
PhysicalExprNode expr = 2;
uint32 default_filter_selectivity = 3;
}

message FileGroup {
Expand Down
20 changes: 20 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 11 additions & 1 deletion datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,16 @@ impl AsExecutionPlan for PhysicalPlanNode {
.to_owned(),
)
})?;
Ok(Arc::new(FilterExec::try_new(predicate, input)?))
let filter_selectivity = filter.default_filter_selectivity.try_into();
let filter = FilterExec::try_new(predicate, input)?;
match filter_selectivity {
Ok(filter_selectivity) => Ok(Arc::new(
filter.with_default_selectivity(filter_selectivity)?,
)),
Err(_) => Err(DataFusionError::Internal(
"filter_selectivity in PhysicalPlanNode is invalid ".to_owned(),
)),
}
}
PhysicalPlanType::CsvScan(scan) => Ok(Arc::new(CsvExec::new(
parse_protobuf_file_scan_config(
Expand Down Expand Up @@ -991,6 +1000,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
protobuf::FilterExecNode {
input: Some(Box::new(input)),
expr: Some(exec.predicate().clone().try_into()?),
default_filter_selectivity: exec.default_selectivity() as u32,
},
))),
});
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ datafusion.explain.logical_plan_only false
datafusion.explain.physical_plan_only false
datafusion.explain.show_statistics false
datafusion.optimizer.allow_symmetric_joins_without_pruning true
datafusion.optimizer.default_filter_selectivity 20
datafusion.optimizer.enable_distinct_aggregation_soft_limit true
datafusion.optimizer.enable_round_robin_repartition true
datafusion.optimizer.enable_topk_aggregation true
Expand Down Expand Up @@ -261,6 +262,7 @@ datafusion.explain.logical_plan_only false When set to true, the explain stateme
datafusion.explain.physical_plan_only false When set to true, the explain statement will only print physical plans
datafusion.explain.show_statistics false When set to true, the explain statement will print operator statistics for physical plans
datafusion.optimizer.allow_symmetric_joins_without_pruning true Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors.
datafusion.optimizer.default_filter_selectivity 20 The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected).
datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read.
datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores
datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys |
| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory |
| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition |
| datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). |
| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans |
| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans |
| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |
Expand Down