Skip to content

Commit

Permalink
Fix partition count assertion (#1597)
Browse files Browse the repository at this point in the history
The actual asserted bound is a bit more complex because the last partial
partition is merged into multiple existing ones instead of choosing to
have one smaller partition.
  • Loading branch information
AdamGS authored Dec 6, 2024
1 parent 77a292d commit 0907599
Showing 1 changed file with 19 additions and 9 deletions.
28 changes: 19 additions & 9 deletions vortex-datafusion/src/persistent/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,23 +168,29 @@ fn repartition_by_size(
curr_partition_size += file.object_meta.size;
curr_partition.push(file);

if curr_partition_size > target_partition_size {
if curr_partition_size >= target_partition_size {
curr_partition_size = 0;
partitions.push(std::mem::take(&mut curr_partition));
}
}

// if there's anything left, we shove it into existing partitions
for (idx, file) in curr_partition.into_iter().enumerate() {
let part_idx = idx % partitions.len();
partitions[part_idx].push(file);
// If we we're still missing the last partition
if !curr_partition.is_empty() && partitions.len() != desired_partitions {
partitions.push(std::mem::take(&mut curr_partition));
// If we already have enough partitions
} else if !curr_partition.is_empty() {
for (idx, file) in curr_partition.into_iter().enumerate() {
let new_part_idx = idx % partitions.len();
partitions[new_part_idx].push(file);
}
}

// Assert that we have the correct number of partitions and that the total number of files is right
assert_eq!(
partitions.len(),
usize::min(total_file_count, desired_partitions),
"The final number of partitions should be smallest between the total number of files and the desired partition count."
usize::min(desired_partitions, total_file_count)
);
assert_eq!(total_file_count, partitions.iter().flatten().count());

partitions
}
Expand All @@ -203,8 +209,12 @@ mod tests {
PartitionedFile::new("e", 50),
]];

let output = repartition_by_size(file_groups, 2);
repartition_by_size(file_groups, 2);

let file_groups = vec![(0..100)
.map(|idx| PartitionedFile::new(format!("{idx}"), idx))
.collect()];

assert_eq!(output.len(), 2);
repartition_by_size(file_groups, 16);
}
}

0 comments on commit 0907599

Please sign in to comment.