Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change format for store to make it faster with small documents #1569

Merged
merged 3 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 39 additions & 52 deletions src/store/reader.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::io;
use std::iter::Sum;
use std::ops::AddAssign;
use std::ops::{AddAssign, Range};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};

use common::{BinarySerializable, HasLen, VInt};
use common::{BinarySerializable, HasLen};
use lru::LruCache;
use ownedbytes::OwnedBytes;

Expand Down Expand Up @@ -211,17 +211,10 @@ impl StoreReader {
doc_id: DocId,
checkpoint: &Checkpoint,
) -> crate::Result<OwnedBytes> {
let mut cursor = &block[..];
let cursor_len_before = cursor.len();
for _ in checkpoint.doc_range.start..doc_id {
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
cursor = &cursor[doc_length..];
}
let doc_pos = doc_id - checkpoint.doc_range.start;

let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
let start_pos = cursor_len_before - cursor.len();
let end_pos = cursor_len_before - cursor.len() + doc_length;
Ok(block.slice(start_pos..end_pos))
let range = block_read_index(&block, doc_pos)?;
Ok(block.slice(range))
}

/// Iterator over all Documents in their order as they are stored in the doc store.
Expand Down Expand Up @@ -254,9 +247,7 @@ impl StoreReader {
let mut curr_block = curr_checkpoint
.as_ref()
.map(|checkpoint| self.read_block(checkpoint).map_err(|e| e.kind())); // map error in order to enable cloning
let mut block_start_pos = 0;
let mut num_skipped = 0;
let mut reset_block_pos = false;
let mut doc_pos = 0;
(0..last_doc_id)
.filter_map(move |doc_id| {
// filter_map is only used to resolve lifetime issues between the two closures on
Expand All @@ -268,24 +259,19 @@ impl StoreReader {
curr_block = curr_checkpoint
.as_ref()
.map(|checkpoint| self.read_block(checkpoint).map_err(|e| e.kind()));
reset_block_pos = true;
num_skipped = 0;
doc_pos = 0;
}

let alive = alive_bitset.map_or(true, |bitset| bitset.is_alive(doc_id));
if alive {
let ret = Some((curr_block.clone(), num_skipped, reset_block_pos));
// the map block will move over the num_skipped, so we reset to 0
num_skipped = 0;
reset_block_pos = false;
ret
let res = if alive {
Some((curr_block.clone(), doc_pos))
} else {
// we keep the number of skipped documents to move forward in the map block
num_skipped += 1;
None
}
};
doc_pos += 1;
res
})
.map(move |(block, num_skipped, reset_block_pos)| {
.map(move |(block, doc_pos)| {
let block = block
.ok_or_else(|| {
DataCorruption::comment_only(
Expand All @@ -296,30 +282,9 @@ impl StoreReader {
.map_err(|error_kind| {
std::io::Error::new(error_kind, "error when reading block in doc store")
})?;
// this flag is set, when filter_map moved to the next block
if reset_block_pos {
block_start_pos = 0;
}
let mut cursor = &block[block_start_pos..];
let mut pos = 0;
// move forward 1 doc + num_skipped in block and return length of current doc
let doc_length = loop {
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
let num_bytes_read = block[block_start_pos..].len() - cursor.len();
block_start_pos += num_bytes_read;

pos += 1;
if pos == num_skipped + 1 {
break doc_length;
} else {
block_start_pos += doc_length;
cursor = &block[block_start_pos..];
}
};
let end_pos = block_start_pos + doc_length;
let doc_bytes = block.slice(block_start_pos..end_pos);
block_start_pos = end_pos;
Ok(doc_bytes)

let range = block_read_index(&block, doc_pos)?;
Ok(block.slice(range))
})
}

Expand All @@ -329,6 +294,28 @@ impl StoreReader {
}
}

fn block_read_index(block: &[u8], doc_pos: u32) -> crate::Result<Range<usize>> {
let doc_pos = doc_pos as usize;
let size_of_u32 = std::mem::size_of::<u32>();

let index_len_pos = block.len() - size_of_u32;
let index_len = u32::deserialize(&mut &block[index_len_pos..])? as usize;

if doc_pos > index_len {
return Err(crate::TantivyError::InternalError(
"Attempted to read doc from wrong block".to_owned(),
));
}

let index_start = block.len() - (index_len + 1) * size_of_u32;
let index = &block[index_start..index_start + index_len * size_of_u32];

let start_offset = u32::deserialize(&mut &index[doc_pos * size_of_u32..])? as usize;
let end_offset = u32::deserialize(&mut &index[(doc_pos + 1) * size_of_u32..])
.unwrap_or(index_start as u32) as usize;
Ok(start_offset..end_offset)
}

#[cfg(feature = "quickwit")]
impl StoreReader {
/// Advanced API.
Expand Down Expand Up @@ -427,7 +414,7 @@ mod tests {
assert_eq!(store.cache_stats().cache_hits, 1);
assert_eq!(store.cache_stats().cache_misses, 2);

assert_eq!(store.cache.peek_lru(), Some(9210));
assert_eq!(store.cache.peek_lru(), Some(11163));

Ok(())
}
Expand Down
30 changes: 23 additions & 7 deletions src/store/writer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::io::{self, Write};

use common::{BinarySerializable, VInt};
use common::BinarySerializable;

use super::compressors::Compressor;
use super::StoreReader;
Expand All @@ -22,6 +22,7 @@ pub struct StoreWriter {
num_docs_in_current_block: DocId,
intermediary_buffer: Vec<u8>,
current_block: Vec<u8>,
doc_pos: Vec<u32>,
block_compressor: BlockCompressor,
}

Expand All @@ -42,6 +43,7 @@ impl StoreWriter {
block_size,
num_docs_in_current_block: 0,
intermediary_buffer: Vec::new(),
doc_pos: Vec::new(),
current_block: Vec::new(),
block_compressor,
})
Expand All @@ -53,12 +55,17 @@ impl StoreWriter {

/// The memory used (inclusive childs)
pub fn mem_usage(&self) -> usize {
self.intermediary_buffer.capacity() + self.current_block.capacity()
self.intermediary_buffer.capacity()
+ self.current_block.capacity()
+ self.doc_pos.capacity() * std::mem::size_of::<u32>()
}

/// Checks if the current block is full, and if so, compresses and flushes it.
fn check_flush_block(&mut self) -> io::Result<()> {
if self.current_block.len() > self.block_size {
// this does not count the VInt storing the index lenght itself, but it is negligible in
// front of everything else.
let index_len = self.doc_pos.len() * std::mem::size_of::<usize>();
if self.current_block.len() + index_len > self.block_size {
self.send_current_block_to_compressor()?;
}
Ok(())
Expand All @@ -70,8 +77,19 @@ impl StoreWriter {
if self.current_block.is_empty() {
return Ok(());
}

let size_of_u32 = std::mem::size_of::<u32>();
self.current_block
.reserve((self.doc_pos.len() + 1) * size_of_u32);

for pos in self.doc_pos.iter() {
pos.serialize(&mut self.current_block)?;
}
(self.doc_pos.len() as u32).serialize(&mut self.current_block)?;
Comment on lines +81 to +88
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would bitpack them here, so the cost per doc is not fixed to 4bytes but depends on the block size (e.g. 3 bytes for 2MB blocks)

Usage is very simple, see bitpacked.rs

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's stick to this.


self.block_compressor
.compress_block_and_write(&self.current_block, self.num_docs_in_current_block)?;
self.doc_pos.clear();
self.current_block.clear();
self.num_docs_in_current_block = 0;
Ok(())
Expand All @@ -87,8 +105,7 @@ impl StoreWriter {
// calling store bytes would be preferable for code reuse, but then we can't use
// intermediary_buffer due to the borrow checker
// a new buffer costs ~1% indexing performance
let doc_num_bytes = self.intermediary_buffer.len();
VInt(doc_num_bytes as u64).serialize_into_vec(&mut self.current_block);
self.doc_pos.push(self.current_block.len() as u32);
self.current_block
.write_all(&self.intermediary_buffer[..])?;
self.num_docs_in_current_block += 1;
Expand All @@ -101,8 +118,7 @@ impl StoreWriter {
/// The document id is implicitly the current number
/// of documents.
pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> {
let doc_num_bytes = serialized_document.len();
VInt(doc_num_bytes as u64).serialize_into_vec(&mut self.current_block);
self.doc_pos.push(self.current_block.len() as u32);
self.current_block.extend_from_slice(serialized_document);
self.num_docs_in_current_block += 1;
self.check_flush_block()?;
Expand Down