Skip to content

Commit

Permalink
enable setting compression level
Browse files Browse the repository at this point in the history
  • Loading branch information
PSeitz committed May 30, 2022
1 parent f0a2b1c commit 4328b96
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 10 deletions.
5 changes: 5 additions & 0 deletions src/core/index_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ pub struct IndexSettings {
#[serde(default = "default_docstore_blocksize")]
/// The size of each block that will be compressed and written to disk
pub docstore_blocksize: usize,
#[serde(default)]
/// The compression level, which will be forwarded to the underlying compressor (if
/// applicaple).
pub docstore_compression_level: Option<i32>,
}

/// Must be a function to be compatible with serde defaults
Expand All @@ -264,6 +268,7 @@ impl Default for IndexSettings {
sort_by_field: None,
docstore_compression: Compressor::default(),
docstore_blocksize: default_docstore_blocksize(),
docstore_compression_level: None,
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/indexer/segment_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ impl SegmentSerializer {
let postings_serializer = InvertedIndexSerializer::open(&mut segment)?;
let compressor = segment.index().settings().docstore_compression;
let blocksize = segment.index().settings().docstore_blocksize;
let compression_level = segment.index().settings().docstore_compression_level;
Ok(SegmentSerializer {
segment,
store_writer: StoreWriter::new(store_write, compressor, blocksize),
store_writer: StoreWriter::new(store_write, compressor, blocksize, compression_level),
fast_field_serializer,
fieldnorms_serializer: Some(fieldnorms_serializer),
postings_serializer,
Expand Down
7 changes: 6 additions & 1 deletion src/indexer/segment_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,14 @@ fn remap_and_write(
.open_write(SegmentComponent::Store)?;
let compressor = serializer.segment().index().settings().docstore_compression;
let block_size = serializer.segment().index().settings().docstore_blocksize;
let compression_level = serializer
.segment()
.index()
.settings()
.docstore_compression_level;
let old_store_writer = std::mem::replace(
&mut serializer.store_writer,
StoreWriter::new(store_write, compressor, block_size),
StoreWriter::new(store_write, compressor, block_size, compression_level),
);
old_store_writer.close()?;
let store_read = StoreReader::open(
Expand Down
8 changes: 6 additions & 2 deletions src/store/compression_zstd_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ use zstd::bulk::{compress_to_buffer, decompress_to_buffer};
use zstd::DEFAULT_COMPRESSION_LEVEL;

#[inline]
pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
pub fn compress(
uncompressed: &[u8],
compressed: &mut Vec<u8>,
compression_level: Option<i32>,
) -> io::Result<()> {
let count_size = std::mem::size_of::<u32>();
let max_size = zstd::zstd_safe::compress_bound(uncompressed.len()) + count_size;

Expand All @@ -14,7 +18,7 @@ pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()>
let compressed_size = compress_to_buffer(
uncompressed,
&mut compressed[count_size..],
DEFAULT_COMPRESSION_LEVEL,
compression_level.unwrap_or(DEFAULT_COMPRESSION_LEVEL),
)?;

compressed[0..count_size].copy_from_slice(&(uncompressed.len() as u32).to_le_bytes());
Expand Down
13 changes: 11 additions & 2 deletions src/store/compressors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ impl Compressor {
}
}
#[inline]
pub(crate) fn compress(&self, uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
pub(crate) fn compress(
&self,
uncompressed: &[u8],
compressed: &mut Vec<u8>,
_compression_level: Option<i32>,
) -> io::Result<()> {
match self {
Self::None => {
compressed.clear();
Expand Down Expand Up @@ -108,7 +113,11 @@ impl Compressor {
Self::Zstd => {
#[cfg(feature = "zstd-compression")]
{
super::compression_zstd_block::compress(uncompressed, compressed)
super::compression_zstd_block::compress(
uncompressed,
compressed,
_compression_level,
)
}
#[cfg(not(feature = "zstd-compression"))]
{
Expand Down
2 changes: 1 addition & 1 deletion src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub mod tests {
schema_builder.add_text_field("title", TextOptions::default().set_stored());
let schema = schema_builder.build();
{
let mut store_writer = StoreWriter::new(writer, compressor, blocksize);
let mut store_writer = StoreWriter::new(writer, compressor, blocksize, None);
for i in 0..num_docs {
let mut doc = Document::default();
doc.add_field_value(field_body, LOREM.to_string());
Expand Down
16 changes: 13 additions & 3 deletions src/store/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::DocId;
pub struct StoreWriter {
compressor: Compressor,
block_size: usize,
compression_level: Option<i32>,
doc: DocId,
first_doc_in_block: DocId,
offset_index_writer: SkipIndexBuilder,
Expand All @@ -34,10 +35,16 @@ impl StoreWriter {
///
/// The store writer will writes blocks on disc as
/// document are added.
pub fn new(writer: WritePtr, compressor: Compressor, block_size: usize) -> StoreWriter {
pub fn new(
writer: WritePtr,
compressor: Compressor,
block_size: usize,
compression_level: Option<i32>,
) -> StoreWriter {
StoreWriter {
compressor,
block_size,
compression_level,
doc: 0,
first_doc_in_block: 0,
offset_index_writer: SkipIndexBuilder::new(),
Expand Down Expand Up @@ -129,8 +136,11 @@ impl StoreWriter {
fn write_and_compress_block(&mut self) -> io::Result<()> {
assert!(self.doc > 0);
self.intermediary_buffer.clear();
self.compressor
.compress(&self.current_block[..], &mut self.intermediary_buffer)?;
self.compressor.compress(
&self.current_block[..],
&mut self.intermediary_buffer,
self.compression_level,
)?;
let start_offset = self.writer.written_bytes() as usize;
self.writer.write_all(&self.intermediary_buffer)?;
let end_offset = self.writer.written_bytes() as usize;
Expand Down

0 comments on commit 4328b96

Please sign in to comment.