Skip to content

Commit

Permalink
settings reorganized
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Jan 27, 2023
1 parent 556b0c6 commit d6e95f7
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 43 deletions.
10 changes: 7 additions & 3 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,6 @@ config_namespace! {
/// Should DataFusion collect statistics after listing files
pub collect_statistics: bool, default = false

/// Enables parallel file scanning. Currently supported only for Parquet format
pub parallel_file_scan: bool, default = false

/// Number of partitions for query execution. Increasing partitions can increase
/// concurrency. Defaults to the number of cpu cores on the system
pub target_partitions: usize, default = num_cpus::get()
Expand Down Expand Up @@ -264,10 +261,17 @@ config_namespace! {
/// in parallel using the provided `target_partitions` level"
pub repartition_aggregations: bool, default = true

/// Minimum total files size in bytes to perform file scan repartitioning.
pub repartition_file_min_size: usize, default = 10 * 1024 * 1024

/// Should DataFusion repartition data using the join keys to execute joins in parallel
/// using the provided `target_partitions` level"
pub repartition_joins: bool, default = true

/// When set to true, file groups will be repartitioned to achieve maximum parallelism.
/// Currently supported only for Parquet format
pub repartition_file_scans: bool, default = false

/// Should DataFusion repartition data using the partitions keys to execute window
/// functions in parallel using the provided `target_partitions` level"
pub repartition_windows: bool, default = true
Expand Down
23 changes: 12 additions & 11 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1216,11 +1216,6 @@ impl SessionConfig {
self.options.execution.collect_statistics
}

/// Are file scans parallelized during execution?
pub fn parallel_file_scan(&self) -> bool {
self.options.execution.parallel_file_scan
}

/// Selects a name for the default catalog and schema
pub fn with_default_catalog_and_schema(
mut self,
Expand Down Expand Up @@ -1256,6 +1251,18 @@ impl SessionConfig {
self
}

/// Sets minimum file range size for repartitioning scans
pub fn with_repartition_file_min_size(mut self, size: usize) -> Self {
self.options.optimizer.repartition_file_min_size = size;
self
}

/// Enables or disables the use of repartitioning for file scans
pub fn with_repartition_file_scans(mut self, enabled: bool) -> Self {
self.options.optimizer.repartition_file_scans = enabled;
self
}

/// Enables or disables the use of repartitioning for window functions to improve parallelism
pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
self.options.optimizer.repartition_windows = enabled;
Expand All @@ -1279,12 +1286,6 @@ impl SessionConfig {
self
}

/// Enables or disables parallel file scanning after listing files
pub fn with_parallel_file_scan(mut self, enabled: bool) -> Self {
self.options.execution.parallel_file_scan = enabled;
self
}

/// Get the currently configured batch size
pub fn batch_size(&self) -> usize {
self.options.execution.batch_size
Expand Down
49 changes: 29 additions & 20 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ fn optimize_partitions(
is_root: bool,
can_reorder: bool,
would_benefit: bool,
parallel_file_scan: bool,
repartition_file_scans: bool,
repartition_file_min_size: usize,
) -> Result<Arc<dyn ExecutionPlan>> {
// Recurse into children bottom-up (attempt to repartition as
// early as possible)
Expand Down Expand Up @@ -201,7 +202,8 @@ fn optimize_partitions(
false, // child is not root
can_reorder_child,
plan.benefits_from_input_partitioning(),
parallel_file_scan,
repartition_file_scans,
repartition_file_min_size,
)
})
.collect::<Result<_>>()?;
Expand Down Expand Up @@ -237,10 +239,13 @@ fn optimize_partitions(
return Ok(new_plan);
}

// For ParquetExec return internally repartitioned version of the plan in case parallel_file_scan is set
// For ParquetExec return internally repartitioned version of the plan in case `repartition_file_scans` is set
if let Some(parquet_exec) = new_plan.as_any().downcast_ref::<ParquetExec>() {
if parallel_file_scan {
return Ok(Arc::new(parquet_exec.get_repartitioned(target_partitions)));
if repartition_file_scans {
return Ok(Arc::new(parquet_exec.get_repartitioned(
target_partitions,
repartition_file_min_size,
)));
}
}

Expand All @@ -267,7 +272,9 @@ impl PhysicalOptimizerRule for Repartition {
) -> Result<Arc<dyn ExecutionPlan>> {
let target_partitions = config.execution.target_partitions;
let enabled = config.optimizer.enable_round_robin_repartition;
let parallel_file_scan = config.execution.parallel_file_scan;
let repartition_file_scans = config.optimizer.repartition_file_scans;
let repartition_file_min_size =
config.optimizer.repartition_file_min_size;
// Don't run optimizer if target_partitions == 1
if !enabled || target_partitions == 1 {
Ok(plan)
Expand All @@ -281,7 +288,8 @@ impl PhysicalOptimizerRule for Repartition {
is_root,
can_reorder,
would_benefit,
parallel_file_scan,
repartition_file_scans,
repartition_file_min_size,
)
}
}
Expand Down Expand Up @@ -486,15 +494,16 @@ mod tests {
/// Runs the repartition optimizer and asserts the plan against the expected
macro_rules! assert_optimized {
($EXPECTED_LINES: expr, $PLAN: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, 10, false);
assert_optimized!($EXPECTED_LINES, $PLAN, 10, false, 1024);
};

($EXPECTED_LINES: expr, $PLAN: expr, $TAGRET_PARTITIONS: expr, $PARALLEL_SCAN: expr) => {
($EXPECTED_LINES: expr, $PLAN: expr, $TAGRET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();

let mut config = ConfigOptions::new();
config.execution.target_partitions = $TAGRET_PARTITIONS;
config.execution.parallel_file_scan = $PARALLEL_SCAN;
config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS;
config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE;

// run optimizer
let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Expand Down Expand Up @@ -900,7 +909,7 @@ mod tests {
"ParquetExec: limit=None, partitions={2 groups: [[x:0..50], [x:50..100]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true);
assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

Expand All @@ -916,7 +925,7 @@ mod tests {
"ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true);
assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

Expand All @@ -933,7 +942,7 @@ mod tests {
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true);
assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

Expand All @@ -952,7 +961,7 @@ mod tests {
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true);
assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

Expand All @@ -977,7 +986,7 @@ mod tests {
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true);
assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

Expand All @@ -995,7 +1004,7 @@ mod tests {
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true);
assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

Expand All @@ -1010,7 +1019,7 @@ mod tests {
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
];

assert_optimized!(expected, plan, 2, true);
assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

Expand All @@ -1028,7 +1037,7 @@ mod tests {
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
];

assert_optimized!(expected, plan, 2, true);
assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

Expand All @@ -1045,7 +1054,7 @@ mod tests {
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
];

assert_optimized!(expected, plan, 2, true);
assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

Expand All @@ -1061,7 +1070,7 @@ mod tests {
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
];

assert_optimized!(expected, plan, 2, true);
assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

Expand Down
51 changes: 44 additions & 7 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,11 @@ impl ParquetExec {
}

/// Redistribute files across partitions according to their size
pub fn get_repartitioned(&self, target_partitions: usize) -> Self {
pub fn get_repartitioned(
&self,
target_partitions: usize,
repartition_file_min_size: usize,
) -> Self {
let flattened_files = self
.base_config()
.file_groups
Expand All @@ -261,6 +265,10 @@ impl ParquetExec {
.iter()
.map(|f| f.object_meta.size as i64)
.sum::<i64>();
if total_size < (repartition_file_min_size as i64) {
return self.clone();
}

let target_partition_size =
(total_size as usize + (target_partitions) - 1) / (target_partitions);

Expand Down Expand Up @@ -1738,7 +1746,7 @@ mod tests {

let actual = file_groups_to_vec(
parquet_exec
.get_repartitioned(4)
.get_repartitioned(4, 10)
.base_config()
.file_groups
.clone(),
Expand Down Expand Up @@ -1775,7 +1783,7 @@ mod tests {

let actual = file_groups_to_vec(
parquet_exec
.get_repartitioned(96)
.get_repartitioned(96, 5)
.base_config()
.file_groups
.clone(),
Expand Down Expand Up @@ -1817,7 +1825,7 @@ mod tests {

let actual = file_groups_to_vec(
parquet_exec
.get_repartitioned(3)
.get_repartitioned(3, 10)
.base_config()
.file_groups
.clone(),
Expand Down Expand Up @@ -1855,7 +1863,7 @@ mod tests {

let actual = file_groups_to_vec(
parquet_exec
.get_repartitioned(2)
.get_repartitioned(2, 10)
.base_config()
.file_groups
.clone(),
Expand All @@ -1869,7 +1877,7 @@ mod tests {
}

#[tokio::test]
async fn parquet_exec_repartition_no_action() {
async fn parquet_exec_repartition_no_action_ranges() {
// No action due to Some(range) in second file
let partitioned_file_1 = PartitionedFile::new("a".to_string(), 123);
let mut partitioned_file_2 = PartitionedFile::new("b".to_string(), 144);
Expand All @@ -1893,13 +1901,42 @@ mod tests {
);

let actual = parquet_exec
.get_repartitioned(65)
.get_repartitioned(65, 10)
.base_config()
.file_groups
.clone();
assert_eq!(2, actual.len());
}

#[tokio::test]
async fn parquet_exec_repartition_no_action_min_size() {
// No action due to target_partition_size
let partitioned_file = PartitionedFile::new("a".to_string(), 123);
let single_partition = vec![vec![partitioned_file]];
let parquet_exec = ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: single_partition,
file_schema: Arc::new(Schema::empty()),
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: None,
infinite_source: false,
},
None,
None,
);

let actual = parquet_exec
.get_repartitioned(65, 500)
.base_config()
.file_groups
.clone();
assert_eq!(1, actual.len());
}

fn file_groups_to_vec(
file_groups: Vec<Vec<PartitionedFile>>,
) -> Vec<(usize, String, i64, i64)> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ datafusion.catalog.location NULL
datafusion.execution.batch_size 8192
datafusion.execution.coalesce_batches true
datafusion.execution.collect_statistics false
datafusion.execution.parallel_file_scan false
datafusion.execution.parquet.enable_page_index false
datafusion.execution.parquet.metadata_size_hint NULL
datafusion.execution.parquet.pruning true
Expand All @@ -132,6 +131,8 @@ datafusion.optimizer.hash_join_single_partition_threshold 1048576
datafusion.optimizer.max_passes 3
datafusion.optimizer.prefer_hash_join true
datafusion.optimizer.repartition_aggregations true
datafusion.optimizer.repartition_file_min_size 10485760
datafusion.optimizer.repartition_file_scans false
datafusion.optimizer.repartition_joins true
datafusion.optimizer.repartition_windows true
datafusion.optimizer.skip_failed_rules true
Expand Down
3 changes: 2 additions & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.execution.batch_size | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would results in too much metadata memory consumption |
| datafusion.execution.coalesce_batches | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting |
| datafusion.execution.collect_statistics | false | Should DataFusion collect statistics after listing files |
| datafusion.execution.parallel_file_scan | false | Enables parallel file scanning. Currently supported only for Parquet format |
| datafusion.execution.target_partitions | 0 | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of cpu cores on the system |
| datafusion.execution.time_zone | +00:00 | The default time zone Some functions, e.g. EXTRACT(HOUR from SOME_TIME), shift the underlying datetime according to this time zone, and then extract the hour |
| datafusion.execution.parquet.enable_page_index | false | If true, uses parquet data page level metadata (Page Index) statistics to reduce the number of rows decoded. |
Expand All @@ -59,7 +58,9 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartition to increase parallelism to leverage more CPU cores |
| datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. |
| datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level" |
| datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. |
| datafusion.optimizer.repartition_joins | true | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level" |
| datafusion.optimizer.repartition_file_scans | false | When set to true, file groups will be repartitioned to achieve maximum parallelism. Currently supported only for Parquet format |
| datafusion.optimizer.repartition_windows | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level" |
| datafusion.optimizer.skip_failed_rules | true | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail |
| datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan |
Expand Down

0 comments on commit d6e95f7

Please sign in to comment.