diff --git a/src/core/index_meta.rs b/src/core/index_meta.rs index 1343c8d51b..5a09788113 100644 --- a/src/core/index_meta.rs +++ b/src/core/index_meta.rs @@ -251,6 +251,10 @@ pub struct IndexSettings { #[serde(default = "default_docstore_blocksize")] /// The size of each block that will be compressed and written to disk pub docstore_blocksize: usize, + #[serde(default)] + /// The compression level, which will be forwarded to the underlying compressor (if + /// applicaple). + pub docstore_compression_level: Option, } /// Must be a function to be compatible with serde defaults @@ -264,6 +268,7 @@ impl Default for IndexSettings { sort_by_field: None, docstore_compression: Compressor::default(), docstore_blocksize: default_docstore_blocksize(), + docstore_compression_level: None, } } } diff --git a/src/indexer/segment_serializer.rs b/src/indexer/segment_serializer.rs index 554503e668..891fdcaa32 100644 --- a/src/indexer/segment_serializer.rs +++ b/src/indexer/segment_serializer.rs @@ -40,9 +40,10 @@ impl SegmentSerializer { let postings_serializer = InvertedIndexSerializer::open(&mut segment)?; let compressor = segment.index().settings().docstore_compression; let blocksize = segment.index().settings().docstore_blocksize; + let compression_level = segment.index().settings().docstore_compression_level; Ok(SegmentSerializer { segment, - store_writer: StoreWriter::new(store_write, compressor, blocksize), + store_writer: StoreWriter::new(store_write, compressor, blocksize, compression_level), fast_field_serializer, fieldnorms_serializer: Some(fieldnorms_serializer), postings_serializer, diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index c1ae1c6e88..dac596df94 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -373,9 +373,14 @@ fn remap_and_write( .open_write(SegmentComponent::Store)?; let compressor = serializer.segment().index().settings().docstore_compression; let block_size = serializer.segment().index().settings().docstore_blocksize; + let compression_level = serializer + .segment() + .index() + .settings() + .docstore_compression_level; let old_store_writer = std::mem::replace( &mut serializer.store_writer, - StoreWriter::new(store_write, compressor, block_size), + StoreWriter::new(store_write, compressor, block_size, compression_level), ); old_store_writer.close()?; let store_read = StoreReader::open( diff --git a/src/store/compression_zstd_block.rs b/src/store/compression_zstd_block.rs index 6879cc2fbd..7ef0aa9df1 100644 --- a/src/store/compression_zstd_block.rs +++ b/src/store/compression_zstd_block.rs @@ -4,7 +4,11 @@ use zstd::bulk::{compress_to_buffer, decompress_to_buffer}; use zstd::DEFAULT_COMPRESSION_LEVEL; #[inline] -pub fn compress(uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()> { +pub fn compress( + uncompressed: &[u8], + compressed: &mut Vec, + compression_level: Option, +) -> io::Result<()> { let count_size = std::mem::size_of::(); let max_size = zstd::zstd_safe::compress_bound(uncompressed.len()) + count_size; @@ -14,7 +18,7 @@ pub fn compress(uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()> let compressed_size = compress_to_buffer( uncompressed, &mut compressed[count_size..], - DEFAULT_COMPRESSION_LEVEL, + compression_level.unwrap_or(DEFAULT_COMPRESSION_LEVEL), )?; compressed[0..count_size].copy_from_slice(&(uncompressed.len() as u32).to_le_bytes()); diff --git a/src/store/compressors.rs b/src/store/compressors.rs index 2e3ae5293e..b2482b93e7 100644 --- a/src/store/compressors.rs +++ b/src/store/compressors.rs @@ -68,7 +68,12 @@ impl Compressor { } } #[inline] - pub(crate) fn compress(&self, uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()> { + pub(crate) fn compress( + &self, + uncompressed: &[u8], + compressed: &mut Vec, + _compression_level: Option, + ) -> io::Result<()> { match self { Self::None => { compressed.clear(); @@ -108,7 +113,11 @@ impl Compressor { Self::Zstd => { #[cfg(feature = "zstd-compression")] { - super::compression_zstd_block::compress(uncompressed, compressed) + super::compression_zstd_block::compress( + uncompressed, + compressed, + _compression_level, + ) } #[cfg(not(feature = "zstd-compression"))] { diff --git a/src/store/mod.rs b/src/store/mod.rs index 1fcec19091..5efd22cec6 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -86,7 +86,7 @@ pub mod tests { schema_builder.add_text_field("title", TextOptions::default().set_stored()); let schema = schema_builder.build(); { - let mut store_writer = StoreWriter::new(writer, compressor, blocksize); + let mut store_writer = StoreWriter::new(writer, compressor, blocksize, None); for i in 0..num_docs { let mut doc = Document::default(); doc.add_field_value(field_body, LOREM.to_string()); diff --git a/src/store/writer.rs b/src/store/writer.rs index 96864c75bb..bc5abb0a57 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -21,6 +21,7 @@ use crate::DocId; pub struct StoreWriter { compressor: Compressor, block_size: usize, + compression_level: Option, doc: DocId, first_doc_in_block: DocId, offset_index_writer: SkipIndexBuilder, @@ -34,10 +35,16 @@ impl StoreWriter { /// /// The store writer will writes blocks on disc as /// document are added. - pub fn new(writer: WritePtr, compressor: Compressor, block_size: usize) -> StoreWriter { + pub fn new( + writer: WritePtr, + compressor: Compressor, + block_size: usize, + compression_level: Option, + ) -> StoreWriter { StoreWriter { compressor, block_size, + compression_level, doc: 0, first_doc_in_block: 0, offset_index_writer: SkipIndexBuilder::new(), @@ -129,8 +136,11 @@ impl StoreWriter { fn write_and_compress_block(&mut self) -> io::Result<()> { assert!(self.doc > 0); self.intermediary_buffer.clear(); - self.compressor - .compress(&self.current_block[..], &mut self.intermediary_buffer)?; + self.compressor.compress( + &self.current_block[..], + &mut self.intermediary_buffer, + self.compression_level, + )?; let start_offset = self.writer.written_bytes() as usize; self.writer.write_all(&self.intermediary_buffer)?; let end_offset = self.writer.written_bytes() as usize;