From 7eb03db8792b9254826d35c01e36a1e1ef95b9c5 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Mon, 3 Oct 2022 16:07:41 +0200 Subject: [PATCH 1/3] use new format for docstore blocks --- src/store/reader.rs | 95 +++++++++++++++++++++------------------------ src/store/writer.rs | 29 +++++++++----- 2 files changed, 64 insertions(+), 60 deletions(-) diff --git a/src/store/reader.rs b/src/store/reader.rs index 4daed9e589..27f608d97c 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -1,6 +1,6 @@ use std::io; use std::iter::Sum; -use std::ops::AddAssign; +use std::ops::{AddAssign, Range}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; @@ -211,17 +211,10 @@ impl StoreReader { doc_id: DocId, checkpoint: &Checkpoint, ) -> crate::Result { - let mut cursor = &block[..]; - let cursor_len_before = cursor.len(); - for _ in checkpoint.doc_range.start..doc_id { - let doc_length = VInt::deserialize(&mut cursor)?.val() as usize; - cursor = &cursor[doc_length..]; - } + let doc_pos = doc_id - checkpoint.doc_range.start; - let doc_length = VInt::deserialize(&mut cursor)?.val() as usize; - let start_pos = cursor_len_before - cursor.len(); - let end_pos = cursor_len_before - cursor.len() + doc_length; - Ok(block.slice(start_pos..end_pos)) + let range = block_read_index(&block, doc_pos)?; + Ok(block.slice(range)) } /// Iterator over all Documents in their order as they are stored in the doc store. @@ -254,9 +247,7 @@ impl StoreReader { let mut curr_block = curr_checkpoint .as_ref() .map(|checkpoint| self.read_block(checkpoint).map_err(|e| e.kind())); // map error in order to enable cloning - let mut block_start_pos = 0; - let mut num_skipped = 0; - let mut reset_block_pos = false; + let mut doc_pos = 0; (0..last_doc_id) .filter_map(move |doc_id| { // filter_map is only used to resolve lifetime issues between the two closures on @@ -268,24 +259,19 @@ impl StoreReader { curr_block = curr_checkpoint .as_ref() .map(|checkpoint| self.read_block(checkpoint).map_err(|e| e.kind())); - reset_block_pos = true; - num_skipped = 0; + doc_pos = 0; } let alive = alive_bitset.map_or(true, |bitset| bitset.is_alive(doc_id)); - if alive { - let ret = Some((curr_block.clone(), num_skipped, reset_block_pos)); - // the map block will move over the num_skipped, so we reset to 0 - num_skipped = 0; - reset_block_pos = false; - ret + let res = if alive { + Some((curr_block.clone(), doc_pos)) } else { - // we keep the number of skipped documents to move forward in the map block - num_skipped += 1; None - } + }; + doc_pos += 1; + res }) - .map(move |(block, num_skipped, reset_block_pos)| { + .map(move |(block, doc_pos)| { let block = block .ok_or_else(|| { DataCorruption::comment_only( @@ -296,30 +282,9 @@ impl StoreReader { .map_err(|error_kind| { std::io::Error::new(error_kind, "error when reading block in doc store") })?; - // this flag is set, when filter_map moved to the next block - if reset_block_pos { - block_start_pos = 0; - } - let mut cursor = &block[block_start_pos..]; - let mut pos = 0; - // move forward 1 doc + num_skipped in block and return length of current doc - let doc_length = loop { - let doc_length = VInt::deserialize(&mut cursor)?.val() as usize; - let num_bytes_read = block[block_start_pos..].len() - cursor.len(); - block_start_pos += num_bytes_read; - - pos += 1; - if pos == num_skipped + 1 { - break doc_length; - } else { - block_start_pos += doc_length; - cursor = &block[block_start_pos..]; - } - }; - let end_pos = block_start_pos + doc_length; - let doc_bytes = block.slice(block_start_pos..end_pos); - block_start_pos = end_pos; - Ok(doc_bytes) + + let range = block_read_index(&block, doc_pos)?; + Ok(block.slice(range)) }) } @@ -329,6 +294,34 @@ impl StoreReader { } } +fn block_read_index(block: &[u8], doc_pos: u32) -> crate::Result> { + let doc_pos = doc_pos as usize; + let mut cursor = block; + let index_len_vint = VInt::deserialize(&mut cursor)?; + let index_len = index_len_vint.val() as usize; + + if doc_pos > index_len { + return Err(crate::TantivyError::InternalError( + "Attempted to read doc from wrong block".to_owned(), + )); + } + + let size_of_u32 = std::mem::size_of::(); + let index_size = index_len * size_of_u32 + index_len_vint.serialize_into(&mut [0; 10]); + + let start_pos = u32::deserialize(&mut &cursor[doc_pos * size_of_u32..])? as usize + index_size; + let end_pos = if doc_pos + 1 == index_len { + block.len() + } else { + u32::deserialize(&mut &cursor[(doc_pos + 1) * size_of_u32..])? as usize + index_size + }; + + Ok(Range { + start: start_pos, + end: end_pos, + }) +} + #[cfg(feature = "quickwit")] impl StoreReader { /// Advanced API. @@ -427,7 +420,7 @@ mod tests { assert_eq!(store.cache_stats().cache_hits, 1); assert_eq!(store.cache_stats().cache_misses, 2); - assert_eq!(store.cache.peek_lru(), Some(9210)); + assert_eq!(store.cache.peek_lru(), Some(11086)); Ok(()) } diff --git a/src/store/writer.rs b/src/store/writer.rs index d0f27f4e89..c6d758620e 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -1,6 +1,6 @@ use std::io::{self, Write}; -use common::{BinarySerializable, VInt}; +use common::BinarySerializable; use super::compressors::Compressor; use super::StoreReader; @@ -22,6 +22,7 @@ pub struct StoreWriter { num_docs_in_current_block: DocId, intermediary_buffer: Vec, current_block: Vec, + doc_pos: Vec, block_compressor: BlockCompressor, } @@ -42,6 +43,7 @@ impl StoreWriter { block_size, num_docs_in_current_block: 0, intermediary_buffer: Vec::new(), + doc_pos: Vec::new(), current_block: Vec::new(), block_compressor, }) @@ -53,12 +55,17 @@ impl StoreWriter { /// The memory used (inclusive childs) pub fn mem_usage(&self) -> usize { - self.intermediary_buffer.capacity() + self.current_block.capacity() + self.intermediary_buffer.capacity() + + self.current_block.capacity() + + self.doc_pos.capacity() * std::mem::size_of::() } /// Checks if the current block is full, and if so, compresses and flushes it. fn check_flush_block(&mut self) -> io::Result<()> { - if self.current_block.len() > self.block_size { + // this does not count the VInt storing the index lenght itself, but it is negligible in + // front of everything else. + let index_len = self.doc_pos.len() * std::mem::size_of::(); + if self.current_block.len() + index_len > self.block_size { self.send_current_block_to_compressor()?; } Ok(()) @@ -70,9 +77,15 @@ impl StoreWriter { if self.current_block.is_empty() { return Ok(()); } + + self.intermediary_buffer.clear(); + + self.doc_pos.serialize(&mut self.intermediary_buffer)?; + self.intermediary_buffer.append(&mut self.current_block); + self.block_compressor - .compress_block_and_write(&self.current_block, self.num_docs_in_current_block)?; - self.current_block.clear(); + .compress_block_and_write(&self.intermediary_buffer, self.num_docs_in_current_block)?; + self.doc_pos.clear(); self.num_docs_in_current_block = 0; Ok(()) } @@ -87,8 +100,7 @@ impl StoreWriter { // calling store bytes would be preferable for code reuse, but then we can't use // intermediary_buffer due to the borrow checker // a new buffer costs ~1% indexing performance - let doc_num_bytes = self.intermediary_buffer.len(); - VInt(doc_num_bytes as u64).serialize_into_vec(&mut self.current_block); + self.doc_pos.push(self.current_block.len() as u32); self.current_block .write_all(&self.intermediary_buffer[..])?; self.num_docs_in_current_block += 1; @@ -101,8 +113,7 @@ impl StoreWriter { /// The document id is implicitly the current number /// of documents. pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> { - let doc_num_bytes = serialized_document.len(); - VInt(doc_num_bytes as u64).serialize_into_vec(&mut self.current_block); + self.doc_pos.push(self.current_block.len() as u32); self.current_block.extend_from_slice(serialized_document); self.num_docs_in_current_block += 1; self.check_flush_block()?; From 8ac48870e3f946b3427cc9ef1d4eba78df259762 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Mon, 3 Oct 2022 17:45:16 +0200 Subject: [PATCH 2/3] move index to end of block it makes writing the block faster due to one less memcopy --- src/store/reader.rs | 25 +++++++++++-------------- src/store/writer.rs | 13 +++++++++---- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/src/store/reader.rs b/src/store/reader.rs index 27f608d97c..fc1db9e929 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -4,7 +4,7 @@ use std::ops::{AddAssign, Range}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; -use common::{BinarySerializable, HasLen, VInt}; +use common::{BinarySerializable, HasLen}; use lru::LruCache; use ownedbytes::OwnedBytes; @@ -296,9 +296,10 @@ impl StoreReader { fn block_read_index(block: &[u8], doc_pos: u32) -> crate::Result> { let doc_pos = doc_pos as usize; - let mut cursor = block; - let index_len_vint = VInt::deserialize(&mut cursor)?; - let index_len = index_len_vint.val() as usize; + let size_of_u32 = std::mem::size_of::(); + + let index_len_pos = block.len() - size_of_u32; + let index_len = u32::deserialize(&mut &block[index_len_pos..])? as usize; if doc_pos > index_len { return Err(crate::TantivyError::InternalError( @@ -306,16 +307,12 @@ fn block_read_index(block: &[u8], doc_pos: u32) -> crate::Result> { )); } - let size_of_u32 = std::mem::size_of::(); - let index_size = index_len * size_of_u32 + index_len_vint.serialize_into(&mut [0; 10]); - - let start_pos = u32::deserialize(&mut &cursor[doc_pos * size_of_u32..])? as usize + index_size; - let end_pos = if doc_pos + 1 == index_len { - block.len() - } else { - u32::deserialize(&mut &cursor[(doc_pos + 1) * size_of_u32..])? as usize + index_size - }; + let index_start = block.len() - (index_len + 1) * size_of_u32; + let index = &block[index_start..index_start + index_len * size_of_u32]; + let start_pos = u32::deserialize(&mut &index[doc_pos * size_of_u32..])? as usize; + let end_pos = u32::deserialize(&mut &index[(doc_pos + 1) * size_of_u32..]) + .unwrap_or(index_start as u32) as usize; Ok(Range { start: start_pos, end: end_pos, @@ -420,7 +417,7 @@ mod tests { assert_eq!(store.cache_stats().cache_hits, 1); assert_eq!(store.cache_stats().cache_misses, 2); - assert_eq!(store.cache.peek_lru(), Some(11086)); + assert_eq!(store.cache.peek_lru(), Some(11163)); Ok(()) } diff --git a/src/store/writer.rs b/src/store/writer.rs index c6d758620e..c034548f35 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -78,14 +78,19 @@ impl StoreWriter { return Ok(()); } - self.intermediary_buffer.clear(); + let size_of_u32 = std::mem::size_of::(); + self.current_block + .reserve((self.doc_pos.len() + 1) * size_of_u32); - self.doc_pos.serialize(&mut self.intermediary_buffer)?; - self.intermediary_buffer.append(&mut self.current_block); + for pos in self.doc_pos.iter() { + pos.serialize(&mut self.current_block)?; + } + (self.doc_pos.len() as u32).serialize(&mut self.current_block)?; self.block_compressor - .compress_block_and_write(&self.intermediary_buffer, self.num_docs_in_current_block)?; + .compress_block_and_write(&self.current_block, self.num_docs_in_current_block)?; self.doc_pos.clear(); + self.current_block.clear(); self.num_docs_in_current_block = 0; Ok(()) } From 2ab4beafbc60b2eb321585d23c7036d3a6930b4d Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Tue, 4 Oct 2022 09:41:16 +0200 Subject: [PATCH 3/3] fix nits --- src/store/reader.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/store/reader.rs b/src/store/reader.rs index fc1db9e929..92e9a15064 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -310,13 +310,10 @@ fn block_read_index(block: &[u8], doc_pos: u32) -> crate::Result> { let index_start = block.len() - (index_len + 1) * size_of_u32; let index = &block[index_start..index_start + index_len * size_of_u32]; - let start_pos = u32::deserialize(&mut &index[doc_pos * size_of_u32..])? as usize; - let end_pos = u32::deserialize(&mut &index[(doc_pos + 1) * size_of_u32..]) + let start_offset = u32::deserialize(&mut &index[doc_pos * size_of_u32..])? as usize; + let end_offset = u32::deserialize(&mut &index[(doc_pos + 1) * size_of_u32..]) .unwrap_or(index_start as u32) as usize; - Ok(Range { - start: start_pos, - end: end_pos, - }) + Ok(start_offset..end_offset) } #[cfg(feature = "quickwit")]