Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement memtable range #4162

Merged
merged 10 commits into from
Jun 18, 2024
Prev Previous commit
Next Next commit
refactor: MemRange -> MemtableRange
evenyag committed Jun 18, 2024

Verified

This commit was signed with the committer’s verified signature.
evenyag Yingwen
commit 6e0d1a9b9f520a89b18eb87b4fc7d0c5a7249652
18 changes: 9 additions & 9 deletions src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
@@ -115,7 +115,7 @@ pub trait Memtable: Send + Sync + fmt::Debug {
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Vec<MemRange>;
) -> Vec<MemtableRange>;
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved

/// Returns true if the memtable is empty.
fn is_empty(&self) -> bool;
@@ -295,33 +295,33 @@ pub trait IterBuilder: Send + Sync {
pub type BoxedIterBuilder = Box<dyn IterBuilder>;

/// Context shared by ranges of the same memtable.
pub struct MemRangeContext {
pub struct MemtableRangeContext {
/// Id of the memtable.
id: MemtableId,
/// Iterator builder.
builder: BoxedIterBuilder,
}

pub type MemRangeContextRef = Arc<MemRangeContext>;
pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;

impl MemRangeContext {
/// Creates a new [MemRangeContext].
impl MemtableRangeContext {
/// Creates a new [MemtableRangeContext].
pub fn new(id: MemtableId, builder: BoxedIterBuilder) -> Self {
Self { id, builder }
}
}

/// A range in the memtable.
#[derive(Clone)]
pub struct MemRange {
pub struct MemtableRange {
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
/// Shared context.
context: MemRangeContextRef,
context: MemtableRangeContextRef,
// TODO(yingwen): Id to identify the range in the memtable.
}

impl MemRange {
impl MemtableRange {
/// Creates a new range from context.
pub fn new(context: MemRangeContextRef) -> Self {
pub fn new(context: MemtableRangeContextRef) -> Self {
Self { context }
}

10 changes: 5 additions & 5 deletions src/mito2/src/memtable/partition_tree.rs
Original file line number Diff line number Diff line change
@@ -40,8 +40,8 @@ use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::metrics::WriteMetrics;
use crate::memtable::partition_tree::tree::PartitionTree;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemRange, MemRangeContext, Memtable,
MemtableBuilder, MemtableId, MemtableRef, MemtableStats,
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder,
MemtableId, MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats,
};

/// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size.
@@ -160,16 +160,16 @@ impl Memtable for PartitionTreeMemtable {
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Vec<MemRange> {
) -> Vec<MemtableRange> {
let projection = projection.map(|ids| ids.to_vec());
let builder = Box::new(PartitionTreeIterBuilder {
tree: self.tree.clone(),
projection,
predicate,
});
let context = Arc::new(MemRangeContext::new(self.id, builder));
let context = Arc::new(MemtableRangeContext::new(self.id, builder));

vec![MemRange::new(context)]
vec![MemtableRange::new(context)]
}

fn is_empty(&self) -> bool {
10 changes: 5 additions & 5 deletions src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
@@ -40,8 +40,8 @@ use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismat
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemRange, MemRangeContext, Memtable,
MemtableBuilder, MemtableId, MemtableRef, MemtableStats,
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder,
MemtableId, MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats,
};
use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
use crate::read::{Batch, BatchBuilder, BatchColumn};
@@ -248,7 +248,7 @@ impl Memtable for TimeSeriesMemtable {
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Vec<MemRange> {
) -> Vec<MemtableRange> {
let projection = if let Some(projection) = projection {
projection.iter().copied().collect()
} else {
@@ -263,9 +263,9 @@ impl Memtable for TimeSeriesMemtable {
predicate,
dedup: self.dedup,
});
let context = Arc::new(MemRangeContext::new(self.id, builder));
let context = Arc::new(MemtableRangeContext::new(self.id, builder));

vec![MemRange::new(context)]
vec![MemtableRange::new(context)]
}

fn is_empty(&self) -> bool {
22 changes: 12 additions & 10 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
@@ -35,7 +35,7 @@ use crate::access_layer::AccessLayerRef;
use crate::cache::file_cache::FileCacheRef;
use crate::cache::CacheManagerRef;
use crate::error::Result;
use crate::memtable::{MemRange, MemtableRef};
use crate::memtable::{MemtableRange, MemtableRef};
use crate::metrics::READ_SST_COUNT;
use crate::read::compat::{self, CompatBatch};
use crate::read::projection::ProjectionMapper;
@@ -631,8 +631,8 @@ pub(crate) type FileRangesGroup = SmallVec<[Vec<FileRange>; 4]>;
/// It contains memtables and file ranges to scan.
#[derive(Default)]
pub(crate) struct ScanPart {
/// Memtables to scan.
pub(crate) mem_ranges: Vec<MemRange>,
/// Memtable ranges to scan.
pub(crate) memtable_ranges: Vec<MemtableRange>,
/// File ranges to scan.
pub(crate) file_ranges: FileRangesGroup,
/// Optional time range of the part (inclusive).
@@ -643,8 +643,8 @@ impl fmt::Debug for ScanPart {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"ScanPart({} mem ranges, {} file ranges",
self.mem_ranges.len(),
"ScanPart({} memtable ranges, {} file ranges",
self.memtable_ranges.len(),
self.file_ranges
.iter()
.map(|ranges| ranges.len())
@@ -670,7 +670,7 @@ impl ScanPart {

/// Merges given `part` to this part.
pub(crate) fn merge(&mut self, mut part: ScanPart) {
self.mem_ranges.append(&mut part.mem_ranges);
self.memtable_ranges.append(&mut part.memtable_ranges);
self.file_ranges.append(&mut part.file_ranges);
let Some(part_range) = part.time_range else {
return;
@@ -687,7 +687,9 @@ impl ScanPart {
/// Returns true if the we can split the part into multiple parts
/// and preserving order.
pub(crate) fn can_split_preserve_order(&self) -> bool {
self.mem_ranges.is_empty() && self.file_ranges.len() == 1 && self.file_ranges[0].len() > 1
self.memtable_ranges.is_empty()
&& self.file_ranges.len() == 1
&& self.file_ranges[0].len() > 1
}
}

@@ -738,10 +740,10 @@ impl ScanPartList {
self.0.as_ref().map_or(0, |parts| parts.len())
}

/// Returns the number of mem ranges.
/// Returns the number of memtable ranges.
pub(crate) fn num_mem_ranges(&self) -> usize {
self.0.as_ref().map_or(0, |parts| {
parts.iter().map(|part| part.mem_ranges.len()).sum()
parts.iter().map(|part| part.memtable_ranges.len()).sum()
})
}

@@ -791,7 +793,7 @@ impl StreamContext {
Ok(inner) => match t {
DisplayFormatType::Default => write!(
f,
"partition_count={} ({} mem ranges, {} file ranges)",
"partition_count={} ({} memtable ranges, {} file ranges)",
inner.len(),
inner.num_mem_ranges(),
inner.num_file_ranges()
20 changes: 10 additions & 10 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
@@ -92,9 +92,9 @@ impl SeqScan {

/// Builds sources from a [ScanPart].
fn build_part_sources(part: &ScanPart, sources: &mut Vec<Source>) -> Result<()> {
sources.reserve(part.mem_ranges.len() + part.file_ranges.len());
sources.reserve(part.memtable_ranges.len() + part.file_ranges.len());
// Read memtables.
for mem in &part.mem_ranges {
for mem in &part.memtable_ranges {
let iter = mem.build_iter()?;
sources.push(Source::Iter(iter));
}
@@ -323,7 +323,7 @@ impl FileRangeCollector for SeqDistributor {
return;
}
let part = ScanPart {
mem_ranges: Vec::new(),
memtable_ranges: Vec::new(),
file_ranges: smallvec![ranges],
time_range: Some(file_meta.time_range),
};
@@ -346,15 +346,15 @@ impl SeqDistributor {
continue;
}
let part = ScanPart {
mem_ranges,
memtable_ranges: mem_ranges,
file_ranges: smallvec![],
time_range: stats.time_range(),
};
self.parts.push(part);
}
}

/// Groups file ranges and mem ranges by time ranges.
/// Groups file ranges and memtable ranges by time ranges.
/// The output number of parts may be `<= parallelism`. If `parallelism` is 0, it will be set to 1.
///
/// Output parts have non-overlapping time ranges.
@@ -417,9 +417,9 @@ fn maybe_merge_parts(mut parts: Vec<ScanPart>, parallelism: usize) -> Vec<ScanPa

// Sort parts by number of memtables and ranges in reverse order.
parts.sort_unstable_by(|a, b| {
a.mem_ranges
a.memtable_ranges
.len()
.cmp(&b.mem_ranges.len())
.cmp(&b.memtable_ranges.len())
.then_with(|| {
let a_ranges_len = a
.file_ranges
@@ -482,7 +482,7 @@ fn maybe_split_parts(mut parts: Vec<ScanPart>, parallelism: usize) -> Vec<ScanPa
assert!(ranges_per_part > 0);
for ranges in part.file_ranges[0].chunks(ranges_per_part) {
let new_part = ScanPart {
mem_ranges: Vec::new(),
memtable_ranges: Vec::new(),
file_ranges: smallvec![ranges.to_vec()],
time_range: part.time_range,
};
@@ -522,7 +522,7 @@ mod tests {
Timestamp::new(*end, TimeUnit::Second),
);
ScanPart {
mem_ranges: vec![mem_range_for_test(*id)],
memtable_ranges: vec![mem_range_for_test(*id)],
file_ranges: smallvec![],
time_range: Some(range),
}
@@ -532,7 +532,7 @@ mod tests {
let actual: Vec<_> = output
.iter()
.map(|part| {
let ids: Vec<_> = part.mem_ranges.iter().map(|mem| mem.id()).collect();
let ids: Vec<_> = part.memtable_ranges.iter().map(|mem| mem.id()).collect();
let range = part.time_range.unwrap();
(ids, range.0.value(), range.1.value())
})
12 changes: 6 additions & 6 deletions src/mito2/src/read/unordered_scan.rs
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ use table::predicate::Predicate;

use crate::cache::CacheManager;
use crate::error::Result;
use crate::memtable::{MemRange, MemtableRef};
use crate::memtable::{MemtableRange, MemtableRef};
use crate::read::compat::CompatBatch;
use crate::read::projection::ProjectionMapper;
use crate::read::scan_region::{
@@ -153,7 +153,7 @@ impl RegionScanner for UnorderedScan {

let mapper = &stream_ctx.input.mapper;
let memtable_sources = part
.mem_ranges
.memtable_ranges
.iter()
.map(|mem| {
let iter = mem.build_iter()?;
@@ -256,7 +256,7 @@ async fn maybe_init_parts(
/// is no output ordering guarantee of each partition.
#[derive(Default)]
struct UnorderedDistributor {
mem_ranges: Vec<MemRange>,
mem_ranges: Vec<MemtableRange>,
file_ranges: Vec<FileRange>,
}

@@ -295,7 +295,7 @@ impl UnorderedDistributor {
if parallelism <= 1 {
// Returns a single part.
let part = ScanPart {
mem_ranges: self.mem_ranges.clone(),
memtable_ranges: self.mem_ranges.clone(),
file_ranges: smallvec![self.file_ranges],
time_range: None,
};
@@ -316,15 +316,15 @@ impl UnorderedDistributor {
.mem_ranges
.chunks(mems_per_part)
.map(|mems| ScanPart {
mem_ranges: mems.to_vec(),
memtable_ranges: mems.to_vec(),
file_ranges: smallvec![Vec::new()], // Ensures there is always one group.
time_range: None,
})
.collect::<Vec<_>>();
for (i, ranges) in self.file_ranges.chunks(ranges_per_part).enumerate() {
if i == scan_parts.len() {
scan_parts.push(ScanPart {
mem_ranges: Vec::new(),
memtable_ranges: Vec::new(),
file_ranges: smallvec![ranges.to_vec()],
time_range: None,
});
12 changes: 6 additions & 6 deletions src/mito2/src/test_util/memtable_util.rs
Original file line number Diff line number Diff line change
@@ -33,8 +33,8 @@ use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer};
use crate::memtable::{
BoxedBatchIterator, IterBuilder, KeyValues, MemRange, MemRangeContext, Memtable,
MemtableBuilder, MemtableId, MemtableRef, MemtableStats,
BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder, MemtableId,
MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats,
};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::sst::parquet::format::ReadFormat;
@@ -92,7 +92,7 @@ impl Memtable for EmptyMemtable {
&self,
_projection: Option<&[ColumnId]>,
_predicate: Option<Predicate>,
) -> Vec<MemRange> {
) -> Vec<MemtableRange> {
vec![]
}

@@ -362,9 +362,9 @@ pub(crate) fn collect_iter_timestamps(iter: BoxedBatchIterator) -> Vec<i64> {
}

/// Builds a memtable range for test.
pub(crate) fn mem_range_for_test(id: MemtableId) -> MemRange {
pub(crate) fn mem_range_for_test(id: MemtableId) -> MemtableRange {
let builder = Box::new(EmptyIterBuilder::default());

let context = Arc::new(MemRangeContext::new(id, builder));
MemRange::new(context)
let context = Arc::new(MemtableRangeContext::new(id, builder));
MemtableRange::new(context)
}