Skip to content

Commit

Permalink
feat: use real time range to filter memtable (#1233)
Browse files Browse the repository at this point in the history
## Rationale
Followup PR of #1225, this PR add the same logic for memtable.

## Detailed Changes


## Test Plan
Integration tests.
  • Loading branch information
jiacai2050 authored Sep 30, 2023
1 parent c96ca8d commit d52ef2a
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 37 deletions.
2 changes: 1 addition & 1 deletion analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ impl FlushTask {
let sst_meta = MetaData {
min_key,
max_key,
time_range: memtable_state.time_range,
time_range: memtable_state.aligned_time_range,
max_sequence,
schema: self.table_data.schema(),
};
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/instance/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ impl Instance {

for memtable in read_view.memtables {
let aligned_ts = memtable
.time_range
.aligned_time_range
.inclusive_start()
.truncate_by(segment_duration);
let entry = read_view_by_time
Expand Down
8 changes: 7 additions & 1 deletion analytic_engine/src/memtable/columnar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use std::{
use arena::MonoIncArena;
use bytes_ext::Bytes;
use common_types::{
column::Column, column_schema::ColumnId, datum::Datum, row::Row, schema::Schema, SequenceNumber,
column::Column, column_schema::ColumnId, datum::Datum, row::Row, schema::Schema,
time::TimeRange, SequenceNumber,
};
use generic_error::BoxError;
use log::debug;
Expand Down Expand Up @@ -230,6 +231,11 @@ impl MemTable for ColumnarMemTable {
self.last_sequence.load(Ordering::Relaxed)
}

// TODO: implement this.
fn time_range(&self) -> Option<TimeRange> {
None
}

fn metrics(&self) -> MemtableMetrics {
let row_raw_size = self.metrics.row_raw_size.load(Ordering::Relaxed);
let row_count = self.metrics.row_count.load(Ordering::Relaxed);
Expand Down
7 changes: 7 additions & 0 deletions analytic_engine/src/memtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use common_types::{
record_batch::RecordBatchWithKey,
row::Row,
schema::{IndexInWriterSchema, Schema},
time::TimeRange,
SequenceNumber,
};
use generic_error::GenericError;
Expand Down Expand Up @@ -134,6 +135,9 @@ pub enum Error {

#[snafu(display("msg:{msg}"))]
InternalNoCause { msg: String },

#[snafu(display("Timestamp is not found in row.\nBacktrace:\n{backtrace}"))]
TimestampNotFound { backtrace: Backtrace },
}

define_result!(Error);
Expand Down Expand Up @@ -255,6 +259,9 @@ pub trait MemTable {
/// If the memtable is empty, then the last sequence is 0.
fn last_sequence(&self) -> SequenceNumber;

/// Time range of written rows.
fn time_range(&self) -> Option<TimeRange>;

/// Metrics of inner state.
fn metrics(&self) -> Metrics;
}
Expand Down
9 changes: 4 additions & 5 deletions analytic_engine/src/memtable/skiplist/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ impl Factory for SkiplistMemTableFactory {
fn create_memtable(&self, opts: Options) -> Result<MemTableRef> {
let arena = MonoIncArena::with_collector(opts.arena_block_size as usize, opts.collector);
let skiplist = Skiplist::with_arena(BytewiseComparator, arena);
let memtable = Arc::new(SkiplistMemTable {
schema: opts.schema,
let memtable = Arc::new(SkiplistMemTable::new(
opts.schema,
skiplist,
last_sequence: AtomicU64::new(opts.creation_sequence),
metrics: Default::default(),
});
AtomicU64::new(opts.creation_sequence),
));

Ok(memtable)
}
Expand Down
57 changes: 53 additions & 4 deletions analytic_engine/src/memtable/skiplist/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,28 @@
pub mod factory;
pub mod iter;

use std::sync::atomic::{self, AtomicU64, AtomicUsize};
use std::sync::atomic::{self, AtomicI64, AtomicU64, AtomicUsize};

use arena::{Arena, BasicStats};
use bytes_ext::Bytes;
use codec::Encoder;
use common_types::{
row::{contiguous::ContiguousRowWriter, Row},
schema::Schema,
time::TimeRange,
SequenceNumber,
};
use generic_error::BoxError;
use log::{debug, trace};
use skiplist::{BytewiseComparator, Skiplist};
use snafu::{ensure, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};

use crate::memtable::{
key::{ComparableInternalKey, KeySequence},
reversed_iter::ReversedColumnarIterator,
skiplist::iter::ColumnarIterImpl,
ColumnarIterPtr, EncodeInternalKey, InvalidPutSequence, InvalidRow, MemTable,
Metrics as MemtableMetrics, PutContext, Result, ScanContext, ScanRequest,
Metrics as MemtableMetrics, PutContext, Result, ScanContext, ScanRequest, TimestampNotFound,
};

#[derive(Default, Debug)]
Expand All @@ -48,7 +49,7 @@ struct Metrics {
}

/// MemTable implementation based on skiplist
pub struct SkiplistMemTable<A: Arena<Stats = BasicStats> + Clone + Sync + Send> {
pub struct SkiplistMemTable<A: Arena<Stats = BasicStats> + Clone> {
/// Schema of this memtable, is immutable.
schema: Schema,
skiplist: Skiplist<BytewiseComparator, A>,
Expand All @@ -57,6 +58,27 @@ pub struct SkiplistMemTable<A: Arena<Stats = BasicStats> + Clone + Sync + Send>
last_sequence: AtomicU64,

metrics: Metrics,
min_time: AtomicI64,
max_time: AtomicI64,
}

impl<A: Arena<Stats = BasicStats> + Clone> SkiplistMemTable<A> {
fn new(
schema: Schema,
skiplist: Skiplist<BytewiseComparator, A>,
last_sequence: AtomicU64,
) -> Self {
Self {
schema,
skiplist,
last_sequence,
metrics: Metrics::default(),
// Init to max value first, so we can use `min(min_time, row.time)` to get real min
// time.
min_time: AtomicI64::new(i64::MAX),
max_time: AtomicI64::new(i64::MIN),
}
}
}

impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send + 'static> MemTable
Expand Down Expand Up @@ -117,6 +139,27 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send + 'static> MemTable
let encoded_size = internal_key.len() + row_value.len();
self.skiplist.put(internal_key, row_value);

// Update min/max time
let timestamp = row.timestamp(schema).context(TimestampNotFound)?.as_i64();
_ = self
.min_time
.fetch_update(atomic::Ordering::Relaxed, atomic::Ordering::Relaxed, |v| {
if timestamp < v {
Some(timestamp)
} else {
None
}
});
_ = self
.max_time
.fetch_update(atomic::Ordering::Relaxed, atomic::Ordering::Relaxed, |v| {
if timestamp > v {
Some(timestamp)
} else {
None
}
});

// Update metrics
self.metrics
.row_raw_size
Expand Down Expand Up @@ -180,6 +223,12 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send + 'static> MemTable
self.last_sequence.load(atomic::Ordering::Relaxed)
}

fn time_range(&self) -> Option<TimeRange> {
let min_time = self.min_time.load(atomic::Ordering::Relaxed);
let max_time = self.max_time.load(atomic::Ordering::Relaxed);
TimeRange::new(min_time.into(), (max_time + 1).into())
}

fn metrics(&self) -> MemtableMetrics {
let row_raw_size = self.metrics.row_raw_size.load(atomic::Ordering::Relaxed);
let row_encoded_size = self
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/table/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ impl TableData {
)?;
let mem_state = MemTableState {
mem,
time_range,
aligned_time_range: time_range,
id: self.alloc_memtable_id(),
};

Expand Down Expand Up @@ -931,7 +931,7 @@ pub mod tests {
assert_eq!(2, mem_state.id);
let time_range =
TimeRange::bucket_of(now_ts, table_options::DEFAULT_SEGMENT_DURATION).unwrap();
assert_eq!(time_range, mem_state.time_range);
assert_eq!(time_range, mem_state.aligned_time_range);
}

#[test]
Expand Down
47 changes: 27 additions & 20 deletions analytic_engine/src/table/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ pub struct MemTableState {
pub mem: MemTableRef,
/// The `time_range` is estimated via the time range of the first row group
/// write to this memtable and is aligned to segment size
pub time_range: TimeRange,
pub aligned_time_range: TimeRange,
/// Id of the memtable, newer memtable has greater id
pub id: MemTableId,
}
Expand All @@ -135,12 +135,17 @@ impl MemTableState {
pub fn last_sequence(&self) -> SequenceNumber {
self.mem.last_sequence()
}

pub fn real_time_range(&self) -> TimeRange {
self.mem.time_range().unwrap_or(self.aligned_time_range)
}
}

impl fmt::Debug for MemTableState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MemTableState")
.field("time_range", &self.time_range)
.field("aligned_time_range", &self.aligned_time_range)
.field("real_time_range", &self.real_time_range())
.field("id", &self.id)
.field("mem", &self.mem.approximate_memory_usage())
.field("metrics", &self.mem.metrics())
Expand All @@ -166,7 +171,7 @@ impl MemTableForWrite {
pub fn accept_timestamp(&self, timestamp: Timestamp) -> bool {
match self {
MemTableForWrite::Sampling(_) => true,
MemTableForWrite::Normal(v) => v.time_range.contains(timestamp),
MemTableForWrite::Normal(v) => v.aligned_time_range.contains(timestamp),
}
}

Expand Down Expand Up @@ -407,7 +412,7 @@ impl MutableMemTableSet {
.range((Bound::Excluded(timestamp), Bound::Unbounded))
.next()
{
if memtable.time_range.contains(timestamp) {
if memtable.aligned_time_range.contains(timestamp) {
return Some(memtable);
}
}
Expand All @@ -419,7 +424,7 @@ impl MutableMemTableSet {
/// present.
fn insert(&mut self, memtable: MemTableState) -> Option<MemTableState> {
// Use end time of time range as key
let end = memtable.time_range.exclusive_end();
let end = memtable.aligned_time_range.exclusive_end();
self.0.insert(end, memtable)
}

Expand Down Expand Up @@ -453,10 +458,11 @@ impl MutableMemTableSet {
let iter = self
.0
.range((Bound::Excluded(inclusive_start), Bound::Unbounded));
for (_end_ts, mem) in iter {
for (_end_ts, mem_state) in iter {
// We need to iterate all candidate memtables as their start time is unspecific
if mem.time_range.intersect_with(time_range) {
mems.push(mem.clone());
let memtable_time_range = mem_state.real_time_range();
if memtable_time_range.intersect_with(time_range) {
mems.push(mem_state.clone());
}
}
}
Expand All @@ -482,9 +488,10 @@ impl ImmutableMemTableSet {
}

fn memtables_for_read(&self, time_range: TimeRange, mems: &mut MemTableVec) {
for mem in self.0.values() {
if mem.time_range.intersect_with(time_range) {
mems.push(mem.clone());
for mem_state in self.0.values() {
let memtable_time_range = mem_state.real_time_range();
if memtable_time_range.intersect_with(time_range) {
mems.push(mem_state.clone());
}
}
}
Expand Down Expand Up @@ -1054,11 +1061,11 @@ mod tests {
let flushable_mems = version.pick_memtables_to_flush(last_sequence);
assert!(flushable_mems.sampling_mem.unwrap().freezed);

let time_range =
let aligned_time_range =
TimeRange::bucket_of(now, table_options::DEFAULT_SEGMENT_DURATION).unwrap();

// Sampling memtable still readable after freezed.
let read_view = version.pick_read_view(time_range);
let read_view = version.pick_read_view(aligned_time_range);
assert!(read_view.contains_sampling());
assert_eq!(memtable_id1, read_view.sampling_mem.as_ref().unwrap().id);
assert!(read_view.sampling_mem.unwrap().freezed);
Expand All @@ -1067,7 +1074,7 @@ mod tests {
let memtable_id2 = 2;
let mem_state = MemTableState {
mem: memtable,
time_range,
aligned_time_range,
id: memtable_id2,
};
// Insert a mutable memtable.
Expand All @@ -1079,11 +1086,11 @@ mod tests {
.unwrap()
.unwrap();
let mutable = mutable.as_normal();
assert_eq!(time_range, mutable.time_range);
assert_eq!(aligned_time_range, mutable.aligned_time_range);
assert_eq!(memtable_id2, mutable.id);

// Need to read sampling memtable and mutable memtable.
let read_view = version.pick_read_view(time_range);
let read_view = version.pick_read_view(aligned_time_range);
assert_eq!(memtable_id1, read_view.sampling_mem.as_ref().unwrap().id);
assert_eq!(1, read_view.memtables.len());
assert_eq!(memtable_id2, read_view.memtables[0].id);
Expand Down Expand Up @@ -1120,15 +1127,15 @@ mod tests {
version.freeze_sampling_memtable();

let now = Timestamp::now();
let time_range =
let aligned_time_range =
TimeRange::bucket_of(now, table_options::DEFAULT_SEGMENT_DURATION).unwrap();

// Prepare mutable memtable.
let memtable = MemTableMocker.build();
let memtable_id2 = 2;
let mem_state = MemTableState {
mem: memtable,
time_range,
aligned_time_range,
id: memtable_id2,
};
// Insert a mutable memtable.
Expand All @@ -1140,7 +1147,7 @@ mod tests {
let max_sequence = 120;
let file_id = 13;
let add_file = AddFileMocker::new(file_id)
.time_range(time_range)
.time_range(aligned_time_range)
.max_seq(max_sequence)
.build();
let edit = VersionEdit {
Expand All @@ -1153,7 +1160,7 @@ mod tests {
version.apply_edit(edit);

// Only pick ssts after flushed.
let read_view = version.pick_read_view(time_range);
let read_view = version.pick_read_view(aligned_time_range);
assert!(!read_view.contains_sampling());
assert!(read_view.sampling_mem.is_none());
assert!(read_view.memtables.is_empty());
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/merge_memtable_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl MergeMemTableBench {

memtables.push(MemTableState {
mem: memtable,
time_range: TimeRange::min_to_max(),
aligned_time_range: TimeRange::min_to_max(),
id: *id,
});
}
Expand Down
Loading

0 comments on commit d52ef2a

Please sign in to comment.