Skip to content

Commit

Permalink
split into compressor/decompressor
Browse files Browse the repository at this point in the history
use custom de/serializer for compressor
accept parameters like zstd(compression_level=5) as compressor
  • Loading branch information
PSeitz committed Jun 2, 2022
1 parent ed868f9 commit 4d9d2b6
Show file tree
Hide file tree
Showing 10 changed files with 381 additions and 138 deletions.
65 changes: 58 additions & 7 deletions src/core/index_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,6 @@ pub struct IndexSettings {
#[serde(default = "default_docstore_blocksize")]
/// The size of each block that will be compressed and written to disk
pub docstore_blocksize: usize,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
/// The compression level, which will be forwarded to the underlying compressor (if
/// applicaple).
pub docstore_compression_level: Option<i32>,
}

/// Must be a function to be compatible with serde defaults
Expand All @@ -269,7 +264,6 @@ impl Default for IndexSettings {
sort_by_field: None,
docstore_compression: Compressor::default(),
docstore_blocksize: default_docstore_blocksize(),
docstore_compression_level: None,
}
}
}
Expand Down Expand Up @@ -332,7 +326,7 @@ pub struct IndexMeta {
pub payload: Option<String>,
}

#[derive(Deserialize)]
#[derive(Deserialize, Debug)]
struct UntrackedIndexMeta {
pub segments: Vec<InnerSegmentMeta>,
#[serde(default)]
Expand Down Expand Up @@ -401,6 +395,7 @@ mod tests {
use super::IndexMeta;
use crate::core::index_meta::UntrackedIndexMeta;
use crate::schema::{Schema, TEXT};
use crate::store::ZstdCompressor;
use crate::{IndexSettings, IndexSortByField, Order};

#[test]
Expand Down Expand Up @@ -434,4 +429,60 @@ mod tests {
assert_eq!(index_metas.schema, deser_meta.schema);
assert_eq!(index_metas.opstamp, deser_meta.opstamp);
}

#[test]
fn test_serialize_metas_zstd_compressor() {
let schema = {
let mut schema_builder = Schema::builder();
schema_builder.add_text_field("text", TEXT);
schema_builder.build()
};
let index_metas = IndexMeta {
index_settings: IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "text".to_string(),
order: Order::Asc,
}),
docstore_compression: crate::store::Compressor::Zstd(ZstdCompressor {
compression_level: Some(4),
}),
docstore_blocksize: 1_000_000,
},
segments: Vec::new(),
schema,
opstamp: 0u64,
payload: None,
};
let json = serde_json::ser::to_string(&index_metas).expect("serialization failed");
assert_eq!(
json,
r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"zstd(compression_level=4)","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#
);

let deser_meta: UntrackedIndexMeta = serde_json::from_str(&json).unwrap();
assert_eq!(index_metas.index_settings, deser_meta.index_settings);
assert_eq!(index_metas.schema, deser_meta.schema);
assert_eq!(index_metas.opstamp, deser_meta.opstamp);
}

#[test]
fn test_serialize_metas_invalid_comp() {
let json = r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"zsstd","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#;

let err = serde_json::from_str::<UntrackedIndexMeta>(&json).unwrap_err();
assert_eq!(
err.to_string(),
"unknown variant `zsstd`, expected one of `none`, `lz4`, `brotli`, `snappy`, `zstd`, \
`zstd(compression_level=5)` at line 1 column 96"
.to_string()
);

let json = r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"zstd(bla=10)","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#;

let err = serde_json::from_str::<UntrackedIndexMeta>(&json).unwrap_err();
assert_eq!(
err.to_string(),
"unknown zstd option \"bla\" at line 1 column 103".to_string()
);
}
}
2 changes: 1 addition & 1 deletion src/indexer/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,7 @@ impl IndexMerger {
//
// take 7 in order to not walk over all checkpoints.
|| store_reader.block_checkpoints().take(7).count() < 6
|| store_reader.compressor() != store_writer.compressor()
|| store_reader.decompressor() != store_writer.compressor().into()
{
for doc_bytes_res in store_reader.iter_raw(reader.alive_bitset()) {
let doc_bytes = doc_bytes_res?;
Expand Down
3 changes: 1 addition & 2 deletions src/indexer/segment_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@ impl SegmentSerializer {
let postings_serializer = InvertedIndexSerializer::open(&mut segment)?;
let compressor = segment.index().settings().docstore_compression;
let blocksize = segment.index().settings().docstore_blocksize;
let compression_level = segment.index().settings().docstore_compression_level;
Ok(SegmentSerializer {
segment,
store_writer: StoreWriter::new(store_write, compressor, blocksize, compression_level),
store_writer: StoreWriter::new(store_write, compressor, blocksize),
fast_field_serializer,
fieldnorms_serializer: Some(fieldnorms_serializer),
postings_serializer,
Expand Down
7 changes: 1 addition & 6 deletions src/indexer/segment_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,14 +373,9 @@ fn remap_and_write(
.open_write(SegmentComponent::Store)?;
let compressor = serializer.segment().index().settings().docstore_compression;
let block_size = serializer.segment().index().settings().docstore_blocksize;
let compression_level = serializer
.segment()
.index()
.settings()
.docstore_compression_level;
let old_store_writer = std::mem::replace(
&mut serializer.store_writer,
StoreWriter::new(store_write, compressor, block_size, compression_level),
StoreWriter::new(store_write, compressor, block_size),
);
old_store_writer.close()?;
let store_read = StoreReader::open(
Expand Down
Loading

0 comments on commit 4d9d2b6

Please sign in to comment.