Skip to content

Commit

Permalink
feat: integrate with sorted runs
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed Apr 14, 2024
1 parent dcb3554 commit 2c0f146
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 11 deletions.
21 changes: 19 additions & 2 deletions src/mito2/src/compaction/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ use std::cmp::Ordering;
use std::marker::PhantomData;

use common_base::BitVec;
use common_time::Timestamp;
use itertools::Itertools;

use crate::sst::file::FileHandle;

/// Trait for any items with specific range.
pub(crate) trait Ranged {
type BoundType: Ord + Copy;
Expand All @@ -42,7 +45,7 @@ pub(crate) trait Ranged {
}

// Sorts ranges by start asc and end desc.
fn sort_ranged_items<T: Ranged>(values: &mut Vec<T>) {
fn sort_ranged_items<T: Ranged>(values: &mut [T]) {
values.sort_unstable_by(|l, r| {
let (l_start, l_end) = l.range();
let (r_start, r_end) = r.range();
Expand All @@ -56,6 +59,20 @@ pub(crate) trait Item: Ranged + Clone {
fn size(&self) -> usize;
}

impl Ranged for FileHandle {
type BoundType = Timestamp;

fn range(&self) -> (Self::BoundType, Self::BoundType) {
self.time_range()
}
}

impl Item for FileHandle {
fn size(&self) -> usize {
self.size() as usize
}
}

#[derive(Debug, Clone)]
struct MergeItems<T: Item> {
items: Vec<T>,
Expand Down Expand Up @@ -352,7 +369,7 @@ fn merge_all_runs<T: Item>(mut runs: Vec<SortedRun<T>>) -> SortedRun<T> {

/// Reduces the num of runs to given target and returns items to merge.
/// The time complexity of this function is `C_{k}_{runs.len()}` where k=`runs.len()`-target+1.
fn reduce_runs<T: Item>(runs: Vec<SortedRun<T>>, target: usize) -> Vec<Vec<T>> {
pub(crate) fn reduce_runs<T: Item>(runs: Vec<SortedRun<T>>, target: usize) -> Vec<Vec<T>> {
assert_ne!(target, 0);
if target >= runs.len() {
// already satisfied.
Expand Down
22 changes: 14 additions & 8 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tokio::sync::mpsc;
use crate::access_layer::{AccessLayerRef, SstWriteRequest};
use crate::cache::CacheManagerRef;
use crate::compaction::picker::{CompactionTask, Picker};
use crate::compaction::run::find_sorted_runs;
use crate::compaction::run::{find_sorted_runs, reduce_runs};
use crate::compaction::CompactionRequest;
use crate::config::MitoConfig;
use crate::error::{self, CompactRegionSnafu};
Expand Down Expand Up @@ -96,15 +96,21 @@ impl TwcsPicker {
if let Some(active_window) = active_window
&& *window == active_window
{
let sorted_runs = find_sorted_runs(files.clone());
debug!("Find sorted runs: {:?}", sorted_runs);
// inside active window
if files.len() > self.max_active_window_runs {
output.push(CompactionOutput {
output_file_id: FileId::random(),
output_level: LEVEL_COMPACTED,
inputs: files.clone(),
});
if sorted_runs.len() > self.max_active_window_runs {
output.extend(
reduce_runs(sorted_runs, self.max_active_window_runs)
.into_iter()
.map(|files| CompactionOutput {
output_file_id: FileId::random(),
output_level: LEVEL_COMPACTED,
inputs: files,
}),
);
} else {
debug!("Active window not present or no enough files in active window {:?}, window: {}", active_window, *window);
debug!("Active window not present or no enough sorted runs in active window {:?}, window: {}, current run: {}", active_window, *window, sorted_runs.len());
}
} else {
// not active writing window
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ with(
'regions'=1,
'ttl'='7d',
'compaction.type'='twcs',
'compaction.twcs.max_active_window_runs'='8',
'compaction.twcs.max_active_window_runs'='2',
'compaction.twcs.max_inactive_window_files'='2',
'compaction.twcs.time_window'='1d',
'index.inverted_index.ignore_column_ids'='1,2,3',
Expand Down

0 comments on commit 2c0f146

Please sign in to comment.