From 2dbff1691a94dac610cb7c1b18d0a7e917dbd1f8 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sat, 20 Aug 2022 11:50:22 +0200 Subject: [PATCH] Minor readability refactoring in the SegmentDocIdMapping --- src/indexer/doc_id_mapping.rs | 44 ++++---- src/indexer/merger.rs | 127 +++++++++++++----------- src/indexer/merger_sorted_index_test.rs | 15 ++- 3 files changed, 92 insertions(+), 94 deletions(-) diff --git a/src/indexer/doc_id_mapping.rs b/src/indexer/doc_id_mapping.rs index 9d633c2f86..81b1372533 100644 --- a/src/indexer/doc_id_mapping.rs +++ b/src/indexer/doc_id_mapping.rs @@ -2,35 +2,42 @@ //! to get mappings from old doc_id to new doc_id and vice versa, after sorting use std::cmp::Reverse; -use std::ops::Index; use super::SegmentWriter; use crate::schema::{Field, Schema}; -use crate::{DocId, IndexSortByField, Order, SegmentOrdinal, TantivyError}; +use crate::{DocAddress, DocId, IndexSortByField, Order, TantivyError}; /// Struct to provide mapping from new doc_id to old doc_id and segment. #[derive(Clone)] pub(crate) struct SegmentDocIdMapping { - new_doc_id_to_old_and_segment: Vec<(DocId, SegmentOrdinal)>, + new_doc_id_to_old_doc_addr: Vec, is_trivial: bool, } impl SegmentDocIdMapping { - pub(crate) fn new( - new_doc_id_to_old_and_segment: Vec<(DocId, SegmentOrdinal)>, - is_trivial: bool, - ) -> Self { + pub(crate) fn new(new_doc_id_to_old_and_segment: Vec, is_trivial: bool) -> Self { Self { - new_doc_id_to_old_and_segment, + new_doc_id_to_old_doc_addr: new_doc_id_to_old_and_segment, is_trivial, } } - pub(crate) fn iter(&self) -> impl Iterator { - self.new_doc_id_to_old_and_segment.iter() + + /// Returns an iterator over the old document addresses, ordered by the new document ids. + /// + /// In the returned `DocAddress`, the `segment_ord` is the ordinal of targetted segment + /// in the list of merged segments. + pub(crate) fn iter_old_doc_addrs(&self) -> impl Iterator + '_ { + self.new_doc_id_to_old_doc_addr.iter().copied() } + pub(crate) fn len(&self) -> usize { - self.new_doc_id_to_old_and_segment.len() + self.new_doc_id_to_old_doc_addr.len() } + + pub(crate) fn get_old_doc_addr(&self, new_doc_id: DocId) -> DocAddress { + self.new_doc_id_to_old_doc_addr[new_doc_id as usize] + } + /// This flags means the segments are simply stacked in the order of their ordinal. /// e.g. [(0, 1), .. (n, 1), (0, 2)..., (m, 2)] /// @@ -39,21 +46,6 @@ impl SegmentDocIdMapping { self.is_trivial } } -impl Index for SegmentDocIdMapping { - type Output = (DocId, SegmentOrdinal); - - fn index(&self, idx: usize) -> &Self::Output { - &self.new_doc_id_to_old_and_segment[idx] - } -} -impl IntoIterator for SegmentDocIdMapping { - type Item = (DocId, SegmentOrdinal); - type IntoIter = std::vec::IntoIter; - - fn into_iter(self) -> Self::IntoIter { - self.new_doc_id_to_old_and_segment.into_iter() - } -} /// Struct to provide mapping from old doc_id to new doc_id and vice versa within a segment. pub struct DocIdMapping { diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index b55747da86..c6662398cc 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -21,8 +21,8 @@ use crate::schema::{Cardinality, Field, FieldType, Schema}; use crate::store::StoreWriter; use crate::termdict::{TermMerger, TermOrdinal}; use crate::{ - DocId, IndexSettings, IndexSortByField, InvertedIndexReader, Order, SegmentComponent, - SegmentOrdinal, + DocAddress, DocId, IndexSettings, IndexSortByField, InvertedIndexReader, Order, + SegmentComponent, SegmentOrdinal, }; /// Segment's max doc must be `< MAX_DOC_LIMIT`. @@ -260,9 +260,9 @@ impl IndexMerger { .iter() .map(|reader| reader.get_fieldnorms_reader(field)) .collect::>()?; - for (doc_id, reader_ordinal) in doc_id_mapping.iter() { - let fieldnorms_reader = &fieldnorms_readers[*reader_ordinal as usize]; - let fieldnorm_id = fieldnorms_reader.fieldnorm_id(*doc_id); + for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() { + let fieldnorms_reader = &fieldnorms_readers[old_doc_addr.segment_ord as usize]; + let fieldnorm_id = fieldnorms_reader.fieldnorm_id(old_doc_addr.doc_id); fieldnorms_data.push(fieldnorm_id); } @@ -377,8 +377,11 @@ impl IndexMerger { } impl<'a> FastFieldDataAccess for SortedDocIdFieldAccessProvider<'a> { fn get_val(&self, doc: u64) -> u64 { - let (doc_id, reader_ordinal) = self.doc_id_mapping[doc as usize]; - self.fast_field_readers[reader_ordinal as usize].get(doc_id) + let DocAddress { + doc_id, + segment_ord, + } = self.doc_id_mapping.get_old_doc_addr(doc as u32); + self.fast_field_readers[segment_ord as usize].get(doc_id) } } let fastfield_accessor = SortedDocIdFieldAccessProvider { @@ -386,9 +389,9 @@ impl IndexMerger { fast_field_readers: &fast_field_readers, }; let iter_gen = || { - doc_id_mapping.iter().map(|(doc_id, reader_ordinal)| { - let fast_field_reader = &fast_field_readers[*reader_ordinal as usize]; - fast_field_reader.get(*doc_id) + doc_id_mapping.iter_old_doc_addrs().map(|old_doc_addr| { + let fast_field_reader = &fast_field_readers[old_doc_addr.segment_ord as usize]; + fast_field_reader.get(old_doc_addr.doc_id) }) }; fast_field_serializer.create_auto_detect_u64_fast_field( @@ -469,15 +472,11 @@ impl IndexMerger { let doc_id_reader_pair = reader_ordinal_and_field_accessors .iter() - .map(|reader_and_field_accessor| { - let reader = &self.readers[reader_and_field_accessor.0 as usize]; - reader.doc_ids_alive().map(move |doc_id| { - ( - doc_id, - reader_and_field_accessor.0, - &reader_and_field_accessor.1, - ) - }) + .map(|(reader_ord, ff_reader)| { + let reader = &self.readers[*reader_ord as usize]; + reader + .doc_ids_alive() + .map(move |doc_id| (doc_id, reader_ord, ff_reader)) }); let total_num_new_docs = self @@ -486,7 +485,7 @@ impl IndexMerger { .map(|reader| reader.num_docs() as usize) .sum(); - let mut sorted_doc_ids = Vec::with_capacity(total_num_new_docs); + let mut sorted_doc_ids: Vec = Vec::with_capacity(total_num_new_docs); // create iterator tuple of (old doc_id, reader) in order of the new doc_ids sorted_doc_ids.extend( @@ -501,7 +500,10 @@ impl IndexMerger { val1 > val2 } }) - .map(|(doc_id, reader_with_id, _)| (doc_id, reader_with_id)), + .map(|(doc_id, &segment_ord, _)| DocAddress { + doc_id, + segment_ord, + }), ); Ok(SegmentDocIdMapping::new(sorted_doc_ids, false)) } @@ -550,10 +552,10 @@ impl IndexMerger { let mut offsets = Vec::with_capacity(doc_id_mapping.len()); let mut offset = 0; - for (doc_id, reader) in doc_id_mapping.iter() { - let reader = &reader_and_field_accessors[*reader as usize].1; + for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() { + let reader = &reader_and_field_accessors[old_doc_addr.segment_ord as usize].1; offsets.push(offset); - offset += reader.get_len(*doc_id) as u64; + offset += reader.get_len(old_doc_addr.doc_id) as u64; } offsets.push(offset); @@ -631,12 +633,12 @@ impl IndexMerger { fast_field_serializer.new_u64_fast_field_with_idx(field, 0u64, max_term_ord, 1)?; let mut vals = Vec::with_capacity(100); - for (old_doc_id, reader_ordinal) in doc_id_mapping.iter() { + for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() { let term_ordinal_mapping: &[TermOrdinal] = - term_ordinal_mappings.get_segment(*reader_ordinal as usize); + term_ordinal_mappings.get_segment(old_doc_addr.segment_ord as usize); - let ff_reader = &fast_field_reader[*reader_ordinal as usize]; - ff_reader.get_vals(*old_doc_id, &mut vals); + let ff_reader = &fast_field_reader[old_doc_addr.segment_ord as usize]; + ff_reader.get_vals(old_doc_addr.doc_id, &mut vals); for &prev_term_ord in &vals { let new_term_ord = term_ordinal_mapping[prev_term_ord as usize]; serialize_vals.add_val(new_term_ord)?; @@ -657,16 +659,17 @@ impl IndexMerger { .map(|reader| reader.num_docs() as usize) .sum(); - let mut mapping = Vec::with_capacity(total_num_new_docs); + let mut mapping: Vec = Vec::with_capacity(total_num_new_docs); mapping.extend( self.readers .iter() .enumerate() - .flat_map(|(reader_ordinal, reader)| { - reader - .doc_ids_alive() - .map(move |doc_id| (doc_id, reader_ordinal as SegmentOrdinal)) + .flat_map(|(segment_ord, reader)| { + reader.doc_ids_alive().map(move |doc_id| DocAddress { + segment_ord: segment_ord as u32, + doc_id, + }) }), ); Ok(SegmentDocIdMapping::new(mapping, true)) @@ -740,22 +743,24 @@ impl IndexMerger { fn get_val(&self, pos: u64) -> u64 { // use the offsets index to find the doc_id which will contain the position. // the offsets are strictly increasing so we can do a simple search on it. - let new_doc_id = self - .offsets - .iter() - .position(|&offset| offset > pos) - .expect("pos is out of bounds") - - 1; + let new_doc_id: DocId = + self.offsets + .iter() + .position(|&offset| offset > pos) + .expect("pos is out of bounds") as DocId + - 1u32; // now we need to find the position of `pos` in the multivalued bucket - let num_pos_covered_until_now = self.offsets[new_doc_id]; + let num_pos_covered_until_now = self.offsets[new_doc_id as usize]; let pos_in_values = pos - num_pos_covered_until_now; - let (old_doc_id, reader_ordinal) = self.doc_id_mapping[new_doc_id as usize]; - let num_vals = self.fast_field_readers[reader_ordinal as usize].get_len(old_doc_id); + let old_doc_addr = self.doc_id_mapping.get_old_doc_addr(new_doc_id); + let num_vals = self.fast_field_readers[old_doc_addr.segment_ord as usize] + .get_len(old_doc_addr.doc_id); assert!(num_vals >= pos_in_values); - let mut vals = vec![]; - self.fast_field_readers[reader_ordinal as usize].get_vals(old_doc_id, &mut vals); + let mut vals = Vec::new(); + self.fast_field_readers[old_doc_addr.segment_ord as usize] + .get_vals(old_doc_addr.doc_id, &mut vals); vals[pos_in_values as usize] } @@ -766,12 +771,14 @@ impl IndexMerger { offsets, }; let iter_gen = || { - doc_id_mapping.iter().flat_map(|(doc_id, reader_ordinal)| { - let ff_reader = &ff_readers[*reader_ordinal as usize]; - let mut vals = vec![]; - ff_reader.get_vals(*doc_id, &mut vals); - vals.into_iter() - }) + doc_id_mapping + .iter_old_doc_addrs() + .flat_map(|old_doc_addr| { + let ff_reader = &ff_readers[old_doc_addr.segment_ord as usize]; + let mut vals = Vec::new(); + ff_reader.get_vals(old_doc_addr.doc_id, &mut vals); + vals.into_iter() + }) }; fast_field_serializer.create_auto_detect_u64_fast_field_with_idx( field, @@ -810,9 +817,9 @@ impl IndexMerger { )?; let mut serialize_vals = fast_field_serializer.new_bytes_fast_field_with_idx(field, 1); - for (doc_id, reader_ordinal) in doc_id_mapping.iter() { - let bytes_reader = &reader_and_field_accessors[*reader_ordinal as usize].1; - let val = bytes_reader.get_bytes(*doc_id); + for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() { + let bytes_reader = &reader_and_field_accessors[old_doc_addr.segment_ord as usize].1; + let val = bytes_reader.get_bytes(old_doc_addr.doc_id); serialize_vals.write_all(val)?; } @@ -868,9 +875,9 @@ impl IndexMerger { segment_local_map }) .collect(); - for (new_doc_id, (old_doc_id, segment_ord)) in doc_id_mapping.iter().enumerate() { - let segment_map = &mut merged_doc_id_map[*segment_ord as usize]; - segment_map[*old_doc_id as usize] = Some(new_doc_id as DocId); + for (new_doc_id, old_doc_addr) in doc_id_mapping.iter_old_doc_addrs().enumerate() { + let segment_map = &mut merged_doc_id_map[old_doc_addr.segment_ord as usize]; + segment_map[old_doc_addr.doc_id as usize] = Some(new_doc_id as DocId); } // Note that the total number of tokens is not exact. @@ -1045,15 +1052,15 @@ impl IndexMerger { .map(|(i, store)| store.iter_raw(self.readers[i].alive_bitset())) .collect(); - for (old_doc_id, reader_ordinal) in doc_id_mapping.iter() { - let doc_bytes_it = &mut document_iterators[*reader_ordinal as usize]; + for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() { + let doc_bytes_it = &mut document_iterators[old_doc_addr.segment_ord as usize]; if let Some(doc_bytes_res) = doc_bytes_it.next() { let doc_bytes = doc_bytes_res?; store_writer.store_bytes(&doc_bytes)?; } else { return Err(DataCorruption::comment_only(&format!( - "unexpected missing document in docstore on merge, doc id {:?}", - old_doc_id + "unexpected missing document in docstore on merge, doc address \ + {old_doc_addr:?}", )) .into()); } diff --git a/src/indexer/merger_sorted_index_test.rs b/src/indexer/merger_sorted_index_test.rs index cd3ee2f038..d8a80534cd 100644 --- a/src/indexer/merger_sorted_index_test.rs +++ b/src/indexer/merger_sorted_index_test.rs @@ -484,7 +484,7 @@ mod bench_sorted_index_merge { // use cratedoc_id, readerdoc_id_mappinglet vals = reader.fate::schema; use crate::fastfield::{DynamicFastFieldReader, FastFieldReader}; use crate::indexer::merger::IndexMerger; - use crate::schema::{Cardinality, Document, NumericOptions, Schema}; + use crate::schema::{Cardinality, NumericOptions, Schema}; use crate::{IndexSettings, IndexSortByField, IndexWriter, Order}; fn create_index(sort_by_field: Option) -> Index { let mut schema_builder = Schema::builder(); @@ -503,9 +503,7 @@ mod bench_sorted_index_merge { { let mut index_writer = index.writer_for_tests().unwrap(); let index_doc = |index_writer: &mut IndexWriter, val: u64| { - let mut doc = Document::default(); - doc.add_u64(int_field, val); - index_writer.add_document(doc).unwrap(); + index_writer.add_document(doc!(int_field=>val)).unwrap(); }; // 3 segments with 10_000 values in the fast fields for _ in 0..3 { @@ -518,6 +516,7 @@ mod bench_sorted_index_merge { } index } + #[bench] fn create_sorted_index_walk_overkmerge_on_merge_fastfield( b: &mut Bencher, @@ -533,19 +532,19 @@ mod bench_sorted_index_merge { IndexMerger::open(index.schema(), index.settings().clone(), &segments[..])?; let doc_id_mapping = merger.generate_doc_id_mapping(&sort_by_field).unwrap(); b.iter(|| { - let sorted_doc_ids = doc_id_mapping.iter().map(|(doc_id, ordinal)| { - let reader = &merger.readers[*ordinal as usize]; + let sorted_doc_ids = doc_id_mapping.iter_old_doc_addrs().map(|doc_addr| { + let reader = &merger.readers[doc_addr.segment_ord as usize]; let u64_reader: DynamicFastFieldReader = reader.fast_fields().typed_fast_field_reader(field).expect( "Failed to find a reader for single fast field. This is a tantivy bug and \ it should never happen.", ); - (doc_id, reader, u64_reader) + (doc_addr.doc_id, reader, u64_reader) }); // add values in order of the new doc_ids let mut val = 0; for (doc_id, _reader, field_reader) in sorted_doc_ids { - val = field_reader.get(*doc_id); + val = field_reader.get(doc_id); } val