Skip to content

Commit

Permalink
minor changes & review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Jan 27, 2023
1 parent 659d9dc commit 556b0c6
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 13 deletions.
20 changes: 19 additions & 1 deletion datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,24 @@ mod tests {
Ok(())
}

#[test]
fn repartition_ignores_sort_preserving_merge_with_union() -> Result<()> {
// 2 sorted parquet files unioned (partitions are concatenated, sort is preserved)
let input = union_exec(vec![parquet_exec_sorted(); 2]);
let plan = sort_preserving_merge_exec(input);

// should not repartition / sort (as the data was already sorted)
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"UnionExec",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
];

assert_optimized!(expected, plan);
Ok(())
}

#[test]
fn repartition_does_not_destroy_sort() -> Result<()> {
// SortRequired
Expand Down Expand Up @@ -1010,7 +1028,7 @@ mod tests {
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
];

assert_optimized!(expected, plan);
assert_optimized!(expected, plan, 2, true);
Ok(())
}

Expand Down
20 changes: 8 additions & 12 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,32 +244,28 @@ impl ParquetExec {

/// Redistribute files across partitions according to their size
pub fn get_repartitioned(&self, target_partitions: usize) -> Self {
// Perform redistribution only in case all files should be read from beginning to end
let has_ranges = self
let flattened_files = self
.base_config()
.file_groups
.iter()
.flatten()
.any(|f| f.range.is_some());
.collect::<Vec<_>>();

// Perform redistribution only in case all files should be read from beginning to end
let has_ranges = flattened_files.iter().any(|f| f.range.is_some());
if has_ranges {
return self.clone();
}

let total_size = self
.base_config()
.file_groups
let total_size = flattened_files
.iter()
.flatten()
.map(|f| f.object_meta.size as i64)
.sum::<i64>();
let target_partition_size =
(total_size as usize + (target_partitions) - 1) / (target_partitions);

let repartitioned_files = self
.base_config()
.file_groups
.iter()
.flatten()
let repartitioned_files = flattened_files
.into_iter()
.scan(RepartitionState::default(), |state, source_file| {
let mut produced_files = vec![];
let mut range_start = 0;
Expand Down

0 comments on commit 556b0c6

Please sign in to comment.