Skip to content

Commit

Permalink
Merge pull request #1378 from quickwit-oss/test_compression
Browse files Browse the repository at this point in the history
enable setting compression level
  • Loading branch information
PSeitz authored Jun 10, 2022
2 parents fc24842 + 4d9d2b6 commit 328bd96
Show file tree
Hide file tree
Showing 9 changed files with 384 additions and 110 deletions.
59 changes: 58 additions & 1 deletion src/core/index_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ pub struct IndexMeta {
pub payload: Option<String>,
}

#[derive(Deserialize)]
#[derive(Deserialize, Debug)]
struct UntrackedIndexMeta {
pub segments: Vec<InnerSegmentMeta>,
#[serde(default)]
Expand Down Expand Up @@ -395,6 +395,7 @@ mod tests {
use super::IndexMeta;
use crate::core::index_meta::UntrackedIndexMeta;
use crate::schema::{Schema, TEXT};
use crate::store::ZstdCompressor;
use crate::{IndexSettings, IndexSortByField, Order};

#[test]
Expand Down Expand Up @@ -428,4 +429,60 @@ mod tests {
assert_eq!(index_metas.schema, deser_meta.schema);
assert_eq!(index_metas.opstamp, deser_meta.opstamp);
}

#[test]
fn test_serialize_metas_zstd_compressor() {
let schema = {
let mut schema_builder = Schema::builder();
schema_builder.add_text_field("text", TEXT);
schema_builder.build()
};
let index_metas = IndexMeta {
index_settings: IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "text".to_string(),
order: Order::Asc,
}),
docstore_compression: crate::store::Compressor::Zstd(ZstdCompressor {
compression_level: Some(4),
}),
docstore_blocksize: 1_000_000,
},
segments: Vec::new(),
schema,
opstamp: 0u64,
payload: None,
};
let json = serde_json::ser::to_string(&index_metas).expect("serialization failed");
assert_eq!(
json,
r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"zstd(compression_level=4)","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#
);

let deser_meta: UntrackedIndexMeta = serde_json::from_str(&json).unwrap();
assert_eq!(index_metas.index_settings, deser_meta.index_settings);
assert_eq!(index_metas.schema, deser_meta.schema);
assert_eq!(index_metas.opstamp, deser_meta.opstamp);
}

#[test]
fn test_serialize_metas_invalid_comp() {
let json = r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"zsstd","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#;

let err = serde_json::from_str::<UntrackedIndexMeta>(&json).unwrap_err();
assert_eq!(
err.to_string(),
"unknown variant `zsstd`, expected one of `none`, `lz4`, `brotli`, `snappy`, `zstd`, \
`zstd(compression_level=5)` at line 1 column 96"
.to_string()
);

let json = r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"zstd(bla=10)","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#;

let err = serde_json::from_str::<UntrackedIndexMeta>(&json).unwrap_err();
assert_eq!(
err.to_string(),
"unknown zstd option \"bla\" at line 1 column 103".to_string()
);
}
}
2 changes: 1 addition & 1 deletion src/indexer/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,7 @@ impl IndexMerger {
//
// take 7 in order to not walk over all checkpoints.
|| store_reader.block_checkpoints().take(7).count() < 6
|| store_reader.compressor() != store_writer.compressor()
|| store_reader.decompressor() != store_writer.compressor().into()
{
for doc_bytes_res in store_reader.iter_raw(reader.alive_bitset()) {
let doc_bytes = doc_bytes_res?;
Expand Down
8 changes: 6 additions & 2 deletions src/store/compression_zstd_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ use zstd::bulk::{compress_to_buffer, decompress_to_buffer};
use zstd::DEFAULT_COMPRESSION_LEVEL;

#[inline]
pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
pub fn compress(
uncompressed: &[u8],
compressed: &mut Vec<u8>,
compression_level: Option<i32>,
) -> io::Result<()> {
let count_size = std::mem::size_of::<u32>();
let max_size = zstd::zstd_safe::compress_bound(uncompressed.len()) + count_size;

Expand All @@ -14,7 +18,7 @@ pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()>
let compressed_size = compress_to_buffer(
uncompressed,
&mut compressed[count_size..],
DEFAULT_COMPRESSION_LEVEL,
compression_level.unwrap_or(DEFAULT_COMPRESSION_LEVEL),
)?;

compressed[0..count_size].copy_from_slice(&(uncompressed.len() as u32).to_le_bytes());
Expand Down
Loading

0 comments on commit 328bd96

Please sign in to comment.