diff --git a/src/store/reader.rs b/src/store/reader.rs index 4daed9e589..92e9a15064 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -1,10 +1,10 @@ use std::io; use std::iter::Sum; -use std::ops::AddAssign; +use std::ops::{AddAssign, Range}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; -use common::{BinarySerializable, HasLen, VInt}; +use common::{BinarySerializable, HasLen}; use lru::LruCache; use ownedbytes::OwnedBytes; @@ -211,17 +211,10 @@ impl StoreReader { doc_id: DocId, checkpoint: &Checkpoint, ) -> crate::Result { - let mut cursor = &block[..]; - let cursor_len_before = cursor.len(); - for _ in checkpoint.doc_range.start..doc_id { - let doc_length = VInt::deserialize(&mut cursor)?.val() as usize; - cursor = &cursor[doc_length..]; - } + let doc_pos = doc_id - checkpoint.doc_range.start; - let doc_length = VInt::deserialize(&mut cursor)?.val() as usize; - let start_pos = cursor_len_before - cursor.len(); - let end_pos = cursor_len_before - cursor.len() + doc_length; - Ok(block.slice(start_pos..end_pos)) + let range = block_read_index(&block, doc_pos)?; + Ok(block.slice(range)) } /// Iterator over all Documents in their order as they are stored in the doc store. @@ -254,9 +247,7 @@ impl StoreReader { let mut curr_block = curr_checkpoint .as_ref() .map(|checkpoint| self.read_block(checkpoint).map_err(|e| e.kind())); // map error in order to enable cloning - let mut block_start_pos = 0; - let mut num_skipped = 0; - let mut reset_block_pos = false; + let mut doc_pos = 0; (0..last_doc_id) .filter_map(move |doc_id| { // filter_map is only used to resolve lifetime issues between the two closures on @@ -268,24 +259,19 @@ impl StoreReader { curr_block = curr_checkpoint .as_ref() .map(|checkpoint| self.read_block(checkpoint).map_err(|e| e.kind())); - reset_block_pos = true; - num_skipped = 0; + doc_pos = 0; } let alive = alive_bitset.map_or(true, |bitset| bitset.is_alive(doc_id)); - if alive { - let ret = Some((curr_block.clone(), num_skipped, reset_block_pos)); - // the map block will move over the num_skipped, so we reset to 0 - num_skipped = 0; - reset_block_pos = false; - ret + let res = if alive { + Some((curr_block.clone(), doc_pos)) } else { - // we keep the number of skipped documents to move forward in the map block - num_skipped += 1; None - } + }; + doc_pos += 1; + res }) - .map(move |(block, num_skipped, reset_block_pos)| { + .map(move |(block, doc_pos)| { let block = block .ok_or_else(|| { DataCorruption::comment_only( @@ -296,30 +282,9 @@ impl StoreReader { .map_err(|error_kind| { std::io::Error::new(error_kind, "error when reading block in doc store") })?; - // this flag is set, when filter_map moved to the next block - if reset_block_pos { - block_start_pos = 0; - } - let mut cursor = &block[block_start_pos..]; - let mut pos = 0; - // move forward 1 doc + num_skipped in block and return length of current doc - let doc_length = loop { - let doc_length = VInt::deserialize(&mut cursor)?.val() as usize; - let num_bytes_read = block[block_start_pos..].len() - cursor.len(); - block_start_pos += num_bytes_read; - - pos += 1; - if pos == num_skipped + 1 { - break doc_length; - } else { - block_start_pos += doc_length; - cursor = &block[block_start_pos..]; - } - }; - let end_pos = block_start_pos + doc_length; - let doc_bytes = block.slice(block_start_pos..end_pos); - block_start_pos = end_pos; - Ok(doc_bytes) + + let range = block_read_index(&block, doc_pos)?; + Ok(block.slice(range)) }) } @@ -329,6 +294,28 @@ impl StoreReader { } } +fn block_read_index(block: &[u8], doc_pos: u32) -> crate::Result> { + let doc_pos = doc_pos as usize; + let size_of_u32 = std::mem::size_of::(); + + let index_len_pos = block.len() - size_of_u32; + let index_len = u32::deserialize(&mut &block[index_len_pos..])? as usize; + + if doc_pos > index_len { + return Err(crate::TantivyError::InternalError( + "Attempted to read doc from wrong block".to_owned(), + )); + } + + let index_start = block.len() - (index_len + 1) * size_of_u32; + let index = &block[index_start..index_start + index_len * size_of_u32]; + + let start_offset = u32::deserialize(&mut &index[doc_pos * size_of_u32..])? as usize; + let end_offset = u32::deserialize(&mut &index[(doc_pos + 1) * size_of_u32..]) + .unwrap_or(index_start as u32) as usize; + Ok(start_offset..end_offset) +} + #[cfg(feature = "quickwit")] impl StoreReader { /// Advanced API. @@ -427,7 +414,7 @@ mod tests { assert_eq!(store.cache_stats().cache_hits, 1); assert_eq!(store.cache_stats().cache_misses, 2); - assert_eq!(store.cache.peek_lru(), Some(9210)); + assert_eq!(store.cache.peek_lru(), Some(11163)); Ok(()) } diff --git a/src/store/writer.rs b/src/store/writer.rs index d0f27f4e89..c034548f35 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -1,6 +1,6 @@ use std::io::{self, Write}; -use common::{BinarySerializable, VInt}; +use common::BinarySerializable; use super::compressors::Compressor; use super::StoreReader; @@ -22,6 +22,7 @@ pub struct StoreWriter { num_docs_in_current_block: DocId, intermediary_buffer: Vec, current_block: Vec, + doc_pos: Vec, block_compressor: BlockCompressor, } @@ -42,6 +43,7 @@ impl StoreWriter { block_size, num_docs_in_current_block: 0, intermediary_buffer: Vec::new(), + doc_pos: Vec::new(), current_block: Vec::new(), block_compressor, }) @@ -53,12 +55,17 @@ impl StoreWriter { /// The memory used (inclusive childs) pub fn mem_usage(&self) -> usize { - self.intermediary_buffer.capacity() + self.current_block.capacity() + self.intermediary_buffer.capacity() + + self.current_block.capacity() + + self.doc_pos.capacity() * std::mem::size_of::() } /// Checks if the current block is full, and if so, compresses and flushes it. fn check_flush_block(&mut self) -> io::Result<()> { - if self.current_block.len() > self.block_size { + // this does not count the VInt storing the index lenght itself, but it is negligible in + // front of everything else. + let index_len = self.doc_pos.len() * std::mem::size_of::(); + if self.current_block.len() + index_len > self.block_size { self.send_current_block_to_compressor()?; } Ok(()) @@ -70,8 +77,19 @@ impl StoreWriter { if self.current_block.is_empty() { return Ok(()); } + + let size_of_u32 = std::mem::size_of::(); + self.current_block + .reserve((self.doc_pos.len() + 1) * size_of_u32); + + for pos in self.doc_pos.iter() { + pos.serialize(&mut self.current_block)?; + } + (self.doc_pos.len() as u32).serialize(&mut self.current_block)?; + self.block_compressor .compress_block_and_write(&self.current_block, self.num_docs_in_current_block)?; + self.doc_pos.clear(); self.current_block.clear(); self.num_docs_in_current_block = 0; Ok(()) @@ -87,8 +105,7 @@ impl StoreWriter { // calling store bytes would be preferable for code reuse, but then we can't use // intermediary_buffer due to the borrow checker // a new buffer costs ~1% indexing performance - let doc_num_bytes = self.intermediary_buffer.len(); - VInt(doc_num_bytes as u64).serialize_into_vec(&mut self.current_block); + self.doc_pos.push(self.current_block.len() as u32); self.current_block .write_all(&self.intermediary_buffer[..])?; self.num_docs_in_current_block += 1; @@ -101,8 +118,7 @@ impl StoreWriter { /// The document id is implicitly the current number /// of documents. pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> { - let doc_num_bytes = serialized_document.len(); - VInt(doc_num_bytes as u64).serialize_into_vec(&mut self.current_block); + self.doc_pos.push(self.current_block.len() as u32); self.current_block.extend_from_slice(serialized_document); self.num_docs_in_current_block += 1; self.check_flush_block()?;