-
-
Notifications
You must be signed in to change notification settings - Fork 716
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
change format for store to make it faster with small documents #1569
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,10 +1,10 @@ | ||||||||||||
use std::io; | ||||||||||||
use std::iter::Sum; | ||||||||||||
use std::ops::AddAssign; | ||||||||||||
use std::ops::{AddAssign, Range}; | ||||||||||||
use std::sync::atomic::{AtomicUsize, Ordering}; | ||||||||||||
use std::sync::{Arc, Mutex}; | ||||||||||||
|
||||||||||||
use common::{BinarySerializable, HasLen, VInt}; | ||||||||||||
use common::{BinarySerializable, HasLen}; | ||||||||||||
use lru::LruCache; | ||||||||||||
use ownedbytes::OwnedBytes; | ||||||||||||
|
||||||||||||
|
@@ -211,17 +211,10 @@ impl StoreReader { | |||||||||||
doc_id: DocId, | ||||||||||||
checkpoint: &Checkpoint, | ||||||||||||
) -> crate::Result<OwnedBytes> { | ||||||||||||
let mut cursor = &block[..]; | ||||||||||||
let cursor_len_before = cursor.len(); | ||||||||||||
for _ in checkpoint.doc_range.start..doc_id { | ||||||||||||
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize; | ||||||||||||
cursor = &cursor[doc_length..]; | ||||||||||||
} | ||||||||||||
let doc_pos = doc_id - checkpoint.doc_range.start; | ||||||||||||
|
||||||||||||
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize; | ||||||||||||
let start_pos = cursor_len_before - cursor.len(); | ||||||||||||
let end_pos = cursor_len_before - cursor.len() + doc_length; | ||||||||||||
Ok(block.slice(start_pos..end_pos)) | ||||||||||||
let range = block_read_index(&block, doc_pos)?; | ||||||||||||
Ok(block.slice(range)) | ||||||||||||
} | ||||||||||||
|
||||||||||||
/// Iterator over all Documents in their order as they are stored in the doc store. | ||||||||||||
|
@@ -254,9 +247,7 @@ impl StoreReader { | |||||||||||
let mut curr_block = curr_checkpoint | ||||||||||||
.as_ref() | ||||||||||||
.map(|checkpoint| self.read_block(checkpoint).map_err(|e| e.kind())); // map error in order to enable cloning | ||||||||||||
let mut block_start_pos = 0; | ||||||||||||
let mut num_skipped = 0; | ||||||||||||
let mut reset_block_pos = false; | ||||||||||||
let mut doc_pos = 0; | ||||||||||||
(0..last_doc_id) | ||||||||||||
.filter_map(move |doc_id| { | ||||||||||||
// filter_map is only used to resolve lifetime issues between the two closures on | ||||||||||||
|
@@ -268,24 +259,19 @@ impl StoreReader { | |||||||||||
curr_block = curr_checkpoint | ||||||||||||
.as_ref() | ||||||||||||
.map(|checkpoint| self.read_block(checkpoint).map_err(|e| e.kind())); | ||||||||||||
reset_block_pos = true; | ||||||||||||
num_skipped = 0; | ||||||||||||
doc_pos = 0; | ||||||||||||
} | ||||||||||||
|
||||||||||||
let alive = alive_bitset.map_or(true, |bitset| bitset.is_alive(doc_id)); | ||||||||||||
if alive { | ||||||||||||
let ret = Some((curr_block.clone(), num_skipped, reset_block_pos)); | ||||||||||||
// the map block will move over the num_skipped, so we reset to 0 | ||||||||||||
num_skipped = 0; | ||||||||||||
reset_block_pos = false; | ||||||||||||
ret | ||||||||||||
let res = if alive { | ||||||||||||
Some((curr_block.clone(), doc_pos)) | ||||||||||||
} else { | ||||||||||||
// we keep the number of skipped documents to move forward in the map block | ||||||||||||
num_skipped += 1; | ||||||||||||
None | ||||||||||||
} | ||||||||||||
}; | ||||||||||||
doc_pos += 1; | ||||||||||||
res | ||||||||||||
}) | ||||||||||||
.map(move |(block, num_skipped, reset_block_pos)| { | ||||||||||||
.map(move |(block, doc_pos)| { | ||||||||||||
let block = block | ||||||||||||
.ok_or_else(|| { | ||||||||||||
DataCorruption::comment_only( | ||||||||||||
|
@@ -296,30 +282,9 @@ impl StoreReader { | |||||||||||
.map_err(|error_kind| { | ||||||||||||
std::io::Error::new(error_kind, "error when reading block in doc store") | ||||||||||||
})?; | ||||||||||||
// this flag is set, when filter_map moved to the next block | ||||||||||||
if reset_block_pos { | ||||||||||||
block_start_pos = 0; | ||||||||||||
} | ||||||||||||
let mut cursor = &block[block_start_pos..]; | ||||||||||||
let mut pos = 0; | ||||||||||||
// move forward 1 doc + num_skipped in block and return length of current doc | ||||||||||||
let doc_length = loop { | ||||||||||||
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize; | ||||||||||||
let num_bytes_read = block[block_start_pos..].len() - cursor.len(); | ||||||||||||
block_start_pos += num_bytes_read; | ||||||||||||
|
||||||||||||
pos += 1; | ||||||||||||
if pos == num_skipped + 1 { | ||||||||||||
break doc_length; | ||||||||||||
} else { | ||||||||||||
block_start_pos += doc_length; | ||||||||||||
cursor = &block[block_start_pos..]; | ||||||||||||
} | ||||||||||||
}; | ||||||||||||
let end_pos = block_start_pos + doc_length; | ||||||||||||
let doc_bytes = block.slice(block_start_pos..end_pos); | ||||||||||||
block_start_pos = end_pos; | ||||||||||||
Ok(doc_bytes) | ||||||||||||
|
||||||||||||
let range = block_read_index(&block, doc_pos)?; | ||||||||||||
Ok(block.slice(range)) | ||||||||||||
}) | ||||||||||||
} | ||||||||||||
|
||||||||||||
|
@@ -329,6 +294,31 @@ impl StoreReader { | |||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
fn block_read_index(block: &[u8], doc_pos: u32) -> crate::Result<Range<usize>> { | ||||||||||||
let doc_pos = doc_pos as usize; | ||||||||||||
let size_of_u32 = std::mem::size_of::<u32>(); | ||||||||||||
|
||||||||||||
let index_len_pos = block.len() - size_of_u32; | ||||||||||||
let index_len = u32::deserialize(&mut &block[index_len_pos..])? as usize; | ||||||||||||
|
||||||||||||
if doc_pos > index_len { | ||||||||||||
return Err(crate::TantivyError::InternalError( | ||||||||||||
"Attempted to read doc from wrong block".to_owned(), | ||||||||||||
)); | ||||||||||||
} | ||||||||||||
|
||||||||||||
let index_start = block.len() - (index_len + 1) * size_of_u32; | ||||||||||||
let index = &block[index_start..index_start + index_len * size_of_u32]; | ||||||||||||
|
||||||||||||
let start_pos = u32::deserialize(&mut &index[doc_pos * size_of_u32..])? as usize; | ||||||||||||
let end_pos = u32::deserialize(&mut &index[(doc_pos + 1) * size_of_u32..]) | ||||||||||||
.unwrap_or(index_start as u32) as usize; | ||||||||||||
Ok(Range { | ||||||||||||
start: start_pos, | ||||||||||||
end: end_pos, | ||||||||||||
}) | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nitpick
Suggested change
|
||||||||||||
} | ||||||||||||
|
||||||||||||
#[cfg(feature = "quickwit")] | ||||||||||||
impl StoreReader { | ||||||||||||
/// Advanced API. | ||||||||||||
|
@@ -427,7 +417,7 @@ mod tests { | |||||||||||
assert_eq!(store.cache_stats().cache_hits, 1); | ||||||||||||
assert_eq!(store.cache_stats().cache_misses, 2); | ||||||||||||
|
||||||||||||
assert_eq!(store.cache.peek_lru(), Some(9210)); | ||||||||||||
assert_eq!(store.cache.peek_lru(), Some(11163)); | ||||||||||||
|
||||||||||||
Ok(()) | ||||||||||||
} | ||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
use std::io::{self, Write}; | ||
|
||
use common::{BinarySerializable, VInt}; | ||
use common::BinarySerializable; | ||
|
||
use super::compressors::Compressor; | ||
use super::StoreReader; | ||
|
@@ -22,6 +22,7 @@ pub struct StoreWriter { | |
num_docs_in_current_block: DocId, | ||
intermediary_buffer: Vec<u8>, | ||
current_block: Vec<u8>, | ||
doc_pos: Vec<u32>, | ||
block_compressor: BlockCompressor, | ||
} | ||
|
||
|
@@ -42,6 +43,7 @@ impl StoreWriter { | |
block_size, | ||
num_docs_in_current_block: 0, | ||
intermediary_buffer: Vec::new(), | ||
doc_pos: Vec::new(), | ||
current_block: Vec::new(), | ||
block_compressor, | ||
}) | ||
|
@@ -53,12 +55,17 @@ impl StoreWriter { | |
|
||
/// The memory used (inclusive childs) | ||
pub fn mem_usage(&self) -> usize { | ||
self.intermediary_buffer.capacity() + self.current_block.capacity() | ||
self.intermediary_buffer.capacity() | ||
+ self.current_block.capacity() | ||
+ self.doc_pos.capacity() * std::mem::size_of::<u32>() | ||
} | ||
|
||
/// Checks if the current block is full, and if so, compresses and flushes it. | ||
fn check_flush_block(&mut self) -> io::Result<()> { | ||
if self.current_block.len() > self.block_size { | ||
// this does not count the VInt storing the index lenght itself, but it is negligible in | ||
// front of everything else. | ||
let index_len = self.doc_pos.len() * std::mem::size_of::<usize>(); | ||
if self.current_block.len() + index_len > self.block_size { | ||
self.send_current_block_to_compressor()?; | ||
} | ||
Ok(()) | ||
|
@@ -70,8 +77,19 @@ impl StoreWriter { | |
if self.current_block.is_empty() { | ||
return Ok(()); | ||
} | ||
|
||
let size_of_u32 = std::mem::size_of::<u32>(); | ||
self.current_block | ||
.reserve((self.doc_pos.len() + 1) * size_of_u32); | ||
|
||
for pos in self.doc_pos.iter() { | ||
pos.serialize(&mut self.current_block)?; | ||
} | ||
(self.doc_pos.len() as u32).serialize(&mut self.current_block)?; | ||
Comment on lines
+81
to
+88
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would bitpack them here, so the cost per doc is not fixed to 4bytes but depends on the block size (e.g. 3 bytes for 2MB blocks) Usage is very simple, see bitpacked.rs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's stick to this. |
||
|
||
self.block_compressor | ||
.compress_block_and_write(&self.current_block, self.num_docs_in_current_block)?; | ||
self.doc_pos.clear(); | ||
self.current_block.clear(); | ||
self.num_docs_in_current_block = 0; | ||
Ok(()) | ||
|
@@ -87,8 +105,7 @@ impl StoreWriter { | |
// calling store bytes would be preferable for code reuse, but then we can't use | ||
// intermediary_buffer due to the borrow checker | ||
// a new buffer costs ~1% indexing performance | ||
let doc_num_bytes = self.intermediary_buffer.len(); | ||
VInt(doc_num_bytes as u64).serialize_into_vec(&mut self.current_block); | ||
self.doc_pos.push(self.current_block.len() as u32); | ||
self.current_block | ||
.write_all(&self.intermediary_buffer[..])?; | ||
self.num_docs_in_current_block += 1; | ||
|
@@ -101,8 +118,7 @@ impl StoreWriter { | |
/// The document id is implicitly the current number | ||
/// of documents. | ||
pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> { | ||
let doc_num_bytes = serialized_document.len(); | ||
VInt(doc_num_bytes as u64).serialize_into_vec(&mut self.current_block); | ||
self.doc_pos.push(self.current_block.len() as u32); | ||
self.current_block.extend_from_slice(serialized_document); | ||
self.num_docs_in_current_block += 1; | ||
self.check_flush_block()?; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_offset is imo better to name byte offsets