diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 2ff4850774ca..4fbffd325fbb 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -627,6 +627,12 @@ impl ScanPart { let end = current_range.1.max(part_range.1); self.time_range = Some((start, end)); } + + /// Returns true if the we can split the part into multiple parts + /// and preserving order. + pub(crate) fn can_split_preserve_order(&self) -> bool { + self.memtables.is_empty() && self.file_ranges.len() == 1 && self.file_ranges[0].len() > 1 + } } /// A trait to collect file ranges to scan. diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 51ac362dcf43..eabaec6bced3 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -334,10 +334,9 @@ impl SeqDistributor { let parallelism = parallelism.max(1); let parts = group_parts_by_range(self.parts); - let parts = maybe_merge_parts(parts, parallelism); - // TODO(yingwen): Split parts. - - parts + let parts = maybe_split_parts(parts, parallelism); + // Ensures it doesn't returns parts more than `parallelism`. + maybe_merge_parts(parts, parallelism) } } @@ -420,6 +419,62 @@ fn maybe_merge_parts(mut parts: Vec, parallelism: usize) -> Vec, parallelism: usize) -> Vec { + assert!(parallelism > 0); + if parts.len() >= parallelism { + // No need to split parts. + return parts; + } + + let has_part_to_split = parts.iter().any(|part| part.can_split_preserve_order()); + if !has_part_to_split { + // No proper parts to scan. + return parts; + } + + // Sorts parts by the number of ranges in the first file. + parts.sort_unstable_by(|a, b| { + let a_len = a.file_ranges.first().map(|file| file.len()).unwrap_or(0); + let b_len = b.file_ranges.first().map(|file| file.len()).unwrap_or(0); + a_len.cmp(&b_len).reverse() + }); + let num_parts_to_split = parallelism - parts.len(); + let mut output_parts = Vec::with_capacity(parallelism); + // Split parts up to num_parts_to_split. + for part in parts.iter_mut() { + if !part.can_split_preserve_order() { + continue; + } + // Safety: `can_split_preserve_order()` ensures file_ranges.len() == 1. + // Splits part into `num_parts_to_split + 1` new parts if possible. + let target_part_num = num_parts_to_split + 1; + let ranges_per_part = (part.file_ranges[0].len() + target_part_num - 1) / target_part_num; + // `can_split_preserve_order()` ensures part.file_ranges[0].len() > 1. + assert!(ranges_per_part > 0); + for ranges in part.file_ranges[0].chunks(ranges_per_part) { + let new_part = ScanPart { + memtables: Vec::new(), + file_ranges: smallvec![ranges.to_vec()], + time_range: part.time_range, + }; + output_parts.push(new_part); + } + // Replace the current part with the last output part as we will put the current part + // into the output parts later. + *part = output_parts.pop().unwrap(); + if output_parts.len() >= num_parts_to_split { + // We already split enough parts. + break; + } + } + // Put the remaining parts into the output parts. + output_parts.append(&mut parts); + + output_parts +} + #[cfg(test)] mod tests { use std::sync::Arc;