Skip to content

Commit

Permalink
Renaming API to be more consistent with struct value
Browse files Browse the repository at this point in the history
  • Loading branch information
edmondop committed Nov 21, 2023
1 parent 879987b commit d121405
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 41 deletions.
5 changes: 3 additions & 2 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,9 @@ config_namespace! {
/// will be collected into a single partition
pub hash_join_single_partition_threshold: usize, default = 1024 * 1024

/// The default filter selectivity used by Filter statistics
/// when an exact selectivity cannot be determined
/// The default filter selectivity used by Filter Statistics
/// when an exact selectivity cannot be determined. Valid values are
/// between 0 (no selectivity) and 100 (all rows are selected).
pub default_filter_selectivity: u8, default = 20
}
}
Expand Down
14 changes: 5 additions & 9 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,16 +302,12 @@ fn try_swapping_with_filter(
return Ok(None);
};

FilterExec::try_new(
new_predicate,
make_with_child(projection, filter.input())?,
).and_then(
|e| {
FilterExec::try_new(new_predicate, make_with_child(projection, filter.input())?)
.and_then(|e| {
let selectivity = filter.default_selectivity();
e.with_selectivity(selectivity)
}
)
.map(|e| Some(Arc::new(e) as _))
e.with_default_selectivity(selectivity)
})
.map(|e| Some(Arc::new(e) as _))
}

/// Tries to swap the projection with its input [`RepartitionExec`]. If it can be done,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ impl DefaultPhysicalPlanner {
)?;
let selectivity = session_state.config().options().optimizer.default_filter_selectivity;
let filter = FilterExec::try_new(runtime_expr, physical_input)?;
Ok(Arc::new(filter.with_selectivity(selectivity)?))
Ok(Arc::new(filter.with_default_selectivity(selectivity)?))
}
LogicalPlan::Union(Union { inputs, schema }) => {
let physical_plans = self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), session_state).await?;
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,7 @@ impl TestParquetFile {
None,
));

let exec =
Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
Ok(exec)
} else {
Ok(Arc::new(ParquetExec::new(scan_config, None, None)))
Expand Down
33 changes: 12 additions & 21 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ impl FilterExec {
}
}

pub fn with_selectivity(mut self, default_selectivity: u8) -> Result<Self, DataFusionError>{
pub fn with_default_selectivity(
mut self,
default_selectivity: u8,
) -> Result<Self, DataFusionError> {
if default_selectivity > 100 {
return plan_err!("Default flter selectivity needs to be less than 100");
}
Expand Down Expand Up @@ -182,16 +185,12 @@ impl ExecutionPlan for FilterExec {
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
FilterExec::try_new(
self.predicate.clone(),
children.swap_remove(0),
).and_then(
|e| {
FilterExec::try_new(self.predicate.clone(), children.swap_remove(0))
.and_then(|e| {
let selectivity = e.default_selectivity();
e.with_selectivity(selectivity)
}
)
.map(|e| Arc::new(e) as _)
e.with_default_selectivity(selectivity)
})
.map(|e| Arc::new(e) as _)
}

fn execute(
Expand Down Expand Up @@ -221,15 +220,7 @@ impl ExecutionPlan for FilterExec {
let input_stats = self.input.statistics()?;
let schema = self.schema();
if !check_support(predicate, &schema) {
let selectivity = self.default_selectivity as f32 / 100.0;
let mut stats = input_stats.clone().into_inexact();
if let Precision::Inexact(n) = stats.num_rows {
stats.num_rows = Precision::Inexact((selectivity * n as f32) as usize);
}
if let Precision::Inexact(n) = stats.total_byte_size {
stats.total_byte_size =
Precision::Inexact((selectivity * n as f32) as usize);
}
let selectivity = self.default_selectivity as f64 / 100.0;
let mut stats = input_stats.into_inexact();
stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity);
stats.total_byte_size = stats
Expand Down Expand Up @@ -1025,7 +1016,7 @@ mod tests {
}

#[tokio::test]
async fn test_validation_filter_selectivity() -> Result<()>{
async fn test_validation_filter_selectivity() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let input = Arc::new(StatisticsExec::new(
Statistics::new_unknown(&schema),
Expand All @@ -1038,7 +1029,7 @@ mod tests {
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
));
let filter = FilterExec::try_new(predicate, input)?;
assert!(filter.with_selectivity(120).is_err());
assert!(filter.with_default_selectivity(120).is_err());
Ok(())
}
}
7 changes: 3 additions & 4 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,13 @@ impl AsExecutionPlan for PhysicalPlanNode {
let filter_selectivity = filter.default_filter_selectivity.try_into();
let filter = FilterExec::try_new(predicate, input)?;
match filter_selectivity {
Ok(filter_selectivity) => {
Ok(Arc::new(filter.with_selectivity(filter_selectivity)?))
}
Ok(filter_selectivity) => Ok(Arc::new(
filter.with_default_selectivity(filter_selectivity)?,
)),
Err(_) => Err(DataFusionError::Internal(
"filter_selectivity in PhysicalPlanNode is invalid ".to_owned(),
)),
}

}
PhysicalPlanType::CsvScan(scan) => Ok(Arc::new(CsvExec::new(
parse_protobuf_file_scan_config(
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ datafusion.explain.logical_plan_only false
datafusion.explain.physical_plan_only false
datafusion.explain.show_statistics false
datafusion.optimizer.allow_symmetric_joins_without_pruning true
datafusion.optimizer.default_filter_selectivity 20
datafusion.optimizer.enable_distinct_aggregation_soft_limit true
datafusion.optimizer.enable_round_robin_repartition true
datafusion.optimizer.enable_topk_aggregation true
Expand Down Expand Up @@ -261,6 +262,7 @@ datafusion.explain.logical_plan_only false When set to true, the explain stateme
datafusion.explain.physical_plan_only false When set to true, the explain statement will only print physical plans
datafusion.explain.show_statistics false When set to true, the explain statement will print operator statistics for physical plans
datafusion.optimizer.allow_symmetric_joins_without_pruning true Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors.
datafusion.optimizer.default_filter_selectivity 20 The default filter selectivity used by Filter statistics when an exact selectivity cannot be determined
datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read.
datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores
datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible
Expand Down
4 changes: 2 additions & 2 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan |
| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys |
| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory |
| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition
| datafusion.optimizer.default_filter_selectivity | 20 | The assumed filter selectivity in from 0 (no rows) to 100 (all rows) used when it is not possibl to determine exactly the number of rows returned by a filter
| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition |
| datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). |
| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans |
| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans |
| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |
Expand Down

0 comments on commit d121405

Please sign in to comment.