diff --git a/src/core/index_meta.rs b/src/core/index_meta.rs index 1343c8d51b..8cd9429302 100644 --- a/src/core/index_meta.rs +++ b/src/core/index_meta.rs @@ -326,7 +326,7 @@ pub struct IndexMeta { pub payload: Option, } -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] struct UntrackedIndexMeta { pub segments: Vec, #[serde(default)] @@ -395,6 +395,7 @@ mod tests { use super::IndexMeta; use crate::core::index_meta::UntrackedIndexMeta; use crate::schema::{Schema, TEXT}; + use crate::store::ZstdCompressor; use crate::{IndexSettings, IndexSortByField, Order}; #[test] @@ -428,4 +429,60 @@ mod tests { assert_eq!(index_metas.schema, deser_meta.schema); assert_eq!(index_metas.opstamp, deser_meta.opstamp); } + + #[test] + fn test_serialize_metas_zstd_compressor() { + let schema = { + let mut schema_builder = Schema::builder(); + schema_builder.add_text_field("text", TEXT); + schema_builder.build() + }; + let index_metas = IndexMeta { + index_settings: IndexSettings { + sort_by_field: Some(IndexSortByField { + field: "text".to_string(), + order: Order::Asc, + }), + docstore_compression: crate::store::Compressor::Zstd(ZstdCompressor { + compression_level: Some(4), + }), + docstore_blocksize: 1_000_000, + }, + segments: Vec::new(), + schema, + opstamp: 0u64, + payload: None, + }; + let json = serde_json::ser::to_string(&index_metas).expect("serialization failed"); + assert_eq!( + json, + r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"zstd(compression_level=4)","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"# + ); + + let deser_meta: UntrackedIndexMeta = serde_json::from_str(&json).unwrap(); + assert_eq!(index_metas.index_settings, deser_meta.index_settings); + assert_eq!(index_metas.schema, deser_meta.schema); + assert_eq!(index_metas.opstamp, deser_meta.opstamp); + } + + #[test] + fn test_serialize_metas_invalid_comp() { + let json = r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"zsstd","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#; + + let err = serde_json::from_str::(&json).unwrap_err(); + assert_eq!( + err.to_string(), + "unknown variant `zsstd`, expected one of `none`, `lz4`, `brotli`, `snappy`, `zstd`, \ + `zstd(compression_level=5)` at line 1 column 96" + .to_string() + ); + + let json = r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"zstd(bla=10)","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#; + + let err = serde_json::from_str::(&json).unwrap_err(); + assert_eq!( + err.to_string(), + "unknown zstd option \"bla\" at line 1 column 103".to_string() + ); + } } diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 9856bc134f..455ab7b693 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1073,7 +1073,7 @@ impl IndexMerger { // // take 7 in order to not walk over all checkpoints. || store_reader.block_checkpoints().take(7).count() < 6 - || store_reader.compressor() != store_writer.compressor() + || store_reader.decompressor() != store_writer.compressor().into() { for doc_bytes_res in store_reader.iter_raw(reader.alive_bitset()) { let doc_bytes = doc_bytes_res?; diff --git a/src/store/compression_zstd_block.rs b/src/store/compression_zstd_block.rs index 6879cc2fbd..7ef0aa9df1 100644 --- a/src/store/compression_zstd_block.rs +++ b/src/store/compression_zstd_block.rs @@ -4,7 +4,11 @@ use zstd::bulk::{compress_to_buffer, decompress_to_buffer}; use zstd::DEFAULT_COMPRESSION_LEVEL; #[inline] -pub fn compress(uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()> { +pub fn compress( + uncompressed: &[u8], + compressed: &mut Vec, + compression_level: Option, +) -> io::Result<()> { let count_size = std::mem::size_of::(); let max_size = zstd::zstd_safe::compress_bound(uncompressed.len()) + count_size; @@ -14,7 +18,7 @@ pub fn compress(uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()> let compressed_size = compress_to_buffer( uncompressed, &mut compressed[count_size..], - DEFAULT_COMPRESSION_LEVEL, + compression_level.unwrap_or(DEFAULT_COMPRESSION_LEVEL), )?; compressed[0..count_size].copy_from_slice(&(uncompressed.len() as u32).to_le_bytes()); diff --git a/src/store/compressors.rs b/src/store/compressors.rs index 15f2b810c3..764a09beca 100644 --- a/src/store/compressors.rs +++ b/src/store/compressors.rs @@ -1,6 +1,6 @@ use std::io; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize}; pub trait StoreCompressor { fn compress(&self, uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()>; @@ -12,23 +12,114 @@ pub trait StoreCompressor { /// the compressor used to compress the doc store. /// /// The default is Lz4Block, but also depends on the enabled feature flags. -#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Clone, Debug, Copy, PartialEq, Eq)] pub enum Compressor { - #[serde(rename = "none")] /// No compression None, - #[serde(rename = "lz4")] /// Use the lz4 compressor (block format) Lz4, - #[serde(rename = "brotli")] /// Use the brotli compressor Brotli, - #[serde(rename = "snappy")] /// Use the snap compressor Snappy, - #[serde(rename = "zstd")] /// Use the zstd compressor - Zstd, + Zstd(ZstdCompressor), +} + +impl Serialize for Compressor { + fn serialize(&self, serializer: S) -> Result + where S: serde::Serializer { + match *self { + Compressor::None => serializer.serialize_str("none"), + Compressor::Lz4 => serializer.serialize_str("lz4"), + Compressor::Brotli => serializer.serialize_str("brotli"), + Compressor::Snappy => serializer.serialize_str("snappy"), + Compressor::Zstd(zstd) => serializer.serialize_str(&zstd.ser_to_string()), + } + } +} + +impl<'de> Deserialize<'de> for Compressor { + fn deserialize(deserializer: D) -> Result + where D: Deserializer<'de> { + let buf = String::deserialize(deserializer)?; + let compressor = match buf.as_str() { + "none" => Compressor::None, + "lz4" => Compressor::Lz4, + "brotli" => Compressor::Brotli, + "snappy" => Compressor::Snappy, + _ => { + if buf.starts_with("zstd") { + Compressor::Zstd( + ZstdCompressor::deser_from_str(&buf).map_err(serde::de::Error::custom)?, + ) + } else { + return Err(serde::de::Error::unknown_variant( + &buf, + &[ + "none", + "lz4", + "brotli", + "snappy", + "zstd", + "zstd(compression_level=5)", + ], + )); + } + } + }; + + Ok(compressor) + } +} + +#[derive(Clone, Default, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)] +/// The Zstd compressor, with optional compression level. +pub struct ZstdCompressor { + /// The compression level, if unset defaults to zstd::DEFAULT_COMPRESSION_LEVEL = 3 + pub compression_level: Option, +} + +impl ZstdCompressor { + fn deser_from_str(val: &str) -> Result { + if !val.starts_with("zstd") { + return Err(format!("needs to start with zstd, but got {}", val)); + } + if val == "zstd" { + return Ok(ZstdCompressor::default()); + } + let options = &val["zstd".len() + 1..val.len() - 1]; + + let mut compressor = ZstdCompressor::default(); + for option in options.split(',') { + let (opt_name, value) = options + .split_once('=') + .ok_or_else(|| format!("no '=' found in option {:?}", option))?; + + match opt_name { + "compression_level" => { + let value = value.parse::().map_err(|err| { + format!( + "Could not parse value {} of option {}, e: {}", + value, opt_name, err + ) + })?; + compressor.compression_level = Some(value); + } + _ => { + return Err(format!("unknown zstd option {:?}", opt_name)); + } + } + } + Ok(compressor) + } + fn ser_to_string(&self) -> String { + if let Some(compression_level) = self.compression_level { + format!("zstd(compression_level={})", compression_level) + } else { + "zstd".to_string() + } + } } impl Default for Compressor { @@ -40,7 +131,7 @@ impl Default for Compressor { } else if cfg!(feature = "snappy-compression") { Compressor::Snappy } else if cfg!(feature = "zstd-compression") { - Compressor::Zstd + Compressor::Zstd(ZstdCompressor::default()) } else { Compressor::None } @@ -48,25 +139,6 @@ impl Default for Compressor { } impl Compressor { - pub(crate) fn from_id(id: u8) -> Compressor { - match id { - 0 => Compressor::None, - 1 => Compressor::Lz4, - 2 => Compressor::Brotli, - 3 => Compressor::Snappy, - 4 => Compressor::Zstd, - _ => panic!("unknown compressor id {:?}", id), - } - } - pub(crate) fn get_id(&self) -> u8 { - match self { - Self::None => 0, - Self::Lz4 => 1, - Self::Brotli => 2, - Self::Snappy => 3, - Self::Zstd => 4, - } - } #[inline] pub(crate) fn compress_into( &self, @@ -109,10 +181,14 @@ impl Compressor { panic!("snappy-compression feature flag not activated"); } } - Self::Zstd => { + Self::Zstd(_zstd_compressor) => { #[cfg(feature = "zstd-compression")] { - super::compression_zstd_block::compress(uncompressed, compressed) + super::compression_zstd_block::compress( + uncompressed, + compressed, + _zstd_compressor.compression_level, + ) } #[cfg(not(feature = "zstd-compression"))] { @@ -121,65 +197,56 @@ impl Compressor { } } } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn zstd_serde_roundtrip() { + let compressor = ZstdCompressor { + compression_level: Some(15), + }; + + assert_eq!( + ZstdCompressor::deser_from_str(&compressor.ser_to_string()).unwrap(), + compressor + ); - pub(crate) fn decompress(&self, compressed_block: &[u8]) -> io::Result> { - let mut decompressed_block = vec![]; - self.decompress_into(compressed_block, &mut decompressed_block)?; - Ok(decompressed_block) + assert_eq!( + ZstdCompressor::deser_from_str(&ZstdCompressor::default().ser_to_string()).unwrap(), + ZstdCompressor::default() + ); } - #[inline] - pub(crate) fn decompress_into( - &self, - compressed: &[u8], - decompressed: &mut Vec, - ) -> io::Result<()> { - match self { - Self::None => { - decompressed.clear(); - decompressed.extend_from_slice(compressed); - Ok(()) - } - Self::Lz4 => { - #[cfg(feature = "lz4-compression")] - { - super::compression_lz4_block::decompress(compressed, decompressed) - } - #[cfg(not(feature = "lz4-compression"))] - { - panic!("lz4-compression feature flag not activated"); - } - } - Self::Brotli => { - #[cfg(feature = "brotli-compression")] - { - super::compression_brotli::decompress(compressed, decompressed) - } - #[cfg(not(feature = "brotli-compression"))] - { - panic!("brotli-compression feature flag not activated"); - } - } - Self::Snappy => { - #[cfg(feature = "snappy-compression")] - { - super::compression_snap::decompress(compressed, decompressed) - } - #[cfg(not(feature = "snappy-compression"))] - { - panic!("snappy-compression feature flag not activated"); - } - } - Self::Zstd => { - #[cfg(feature = "zstd-compression")] - { - super::compression_zstd_block::decompress(compressed, decompressed) - } - #[cfg(not(feature = "zstd-compression"))] - { - panic!("zstd-compression feature flag not activated"); - } + #[test] + fn deser_zstd_test() { + assert_eq!( + ZstdCompressor::deser_from_str("zstd").unwrap(), + ZstdCompressor::default() + ); + + assert!(ZstdCompressor::deser_from_str("zzstd").is_err()); + assert!(ZstdCompressor::deser_from_str("zzstd()").is_err()); + assert_eq!( + ZstdCompressor::deser_from_str("zstd(compression_level=15)").unwrap(), + ZstdCompressor { + compression_level: Some(15) } - } + ); + assert_eq!( + ZstdCompressor::deser_from_str("zstd(compresion_level=15)").unwrap_err(), + "unknown zstd option \"compresion_level\"" + ); + assert_eq!( + ZstdCompressor::deser_from_str("zstd(compression_level->2)").unwrap_err(), + "no '=' found in option \"compression_level->2\"" + ); + assert_eq!( + ZstdCompressor::deser_from_str("zstd(compression_level=over9000)").unwrap_err(), + "Could not parse value over9000 of option compression_level, e: invalid digit found \ + in string" + ); } } diff --git a/src/store/decompressors.rs b/src/store/decompressors.rs new file mode 100644 index 0000000000..a333a5b88f --- /dev/null +++ b/src/store/decompressors.rs @@ -0,0 +1,140 @@ +use std::io; + +use serde::{Deserialize, Serialize}; + +use super::Compressor; + +pub trait StoreCompressor { + fn compress(&self, uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()>; + fn decompress(&self, compressed: &[u8], decompressed: &mut Vec) -> io::Result<()>; + fn get_compressor_id() -> u8; +} + +/// Decompressor is deserialized from the doc store footer, when opening an index. +#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum Decompressor { + /// No compression + None, + /// Use the lz4 compressor (block format) + Lz4, + /// Use the brotli compressor + Brotli, + /// Use the snap compressor + Snappy, + /// Use the zstd compressor + Zstd, +} + +impl From for Decompressor { + fn from(compressor: Compressor) -> Self { + match compressor { + Compressor::None => Decompressor::None, + Compressor::Lz4 => Decompressor::Lz4, + Compressor::Brotli => Decompressor::Brotli, + Compressor::Snappy => Decompressor::Snappy, + Compressor::Zstd(_) => Decompressor::Zstd, + } + } +} + +impl Decompressor { + pub(crate) fn from_id(id: u8) -> Decompressor { + match id { + 0 => Decompressor::None, + 1 => Decompressor::Lz4, + 2 => Decompressor::Brotli, + 3 => Decompressor::Snappy, + 4 => Decompressor::Zstd, + _ => panic!("unknown compressor id {:?}", id), + } + } + + pub(crate) fn get_id(&self) -> u8 { + match self { + Self::None => 0, + Self::Lz4 => 1, + Self::Brotli => 2, + Self::Snappy => 3, + Self::Zstd => 4, + } + } + + pub(crate) fn decompress(&self, compressed_block: &[u8]) -> io::Result> { + let mut decompressed_block = vec![]; + self.decompress_into(compressed_block, &mut decompressed_block)?; + Ok(decompressed_block) + } + + #[inline] + pub(crate) fn decompress_into( + &self, + compressed: &[u8], + decompressed: &mut Vec, + ) -> io::Result<()> { + match self { + Self::None => { + decompressed.clear(); + decompressed.extend_from_slice(compressed); + Ok(()) + } + Self::Lz4 => { + #[cfg(feature = "lz4-compression")] + { + super::compression_lz4_block::decompress(compressed, decompressed) + } + #[cfg(not(feature = "lz4-compression"))] + { + panic!("lz4-compression feature flag not activated"); + } + } + Self::Brotli => { + #[cfg(feature = "brotli-compression")] + { + super::compression_brotli::decompress(compressed, decompressed) + } + #[cfg(not(feature = "brotli-compression"))] + { + panic!("brotli-compression feature flag not activated"); + } + } + Self::Snappy => { + #[cfg(feature = "snappy-compression")] + { + super::compression_snap::decompress(compressed, decompressed) + } + #[cfg(not(feature = "snappy-compression"))] + { + panic!("snappy-compression feature flag not activated"); + } + } + Self::Zstd => { + #[cfg(feature = "zstd-compression")] + { + super::compression_zstd_block::decompress(compressed, decompressed) + } + #[cfg(not(feature = "zstd-compression"))] + { + panic!("zstd-compression feature flag not activated"); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::store::Compressor; + + #[test] + fn compressor_decompressor_id_test() { + assert_eq!(Decompressor::from(Compressor::None), Decompressor::None); + assert_eq!(Decompressor::from(Compressor::Lz4), Decompressor::Lz4); + assert_eq!(Decompressor::from(Compressor::Brotli), Decompressor::Brotli); + assert_eq!(Decompressor::from(Compressor::Snappy), Decompressor::Snappy); + assert_eq!( + Decompressor::from(Compressor::Zstd(Default::default())), + Decompressor::Zstd + ); + } +} diff --git a/src/store/footer.rs b/src/store/footer.rs index 102fd675b8..880c1e2d2c 100644 --- a/src/store/footer.rs +++ b/src/store/footer.rs @@ -2,13 +2,13 @@ use std::io; use common::{BinarySerializable, FixedSize, HasLen}; +use super::Decompressor; use crate::directory::FileSlice; -use crate::store::Compressor; #[derive(Debug, Clone, PartialEq)] pub struct DocStoreFooter { pub offset: u64, - pub compressor: Compressor, + pub decompressor: Decompressor, } /// Serialises the footer to a byte-array @@ -18,7 +18,7 @@ pub struct DocStoreFooter { impl BinarySerializable for DocStoreFooter { fn serialize(&self, writer: &mut W) -> io::Result<()> { BinarySerializable::serialize(&self.offset, writer)?; - BinarySerializable::serialize(&self.compressor.get_id(), writer)?; + BinarySerializable::serialize(&self.decompressor.get_id(), writer)?; writer.write_all(&[0; 15])?; Ok(()) } @@ -30,7 +30,7 @@ impl BinarySerializable for DocStoreFooter { reader.read_exact(&mut skip_buf)?; Ok(DocStoreFooter { offset, - compressor: Compressor::from_id(compressor_id), + decompressor: Decompressor::from_id(compressor_id), }) } } @@ -40,8 +40,11 @@ impl FixedSize for DocStoreFooter { } impl DocStoreFooter { - pub fn new(offset: u64, compressor: Compressor) -> Self { - DocStoreFooter { offset, compressor } + pub fn new(offset: u64, decompressor: Decompressor) -> Self { + DocStoreFooter { + offset, + decompressor, + } } pub fn extract_footer(file: FileSlice) -> io::Result<(DocStoreFooter, FileSlice)> { diff --git a/src/store/mod.rs b/src/store/mod.rs index 1fcec19091..88ef9b579e 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -33,11 +33,13 @@ //! ! mod compressors; +mod decompressors; mod footer; mod index; mod reader; mod writer; -pub use self::compressors::Compressor; +pub use self::compressors::{Compressor, ZstdCompressor}; +pub use self::decompressors::Decompressor; pub use self::reader::StoreReader; pub use self::writer::StoreWriter; @@ -196,7 +198,7 @@ pub mod tests { #[cfg(feature = "zstd-compression")] #[test] fn test_store_zstd() -> crate::Result<()> { - test_store(Compressor::Zstd, BLOCK_SIZE) + test_store(Compressor::Zstd(ZstdCompressor::default()), BLOCK_SIZE) } #[test] @@ -267,8 +269,8 @@ pub mod tests { index.reader().unwrap().searcher().segment_readers()[0] .get_store_reader() .unwrap() - .compressor(), - Compressor::Lz4 + .decompressor(), + Decompressor::Lz4 ); // Change compressor, this disables stacking on merging let index_settings = index.settings_mut(); @@ -294,7 +296,7 @@ pub mod tests { LOREM.to_string() ); } - assert_eq!(store.compressor(), Compressor::Snappy); + assert_eq!(store.decompressor(), Decompressor::Snappy); Ok(()) } diff --git a/src/store/reader.rs b/src/store/reader.rs index 3cb4f038db..e2bd7ea912 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -8,7 +8,7 @@ use ownedbytes::OwnedBytes; use super::footer::DocStoreFooter; use super::index::SkipIndex; -use super::Compressor; +use super::Decompressor; use crate::directory::FileSlice; use crate::error::DataCorruption; use crate::fastfield::AliveBitSet; @@ -23,7 +23,7 @@ type Block = OwnedBytes; /// Reads document off tantivy's [`Store`](./index.html) pub struct StoreReader { - compressor: Compressor, + decompressor: Decompressor, data: FileSlice, skip_index: Arc, space_usage: StoreSpaceUsage, @@ -87,7 +87,7 @@ impl StoreReader { let space_usage = StoreSpaceUsage::new(data_file.len(), offset_index_file.len()); let skip_index = SkipIndex::open(index_data); Ok(StoreReader { - compressor: footer.compressor, + decompressor: footer.decompressor, data: data_file, cache: BlockCache { cache: Mutex::new(LruCache::new(LRU_CACHE_CAPACITY)), @@ -103,8 +103,8 @@ impl StoreReader { self.skip_index.checkpoints() } - pub(crate) fn compressor(&self) -> Compressor { - self.compressor + pub(crate) fn decompressor(&self) -> Decompressor { + self.decompressor } /// Returns the cache hit and miss statistics of the store reader. @@ -141,7 +141,7 @@ impl StoreReader { let compressed_block = self.get_compressed_block(checkpoint)?; let decompressed_block = - OwnedBytes::new(self.compressor.decompress(compressed_block.as_ref())?); + OwnedBytes::new(self.decompressor.decompress(compressed_block.as_ref())?); self.cache .put_into_cache(cache_key, decompressed_block.clone()); @@ -321,7 +321,7 @@ impl StoreReader { .await?; let decompressed_block = - OwnedBytes::new(self.compressor.decompress(compressed_block.as_ref())?); + OwnedBytes::new(self.decompressor.decompress(compressed_block.as_ref())?); self.cache .put_into_cache(cache_key, decompressed_block.clone()); @@ -351,6 +351,7 @@ mod tests { use crate::directory::RamDirectory; use crate::schema::{Document, Field}; use crate::store::tests::write_lorem_ipsum_store; + use crate::store::Compressor; use crate::Directory; const BLOCK_SIZE: usize = 16_384; diff --git a/src/store/writer.rs b/src/store/writer.rs index cd53d0d28b..a351d0fcbc 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -5,7 +5,7 @@ use common::{BinarySerializable, CountingWriter, VInt}; use super::compressors::Compressor; use super::footer::DocStoreFooter; use super::index::SkipIndexBuilder; -use super::StoreReader; +use super::{Decompressor, StoreReader}; use crate::directory::{TerminatingWrite, WritePtr}; use crate::schema::Document; use crate::store::index::Checkpoint; @@ -152,7 +152,7 @@ impl StoreWriter { self.write_and_compress_block()?; } let header_offset: u64 = self.writer.written_bytes() as u64; - let footer = DocStoreFooter::new(header_offset, self.compressor); + let footer = DocStoreFooter::new(header_offset, Decompressor::from(self.compressor)); self.offset_index_writer.write(&mut self.writer)?; footer.serialize(&mut self.writer)?; self.writer.terminate()