Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: override __sequence on creating SST to save space and CPU #5252

Merged
merged 6 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use object_store::util::{join_dir, with_instrument_layers};
use object_store::ObjectStore;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::SequenceNumber;

use crate::cache::write_cache::SstUploadRequest;
use crate::cache::CacheManagerRef;
Expand Down Expand Up @@ -164,7 +165,9 @@ impl AccessLayer {
request.metadata,
indexer,
);
writer.write_all(request.source, write_opts).await?
writer
.write_all(request.source, request.max_sequence, write_opts)
.await?
};

// Put parquet metadata to cache manager.
Expand Down Expand Up @@ -194,6 +197,7 @@ pub(crate) struct SstWriteRequest {
pub(crate) cache_manager: CacheManagerRef,
#[allow(dead_code)]
pub(crate) storage: Option<String>,
pub(crate) max_sequence: Option<SequenceNumber>,

/// Configs for index
pub(crate) index_options: IndexOptions,
Expand Down
6 changes: 5 additions & 1 deletion src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ impl WriteCache {
indexer,
);

let sst_info = writer.write_all(write_request.source, write_opts).await?;
let sst_info = writer
.write_all(write_request.source, write_request.max_sequence, write_opts)
.await?;

timer.stop_and_record();

Expand Down Expand Up @@ -375,6 +377,7 @@ mod tests {
metadata,
source,
storage: None,
max_sequence: None,
cache_manager: Default::default(),
index_options: IndexOptions::default(),
inverted_index_config: Default::default(),
Expand Down Expand Up @@ -468,6 +471,7 @@ mod tests {
metadata,
source,
storage: None,
max_sequence: None,
cache_manager: cache_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: Default::default(),
Expand Down
9 changes: 9 additions & 0 deletions src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::num::NonZero;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -303,6 +304,12 @@ impl Compactor for DefaultCompactor {
let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
let bloom_filter_index_config =
compaction_region.engine_config.bloom_filter_index.clone();
let max_sequence = output
.inputs
.iter()
.map(|f| f.meta_ref().sequence)
.max()
.flatten();
futs.push(async move {
let reader = CompactionSstReaderBuilder {
metadata: region_metadata.clone(),
Expand All @@ -324,6 +331,7 @@ impl Compactor for DefaultCompactor {
source: Source::Reader(reader),
cache_manager,
storage,
max_sequence: max_sequence.map(NonZero::get),
index_options,
inverted_index_config,
fulltext_index_config,
Expand All @@ -342,6 +350,7 @@ impl Compactor for DefaultCompactor {
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: max_sequence,
});
Ok(file_meta_opt)
});
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/compaction/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub fn new_file_handle(
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
},
file_purger,
)
Expand All @@ -63,6 +64,7 @@ pub(crate) fn new_file_handles(file_specs: &[(i64, i64, u64)]) -> Vec<FileHandle
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
},
file_purger.clone(),
)
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ mod tests {
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
},
Arc::new(NoopFilePurger),
)
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ async fn test_region_usage() {
flush_region(&engine, region_id, None).await;

let region_stat = region.region_statistic();
assert_eq!(region_stat.sst_size, 2790);
assert!(region_stat.sst_size > 0); // Chief says this assert can ensure the size is counted.
evenyag marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(region_stat.num_rows, 10);

// region total usage
Expand Down
4 changes: 4 additions & 0 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Flush related utilities and structs.

use std::collections::HashMap;
use std::num::NonZeroU64;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

Expand Down Expand Up @@ -345,6 +346,7 @@ impl RegionFlushTask {
continue;
}

let max_sequence = mem.stats().max_sequence();
let file_id = FileId::random();
let iter = mem.iter(None, None)?;
let source = Source::Iter(iter);
Expand All @@ -357,6 +359,7 @@ impl RegionFlushTask {
source,
cache_manager: self.cache_manager.clone(),
storage: version.options.storage.clone(),
max_sequence: Some(max_sequence),
index_options: self.index_options.clone(),
inverted_index_config: self.engine_config.inverted_index.clone(),
fulltext_index_config: self.engine_config.fulltext_index.clone(),
Expand All @@ -382,6 +385,7 @@ impl RegionFlushTask {
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: NonZeroU64::new(max_sequence),
};
file_metas.push(file_meta);
}
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/manifest/tests/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ async fn checkpoint_with_different_compression_types() {
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
};
let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
files_to_add: vec![file_meta],
Expand Down
9 changes: 8 additions & 1 deletion src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub use bulk::part::BulkPart;
use common_time::Timestamp;
use serde::{Deserialize, Serialize};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;

use crate::config::MitoConfig;
Expand Down Expand Up @@ -77,6 +77,8 @@ pub struct MemtableStats {
num_rows: usize,
/// Total number of ranges in the memtable.
num_ranges: usize,
/// The maximum sequence number in the memtable.
max_sequence: SequenceNumber,
}

impl MemtableStats {
Expand Down Expand Up @@ -106,6 +108,11 @@ impl MemtableStats {
pub fn num_ranges(&self) -> usize {
self.num_ranges
}

/// Returns the maximum sequence number in the memtable.
pub fn max_sequence(&self) -> SequenceNumber {
self.max_sequence
}
}

pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
Expand Down
19 changes: 19 additions & 0 deletions src/mito2/src/memtable/key_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,25 @@ impl KeyValues {
// Safety: rows is not None.
self.mutation.rows.as_ref().unwrap().rows.len()
}

/// Returns if this container is empty
pub fn is_empty(&self) -> bool {
self.mutation.rows.is_none()
}

/// Return the max sequence in this container.
///
/// When the mutation has no rows, the sequence is the same as the mutation sequence.
pub fn max_sequence(&self) -> SequenceNumber {
let mut sequence = self.mutation.sequence;
let num_rows = self.mutation.rows.as_ref().unwrap().rows.len() as u64;
sequence += num_rows;
if num_rows > 0 {
sequence -= 1;
}

sequence
}
}

/// Key value view of a mutation.
Expand Down
22 changes: 21 additions & 1 deletion src/mito2/src/memtable/partition_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ mod shard_builder;
mod tree;

use std::fmt;
use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;

use common_base::readable_size::ReadableSize;
Expand Down Expand Up @@ -113,6 +113,7 @@ pub struct PartitionTreeMemtable {
alloc_tracker: AllocTracker,
max_timestamp: AtomicI64,
min_timestamp: AtomicI64,
max_sequence: AtomicU64,
/// Total written rows in memtable. This also includes deleted and duplicated rows.
num_rows: AtomicUsize,
}
Expand All @@ -131,6 +132,10 @@ impl Memtable for PartitionTreeMemtable {
}

fn write(&self, kvs: &KeyValues) -> Result<()> {
if kvs.is_empty() {
return Ok(());
}

// TODO(yingwen): Validate schema while inserting rows.

let mut metrics = WriteMetrics::default();
Expand All @@ -140,6 +145,12 @@ impl Memtable for PartitionTreeMemtable {

self.update_stats(&metrics);

// update max_sequence
if res.is_ok() {
let sequence = kvs.max_sequence();
self.max_sequence.fetch_max(sequence, Ordering::Relaxed);
}

self.num_rows.fetch_add(kvs.num_rows(), Ordering::Relaxed);
res
}
Expand All @@ -152,6 +163,12 @@ impl Memtable for PartitionTreeMemtable {

self.update_stats(&metrics);

// update max_sequence
if res.is_ok() {
self.max_sequence
.fetch_max(key_value.sequence(), Ordering::Relaxed);
}

self.num_rows.fetch_add(1, Ordering::Relaxed);
res
}
Expand Down Expand Up @@ -210,6 +227,7 @@ impl Memtable for PartitionTreeMemtable {
time_range: None,
num_rows: 0,
num_ranges: 0,
max_sequence: 0,
};
}

Expand All @@ -229,6 +247,7 @@ impl Memtable for PartitionTreeMemtable {
time_range: Some((min_timestamp, max_timestamp)),
num_rows: self.num_rows.load(Ordering::Relaxed),
num_ranges: 1,
max_sequence: self.max_sequence.load(Ordering::Relaxed),
}
}

Expand Down Expand Up @@ -267,6 +286,7 @@ impl PartitionTreeMemtable {
max_timestamp: AtomicI64::new(i64::MIN),
min_timestamp: AtomicI64::new(i64::MAX),
num_rows: AtomicUsize::new(0),
max_sequence: AtomicU64::new(0),
}
}

Expand Down
21 changes: 20 additions & 1 deletion src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, Bound, HashSet};
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -100,6 +100,7 @@ pub struct TimeSeriesMemtable {
alloc_tracker: AllocTracker,
max_timestamp: AtomicI64,
min_timestamp: AtomicI64,
max_sequence: AtomicU64,
dedup: bool,
merge_mode: MergeMode,
/// Total written rows in memtable. This also includes deleted and duplicated rows.
Expand Down Expand Up @@ -134,6 +135,7 @@ impl TimeSeriesMemtable {
alloc_tracker: AllocTracker::new(write_buffer_manager),
max_timestamp: AtomicI64::new(i64::MIN),
min_timestamp: AtomicI64::new(i64::MAX),
max_sequence: AtomicU64::new(0),
dedup,
merge_mode,
num_rows: Default::default(),
Expand Down Expand Up @@ -186,6 +188,10 @@ impl Memtable for TimeSeriesMemtable {
}

fn write(&self, kvs: &KeyValues) -> Result<()> {
if kvs.is_empty() {
return Ok(());
}

let mut local_stats = WriteMetrics::default();

for kv in kvs.iter() {
Expand All @@ -199,6 +205,10 @@ impl Memtable for TimeSeriesMemtable {
// so that we can ensure writing to memtable will succeed.
self.update_stats(local_stats);

// update max_sequence
let sequence = kvs.max_sequence();
self.max_sequence.fetch_max(sequence, Ordering::Relaxed);

self.num_rows.fetch_add(kvs.num_rows(), Ordering::Relaxed);
Ok(())
}
Expand All @@ -209,6 +219,13 @@ impl Memtable for TimeSeriesMemtable {
metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();

self.update_stats(metrics);

// update max_sequence
if res.is_ok() {
self.max_sequence
.fetch_max(key_value.sequence(), Ordering::Relaxed);
}

self.num_rows.fetch_add(1, Ordering::Relaxed);
res
}
Expand Down Expand Up @@ -294,6 +311,7 @@ impl Memtable for TimeSeriesMemtable {
time_range: None,
num_rows: 0,
num_ranges: 0,
max_sequence: 0,
};
}
let ts_type = self
Expand All @@ -311,6 +329,7 @@ impl Memtable for TimeSeriesMemtable {
time_range: Some((min_timestamp, max_timestamp)),
num_rows: self.num_rows.load(Ordering::Relaxed),
num_ranges: 1,
max_sequence: self.max_sequence.load(Ordering::Relaxed),
}
}

Expand Down
Loading
Loading