From 56ba67dd5c74ee37a223c4081b236f6389802b11 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 27 Dec 2024 19:27:02 +0800 Subject: [PATCH 1/6] override memtable sequence Signed-off-by: Ruihang Xia --- src/mito2/src/access_layer.rs | 6 +++- src/mito2/src/cache/write_cache.rs | 6 +++- src/mito2/src/compaction/compactor.rs | 1 + src/mito2/src/flush.rs | 13 +++++++ src/mito2/src/memtable.rs | 9 ++++- src/mito2/src/memtable/partition_tree.rs | 18 +++++++++- src/mito2/src/memtable/time_series.rs | 17 +++++++++- src/mito2/src/sst/parquet.rs | 12 +++---- src/mito2/src/sst/parquet/format.rs | 43 ++++++++++++++++++++++-- src/mito2/src/sst/parquet/writer.rs | 5 ++- 10 files changed, 116 insertions(+), 14 deletions(-) diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 0d6204d02416..16d1480a61ed 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -19,6 +19,7 @@ use object_store::util::{join_dir, with_instrument_layers}; use object_store::ObjectStore; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; +use store_api::storage::SequenceNumber; use crate::cache::write_cache::SstUploadRequest; use crate::cache::CacheManagerRef; @@ -164,7 +165,9 @@ impl AccessLayer { request.metadata, indexer, ); - writer.write_all(request.source, write_opts).await? + writer + .write_all(request.source, request.max_sequence, write_opts) + .await? }; // Put parquet metadata to cache manager. @@ -194,6 +197,7 @@ pub(crate) struct SstWriteRequest { pub(crate) cache_manager: CacheManagerRef, #[allow(dead_code)] pub(crate) storage: Option, + pub(crate) max_sequence: Option, /// Configs for index pub(crate) index_options: IndexOptions, diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 18fe41c5f614..1efb742821bb 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -138,7 +138,9 @@ impl WriteCache { indexer, ); - let sst_info = writer.write_all(write_request.source, write_opts).await?; + let sst_info = writer + .write_all(write_request.source, write_request.max_sequence, write_opts) + .await?; timer.stop_and_record(); @@ -375,6 +377,7 @@ mod tests { metadata, source, storage: None, + max_sequence: None, cache_manager: Default::default(), index_options: IndexOptions::default(), inverted_index_config: Default::default(), @@ -468,6 +471,7 @@ mod tests { metadata, source, storage: None, + max_sequence: None, cache_manager: cache_manager.clone(), index_options: IndexOptions::default(), inverted_index_config: Default::default(), diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 58425f4d79e3..9dd171759695 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -324,6 +324,7 @@ impl Compactor for DefaultCompactor { source: Source::Reader(reader), cache_manager, storage, + max_sequence: None, index_options, inverted_index_config, fulltext_index_config, diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index dd844a7d534c..5ef94de6894b 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -337,6 +337,18 @@ impl RegionFlushTask { } let memtables = version.memtables.immutables(); + // get max_sequence from all non-empty memetables + let max_sequence = memtables + .iter() + .filter_map(|m| { + if !m.is_empty() { + Some(m.stats().max_sequence()) + } else { + None + } + }) + .max(); + let mut file_metas = Vec::with_capacity(memtables.len()); let mut flushed_bytes = 0; for mem in memtables { @@ -357,6 +369,7 @@ impl RegionFlushTask { source, cache_manager: self.cache_manager.clone(), storage: version.options.storage.clone(), + max_sequence, index_options: self.index_options.clone(), inverted_index_config: self.engine_config.inverted_index.clone(), fulltext_index_config: self.engine_config.fulltext_index.clone(), diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 6adc6eb96aec..2e488c03f80a 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -23,7 +23,7 @@ pub use bulk::part::BulkPart; use common_time::Timestamp; use serde::{Deserialize, Serialize}; use store_api::metadata::RegionMetadataRef; -use store_api::storage::ColumnId; +use store_api::storage::{ColumnId, SequenceNumber}; use table::predicate::Predicate; use crate::config::MitoConfig; @@ -77,6 +77,8 @@ pub struct MemtableStats { num_rows: usize, /// Total number of ranges in the memtable. num_ranges: usize, + /// The maximum sequence number in the memtable. + max_sequence: SequenceNumber, } impl MemtableStats { @@ -106,6 +108,11 @@ impl MemtableStats { pub fn num_ranges(&self) -> usize { self.num_ranges } + + /// Returns the maximum sequence number in the memtable. + pub fn max_sequence(&self) -> SequenceNumber { + self.max_sequence + } } pub type BoxedBatchIterator = Box> + Send>; diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index 1376f923316c..b6dbe680ef73 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -24,7 +24,7 @@ mod shard_builder; mod tree; use std::fmt; -use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; use common_base::readable_size::ReadableSize; @@ -113,6 +113,7 @@ pub struct PartitionTreeMemtable { alloc_tracker: AllocTracker, max_timestamp: AtomicI64, min_timestamp: AtomicI64, + max_sequence: AtomicU64, /// Total written rows in memtable. This also includes deleted and duplicated rows. num_rows: AtomicUsize, } @@ -140,6 +141,12 @@ impl Memtable for PartitionTreeMemtable { self.update_stats(&metrics); + // update max_sequence + if res.is_ok() { + let sequence = kvs.mutation.sequence; + self.max_sequence.fetch_max(sequence, Ordering::Relaxed); + } + self.num_rows.fetch_add(kvs.num_rows(), Ordering::Relaxed); res } @@ -152,6 +159,12 @@ impl Memtable for PartitionTreeMemtable { self.update_stats(&metrics); + // update max_sequence + if res.is_ok() { + self.max_sequence + .fetch_max(key_value.sequence(), Ordering::Relaxed); + } + self.num_rows.fetch_add(1, Ordering::Relaxed); res } @@ -210,6 +223,7 @@ impl Memtable for PartitionTreeMemtable { time_range: None, num_rows: 0, num_ranges: 0, + max_sequence: 0, }; } @@ -229,6 +243,7 @@ impl Memtable for PartitionTreeMemtable { time_range: Some((min_timestamp, max_timestamp)), num_rows: self.num_rows.load(Ordering::Relaxed), num_ranges: 1, + max_sequence: self.max_sequence.load(Ordering::Relaxed), } } @@ -267,6 +282,7 @@ impl PartitionTreeMemtable { max_timestamp: AtomicI64::new(i64::MIN), min_timestamp: AtomicI64::new(i64::MAX), num_rows: AtomicUsize::new(0), + max_sequence: AtomicU64::new(0), } } diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 8ef6f4412120..933d3e1a8c45 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -15,7 +15,7 @@ use std::collections::btree_map::Entry; use std::collections::{BTreeMap, Bound, HashSet}; use std::fmt::{Debug, Formatter}; -use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; @@ -100,6 +100,7 @@ pub struct TimeSeriesMemtable { alloc_tracker: AllocTracker, max_timestamp: AtomicI64, min_timestamp: AtomicI64, + max_sequence: AtomicU64, dedup: bool, merge_mode: MergeMode, /// Total written rows in memtable. This also includes deleted and duplicated rows. @@ -134,6 +135,7 @@ impl TimeSeriesMemtable { alloc_tracker: AllocTracker::new(write_buffer_manager), max_timestamp: AtomicI64::new(i64::MIN), min_timestamp: AtomicI64::new(i64::MAX), + max_sequence: AtomicU64::new(0), dedup, merge_mode, num_rows: Default::default(), @@ -199,6 +201,10 @@ impl Memtable for TimeSeriesMemtable { // so that we can ensure writing to memtable will succeed. self.update_stats(local_stats); + // update max_sequence + let sequence = kvs.mutation.sequence; + self.max_sequence.fetch_max(sequence, Ordering::Relaxed); + self.num_rows.fetch_add(kvs.num_rows(), Ordering::Relaxed); Ok(()) } @@ -209,6 +215,13 @@ impl Memtable for TimeSeriesMemtable { metrics.value_bytes += std::mem::size_of::() + std::mem::size_of::(); self.update_stats(metrics); + + // update max_sequence + if res.is_ok() { + self.max_sequence + .fetch_max(key_value.sequence(), Ordering::Relaxed); + } + self.num_rows.fetch_add(1, Ordering::Relaxed); res } @@ -294,6 +307,7 @@ impl Memtable for TimeSeriesMemtable { time_range: None, num_rows: 0, num_ranges: 0, + max_sequence: 0, }; } let ts_type = self @@ -311,6 +325,7 @@ impl Memtable for TimeSeriesMemtable { time_range: Some((min_timestamp, max_timestamp)), num_rows: self.num_rows.load(Ordering::Relaxed), num_ranges: 1, + max_sequence: self.max_sequence.load(Ordering::Relaxed), } } diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 280d46b500df..4a5fe7fe43a6 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -134,7 +134,7 @@ mod tests { ); let info = writer - .write_all(source, &write_opts) + .write_all(source, None, &write_opts) .await .unwrap() .unwrap(); @@ -189,7 +189,7 @@ mod tests { ); writer - .write_all(source, &write_opts) + .write_all(source, None, &write_opts) .await .unwrap() .unwrap(); @@ -258,7 +258,7 @@ mod tests { ); let sst_info = writer - .write_all(source, &write_opts) + .write_all(source, None, &write_opts) .await .unwrap() .expect("write_all should return sst info"); @@ -297,7 +297,7 @@ mod tests { Indexer::default(), ); writer - .write_all(source, &write_opts) + .write_all(source, None, &write_opts) .await .unwrap() .unwrap(); @@ -350,7 +350,7 @@ mod tests { Indexer::default(), ); writer - .write_all(source, &write_opts) + .write_all(source, None, &write_opts) .await .unwrap() .unwrap(); @@ -386,7 +386,7 @@ mod tests { ); writer - .write_all(source, &write_opts) + .write_all(source, None, &write_opts) .await .unwrap() .unwrap(); diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index f2d6c7614b1e..125621044511 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -42,7 +42,7 @@ use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; use parquet::file::statistics::Statistics; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{ColumnMetadata, RegionMetadataRef}; -use store_api::storage::ColumnId; +use store_api::storage::{ColumnId, SequenceNumber}; use crate::error::{ ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result, @@ -65,6 +65,7 @@ pub(crate) struct WriteFormat { metadata: RegionMetadataRef, /// SST file schema. arrow_schema: SchemaRef, + override_sequence: Option, } impl WriteFormat { @@ -74,9 +75,19 @@ impl WriteFormat { WriteFormat { metadata, arrow_schema, + override_sequence: None, } } + /// Set override sequence. + pub(crate) fn with_override_sequence( + mut self, + override_sequence: Option, + ) -> Self { + self.override_sequence = override_sequence; + self + } + /// Gets the arrow schema to store in parquet. pub(crate) fn arrow_schema(&self) -> &SchemaRef { &self.arrow_schema @@ -107,7 +118,14 @@ impl WriteFormat { columns.push(batch.timestamps().to_arrow_array()); // Add internal columns: primary key, sequences, op types. columns.push(new_primary_key_array(batch.primary_key(), batch.num_rows())); - columns.push(batch.sequences().to_arrow_array()); + + if let Some(override_sequence) = self.override_sequence { + let sequence_array = + Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()])); + columns.push(sequence_array); + } else { + columns.push(batch.sequences().to_arrow_array()); + } columns.push(batch.op_types().to_arrow_array()); RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu) @@ -756,6 +774,27 @@ mod tests { assert_eq!(expect_record, actual); } + #[test] + fn test_convert_batch_with_override_sequence() { + let metadata = build_test_region_metadata(); + let write_format = WriteFormat::new(metadata).with_override_sequence(Some(415411)); + + let num_rows = 4; + let batch = new_batch(b"test", 1, 2, num_rows); + let columns: Vec = vec![ + Arc::new(Int64Array::from(vec![2; num_rows])), // field1 + Arc::new(Int64Array::from(vec![3; num_rows])), // field0 + Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts + build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // primary key + Arc::new(UInt64Array::from(vec![415411; num_rows])), // sequence + Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type + ]; + let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap(); + + let actual = write_format.convert_batch(&batch).unwrap(); + assert_eq!(expect_record, actual); + } + #[test] fn test_projection_indices() { let metadata = build_test_region_metadata(); diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 1d63f5e3d01b..13f7cfb3ec91 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -31,6 +31,7 @@ use parquet::schema::types::ColumnPath; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::storage::consts::SEQUENCE_COLUMN_NAME; +use store_api::storage::SequenceNumber; use tokio::io::AsyncWrite; use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt}; @@ -112,9 +113,11 @@ where pub async fn write_all( &mut self, mut source: Source, + override_sequence: Option, // override the `sequence` field from `Source` opts: &WriteOptions, ) -> Result> { - let write_format = WriteFormat::new(self.metadata.clone()); + let write_format = + WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence); let mut stats = SourceStats::default(); while let Some(res) = self From c439bba81dd3a333f4534b4c39dd2c05af98724b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 27 Dec 2024 20:11:46 +0800 Subject: [PATCH 2/6] override sst sequence Signed-off-by: Ruihang Xia --- src/mito2/src/compaction/compactor.rs | 9 ++++++++- src/mito2/src/compaction/test_util.rs | 2 ++ src/mito2/src/compaction/twcs.rs | 1 + src/mito2/src/engine/basic_test.rs | 2 +- src/mito2/src/flush.rs | 1 + src/mito2/src/manifest/tests/checkpoint.rs | 1 + src/mito2/src/sst/file.rs | 3 +++ src/mito2/src/sst/file_purger.rs | 2 ++ src/mito2/src/test_util/sst_util.rs | 1 + src/mito2/src/test_util/version_util.rs | 2 ++ 10 files changed, 22 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 9dd171759695..fc4d084dbaed 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -303,6 +303,12 @@ impl Compactor for DefaultCompactor { let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone(); let bloom_filter_index_config = compaction_region.engine_config.bloom_filter_index.clone(); + let max_sequence = output + .inputs + .iter() + .map(|f| f.meta_ref().max_sequence) + .max() + .and_then(|x| if x == 0 { None } else { Some(x) }); futs.push(async move { let reader = CompactionSstReaderBuilder { metadata: region_metadata.clone(), @@ -324,7 +330,7 @@ impl Compactor for DefaultCompactor { source: Source::Reader(reader), cache_manager, storage, - max_sequence: None, + max_sequence, index_options, inverted_index_config, fulltext_index_config, @@ -343,6 +349,7 @@ impl Compactor for DefaultCompactor { index_file_size: sst_info.index_metadata.file_size, num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, + max_sequence: max_sequence.unwrap_or(0), }); Ok(file_meta_opt) }); diff --git a/src/mito2/src/compaction/test_util.rs b/src/mito2/src/compaction/test_util.rs index 1df462004f8d..97767cd6d484 100644 --- a/src/mito2/src/compaction/test_util.rs +++ b/src/mito2/src/compaction/test_util.rs @@ -39,6 +39,7 @@ pub fn new_file_handle( index_file_size: 0, num_rows: 0, num_row_groups: 0, + max_sequence: 0, }, file_purger, ) @@ -63,6 +64,7 @@ pub(crate) fn new_file_handles(file_specs: &[(i64, i64, u64)]) -> Vec FileHandle { index_file_size: 0, num_rows: 0, num_row_groups: 0, + max_sequence: 0, }, file_purger, ) diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index d4a17ffe47c7..557f29b9a84a 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -103,6 +103,7 @@ impl VersionControlBuilder { index_file_size: 0, num_rows: 0, num_row_groups: 0, + max_sequence: 0, }, ); self @@ -194,6 +195,7 @@ pub(crate) fn apply_edit( index_file_size: 0, num_rows: 0, num_row_groups: 0, + max_sequence: 0, } }) .collect(); From 3f328f89357ad761f60d2e42cb6a92654556e344 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 30 Dec 2024 16:42:38 +0800 Subject: [PATCH 3/6] chore changes per to CR comments Signed-off-by: Ruihang Xia --- src/mito2/src/compaction/compactor.rs | 9 +++++---- src/mito2/src/compaction/test_util.rs | 4 ++-- src/mito2/src/compaction/twcs.rs | 2 +- src/mito2/src/engine/basic_test.rs | 2 +- src/mito2/src/flush.rs | 18 ++++-------------- src/mito2/src/manifest/tests/checkpoint.rs | 2 +- src/mito2/src/sst/file.rs | 10 +++++++--- src/mito2/src/sst/file_purger.rs | 6 ++++-- src/mito2/src/test_util/sst_util.rs | 3 ++- src/mito2/src/test_util/version_util.rs | 5 +++-- 10 files changed, 30 insertions(+), 31 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index fc4d084dbaed..6a549e8fcc52 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::num::NonZero; use std::sync::Arc; use std::time::Duration; @@ -306,9 +307,9 @@ impl Compactor for DefaultCompactor { let max_sequence = output .inputs .iter() - .map(|f| f.meta_ref().max_sequence) + .map(|f| f.meta_ref().sequence) .max() - .and_then(|x| if x == 0 { None } else { Some(x) }); + .flatten(); futs.push(async move { let reader = CompactionSstReaderBuilder { metadata: region_metadata.clone(), @@ -330,7 +331,7 @@ impl Compactor for DefaultCompactor { source: Source::Reader(reader), cache_manager, storage, - max_sequence, + max_sequence: max_sequence.map(NonZero::get), index_options, inverted_index_config, fulltext_index_config, @@ -349,7 +350,7 @@ impl Compactor for DefaultCompactor { index_file_size: sst_info.index_metadata.file_size, num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, - max_sequence: max_sequence.unwrap_or(0), + sequence: max_sequence, }); Ok(file_meta_opt) }); diff --git a/src/mito2/src/compaction/test_util.rs b/src/mito2/src/compaction/test_util.rs index 97767cd6d484..4baa6a9db55a 100644 --- a/src/mito2/src/compaction/test_util.rs +++ b/src/mito2/src/compaction/test_util.rs @@ -39,7 +39,7 @@ pub fn new_file_handle( index_file_size: 0, num_rows: 0, num_row_groups: 0, - max_sequence: 0, + sequence: None, }, file_purger, ) @@ -64,7 +64,7 @@ pub(crate) fn new_file_handles(file_specs: &[(i64, i64, u64)]) -> Vec 0); // Chief says this assert can ensure the size is counted. assert_eq!(region_stat.num_rows, 10); // region total usage diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 694af86bc494..a0400deb5bc0 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -15,6 +15,7 @@ //! Flush related utilities and structs. use std::collections::HashMap; +use std::num::NonZeroU64; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -337,18 +338,6 @@ impl RegionFlushTask { } let memtables = version.memtables.immutables(); - // get max_sequence from all non-empty memetables - let max_sequence = memtables - .iter() - .filter_map(|m| { - if !m.is_empty() { - Some(m.stats().max_sequence()) - } else { - None - } - }) - .max(); - let mut file_metas = Vec::with_capacity(memtables.len()); let mut flushed_bytes = 0; for mem in memtables { @@ -357,6 +346,7 @@ impl RegionFlushTask { continue; } + let max_sequence = mem.stats().max_sequence(); let file_id = FileId::random(); let iter = mem.iter(None, None)?; let source = Source::Iter(iter); @@ -369,7 +359,7 @@ impl RegionFlushTask { source, cache_manager: self.cache_manager.clone(), storage: version.options.storage.clone(), - max_sequence, + max_sequence: Some(max_sequence), index_options: self.index_options.clone(), inverted_index_config: self.engine_config.inverted_index.clone(), fulltext_index_config: self.engine_config.fulltext_index.clone(), @@ -395,7 +385,7 @@ impl RegionFlushTask { index_file_size: sst_info.index_metadata.file_size, num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, - max_sequence: max_sequence.unwrap_or(0), + sequence: NonZeroU64::new(max_sequence), }; file_metas.push(file_meta); } diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index 51aae2bd20d3..7e20bc2ced98 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -225,7 +225,7 @@ async fn checkpoint_with_different_compression_types() { index_file_size: 0, num_rows: 0, num_row_groups: 0, - max_sequence: 0, + sequence: None, }; let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit { files_to_add: vec![file_meta], diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 1f9138a0c99e..f9628c495d4c 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -15,6 +15,7 @@ //! Structures to describe metadata of files. use std::fmt; +use std::num::NonZeroU64; use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -134,8 +135,11 @@ pub struct FileMeta { /// the default value `0` doesn't means the file doesn't contains any rows, /// but instead means the number of rows is unknown. pub num_row_groups: u64, - /// Max sequence in this file - pub max_sequence: u64, + /// Sequence in this file. + /// + /// This sequence is the only sequence in this file. And it's retrived from the max + /// sequence of the rows on generating this file. + pub sequence: Option, } /// Type of index. @@ -345,7 +349,7 @@ mod tests { index_file_size: 0, num_rows: 0, num_row_groups: 0, - max_sequence: 0, + sequence: None, } } diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index dc3adbd7b924..7d81445c67fe 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -119,6 +119,8 @@ impl FilePurger for LocalFilePurger { #[cfg(test)] mod tests { + use std::num::NonZeroU64; + use common_test_util::temp_dir::create_temp_dir; use object_store::services::Fs; use object_store::ObjectStore; @@ -176,7 +178,7 @@ mod tests { index_file_size: 0, num_rows: 0, num_row_groups: 0, - max_sequence: 0, + sequence: None, }, file_purger, ); @@ -239,7 +241,7 @@ mod tests { index_file_size: 4096, num_rows: 1024, num_row_groups: 1, - max_sequence: 4096, + sequence: NonZeroU64::new(4096), }, file_purger, ); diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 5506009d107d..63c3fc09d621 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -14,6 +14,7 @@ //! Utilities for testing SSTs. +use std::num::NonZeroU64; use std::sync::Arc; use api::v1::{OpType, SemanticType}; @@ -116,7 +117,7 @@ pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle { index_file_size: 0, num_rows: 0, num_row_groups: 0, - max_sequence: 0, + sequence: None, }, file_purger, ) diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index 557f29b9a84a..68534d34eeb8 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -15,6 +15,7 @@ //! Utilities to mock version. use std::collections::HashMap; +use std::num::NonZeroU64; use std::sync::Arc; use api::v1::value::ValueData; @@ -103,7 +104,7 @@ impl VersionControlBuilder { index_file_size: 0, num_rows: 0, num_row_groups: 0, - max_sequence: 0, + sequence: None, }, ); self @@ -195,7 +196,7 @@ pub(crate) fn apply_edit( index_file_size: 0, num_rows: 0, num_row_groups: 0, - max_sequence: 0, + sequence: None, } }) .collect(); From e4aef34139f4046cfaef9bca4d2413baaf31fabf Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 30 Dec 2024 17:11:48 +0800 Subject: [PATCH 4/6] use correct sequence number Signed-off-by: Ruihang Xia --- src/mito2/src/memtable/key_values.rs | 5 +++++ src/mito2/src/memtable/partition_tree.rs | 6 +++++- src/mito2/src/memtable/time_series.rs | 6 +++++- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index a98826bdb125..42237c6fd5da 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -63,6 +63,11 @@ impl KeyValues { // Safety: rows is not None. self.mutation.rows.as_ref().unwrap().rows.len() } + + /// Returns if this container is empty + pub fn is_empty(&self) -> bool { + self.mutation.rows.is_none() + } } /// Key value view of a mutation. diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index b6dbe680ef73..c8a1126e4a46 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -132,6 +132,10 @@ impl Memtable for PartitionTreeMemtable { } fn write(&self, kvs: &KeyValues) -> Result<()> { + if kvs.is_empty() { + return Ok(()); + } + // TODO(yingwen): Validate schema while inserting rows. let mut metrics = WriteMetrics::default(); @@ -143,7 +147,7 @@ impl Memtable for PartitionTreeMemtable { // update max_sequence if res.is_ok() { - let sequence = kvs.mutation.sequence; + let sequence = kvs.mutation.sequence + kvs.num_rows() as u64 - 1; self.max_sequence.fetch_max(sequence, Ordering::Relaxed); } diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 933d3e1a8c45..154bdbf261ec 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -188,6 +188,10 @@ impl Memtable for TimeSeriesMemtable { } fn write(&self, kvs: &KeyValues) -> Result<()> { + if kvs.is_empty() { + return Ok(()); + } + let mut local_stats = WriteMetrics::default(); for kv in kvs.iter() { @@ -202,7 +206,7 @@ impl Memtable for TimeSeriesMemtable { self.update_stats(local_stats); // update max_sequence - let sequence = kvs.mutation.sequence; + let sequence = kvs.mutation.sequence + kvs.num_rows() as u64 - 1; self.max_sequence.fetch_max(sequence, Ordering::Relaxed); self.num_rows.fetch_add(kvs.num_rows(), Ordering::Relaxed); From fd96d39f9d3ed187bf4b3d73bc86613f713a96ab Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 30 Dec 2024 17:23:17 +0800 Subject: [PATCH 5/6] wrap a method to get max sequence Signed-off-by: Ruihang Xia --- src/mito2/src/memtable/key_values.rs | 14 ++++++++++++++ src/mito2/src/memtable/partition_tree.rs | 2 +- src/mito2/src/memtable/time_series.rs | 2 +- 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index 42237c6fd5da..73013920e65b 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -68,6 +68,20 @@ impl KeyValues { pub fn is_empty(&self) -> bool { self.mutation.rows.is_none() } + + /// Return the max sequence in this container. + /// + /// When the mutation has no rows, the sequence is the same as the mutation sequence. + pub fn max_sequence(&self) -> SequenceNumber { + let mut sequence = self.mutation.sequence; + let num_rows = self.mutation.rows.as_ref().unwrap().rows.len() as u64; + sequence += num_rows; + if num_rows > 0 { + sequence -= 1; + } + + sequence + } } /// Key value view of a mutation. diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index c8a1126e4a46..d38623208300 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -147,7 +147,7 @@ impl Memtable for PartitionTreeMemtable { // update max_sequence if res.is_ok() { - let sequence = kvs.mutation.sequence + kvs.num_rows() as u64 - 1; + let sequence = kvs.max_sequence(); self.max_sequence.fetch_max(sequence, Ordering::Relaxed); } diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 154bdbf261ec..23452d783a6b 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -206,7 +206,7 @@ impl Memtable for TimeSeriesMemtable { self.update_stats(local_stats); // update max_sequence - let sequence = kvs.mutation.sequence + kvs.num_rows() as u64 - 1; + let sequence = kvs.max_sequence(); self.max_sequence.fetch_max(sequence, Ordering::Relaxed); self.num_rows.fetch_add(kvs.num_rows(), Ordering::Relaxed); From e188d7d3c47a5626e49ce39b5d34aa6e56708328 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 30 Dec 2024 17:27:52 +0800 Subject: [PATCH 6/6] fix typo Signed-off-by: Ruihang Xia --- src/mito2/src/sst/file.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index f9628c495d4c..e9959ae562af 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -137,7 +137,7 @@ pub struct FileMeta { pub num_row_groups: u64, /// Sequence in this file. /// - /// This sequence is the only sequence in this file. And it's retrived from the max + /// This sequence is the only sequence in this file. And it's retrieved from the max /// sequence of the rows on generating this file. pub sequence: Option, }