From 997b63e35f35332cad90acc2f45b8cd377e9f91c Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Fri, 27 Jan 2023 20:02:29 +0300 Subject: [PATCH] settings reorganized --- datafusion/common/src/config.rs | 10 ++-- datafusion/core/src/execution/context.rs | 23 +++++---- .../src/physical_optimizer/repartition.rs | 48 +++++++++-------- .../src/physical_plan/file_format/parquet.rs | 51 ++++++++++++++++--- .../test_files/information_schema.slt | 3 +- docs/source/user-guide/configs.md | 3 +- 6 files changed, 95 insertions(+), 43 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 4e10bc4a9fe2..4a735d47e4be 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -195,9 +195,6 @@ config_namespace! { /// Should DataFusion collect statistics after listing files pub collect_statistics: bool, default = false - /// Enables parallel file scanning. Currently supported only for Parquet format - pub parallel_file_scan: bool, default = false - /// Number of partitions for query execution. Increasing partitions can increase /// concurrency. Defaults to the number of cpu cores on the system pub target_partitions: usize, default = num_cpus::get() @@ -264,10 +261,17 @@ config_namespace! { /// in parallel using the provided `target_partitions` level" pub repartition_aggregations: bool, default = true + /// Minimum total files size in bytes to perform file scan repartitioning. + pub repartition_file_min_size: usize, default = 10 * 1024 * 1024 + /// Should DataFusion repartition data using the join keys to execute joins in parallel /// using the provided `target_partitions` level" pub repartition_joins: bool, default = true + /// When set to true, file groups will be repartitioned to achieve maximum parallelism. + /// Currently supported only for Parquet format + pub repartition_file_scans: bool, default = false + /// Should DataFusion repartition data using the partitions keys to execute window /// functions in parallel using the provided `target_partitions` level" pub repartition_windows: bool, default = true diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index fb716f6ee00a..ca80267e0cf3 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1216,11 +1216,6 @@ impl SessionConfig { self.options.execution.collect_statistics } - /// Are file scans parallelized during execution? - pub fn parallel_file_scan(&self) -> bool { - self.options.execution.parallel_file_scan - } - /// Selects a name for the default catalog and schema pub fn with_default_catalog_and_schema( mut self, @@ -1256,6 +1251,18 @@ impl SessionConfig { self } + /// Sets minimum file range size for repartitioning scans + pub fn with_repartition_file_min_size(mut self, size: usize) -> Self { + self.options.optimizer.repartition_file_min_size = size; + self + } + + /// Enables or disables the use of repartitioning for file scans + pub fn with_repartition_file_scans(mut self, enabled: bool) -> Self { + self.options.optimizer.repartition_file_scans = enabled; + self + } + /// Enables or disables the use of repartitioning for window functions to improve parallelism pub fn with_repartition_windows(mut self, enabled: bool) -> Self { self.options.optimizer.repartition_windows = enabled; @@ -1279,12 +1286,6 @@ impl SessionConfig { self } - /// Enables or disables parallel file scanning after listing files - pub fn with_parallel_file_scan(mut self, enabled: bool) -> Self { - self.options.execution.parallel_file_scan = enabled; - self - } - /// Get the currently configured batch size pub fn batch_size(&self) -> usize { self.options.execution.batch_size diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index affee5550836..6edee855f294 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -168,7 +168,8 @@ fn optimize_partitions( is_root: bool, can_reorder: bool, would_benefit: bool, - parallel_file_scan: bool, + repartition_file_scans: bool, + repartition_file_min_size: usize, ) -> Result> { // Recurse into children bottom-up (attempt to repartition as // early as possible) @@ -201,7 +202,8 @@ fn optimize_partitions( false, // child is not root can_reorder_child, plan.benefits_from_input_partitioning(), - parallel_file_scan, + repartition_file_scans, + repartition_file_min_size, ) }) .collect::>()?; @@ -237,10 +239,13 @@ fn optimize_partitions( return Ok(new_plan); } - // For ParquetExec return internally repartitioned version of the plan in case parallel_file_scan is set + // For ParquetExec return internally repartitioned version of the plan in case `repartition_file_scans` is set if let Some(parquet_exec) = new_plan.as_any().downcast_ref::() { - if parallel_file_scan { - return Ok(Arc::new(parquet_exec.get_repartitioned(target_partitions))); + if repartition_file_scans { + return Ok(Arc::new( + parquet_exec + .get_repartitioned(target_partitions, repartition_file_min_size), + )); } } @@ -267,7 +272,8 @@ impl PhysicalOptimizerRule for Repartition { ) -> Result> { let target_partitions = config.execution.target_partitions; let enabled = config.optimizer.enable_round_robin_repartition; - let parallel_file_scan = config.execution.parallel_file_scan; + let repartition_file_scans = config.optimizer.repartition_file_scans; + let repartition_file_min_size = config.optimizer.repartition_file_min_size; // Don't run optimizer if target_partitions == 1 if !enabled || target_partitions == 1 { Ok(plan) @@ -281,7 +287,8 @@ impl PhysicalOptimizerRule for Repartition { is_root, can_reorder, would_benefit, - parallel_file_scan, + repartition_file_scans, + repartition_file_min_size, ) } } @@ -486,15 +493,16 @@ mod tests { /// Runs the repartition optimizer and asserts the plan against the expected macro_rules! assert_optimized { ($EXPECTED_LINES: expr, $PLAN: expr) => { - assert_optimized!($EXPECTED_LINES, $PLAN, 10, false); + assert_optimized!($EXPECTED_LINES, $PLAN, 10, false, 1024); }; - ($EXPECTED_LINES: expr, $PLAN: expr, $TAGRET_PARTITIONS: expr, $PARALLEL_SCAN: expr) => { + ($EXPECTED_LINES: expr, $PLAN: expr, $TAGRET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => { let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect(); let mut config = ConfigOptions::new(); config.execution.target_partitions = $TAGRET_PARTITIONS; - config.execution.parallel_file_scan = $PARALLEL_SCAN; + config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS; + config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE; // run optimizer let optimizers: Vec> = vec![ @@ -900,7 +908,7 @@ mod tests { "ParquetExec: limit=None, partitions={2 groups: [[x:0..50], [x:50..100]]}, projection=[c1]", ]; - assert_optimized!(expected, plan, 2, true); + assert_optimized!(expected, plan, 2, true, 10); Ok(()) } @@ -916,7 +924,7 @@ mod tests { "ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, projection=[c1]", ]; - assert_optimized!(expected, plan, 2, true); + assert_optimized!(expected, plan, 2, true, 10); Ok(()) } @@ -933,7 +941,7 @@ mod tests { "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", ]; - assert_optimized!(expected, plan, 2, true); + assert_optimized!(expected, plan, 2, true, 10); Ok(()) } @@ -952,7 +960,7 @@ mod tests { "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", ]; - assert_optimized!(expected, plan, 2, true); + assert_optimized!(expected, plan, 2, true, 10); Ok(()) } @@ -977,7 +985,7 @@ mod tests { "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", ]; - assert_optimized!(expected, plan, 2, true); + assert_optimized!(expected, plan, 2, true, 10); Ok(()) } @@ -995,7 +1003,7 @@ mod tests { "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", ]; - assert_optimized!(expected, plan, 2, true); + assert_optimized!(expected, plan, 2, true, 10); Ok(()) } @@ -1010,7 +1018,7 @@ mod tests { "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", ]; - assert_optimized!(expected, plan, 2, true); + assert_optimized!(expected, plan, 2, true, 10); Ok(()) } @@ -1028,7 +1036,7 @@ mod tests { "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", ]; - assert_optimized!(expected, plan, 2, true); + assert_optimized!(expected, plan, 2, true, 10); Ok(()) } @@ -1045,7 +1053,7 @@ mod tests { "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", ]; - assert_optimized!(expected, plan, 2, true); + assert_optimized!(expected, plan, 2, true, 10); Ok(()) } @@ -1061,7 +1069,7 @@ mod tests { "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", ]; - assert_optimized!(expected, plan, 2, true); + assert_optimized!(expected, plan, 2, true, 10); Ok(()) } diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 129577460ad1..723c87eb5d77 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -243,7 +243,11 @@ impl ParquetExec { } /// Redistribute files across partitions according to their size - pub fn get_repartitioned(&self, target_partitions: usize) -> Self { + pub fn get_repartitioned( + &self, + target_partitions: usize, + repartition_file_min_size: usize, + ) -> Self { let flattened_files = self .base_config() .file_groups @@ -261,6 +265,10 @@ impl ParquetExec { .iter() .map(|f| f.object_meta.size as i64) .sum::(); + if total_size < (repartition_file_min_size as i64) { + return self.clone(); + } + let target_partition_size = (total_size as usize + (target_partitions) - 1) / (target_partitions); @@ -1738,7 +1746,7 @@ mod tests { let actual = file_groups_to_vec( parquet_exec - .get_repartitioned(4) + .get_repartitioned(4, 10) .base_config() .file_groups .clone(), @@ -1775,7 +1783,7 @@ mod tests { let actual = file_groups_to_vec( parquet_exec - .get_repartitioned(96) + .get_repartitioned(96, 5) .base_config() .file_groups .clone(), @@ -1817,7 +1825,7 @@ mod tests { let actual = file_groups_to_vec( parquet_exec - .get_repartitioned(3) + .get_repartitioned(3, 10) .base_config() .file_groups .clone(), @@ -1855,7 +1863,7 @@ mod tests { let actual = file_groups_to_vec( parquet_exec - .get_repartitioned(2) + .get_repartitioned(2, 10) .base_config() .file_groups .clone(), @@ -1869,7 +1877,7 @@ mod tests { } #[tokio::test] - async fn parquet_exec_repartition_no_action() { + async fn parquet_exec_repartition_no_action_ranges() { // No action due to Some(range) in second file let partitioned_file_1 = PartitionedFile::new("a".to_string(), 123); let mut partitioned_file_2 = PartitionedFile::new("b".to_string(), 144); @@ -1893,13 +1901,42 @@ mod tests { ); let actual = parquet_exec - .get_repartitioned(65) + .get_repartitioned(65, 10) .base_config() .file_groups .clone(); assert_eq!(2, actual.len()); } + #[tokio::test] + async fn parquet_exec_repartition_no_action_min_size() { + // No action due to target_partition_size + let partitioned_file = PartitionedFile::new("a".to_string(), 123); + let single_partition = vec![vec![partitioned_file]]; + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: single_partition, + file_schema: Arc::new(Schema::empty()), + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: None, + infinite_source: false, + }, + None, + None, + ); + + let actual = parquet_exec + .get_repartitioned(65, 500) + .base_config() + .file_groups + .clone(); + assert_eq!(1, actual.len()); + } + fn file_groups_to_vec( file_groups: Vec>, ) -> Vec<(usize, String, i64, i64)> { diff --git a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt index 84345b7ba562..bd796ed71b3d 100644 --- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt +++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt @@ -115,7 +115,6 @@ datafusion.catalog.location NULL datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true datafusion.execution.collect_statistics false -datafusion.execution.parallel_file_scan false datafusion.execution.parquet.enable_page_index false datafusion.execution.parquet.metadata_size_hint NULL datafusion.execution.parquet.pruning true @@ -132,6 +131,8 @@ datafusion.optimizer.hash_join_single_partition_threshold 1048576 datafusion.optimizer.max_passes 3 datafusion.optimizer.prefer_hash_join true datafusion.optimizer.repartition_aggregations true +datafusion.optimizer.repartition_file_min_size 10485760 +datafusion.optimizer.repartition_file_scans false datafusion.optimizer.repartition_joins true datafusion.optimizer.repartition_windows true datafusion.optimizer.skip_failed_rules true diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 004ddc7e7fed..ec14b962ee80 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -47,7 +47,6 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.batch_size | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would results in too much metadata memory consumption | | datafusion.execution.coalesce_batches | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting | | datafusion.execution.collect_statistics | false | Should DataFusion collect statistics after listing files | -| datafusion.execution.parallel_file_scan | false | Enables parallel file scanning. Currently supported only for Parquet format | | datafusion.execution.target_partitions | 0 | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of cpu cores on the system | | datafusion.execution.time_zone | +00:00 | The default time zone Some functions, e.g. EXTRACT(HOUR from SOME_TIME), shift the underlying datetime according to this time zone, and then extract the hour | | datafusion.execution.parquet.enable_page_index | false | If true, uses parquet data page level metadata (Page Index) statistics to reduce the number of rows decoded. | @@ -59,7 +58,9 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartition to increase parallelism to leverage more CPU cores | | datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | | datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level" | +| datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. | | datafusion.optimizer.repartition_joins | true | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level" | +| datafusion.optimizer.repartition_file_scans | false | When set to true, file groups will be repartitioned to achieve maximum parallelism. Currently supported only for Parquet format | | datafusion.optimizer.repartition_windows | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level" | | datafusion.optimizer.skip_failed_rules | true | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail | | datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan |