From 54fdc5dd94d586b5f22b0103206bdf43502c0865 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 30 Oct 2024 12:42:16 -0400 Subject: [PATCH 1/2] Add config option `skip_physical_aggregate_schema_check ` (#13176) * Add option to skip physical aggregate check * tweak wording * update test --- datafusion/common/src/config.rs | 11 +++++++++++ datafusion/core/src/physical_planner.rs | 8 +++++--- .../sqllogictest/test_files/information_schema.slt | 2 ++ docs/source/user-guide/configs.md | 1 + 4 files changed, 19 insertions(+), 3 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 1e1c5d5424b0..312e7c1b7950 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -268,6 +268,17 @@ config_namespace! { /// Defaults to the number of CPU cores on the system pub planning_concurrency: usize, default = num_cpus::get() + /// When set to true, skips verifying that the schema produced by + /// planning the input of `LogicalPlan::Aggregate` exactly matches the + /// schema of the input plan. + /// + /// When set to false, if the schema does not match exactly + /// (including nullability and metadata), a planning error will be raised. + /// + /// This is used to workaround bugs in the planner that are now caught by + /// the new schema verification step. + pub skip_physical_aggregate_schema_check: bool, default = false + /// Specifies the reserved memory for each spillable sort operation to /// facilitate an in-memory merge. /// diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index cc35255dfe29..9efc1f76c2d5 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -665,14 +665,16 @@ impl DefaultPhysicalPlanner { aggr_expr, .. }) => { + let options = session_state.config().options(); // Initially need to perform the aggregate and then merge the partitions let input_exec = children.one()?; let physical_input_schema = input_exec.schema(); let logical_input_schema = input.as_ref().schema(); - let physical_input_schema_from_logical: Arc = - logical_input_schema.as_ref().clone().into(); + let physical_input_schema_from_logical = logical_input_schema.inner(); - if physical_input_schema != physical_input_schema_from_logical { + if &physical_input_schema != physical_input_schema_from_logical + && !options.execution.skip_physical_aggregate_schema_check + { return internal_err!("Physical input schema should be the same as the one converted from logical input schema."); } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 7acdf25b6596..bc3e0b9cb66b 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -208,6 +208,7 @@ datafusion.execution.parquet.writer_version 1.0 datafusion.execution.planning_concurrency 13 datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 +datafusion.execution.skip_physical_aggregate_schema_check false datafusion.execution.soft_max_rows_per_output_file 50000000 datafusion.execution.sort_in_place_threshold_bytes 1048576 datafusion.execution.sort_spill_reservation_bytes 10485760 @@ -298,6 +299,7 @@ datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer ve datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode +datafusion.execution.skip_physical_aggregate_schema_check false When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index a2103c50d37e..be43486e05cc 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -78,6 +78,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.maximum_parallel_row_group_writers | 1 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | | datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | +| datafusion.execution.skip_physical_aggregate_schema_check | false | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. | | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | | datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | | datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics | From fb4270e914049dded2cab78406ecdf96aeb589f4 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 30 Oct 2024 14:37:34 -0400 Subject: [PATCH 2/2] Change default value --- datafusion/common/src/config.rs | 4 +++- datafusion/sqllogictest/test_files/information_schema.slt | 4 ++-- docs/source/user-guide/configs.md | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 312e7c1b7950..18f314e5f584 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -277,7 +277,9 @@ config_namespace! { /// /// This is used to workaround bugs in the planner that are now caught by /// the new schema verification step. - pub skip_physical_aggregate_schema_check: bool, default = false + /// + /// This configuration option will default to `false` in future releases. + pub skip_physical_aggregate_schema_check: bool, default = true /// Specifies the reserved memory for each spillable sort operation to /// facilitate an in-memory merge. diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index bc3e0b9cb66b..a58e446736b1 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -208,7 +208,7 @@ datafusion.execution.parquet.writer_version 1.0 datafusion.execution.planning_concurrency 13 datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 -datafusion.execution.skip_physical_aggregate_schema_check false +datafusion.execution.skip_physical_aggregate_schema_check true datafusion.execution.soft_max_rows_per_output_file 50000000 datafusion.execution.sort_in_place_threshold_bytes 1048576 datafusion.execution.sort_spill_reservation_bytes 10485760 @@ -299,7 +299,7 @@ datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer ve datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode -datafusion.execution.skip_physical_aggregate_schema_check false When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. +datafusion.execution.skip_physical_aggregate_schema_check true When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. This configuration option will default to `false` in future releases. datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index be43486e05cc..86be17eea8ec 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -78,7 +78,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.maximum_parallel_row_group_writers | 1 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | | datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | -| datafusion.execution.skip_physical_aggregate_schema_check | false | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. | +| datafusion.execution.skip_physical_aggregate_schema_check | true | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. This configuration option will default to `false` in future releases. | | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | | datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | | datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics |