From 0ca97d178d8b8892445ef8ef1f0c94beee80b2ae Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Fri, 28 Jun 2024 16:17:30 +0800 Subject: [PATCH] feat!: reduce sorted runs during compaction (#3702) * feat: add functions to find and merge sorted runs * chore: refactor code * chore: remove some duplicates * chore: remove one clone * refactor: change max_active_window_files to max_active_window_runs * feat: integrate with sorted runs * fix: unit tests * feat: limit num of sorted runs during compaction * fix: some test * fix: some cr comments * feat: use smallvec * chore: rebase main * feat/reduce-sorted-runs: Refactor compaction logic and update test configurations - Refactored `merge_all_runs` function to use `sort_ranged_items` for sorting. - Improved item merging logic by iterating with `into_iter` and handling overlaps. - Updated test configurations to use `max_active_window_runs` instead of `max_active_window_files` for consistency. --------- Co-authored-by: tison --- src/mito2/src/compaction.rs | 1 + src/mito2/src/compaction/picker.rs | 4 +- src/mito2/src/compaction/run.rs | 709 ++++++++++++++++++ src/mito2/src/compaction/twcs.rs | 79 +- src/mito2/src/engine/append_mode_test.rs | 6 +- src/mito2/src/engine/compaction_test.rs | 32 +- src/mito2/src/engine/filter_deleted_test.rs | 2 +- src/mito2/src/region/options.rs | 24 +- src/mito2/src/sst/file.rs | 4 + src/store-api/src/mito_engine_options.rs | 8 +- .../common/create/create_with_options.result | 6 +- .../common/create/create_with_options.sql | 6 +- 12 files changed, 797 insertions(+), 84 deletions(-) create mode 100644 src/mito2/src/compaction/run.rs diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 95dd11ea06a1..7f7d337ef3cd 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -15,6 +15,7 @@ mod buckets; pub mod compactor; pub mod picker; +mod run; mod task; #[cfg(test)] mod test_util; diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs index 3ef5e6561215..3ae1012fd86d 100644 --- a/src/mito2/src/compaction/picker.rs +++ b/src/mito2/src/compaction/picker.rs @@ -132,8 +132,8 @@ pub fn new_picker( } else { match compaction_options { CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker::new( - twcs_opts.max_active_window_files, - twcs_opts.max_inactive_window_files, + twcs_opts.max_active_window_runs, + twcs_opts.max_inactive_window_runs, twcs_opts.time_window_seconds(), )) as Arc<_>, } diff --git a/src/mito2/src/compaction/run.rs b/src/mito2/src/compaction/run.rs new file mode 100644 index 000000000000..8bd515008222 --- /dev/null +++ b/src/mito2/src/compaction/run.rs @@ -0,0 +1,709 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! This file contains code to find sorted runs in a set if ranged items and +//! along with the best way to merge these items to satisfy the desired run count. + +use std::cmp::Ordering; + +use common_base::BitVec; +use common_time::Timestamp; +use itertools::Itertools; +use smallvec::{smallvec, SmallVec}; + +use crate::sst::file::FileHandle; + +/// Trait for any items with specific range. +pub(crate) trait Ranged { + type BoundType: Ord + Copy; + + /// Returns the inclusive range of item. + fn range(&self) -> (Self::BoundType, Self::BoundType); + + fn overlap(&self, other: &T) -> bool + where + T: Ranged, + { + let (lhs_start, lhs_end) = self.range(); + let (rhs_start, rhs_end) = other.range(); + match lhs_start.cmp(&rhs_start) { + Ordering::Less => lhs_end >= rhs_start, + Ordering::Equal => true, + Ordering::Greater => lhs_start <= rhs_end, + } + } +} + +// Sorts ranges by start asc and end desc. +fn sort_ranged_items(values: &mut [T]) { + values.sort_unstable_by(|l, r| { + let (l_start, l_end) = l.range(); + let (r_start, r_end) = r.range(); + l_start.cmp(&r_start).then(r_end.cmp(&l_end)) + }); +} + +/// Trait for items to merge. +pub(crate) trait Item: Ranged + Clone { + /// Size is used to calculate the cost of merging items. + fn size(&self) -> usize; +} + +impl Ranged for FileHandle { + type BoundType = Timestamp; + + fn range(&self) -> (Self::BoundType, Self::BoundType) { + self.time_range() + } +} + +impl Item for FileHandle { + fn size(&self) -> usize { + self.size() as usize + } +} + +#[derive(Debug, Clone)] +struct MergeItems { + items: SmallVec<[T; 4]>, + start: T::BoundType, + end: T::BoundType, + size: usize, +} + +impl Ranged for MergeItems { + type BoundType = T::BoundType; + + fn range(&self) -> (Self::BoundType, Self::BoundType) { + (self.start, self.end) + } +} + +impl MergeItems { + /// Creates unmerged item from given value. + pub fn new_unmerged(val: T) -> Self { + let (start, end) = val.range(); + let size = val.size(); + Self { + items: smallvec![val], + start, + end, + size, + } + } + + /// The range of current merge item + pub(crate) fn range(&self) -> (T::BoundType, T::BoundType) { + (self.start, self.end) + } + + /// Merges current item with other item. + pub(crate) fn merge(self, other: Self) -> Self { + let start = self.start.min(other.start); + let end = self.end.max(other.end); + let size = self.size + other.size; + + let mut items = SmallVec::with_capacity(self.items.len() + other.items.len()); + items.extend(self.items); + items.extend(other.items); + Self { + start, + end, + size, + items, + } + } + + /// Returns true if current item is merged from two items. + pub fn merged(&self) -> bool { + self.items.len() > 1 + } +} + +/// A set of files with non-overlapping time ranges. +#[derive(Debug, Clone)] +pub(crate) struct SortedRun { + /// Items to merge + items: Vec>, + /// penalty is defined as the total size of merged items. + penalty: usize, + /// The lower bound of all items. + start: Option, + // The upper bound of all items. + end: Option, +} + +impl Default for SortedRun +where + T: Item, +{ + fn default() -> Self { + Self { + items: vec![], + penalty: 0, + start: None, + end: None, + } + } +} + +impl SortedRun +where + T: Item, +{ + fn push_item(&mut self, t: MergeItems) { + let (file_start, file_end) = t.range(); + if t.merged() { + self.penalty += t.size; + } + self.items.push(t); + self.start = Some(self.start.map_or(file_start, |v| v.min(file_start))); + self.end = Some(self.end.map_or(file_end, |v| v.max(file_end))); + } +} + +/// Finds sorted runs in given items. +pub(crate) fn find_sorted_runs(items: &mut [T]) -> Vec> +where + T: Item, +{ + if items.is_empty() { + return vec![]; + } + // sort files + sort_ranged_items(items); + + let mut current_run = SortedRun::default(); + let mut runs = vec![]; + + let mut selection = BitVec::repeat(false, items.len()); + while !selection.all() { + // until all items are assigned to some sorted run. + for (item, mut selected) in items.iter().zip(selection.iter_mut()) { + if *selected { + // item is already assigned. + continue; + } + let current_item = MergeItems::new_unmerged(item.clone()); + match current_run.items.last() { + None => { + // current run is empty, just add current_item + selected.set(true); + current_run.push_item(current_item); + } + Some(last) => { + // the current item does not overlap with the last item in current run, + // then it belongs to current run. + if !last.overlap(¤t_item) { + // does not overlap, push to current run + selected.set(true); + current_run.push_item(current_item); + } + } + } + } + // finished an iteration, we've found a new run. + runs.push(std::mem::take(&mut current_run)); + } + runs +} + +fn merge_all_runs(runs: Vec>) -> SortedRun { + assert!(!runs.is_empty()); + let mut all_items = runs + .into_iter() + .flat_map(|r| r.items.into_iter()) + .collect::>(); + + sort_ranged_items(&mut all_items); + + let mut res = SortedRun::default(); + let mut iter = all_items.into_iter(); + // safety: all_items is not empty + let mut current_item = iter.next().unwrap(); + + for item in iter { + if current_item.overlap(&item) { + current_item = current_item.merge(item); + } else { + res.push_item(current_item); + current_item = item; + } + } + res.push_item(current_item); + res +} + +/// Reduces the num of runs to given target and returns items to merge. +/// The time complexity of this function is `C_{k}_{runs.len()}` where k=`runs.len()`-target+1. +pub(crate) fn reduce_runs(runs: Vec>, target: usize) -> Vec> { + assert_ne!(target, 0); + if target >= runs.len() { + // already satisfied. + return vec![]; + } + + let k = runs.len() + 1 - target; + runs.into_iter() + .combinations(k) // find all possible solutions + .map(|runs_to_merge| merge_all_runs(runs_to_merge)) // calculate merge penalty + .min_by(|p, r| p.penalty.cmp(&r.penalty)) // find solution with the min penalty + .unwrap() // safety: their must be at least one solution. + .items + .into_iter() + .filter(|m| m.merged()) // find all files to merge in that solution + .map(|m| m.items.to_vec()) + .collect() +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use super::*; + + #[derive(Clone, Debug)] + struct MockFile { + start: i64, + end: i64, + size: usize, + } + + impl Ranged for MockFile { + type BoundType = i64; + + fn range(&self) -> (Self::BoundType, Self::BoundType) { + (self.start, self.end) + } + } + + impl Item for MockFile { + fn size(&self) -> usize { + self.size + } + } + + fn build_items(ranges: &[(i64, i64)]) -> Vec { + ranges + .iter() + .map(|(start, end)| MockFile { + start: *start, + end: *end, + size: (*end - *start) as usize, + }) + .collect() + } + + fn check_sorted_runs( + ranges: &[(i64, i64)], + expected_runs: &[Vec<(i64, i64)>], + ) -> Vec> { + let mut files = build_items(ranges); + let runs = find_sorted_runs(&mut files); + + let result_file_ranges: Vec> = runs + .iter() + .map(|r| r.items.iter().map(|f| f.range()).collect()) + .collect(); + assert_eq!(&expected_runs, &result_file_ranges); + runs + } + + #[test] + fn test_find_sorted_runs() { + check_sorted_runs(&[], &[]); + check_sorted_runs(&[(1, 1), (2, 2)], &[vec![(1, 1), (2, 2)]]); + check_sorted_runs(&[(1, 2)], &[vec![(1, 2)]]); + check_sorted_runs(&[(1, 2), (2, 3)], &[vec![(1, 2)], vec![(2, 3)]]); + check_sorted_runs(&[(1, 2), (3, 4)], &[vec![(1, 2), (3, 4)]]); + check_sorted_runs(&[(2, 4), (1, 3)], &[vec![(1, 3)], vec![(2, 4)]]); + check_sorted_runs( + &[(1, 3), (2, 4), (4, 5)], + &[vec![(1, 3), (4, 5)], vec![(2, 4)]], + ); + + check_sorted_runs( + &[(1, 2), (3, 4), (3, 5)], + &[vec![(1, 2), (3, 5)], vec![(3, 4)]], + ); + + check_sorted_runs( + &[(1, 3), (2, 4), (5, 6)], + &[vec![(1, 3), (5, 6)], vec![(2, 4)]], + ); + + check_sorted_runs( + &[(1, 2), (3, 5), (4, 6)], + &[vec![(1, 2), (3, 5)], vec![(4, 6)]], + ); + + check_sorted_runs( + &[(1, 2), (3, 4), (4, 6), (7, 8)], + &[vec![(1, 2), (3, 4), (7, 8)], vec![(4, 6)]], + ); + check_sorted_runs( + &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)], + &[vec![(1, 2), (3, 6), (7, 8)], vec![(3, 4), (5, 6), (8, 9)]], + ); + + check_sorted_runs( + &[(10, 19), (20, 21), (20, 29), (30, 39)], + &[vec![(10, 19), (20, 29), (30, 39)], vec![(20, 21)]], + ); + + check_sorted_runs( + &[(10, 19), (20, 29), (21, 22), (30, 39), (31, 32), (32, 42)], + &[ + vec![(10, 19), (20, 29), (30, 39)], + vec![(21, 22), (31, 32)], + vec![(32, 42)], + ], + ); + } + + fn check_merge_sorted_runs( + items: &[(i64, i64)], + expected_penalty: usize, + expected: &[Vec<(i64, i64)>], + ) { + let mut items = build_items(items); + let runs = find_sorted_runs(&mut items); + assert_eq!(2, runs.len()); + let res = merge_all_runs(runs); + let penalty = res.penalty; + let ranges = res + .items + .into_iter() + .map(|i| { + i.items + .into_iter() + .map(|f| (f.start, f.end)) + .sorted_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1))) + .collect::>() + }) + .collect::>(); + assert_eq!(expected, &ranges); + assert_eq!(expected_penalty, penalty); + } + + #[test] + fn test_merge_sorted_runs() { + // [1..2] + // [1...3] + check_merge_sorted_runs(&[(1, 2), (1, 3)], 3, &[vec![(1, 2), (1, 3)]]); + + // [1..2][3..4] + // [2..3] + check_merge_sorted_runs( + &[(1, 2), (2, 3), (3, 4)], + 3, + &[vec![(1, 2), (2, 3), (3, 4)]], + ); + + // [1..10][11..20][21...30] + // [18] + check_merge_sorted_runs( + &[(1, 10), (11, 20), (21, 30), (18, 18)], + 9, + &[vec![(1, 10)], vec![(11, 20), (18, 18)], vec![(21, 30)]], + ); + + // [1..3][4..5] + // [2...4] + check_merge_sorted_runs( + &[(1, 3), (2, 4), (4, 5)], + 5, + &[vec![(1, 3), (2, 4), (4, 5)]], + ); + + // [1..2][3..4] [7..8] + // [4..6] + check_merge_sorted_runs( + &[(1, 2), (3, 4), (4, 6), (7, 8)], + 3, + &[vec![(1, 2)], vec![(3, 4), (4, 6)], vec![(7, 8)]], + ); + + // [1..2][3..4][5..6][7..8] + // [3........6] [8..9] + // + check_merge_sorted_runs( + &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)], + 7, + &[ + vec![(1, 2)], + vec![(3, 4), (3, 6), (5, 6)], + vec![(7, 8), (8, 9)], + ], + ); + + // [10.....19][20........29][30........39] + // [21..22] [31..32] + check_merge_sorted_runs( + &[(10, 19), (20, 29), (21, 22), (30, 39), (31, 32)], + 20, + &[ + vec![(10, 19)], + vec![(20, 29), (21, 22)], + vec![(30, 39), (31, 32)], + ], + ); + + // [1..10][11..20][21..30] + // [1..10] [21..30] + check_merge_sorted_runs( + &[(1, 10), (1, 10), (11, 20), (21, 30), (21, 30)], + 36, + &[ + vec![(1, 10), (1, 10)], + vec![(11, 20)], + vec![(21, 30), (21, 30)], + ], + ); + + // [1..10][11..20][21...30] + // [22..30] + check_merge_sorted_runs( + &[(1, 10), (11, 20), (21, 30), (22, 30)], + 17, + &[vec![(1, 10)], vec![(11, 20)], vec![(21, 30), (22, 30)]], + ); + } + + /// files: file arrangement with two sorted runs. + fn check_merge_all_sorted_runs( + files: &[(i64, i64)], + expected_penalty: usize, + expected: &[Vec<(i64, i64)>], + ) { + let mut files = build_items(files); + let runs = find_sorted_runs(&mut files); + let result = merge_all_runs(runs); + assert_eq!(expected_penalty, result.penalty); + assert_eq!(expected.len(), result.items.len()); + let res = result + .items + .iter() + .map(|i| { + let mut res = i.items.iter().map(|f| (f.start, f.end)).collect::>(); + res.sort_unstable_by(|l, r| l.0.cmp(&r.0)); + res + }) + .collect::>(); + assert_eq!(expected, &res); + } + + #[test] + fn test_merge_all_sorted_runs() { + // [1..2][3..4] + // [4..10] + check_merge_all_sorted_runs( + &[(1, 2), (3, 4), (4, 10)], + 7, // 1+6 + &[vec![(1, 2)], vec![(3, 4), (4, 10)]], + ); + + // [1..2] [3..4] [5..6] + // [4..........10] + check_merge_all_sorted_runs( + &[(1, 2), (3, 4), (5, 6), (4, 10)], + 8, // 1+1+6 + &[vec![(1, 2)], vec![(3, 4), (4, 10), (5, 6)]], + ); + + // [10..20] [30..40] [50....60] + // [35........55] + // [51..61] + check_merge_all_sorted_runs( + &[(10, 20), (30, 40), (50, 60), (35, 55), (51, 61)], + 50, + &[vec![(10, 20)], vec![(30, 40), (35, 55), (50, 60), (51, 61)]], + ); + } + + #[test] + fn test_sorted_runs_time_range() { + let mut files = build_items(&[(1, 2), (3, 4), (4, 10)]); + let runs = find_sorted_runs(&mut files); + assert_eq!(2, runs.len()); + let SortedRun { start, end, .. } = &runs[0]; + assert_eq!(Some(1), *start); + assert_eq!(Some(4), *end); + + let SortedRun { start, end, .. } = &runs[1]; + assert_eq!(Some(4), *start); + assert_eq!(Some(10), *end); + } + + fn check_reduce_runs( + files: &[(i64, i64)], + expected_runs: &[Vec<(i64, i64)>], + target: usize, + expected: &[Vec<(i64, i64)>], + ) { + let runs = check_sorted_runs(files, expected_runs); + let files_to_merge = reduce_runs(runs, target); + let file_timestamps = files_to_merge + .into_iter() + .map(|f| { + let mut overlapping = f.into_iter().map(|f| (f.start, f.end)).collect::>(); + overlapping.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1))); + overlapping + }) + .collect::>(); + + let expected = expected.iter().cloned().collect::>(); + assert_eq!(&expected, &file_timestamps); + } + + #[test] + fn test_reduce_runs() { + // [1..3] [5..6] + // [2..4] + check_reduce_runs( + &[(1, 3), (2, 4), (5, 6)], + &[vec![(1, 3), (5, 6)], vec![(2, 4)]], + 1, + &[vec![(1, 3), (2, 4)]], + ); + + // [1..2][3..5] + // [4..6] + check_reduce_runs( + &[(1, 2), (3, 5), (4, 6)], + &[vec![(1, 2), (3, 5)], vec![(4, 6)]], + 1, + &[vec![(3, 5), (4, 6)]], + ); + + // [1..4] + // [2..5] + // [3..6] + check_reduce_runs( + &[(1, 4), (2, 5), (3, 6)], + &[vec![(1, 4)], vec![(2, 5)], vec![(3, 6)]], + 1, + &[vec![(1, 4), (2, 5), (3, 6)]], + ); + check_reduce_runs( + &[(1, 4), (2, 5), (3, 6)], + &[vec![(1, 4)], vec![(2, 5)], vec![(3, 6)]], + 2, + &[vec![(1, 4), (2, 5)]], + ); + + // [1..2][3..4] [7..8] + // [4..6] + check_reduce_runs( + &[(1, 2), (3, 4), (4, 6), (7, 8)], + &[vec![(1, 2), (3, 4), (7, 8)], vec![(4, 6)]], + 1, + &[vec![(3, 4), (4, 6)]], + ); + + // [1..2][3........6][7..8] + // [3..4][5..6] [8..9] + check_reduce_runs( + &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)], + &[vec![(1, 2), (3, 6), (7, 8)], vec![(3, 4), (5, 6), (8, 9)]], + 2, + &[], // already satisfied + ); + + // [1..2][3........6][7..8] + // [3..4][5..6] [8..9] + check_reduce_runs( + &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)], + &[vec![(1, 2), (3, 6), (7, 8)], vec![(3, 4), (5, 6), (8, 9)]], + 1, + &[vec![(3, 4), (3, 6), (5, 6)], vec![(7, 8), (8, 9)]], + ); + + // [10..20] [30..40] [50........80] [100..110] + // [50..60] [80...100] + // [80..90] + check_reduce_runs( + &[ + (10, 20), + (30, 40), + (50, 60), + (50, 80), + (80, 90), + (80, 100), + (100, 110), + ], + &[ + vec![(10, 20), (30, 40), (50, 80), (100, 110)], + vec![(50, 60), (80, 100)], + vec![(80, 90)], + ], + 2, + &[vec![(80, 90), (80, 100)]], + ); + + // [10..20] [30..40] [50........80] [100..110] + // [50..60] [80.......100] + // [80..90] + check_reduce_runs( + &[ + (10, 20), + (30, 40), + (50, 60), + (50, 80), + (80, 90), + (80, 100), + (100, 110), + ], + &[ + vec![(10, 20), (30, 40), (50, 80), (100, 110)], + vec![(50, 60), (80, 100)], + vec![(80, 90)], + ], + 1, + &[vec![(50, 60), (50, 80), (80, 90), (80, 100), (100, 110)]], + ); + + // [0..10] + // [0...11] + // [0....12] + // [0.....13] + check_reduce_runs( + &[(0, 10), (0, 11), (0, 12), (0, 13)], + &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]], + 4, + &[], + ); + // enforce 3 runs + check_reduce_runs( + &[(0, 10), (0, 11), (0, 12), (0, 13)], + &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]], + 3, + &[vec![(0, 10), (0, 11)]], + ); + // enforce 2 runs + check_reduce_runs( + &[(0, 10), (0, 11), (0, 12), (0, 13)], + &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]], + 2, + &[vec![(0, 10), (0, 11), (0, 12)]], + ); + // enforce 1 run + check_reduce_runs( + &[(0, 10), (0, 11), (0, 12), (0, 13)], + &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]], + 1, + &[vec![(0, 10), (0, 11), (0, 12), (0, 13)]], + ); + } +} diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index bcb5e49c2dd9..81ad5268501f 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -24,85 +24,81 @@ use common_time::Timestamp; use crate::compaction::buckets::infer_time_bucket; use crate::compaction::compactor::CompactionRegion; use crate::compaction::picker::{Picker, PickerOutput}; +use crate::compaction::run::{find_sorted_runs, reduce_runs}; use crate::compaction::{get_expired_ssts, CompactionOutput}; -use crate::sst::file::{overlaps, FileHandle, FileId}; +use crate::sst::file::{overlaps, FileHandle, FileId, Level}; use crate::sst::version::LevelMeta; +const LEVEL_COMPACTED: Level = 1; + /// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction /// candidates. pub struct TwcsPicker { - max_active_window_files: usize, - max_inactive_window_files: usize, + max_active_window_runs: usize, + max_inactive_window_runs: usize, time_window_seconds: Option, } impl Debug for TwcsPicker { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("TwcsPicker") - .field("max_active_window_files", &self.max_active_window_files) - .field("max_inactive_window_files", &self.max_inactive_window_files) + .field("max_active_window_runs", &self.max_active_window_runs) + .field("max_inactive_window_runs", &self.max_inactive_window_runs) .finish() } } impl TwcsPicker { pub fn new( - max_active_window_files: usize, - max_inactive_window_files: usize, + max_active_window_runs: usize, + max_inactive_window_runs: usize, time_window_seconds: Option, ) -> Self { Self { - max_inactive_window_files, - max_active_window_files, + max_inactive_window_runs, + max_active_window_runs, time_window_seconds, } } /// Builds compaction output from files. - /// For active writing window, we allow for at most `max_active_window_files` files to alleviate + /// For active writing window, we allow for at most `max_active_window_runs` files to alleviate /// fragmentation. For other windows, we allow at most 1 file at each window. fn build_output( &self, - time_windows: &BTreeMap, + time_windows: &mut BTreeMap, active_window: Option, ) -> Vec { let mut output = vec![]; for (window, files) in time_windows { - let files_in_window = &files.files; - // we only remove deletion markers once no file in current window overlaps with any other window. - let filter_deleted = !files.overlapping; + let sorted_runs = find_sorted_runs(&mut files.files); - if let Some(active_window) = active_window + let max_runs = if let Some(active_window) = active_window && *window == active_window { - if files_in_window.len() > self.max_active_window_files { - output.push(CompactionOutput { - output_file_id: FileId::random(), - output_level: 1, // we only have two levels and always compact to l1 - inputs: files_in_window.clone(), - filter_deleted, - output_time_range: None, // we do not enforce output time range in twcs compactions. - }); - } else { - debug!("Active window not present or no enough files in active window {:?}, window: {}", active_window, *window); - } + self.max_active_window_runs } else { - // not active writing window - if files_in_window.len() > self.max_inactive_window_files { + self.max_inactive_window_runs + }; + + // we only remove deletion markers once no file in current window overlaps with any other window. + let found_runs = sorted_runs.len(); + let filter_deleted = !files.overlapping && (found_runs == 1 || max_runs == 1); + + if found_runs > max_runs { + let files_to_compact = reduce_runs(sorted_runs, max_runs); + info!("Building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, output size: {}", active_window, *window,max_runs, found_runs, files_to_compact.len()); + for inputs in files_to_compact { output.push(CompactionOutput { output_file_id: FileId::random(), - output_level: 1, - inputs: files_in_window.clone(), + output_level: LEVEL_COMPACTED, // always compact to l1 + inputs, filter_deleted, - output_time_range: None, + output_time_range: None, // we do not enforce output time range in twcs compactions.}); }); - } else { - debug!( - "No enough files, current: {}, max_inactive_window_files: {}", - files_in_window.len(), - self.max_inactive_window_files - ) } + } else { + debug!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs); } } output @@ -139,8 +135,9 @@ impl Picker for TwcsPicker { // Find active window from files in level 0. let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size); // Assign files to windows - let windows = assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size); - let outputs = self.build_output(&windows, active_window); + let mut windows = + assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size); + let outputs = self.build_output(&mut windows, active_window); if outputs.is_empty() && expired_ssts.is_empty() { return None; @@ -482,10 +479,10 @@ mod tests { impl CompactionPickerTestCase { fn check(&self) { - let windows = assign_to_windows(self.input_files.iter(), self.window_size); + let mut windows = assign_to_windows(self.input_files.iter(), self.window_size); let active_window = find_latest_window_in_seconds(self.input_files.iter(), self.window_size); - let output = TwcsPicker::new(4, 1, None).build_output(&windows, active_window); + let output = TwcsPicker::new(4, 1, None).build_output(&mut windows, active_window); let output = output .iter() diff --git a/src/mito2/src/engine/append_mode_test.rs b/src/mito2/src/engine/append_mode_test.rs index 548a205f3978..0fb148be44b0 100644 --- a/src/mito2/src/engine/append_mode_test.rs +++ b/src/mito2/src/engine/append_mode_test.rs @@ -100,8 +100,8 @@ async fn test_append_mode_compaction() { let request = CreateRequestBuilder::new() .insert_option("compaction.type", "twcs") - .insert_option("compaction.twcs.max_active_window_files", "2") - .insert_option("compaction.twcs.max_inactive_window_files", "2") + .insert_option("compaction.twcs.max_active_window_runs", "2") + .insert_option("compaction.twcs.max_inactive_window_runs", "2") .insert_option("append_mode", "true") .build(); let region_dir = request.region_dir.clone(); @@ -167,7 +167,7 @@ async fn test_append_mode_compaction() { +-------+---------+---------------------+"; // Scans in parallel. let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); - assert_eq!(1, scanner.num_files()); + assert_eq!(2, scanner.num_files()); assert_eq!(1, scanner.num_memtables()); let stream = scanner.scan().await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index 861d5e0b9065..f71b665a252f 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -140,8 +140,18 @@ async fn test_compaction_region() { assert_eq!(result.affected_rows, 0); let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + // Input: + // [0..9] + // [10...19] + // [20....29] + // -[15.........29]- + // [15.....24] + // Output: + // [0..9] + // [10..14] + // [15..24] assert_eq!( - 1, + 3, scanner.num_files(), "unexpected files: {:?}", scanner.file_ids() @@ -161,8 +171,8 @@ async fn test_compaction_region_with_overlapping() { let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new() .insert_option("compaction.type", "twcs") - .insert_option("compaction.twcs.max_active_window_files", "2") - .insert_option("compaction.twcs.max_inactive_window_files", "2") + .insert_option("compaction.twcs.max_active_window_runs", "2") + .insert_option("compaction.twcs.max_inactive_window_runs", "2") .insert_option("compaction.twcs.time_window", "1h") .build(); @@ -191,14 +201,7 @@ async fn test_compaction_region_with_overlapping() { assert_eq!(result.affected_rows, 0); let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); - assert_eq!( - 2, - scanner.num_files(), - "unexpected files: {:?}", - scanner.file_ids() - ); let stream = scanner.scan().await.unwrap(); - let vec = collect_stream_ts(stream).await; assert_eq!((3600..10800).map(|i| { i * 1000 }).collect::>(), vec); } @@ -212,8 +215,8 @@ async fn test_compaction_region_with_overlapping_delete_all() { let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new() .insert_option("compaction.type", "twcs") - .insert_option("compaction.twcs.max_active_window_files", "2") - .insert_option("compaction.twcs.max_inactive_window_files", "2") + .insert_option("compaction.twcs.max_active_window_runs", "2") + .insert_option("compaction.twcs.max_inactive_window_runs", "2") .insert_option("compaction.twcs.time_window", "1h") .build(); @@ -249,7 +252,6 @@ async fn test_compaction_region_with_overlapping_delete_all() { scanner.file_ids() ); let stream = scanner.scan().await.unwrap(); - let vec = collect_stream_ts(stream).await; assert!(vec.is_empty()); } @@ -275,7 +277,7 @@ async fn test_readonly_during_compaction() { let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new() .insert_option("compaction.type", "twcs") - .insert_option("compaction.twcs.max_active_window_files", "1") + .insert_option("compaction.twcs.max_active_window_runs", "1") .build(); let column_schemas = request @@ -289,7 +291,7 @@ async fn test_readonly_during_compaction() { .unwrap(); // Flush 2 SSTs for compaction. put_and_flush(&engine, region_id, &column_schemas, 0..10).await; - put_and_flush(&engine, region_id, &column_schemas, 10..20).await; + put_and_flush(&engine, region_id, &column_schemas, 5..20).await; // Waits until the engine receives compaction finished request. listener.wait_handle_finished().await; diff --git a/src/mito2/src/engine/filter_deleted_test.rs b/src/mito2/src/engine/filter_deleted_test.rs index 0a89f7b2ef50..167617803e28 100644 --- a/src/mito2/src/engine/filter_deleted_test.rs +++ b/src/mito2/src/engine/filter_deleted_test.rs @@ -34,7 +34,7 @@ async fn test_scan_without_filtering_deleted() { let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new() .insert_option("compaction.type", "twcs") - .insert_option("compaction.twcs.max_active_window_files", "10") + .insert_option("compaction.twcs.max_active_window_runs", "10") .build(); let column_schemas = rows_schema(&request); diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index 8531c1c0c466..0b8512ed0121 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -181,12 +181,12 @@ impl Default for CompactionOptions { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(default)] pub struct TwcsOptions { - /// Max num of files that can be kept in active writing time window. + /// Max num of sorted runs that can be kept in active writing time window. #[serde_as(as = "DisplayFromStr")] - pub max_active_window_files: usize, + pub max_active_window_runs: usize, /// Max num of files that can be kept in inactive time window. #[serde_as(as = "DisplayFromStr")] - pub max_inactive_window_files: usize, + pub max_inactive_window_runs: usize, /// Compaction time window defined when creating tables. #[serde(with = "humantime_serde")] pub time_window: Option, @@ -214,8 +214,8 @@ impl TwcsOptions { impl Default for TwcsOptions { fn default() -> Self { Self { - max_active_window_files: 4, - max_inactive_window_files: 1, + max_active_window_runs: 1, + max_inactive_window_runs: 1, time_window: None, remote_compaction: false, } @@ -439,7 +439,7 @@ mod tests { #[test] fn test_without_compaction_type() { let map = make_map(&[ - ("compaction.twcs.max_active_window_files", "8"), + ("compaction.twcs.max_active_window_runs", "8"), ("compaction.twcs.time_window", "2h"), ]); let err = RegionOptions::try_from(&map).unwrap_err(); @@ -449,14 +449,14 @@ mod tests { #[test] fn test_with_compaction_type() { let map = make_map(&[ - ("compaction.twcs.max_active_window_files", "8"), + ("compaction.twcs.max_active_window_runs", "8"), ("compaction.twcs.time_window", "2h"), ("compaction.type", "twcs"), ]); let options = RegionOptions::try_from(&map).unwrap(); let expect = RegionOptions { compaction: CompactionOptions::Twcs(TwcsOptions { - max_active_window_files: 8, + max_active_window_runs: 8, time_window: Some(Duration::from_secs(3600 * 2)), ..Default::default() }), @@ -557,8 +557,8 @@ mod tests { }); let map = make_map(&[ ("ttl", "7d"), - ("compaction.twcs.max_active_window_files", "8"), - ("compaction.twcs.max_inactive_window_files", "2"), + ("compaction.twcs.max_active_window_runs", "8"), + ("compaction.twcs.max_inactive_window_runs", "2"), ("compaction.twcs.time_window", "2h"), ("compaction.type", "twcs"), ("compaction.twcs.remote_compaction", "false"), @@ -580,8 +580,8 @@ mod tests { let expect = RegionOptions { ttl: Some(Duration::from_secs(3600 * 24 * 7)), compaction: CompactionOptions::Twcs(TwcsOptions { - max_active_window_files: 8, - max_inactive_window_files: 2, + max_active_window_runs: 8, + max_inactive_window_runs: 2, time_window: Some(Duration::from_secs(3600 * 2)), remote_compaction: false, }), diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 1ed4d6ffe6c4..a6f5c9d9f886 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -200,6 +200,10 @@ impl FileHandle { pub fn meta_ref(&self) -> &FileMeta { &self.inner.meta } + + pub fn size(&self) -> u64 { + self.inner.meta.file_size + } } /// Inner data of [FileHandle]. diff --git a/src/store-api/src/mito_engine_options.rs b/src/store-api/src/mito_engine_options.rs index 89060c7cecc3..33dcc20931dd 100644 --- a/src/store-api/src/mito_engine_options.rs +++ b/src/store-api/src/mito_engine_options.rs @@ -22,8 +22,8 @@ pub fn is_mito_engine_option_key(key: &str) -> bool { [ "ttl", "compaction.type", - "compaction.twcs.max_active_window_files", - "compaction.twcs.max_inactive_window_files", + "compaction.twcs.max_active_window_runs", + "compaction.twcs.max_inactive_window_runs", "compaction.twcs.time_window", "compaction.twcs.remote_compaction", "storage", @@ -48,10 +48,10 @@ mod tests { assert!(is_mito_engine_option_key("ttl")); assert!(is_mito_engine_option_key("compaction.type")); assert!(is_mito_engine_option_key( - "compaction.twcs.max_active_window_files" + "compaction.twcs.max_active_window_runs" )); assert!(is_mito_engine_option_key( - "compaction.twcs.max_inactive_window_files" + "compaction.twcs.max_inactive_window_runs" )); assert!(is_mito_engine_option_key("compaction.twcs.time_window")); assert!(is_mito_engine_option_key("storage")); diff --git a/tests/cases/standalone/common/create/create_with_options.result b/tests/cases/standalone/common/create/create_with_options.result index 4592d8cb9eb5..67015eeab9e2 100644 --- a/tests/cases/standalone/common/create/create_with_options.result +++ b/tests/cases/standalone/common/create/create_with_options.result @@ -67,8 +67,8 @@ engine=mito with( 'ttl'='7d', 'compaction.type'='twcs', - 'compaction.twcs.max_active_window_files'='8', - 'compaction.twcs.max_inactive_window_files'='2', + 'compaction.twcs.max_active_window_runs'='2', + 'compaction.twcs.max_inactive_window_runs'='2', 'compaction.twcs.time_window'='1d', 'index.inverted_index.ignore_column_ids'='1,2,3', 'index.inverted_index.segment_row_count'='512', @@ -90,7 +90,7 @@ create table if not exists invalid_compaction( PRIMARY KEY(host) ) engine=mito -with('compaction.type'='twcs', 'compaction.twcs.max_active_window_files'='8d'); +with('compaction.type'='twcs', 'compaction.twcs.max_active_window_runs'='8d'); Error: 1004(InvalidArguments), Invalid options: invalid digit found in string diff --git a/tests/cases/standalone/common/create/create_with_options.sql b/tests/cases/standalone/common/create/create_with_options.sql index 11ed8757b19a..f9a6d4790593 100644 --- a/tests/cases/standalone/common/create/create_with_options.sql +++ b/tests/cases/standalone/common/create/create_with_options.sql @@ -57,8 +57,8 @@ engine=mito with( 'ttl'='7d', 'compaction.type'='twcs', - 'compaction.twcs.max_active_window_files'='8', - 'compaction.twcs.max_inactive_window_files'='2', + 'compaction.twcs.max_active_window_runs'='2', + 'compaction.twcs.max_inactive_window_runs'='2', 'compaction.twcs.time_window'='1d', 'index.inverted_index.ignore_column_ids'='1,2,3', 'index.inverted_index.segment_row_count'='512', @@ -76,4 +76,4 @@ create table if not exists invalid_compaction( PRIMARY KEY(host) ) engine=mito -with('compaction.type'='twcs', 'compaction.twcs.max_active_window_files'='8d'); +with('compaction.type'='twcs', 'compaction.twcs.max_active_window_runs'='8d');